diff --git a/src/main/java/com/darksci/kafkaview/controller/BaseController.java b/src/main/java/com/darksci/kafkaview/controller/BaseController.java index 9049396a..7fda6451 100644 --- a/src/main/java/com/darksci/kafkaview/controller/BaseController.java +++ b/src/main/java/com/darksci/kafkaview/controller/BaseController.java @@ -1,15 +1,31 @@ package com.darksci.kafkaview.controller; import com.darksci.kafkaview.configuration.CustomUserDetails; +import com.darksci.kafkaview.model.Cluster; +import com.darksci.kafkaview.model.View; +import com.darksci.kafkaview.repository.ClusterRepository; +import com.darksci.kafkaview.repository.ViewRepository; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.authentication.AnonymousAuthenticationToken; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.ui.Model; +import org.springframework.web.bind.annotation.ModelAttribute; + +import java.util.ArrayList; +import java.util.List; /** * Base Controller w/ common code. */ public abstract class BaseController { + @Autowired + private ClusterRepository clusterRepository; + + @Autowired + private ViewRepository viewRepository; + /** * Determine if the current user is logged in or not. * @return True if so, false if not. @@ -29,4 +45,15 @@ protected boolean isLoggedIn() { protected CustomUserDetails getLoggedInUser() { return (CustomUserDetails) SecurityContextHolder.getContext().getAuthentication().getPrincipal(); } + + @ModelAttribute + public void addAttributes(Model model) { + // TODO put a limit on these + final Iterable clusters = clusterRepository.findAllByOrderByNameAsc(); + final Iterable views = viewRepository.findAllByOrderByNameAsc(); + + model.addAttribute("MenuClusters", clusters); + model.addAttribute("MenuViews", views); + } + } diff --git a/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java b/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java index da2c803d..d67a0b1a 100644 --- a/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java +++ b/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java @@ -1,93 +1,127 @@ package com.darksci.kafkaview.controller.api; import com.darksci.kafkaview.controller.BaseController; +import com.darksci.kafkaview.manager.kafka.KafkaAdminFactory; import com.darksci.kafkaview.manager.kafka.KafkaOperations; import com.darksci.kafkaview.manager.kafka.config.ClientConfig; import com.darksci.kafkaview.manager.kafka.config.ClusterConfig; import com.darksci.kafkaview.manager.kafka.config.DeserializerConfig; import com.darksci.kafkaview.manager.kafka.config.TopicConfig; +import com.darksci.kafkaview.manager.kafka.dto.ConsumerState; import com.darksci.kafkaview.manager.kafka.dto.KafkaResults; +import com.darksci.kafkaview.manager.kafka.dto.NodeDetails; +import com.darksci.kafkaview.manager.kafka.dto.NodeList; import com.darksci.kafkaview.manager.kafka.dto.TopicDetails; import com.darksci.kafkaview.manager.kafka.dto.TopicList; import com.darksci.kafkaview.manager.kafka.KafkaConsumerFactory; import com.darksci.kafkaview.manager.kafka.TransactionalKafkaClient; import com.darksci.kafkaview.manager.kafka.config.FilterConfig; +import com.darksci.kafkaview.manager.kafka.dto.TopicListing; import com.darksci.kafkaview.manager.kafka.filter.AFilter; +import com.darksci.kafkaview.manager.plugin.DeserializerLoader; +import com.darksci.kafkaview.manager.plugin.exception.LoaderException; +import com.darksci.kafkaview.manager.ui.FlashMessage; import com.darksci.kafkaview.model.Cluster; +import com.darksci.kafkaview.model.MessageFormat; +import com.darksci.kafkaview.model.View; import com.darksci.kafkaview.repository.ClusterRepository; +import com.darksci.kafkaview.repository.ViewRepository; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; +import javax.servlet.http.HttpSession; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Handles API requests. */ @Controller @RequestMapping("/api") -public class ApiController extends BaseController { +public class ApiController { private static final Logger logger = LoggerFactory.getLogger(ApiController.class); + @Autowired + private ViewRepository viewRepository; + + @Autowired + private DeserializerLoader deserializerLoader; + @Autowired private ClusterRepository clusterRepository; /** * GET kafka results */ - @RequestMapping(path = "/consume", method = RequestMethod.GET, produces = "application/json") + @RequestMapping(path = "/consumer/view/{id}", method = RequestMethod.GET, produces = "application/json") @ResponseBody - public KafkaResults consume() { - // Create the configs - final String topicName = "TestTopic"; - final String consumerId = "BobsYerAunty"; - - // Defines the Cluster - final ClusterConfig clusterConfig = getClusterConfig(); - - // Define our Deserializer - final DeserializerConfig deserializerConfig = DeserializerConfig.defaultConfig(); - - // Defines our Topic - final TopicConfig topicConfig = new TopicConfig(clusterConfig, deserializerConfig, topicName); - - // Defines any filters - //final FilterConfig filterConfig = FilterConfig.withNoFilters(); - final FilterConfig filterConfig = FilterConfig.withFilters(AFilter.class); - - // Defines our client - final ClientConfig clientConfig = new ClientConfig(topicConfig, filterConfig, consumerId); - - // Build a consumer - final KafkaConsumer kafkaConsumer = new KafkaConsumerFactory(clientConfig).createAndSubscribe(); + public KafkaResults consume( + final @PathVariable Long id, + final @RequestParam(name = "action", required = false) String action, + final @RequestParam(name = "partitions", required = false) String partitions) { + + // Retrieve the view definition + final View view = viewRepository.findOne(id); + if (view == null) { + // TODO Return some kind of error. + } // Create consumer final KafkaResults results; - try (final TransactionalKafkaClient transactionalKafkaClient = new TransactionalKafkaClient(kafkaConsumer, clientConfig)) { + try (final TransactionalKafkaClient transactionalKafkaClient = setup(view)) { + // move directions if needed + if ("next".equals(action)) { + // Do nothing! + //transactionalKafkaClient.next(); + } else if ("previous".equals(action)) { + transactionalKafkaClient.previous(); + } else if ("head".equals(action)) { + transactionalKafkaClient.toHead(); + } else if ("tail".equals(action)) { + transactionalKafkaClient.toTail(); + } // Poll - results = transactionalKafkaClient.consume(); + results = transactionalKafkaClient.consumePerPartition(); } + return results; + } - // Debug log - logger.info("Consumed records: {}", results); + /** + * POST manually set a consumer's offsets. + */ + @RequestMapping(path = "/consumer/view/{id}/offsets", method = RequestMethod.POST, produces = "application/json") + @ResponseBody + public ConsumerState setConsumerOffsets(final @PathVariable Long id, final @RequestBody Map partitionOffsetMap) { + // Retrieve the view definition + final View view = viewRepository.findOne(id); + if (view == null) { + // TODO Return some kind of error. + } - return results; + // Create consumer + try (final TransactionalKafkaClient transactionalKafkaClient = setup(view)) { + return transactionalKafkaClient.seek(partitionOffsetMap); + } } /** - * GET kafka topics for a requested cluster. + * GET listing of all available kafka topics for a requested cluster. */ @RequestMapping(path = "/cluster/{id}/topics/list", method = RequestMethod.GET, produces = "application/json") @ResponseBody - public List getTopics(final @PathVariable Long id) { + public List getTopics(final @PathVariable Long id) { // Retrieve cluster final Cluster cluster = clusterRepository.findOne(id); if (cluster == null) { @@ -95,10 +129,109 @@ public List getTopics(final @PathVariable Long id) { new ArrayList<>(); } - try (final KafkaOperations operations = KafkaOperations.newKafkaOperationalInstance(cluster.getBrokerHosts())) { + // Create new Operational Client + try (final KafkaOperations operations = createOperationsClient(cluster)) { final TopicList topics = operations.getAvailableTopics(); - return topics.getAllTopics(); + return topics.getTopics(); + } + } + + /** + * GET Topic Details + */ + @RequestMapping(path = "/cluster/{id}/topic/{topic}/details", method = RequestMethod.GET, produces = "application/json") + @ResponseBody + public TopicDetails getTopicDetails(final @PathVariable Long id, final @PathVariable String topic) { + // Retrieve cluster + final Cluster cluster = clusterRepository.findOne(id); + if (cluster == null) { + // Handle error by returning empty list? + new ArrayList<>(); + } + + // Create new Operational Client + try (final KafkaOperations operations = createOperationsClient(cluster)) { + final TopicDetails topicDetails = operations.getTopicDetails(topic); + return topicDetails; + } + } + + /** + * GET Nodes within a cluster. + */ + @RequestMapping(path = "/cluster/{id}/nodes", method = RequestMethod.GET, produces = "application/json") + @ResponseBody + public List getClusterNodes(final @PathVariable Long id) { + // Retrieve cluster + final Cluster cluster = clusterRepository.findOne(id); + if (cluster == null) { + // Handle error by returning empty list? + new ArrayList<>(); + } + + try (final KafkaOperations operations = createOperationsClient(cluster)) { + final NodeList nodes = operations.getClusterNodes(); + return nodes.getNodes(); + } + } + + private KafkaOperations createOperationsClient(final Cluster cluster) { + // Create new Operational Client + final ClusterConfig clusterConfig = new ClusterConfig(cluster.getBrokerHosts()); + final AdminClient adminClient = new KafkaAdminFactory(clusterConfig, "BobsYerAunty").create(); + + return new KafkaOperations(adminClient); + } + + private TransactionalKafkaClient setup(final View view) { + // Construct a consumerId based on user + final String consumerId = "MyUserId1"; + + // Grab our relevant bits + final Cluster cluster = view.getCluster(); + final MessageFormat keyMessageFormat = view.getKeyMessageFormat(); + final MessageFormat valueMessageFormat = view.getValueMessageFormat(); + + final Class keyDeserializerClass; + try { + if (keyMessageFormat.isDefaultFormat()) { + keyDeserializerClass = deserializerLoader.getDeserializerClass(keyMessageFormat.getClasspath()); + } else { + keyDeserializerClass = deserializerLoader.getDeserializerClass(keyMessageFormat.getJar(), keyMessageFormat.getClasspath()); + } + } catch (final LoaderException exception) { + throw new RuntimeException(exception.getMessage(), exception); } + + final Class valueDeserializerClass; + try { + if (valueMessageFormat.isDefaultFormat()) { + valueDeserializerClass = deserializerLoader.getDeserializerClass(valueMessageFormat.getClasspath()); + } else { + valueDeserializerClass = deserializerLoader.getDeserializerClass(valueMessageFormat.getJar(), valueMessageFormat.getClasspath()); + } + } catch (final LoaderException exception) { + throw new RuntimeException(exception.getMessage(), exception); + } + + final ClusterConfig clusterConfig = new ClusterConfig(cluster.getBrokerHosts()); + final DeserializerConfig deserializerConfig = new DeserializerConfig(keyDeserializerClass, valueDeserializerClass); + final TopicConfig topicConfig = new TopicConfig(clusterConfig, deserializerConfig, view.getTopic()); + + final ClientConfig clientConfig = ClientConfig.newBuilder() + .withTopicConfig(topicConfig) + .withNoFilters() + .withConsumerId(consumerId) + .withPartitions(view.getPartitionsAsSet()) + .build(); + + // Create the damn consumer + final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory(clientConfig); + final KafkaConsumer kafkaConsumer = kafkaConsumerFactory.createAndSubscribe(); + + // Create consumer + final KafkaResults results; + return new TransactionalKafkaClient(kafkaConsumer, clientConfig); } private ClusterConfig getClusterConfig() { diff --git a/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java b/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java index e3063bc5..8fd634f3 100644 --- a/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java +++ b/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java @@ -1,22 +1,10 @@ package com.darksci.kafkaview.controller.browser; import com.darksci.kafkaview.controller.BaseController; -import com.darksci.kafkaview.manager.kafka.KafkaConsumerFactory; -import com.darksci.kafkaview.manager.kafka.TransactionalKafkaClient; -import com.darksci.kafkaview.manager.kafka.config.ClientConfig; -import com.darksci.kafkaview.manager.kafka.config.ClusterConfig; -import com.darksci.kafkaview.manager.kafka.config.DeserializerConfig; -import com.darksci.kafkaview.manager.kafka.config.FilterConfig; -import com.darksci.kafkaview.manager.kafka.config.TopicConfig; -import com.darksci.kafkaview.manager.kafka.dto.KafkaResults; import com.darksci.kafkaview.manager.plugin.DeserializerLoader; -import com.darksci.kafkaview.manager.plugin.exception.LoaderException; import com.darksci.kafkaview.manager.ui.FlashMessage; -import com.darksci.kafkaview.model.Cluster; -import com.darksci.kafkaview.model.MessageFormat; import com.darksci.kafkaview.model.View; import com.darksci.kafkaview.repository.ViewRepository; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; @@ -37,10 +25,9 @@ public class BrowserController extends BaseController { /** * GET Displays main configuration index. */ - @RequestMapping(path = "/{id}/{direction}", method = RequestMethod.GET) + @RequestMapping(path = "/{id}", method = RequestMethod.GET) public String index( final @PathVariable Long id, - final @PathVariable String direction, final RedirectAttributes redirectAttributes, final Model model) { @@ -54,68 +41,8 @@ public String index( return "redirect:/"; } model.addAttribute("view", view); + model.addAttribute("cluster", view.getCluster()); - // Create consumer - final KafkaResults results; - try (final TransactionalKafkaClient transactionalKafkaClient = setup(view)) { - // move directions if needed - if ("n".equals(direction)) { - transactionalKafkaClient.next(); - } else if ("p".equals(direction)) { - transactionalKafkaClient.previous(); - } else if ("r".equals(direction)) { - transactionalKafkaClient.toHead(); - } - - // Poll - results = transactionalKafkaClient.consumePerPartition(); - } - model.addAttribute("results", results.getResults()); return "browser/index"; } - - private TransactionalKafkaClient setup(final View view) { - // Construct a consumerId based on user - final String consumerId = "MyUserId1"; - - // Grab our relevant bits - final Cluster cluster = view.getCluster(); - final MessageFormat keyMessageFormat = view.getKeyMessageFormat(); - final MessageFormat valueMessageFormat = view.getValueMessageFormat(); - - final Class keyDeserializerClass; - try { - if (keyMessageFormat.isDefaultFormat()) { - keyDeserializerClass = deserializerLoader.getDeserializerClass(keyMessageFormat.getClasspath()); - } else { - keyDeserializerClass = deserializerLoader.getDeserializerClass(keyMessageFormat.getJar(), keyMessageFormat.getClasspath()); - } - } catch (final LoaderException exception) { - throw new RuntimeException(exception.getMessage(), exception); - } - - final Class valueDeserializerClass; - try { - if (valueMessageFormat.isDefaultFormat()) { - valueDeserializerClass = deserializerLoader.getDeserializerClass(valueMessageFormat.getClasspath()); - } else { - valueDeserializerClass = deserializerLoader.getDeserializerClass(valueMessageFormat.getJar(), valueMessageFormat.getClasspath()); - } - } catch (final LoaderException exception) { - throw new RuntimeException(exception.getMessage(), exception); - } - - final ClusterConfig clusterConfig = new ClusterConfig(cluster.getBrokerHosts()); - final DeserializerConfig deserializerConfig = new DeserializerConfig(keyDeserializerClass, valueDeserializerClass); - final TopicConfig topicConfig = new TopicConfig(clusterConfig, deserializerConfig, view.getTopic()); - final ClientConfig clientConfig = new ClientConfig(topicConfig, FilterConfig.withNoFilters(), consumerId); - - // Create the damn consumer - final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory(clientConfig); - final KafkaConsumer kafkaConsumer = kafkaConsumerFactory.createAndSubscribe(); - - // Create consumer - final KafkaResults results; - return new TransactionalKafkaClient(kafkaConsumer, clientConfig); - } } diff --git a/src/main/java/com/darksci/kafkaview/controller/configuration/view/ViewController.java b/src/main/java/com/darksci/kafkaview/controller/configuration/view/ViewController.java index 350212b7..e074fdbc 100644 --- a/src/main/java/com/darksci/kafkaview/controller/configuration/view/ViewController.java +++ b/src/main/java/com/darksci/kafkaview/controller/configuration/view/ViewController.java @@ -1,10 +1,11 @@ package com.darksci.kafkaview.controller.configuration.view; import com.darksci.kafkaview.controller.BaseController; -import com.darksci.kafkaview.controller.configuration.cluster.forms.ClusterForm; -import com.darksci.kafkaview.controller.configuration.messageFormat.forms.MessageFormatForm; import com.darksci.kafkaview.controller.configuration.view.forms.ViewForm; +import com.darksci.kafkaview.manager.kafka.KafkaAdminFactory; import com.darksci.kafkaview.manager.kafka.KafkaOperations; +import com.darksci.kafkaview.manager.kafka.config.ClusterConfig; +import com.darksci.kafkaview.manager.kafka.dto.TopicDetails; import com.darksci.kafkaview.manager.kafka.dto.TopicList; import com.darksci.kafkaview.manager.ui.FlashMessage; import com.darksci.kafkaview.model.Cluster; @@ -13,6 +14,7 @@ import com.darksci.kafkaview.repository.ClusterRepository; import com.darksci.kafkaview.repository.MessageFormatRepository; import com.darksci.kafkaview.repository.ViewRepository; +import org.apache.kafka.clients.admin.AdminClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; @@ -25,6 +27,10 @@ import javax.validation.Valid; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.StringJoiner; +import java.util.stream.Collectors; @Controller @RequestMapping("/configuration/view") @@ -66,14 +72,25 @@ public String createViewForm(final ViewForm viewForm, final Model model) { // If we have a cluster Id model.addAttribute("topics", new ArrayList<>()); + model.addAttribute("partitions", new ArrayList<>()); if (viewForm.getClusterId() != null) { // Lets load the topics now // Retrieve cluster final Cluster cluster = clusterRepository.findOne(viewForm.getClusterId()); if (cluster != null) { - try (final KafkaOperations operations = KafkaOperations.newKafkaOperationalInstance(cluster.getBrokerHosts())) { + // Create a new Operational Client + final ClusterConfig clusterConfig = new ClusterConfig(cluster.getBrokerHosts()); + final AdminClient adminClient = new KafkaAdminFactory(clusterConfig, "BobsYerAunty").create(); + + try (final KafkaOperations operations = new KafkaOperations(adminClient)) { final TopicList topics = operations.getAvailableTopics(); - model.addAttribute("topics", topics.getAllTopics()); + model.addAttribute("topics", topics.getTopics()); + + // If we have a selected topic + if (viewForm.getTopic() != null && !"!".equals(viewForm.getTopic())) { + final TopicDetails topicDetails = operations.getTopicDetails(viewForm.getTopic()); + model.addAttribute("partitions", topicDetails.getPartitions()); + } } } } @@ -108,6 +125,7 @@ public String editViewForm( viewForm.setKeyMessageFormatId(view.getKeyMessageFormat().getId()); viewForm.setValueMessageFormatId(view.getValueMessageFormat().getId()); viewForm.setTopic(view.getTopic()); + viewForm.setPartitions(view.getPartitionsAsSet()); return createViewForm(viewForm, model); } @@ -167,11 +185,15 @@ public String updateView( final MessageFormat valueMessageFormat = messageFormatRepository.findOne(viewForm.getValueMessageFormatId()); final Cluster cluster = clusterRepository.findOne(viewForm.getClusterId()); + final Set partitionIds = viewForm.getPartitions(); + final String partitionsStr = partitionIds.stream().map(Object::toString).collect(Collectors.joining(",")); + view.setName(viewForm.getName()); view.setTopic(viewForm.getTopic()); view.setKeyMessageFormat(keyMessageFormat); view.setValueMessageFormat(valueMessageFormat); view.setCluster(cluster); + view.setPartitions(partitionsStr); viewRepository.save(view); // Set flash message diff --git a/src/main/java/com/darksci/kafkaview/controller/configuration/view/forms/ViewForm.java b/src/main/java/com/darksci/kafkaview/controller/configuration/view/forms/ViewForm.java index f9c204b5..25822b87 100644 --- a/src/main/java/com/darksci/kafkaview/controller/configuration/view/forms/ViewForm.java +++ b/src/main/java/com/darksci/kafkaview/controller/configuration/view/forms/ViewForm.java @@ -8,6 +8,8 @@ import javax.persistence.ManyToOne; import javax.validation.constraints.NotNull; import javax.validation.constraints.Size; +import java.util.HashSet; +import java.util.Set; public class ViewForm { private Long id = null; @@ -29,6 +31,12 @@ public class ViewForm { @Size(min = 1, max = 255) private String topic; + /** + * Empty set means ALL partitions. + */ + @NotNull + private Set partitions = new HashSet<>(); + public Long getId() { return id; } @@ -77,6 +85,14 @@ public void setTopic(final String topic) { this.topic = topic; } + public Set getPartitions() { + return partitions; + } + + public void setPartitions(final Set partitions) { + this.partitions = partitions; + } + public boolean exists() { return getId() != null; } diff --git a/src/main/java/com/darksci/kafkaview/controller/home/HomeController.java b/src/main/java/com/darksci/kafkaview/controller/home/HomeController.java index 84e3274b..0bf638d4 100644 --- a/src/main/java/com/darksci/kafkaview/controller/home/HomeController.java +++ b/src/main/java/com/darksci/kafkaview/controller/home/HomeController.java @@ -1,5 +1,6 @@ package com.darksci.kafkaview.controller.home; +import com.darksci.kafkaview.controller.BaseController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Controller; @@ -7,7 +8,7 @@ import org.springframework.web.bind.annotation.RequestMethod; @Controller -public class HomeController { +public class HomeController extends BaseController { private static final Logger logger = LoggerFactory.getLogger(HomeController.class); @RequestMapping(path = "/", method = RequestMethod.GET) diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaAdminFactory.java b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaAdminFactory.java new file mode 100644 index 00000000..3662c8aa --- /dev/null +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaAdminFactory.java @@ -0,0 +1,28 @@ +package com.darksci.kafkaview.manager.kafka; + +import com.darksci.kafkaview.manager.kafka.config.ClusterConfig; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.KafkaAdminClient; + +import java.util.HashMap; +import java.util.Map; + +public class KafkaAdminFactory { + private final ClusterConfig clusterConfig; + private final String clientId; + + public KafkaAdminFactory(final ClusterConfig clusterConfig, final String clientId) { + this.clusterConfig = clusterConfig; + this.clientId = clientId; + } + + public AdminClient create() { + // Create a map + Map config = new HashMap<>(); + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterConfig.getConnectString()); + config.put(AdminClientConfig.CLIENT_ID_CONFIG, clientId); + + return KafkaAdminClient.create(config); + } +} diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java index ddd17670..24de1eb8 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java @@ -40,30 +40,15 @@ public KafkaConsumer createAndSubscribe() { // Pull out partitions, convert to browser partitions final List topicPartitions = new ArrayList<>(); for (final PartitionInfo partitionInfo: partitionInfos) { - topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + // Skip filtered partitions + if (!clientConfig.isPartitionFiltered(partitionInfo.partition())) { + topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } } // Assign them. kafkaConsumer.assign(topicPartitions); - // TODO: Seek somewhere? - //kafkaConsumer.seekToBeginning(topicPartitions); - - -// final ArrayList topics = new ArrayList<>(); -// topics.add(clientConfig.getTopicConfig().getTopicName()); -// kafkaConsumer.subscribe(topics, new ConsumerRebalanceListener() { -// @Override -// public void onPartitionsRevoked(final Collection partitions) { -// -// } -// -// @Override -// public void onPartitionsAssigned(final Collection partitions) { -// -// } -// }); - // Return the kafka consumer. return kafkaConsumer; } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaOperations.java b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaOperations.java index d9150955..f0fe99a0 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaOperations.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaOperations.java @@ -1,64 +1,133 @@ package com.darksci.kafkaview.manager.kafka; -import com.darksci.kafkaview.manager.kafka.config.ClientConfig; -import com.darksci.kafkaview.manager.kafka.config.ClusterConfig; -import com.darksci.kafkaview.manager.kafka.config.DeserializerConfig; -import com.darksci.kafkaview.manager.kafka.config.FilterConfig; -import com.darksci.kafkaview.manager.kafka.config.TopicConfig; +import com.darksci.kafkaview.manager.kafka.dto.NodeDetails; +import com.darksci.kafkaview.manager.kafka.dto.NodeList; import com.darksci.kafkaview.manager.kafka.dto.PartitionDetails; import com.darksci.kafkaview.manager.kafka.dto.TopicDetails; import com.darksci.kafkaview.manager.kafka.dto.TopicList; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartitionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; public class KafkaOperations implements AutoCloseable { private final static Logger logger = LoggerFactory.getLogger(KafkaOperations.class); private final static long TIMEOUT = 5000L; - private final KafkaConsumer kafkaConsumer; + private final AdminClient adminClient; - public KafkaOperations(final KafkaConsumer kafkaConsumer) { - this.kafkaConsumer = kafkaConsumer; + public KafkaOperations(final AdminClient adminClient) { + this.adminClient = adminClient; } + /** + * Retrieve all available topics within cluster. + */ public TopicList getAvailableTopics() { - final Map> allTopics = kafkaConsumer.listTopics(); + final List topicListings = new ArrayList<>(); - final List topicDetails = new ArrayList<>(); + try { + final Collection results = adminClient.listTopics().listings().get(); + for (final TopicListing entry: results) { + topicListings.add( + new com.darksci.kafkaview.manager.kafka.dto.TopicListing(entry.name(), entry.isInternal()) + ); + } + return new TopicList(topicListings); + } catch (InterruptedException | ExecutionException e) { + // TODO Handle + throw new RuntimeException(e.getMessage(), e); + } + } - // Loop over - for (final Map.Entry> entry: allTopics.entrySet()) { - final String topicName = entry.getKey(); - final List partitionInfos = entry.getValue(); + /** + * Get information about brokers within the cluster. + */ + public NodeList getClusterNodes() { + final List nodeDetails = new ArrayList<>(); - // Loop over each partition - final List partitionDetails = new ArrayList<>(); - for (final PartitionInfo partitionInfo: partitionInfos) { - partitionDetails.add(new PartitionDetails(partitionInfo.topic(), partitionInfo.partition())); + try { + final Collection nodes = adminClient.describeCluster().nodes().get(); + for (final Node node: nodes) { + nodeDetails.add(new NodeDetails(node.id(), node.host(), node.port())); } - topicDetails.add(new TopicDetails(topicName, partitionDetails)); + return new NodeList(nodeDetails); + } catch (InterruptedException | ExecutionException e) { + // TODO Handle + throw new RuntimeException(e.getMessage(), e); } - - return new TopicList(topicDetails); } - public void close() { - kafkaConsumer.close(); - } + /** + * Get information about a specific topic. + */ + public TopicDetails getTopicDetails(final String topic) { + try { + final Map topicDescriptionMap = adminClient.describeTopics(Collections.singleton(topic)).all().get(); + final TopicDescription topicDescription = topicDescriptionMap.get(topic); + + List partitionDetails = new ArrayList<>(); + + for (final TopicPartitionInfo partitionInfo: topicDescription.partitions()) { + final List isrNodes = new ArrayList<>(); + final List replicaNodes = new ArrayList<>(); - public static KafkaOperations newKafkaOperationalInstance(final String kafkaBrokers) { - final ClusterConfig clusterConfig = new ClusterConfig(kafkaBrokers); - final DeserializerConfig deserializerConfig = DeserializerConfig.defaultConfig(); - final TopicConfig topicConfig = new TopicConfig(clusterConfig, deserializerConfig, "NotUsed"); + // Translate Leader + final NodeDetails leaderNode = new NodeDetails( + partitionInfo.leader().id(), partitionInfo.leader().host(), partitionInfo.leader().port() + ); - final ClientConfig clientConfig = new ClientConfig(topicConfig, FilterConfig.withNoFilters(), "BobsYerAunty"); - return new KafkaOperations(new KafkaConsumerFactory(clientConfig).create()); + // Translate ISR nodes + for (final Node node: partitionInfo.isr()) { + isrNodes.add( + new NodeDetails(node.id(), node.host(), node.port()) + ); + } + + // Translate Replicas nodes + for (final Node node: partitionInfo.replicas()) { + replicaNodes.add( + new NodeDetails(node.id(), node.host(), node.port()) + ); + } + + // Create the details + partitionDetails.add( + new PartitionDetails( + topicDescription.name(), + partitionInfo.partition(), + leaderNode, + replicaNodes, + isrNodes + ) + ); + } + + // Return it + return new TopicDetails( + topicDescription.name(), + topicDescription.isInternal(), + partitionDetails + ); + + } catch (InterruptedException | ExecutionException e) { + // TODO Handle this + throw new RuntimeException(e.getMessage(), e); + } } + public void close() { + adminClient.close(); + } } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java b/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java index 9fa7e684..6e3f512b 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java @@ -1,6 +1,7 @@ package com.darksci.kafkaview.manager.kafka; import com.darksci.kafkaview.manager.kafka.config.ClientConfig; +import com.darksci.kafkaview.manager.kafka.dto.ConsumerState; import com.darksci.kafkaview.manager.kafka.dto.KafkaResult; import com.darksci.kafkaview.manager.kafka.dto.KafkaResults; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -15,10 +16,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.SortedSet; import java.util.TreeMap; public class TransactionalKafkaClient implements AutoCloseable { @@ -45,6 +45,7 @@ public KafkaResults consume() { new KafkaResult( consumerRecord.partition(), consumerRecord.offset(), + consumerRecord.timestamp(), consumerRecord.key(), consumerRecord.value() ) @@ -52,7 +53,7 @@ public KafkaResults consume() { } // Commit offsets commit(); - return new KafkaResults(kafkaResultList); + return KafkaResults.newInstance(kafkaResultList); } public KafkaResults consumePerPartition() { @@ -76,7 +77,29 @@ public KafkaResults consumePerPartition() { allResults.addAll(results); } - return new KafkaResults(allResults); + return KafkaResults.newInstance(allResults); + } + + public ConsumerState seek(final Map partitionOffsetMap) { + for (final Map.Entry entry: partitionOffsetMap.entrySet()) { + kafkaConsumer.seek( + new TopicPartition(clientConfig.getTopicConfig().getTopicName(), entry.getKey()), + entry.getValue() + ); + } + commit(); + return getConsumerState(); + } + + public ConsumerState getConsumerState() { + final Map partitionOffsetMap = new LinkedHashMap<>(); + + for (final TopicPartition topicPartition: getAllPartitions()) { + final long offset = kafkaConsumer.position(topicPartition); + partitionOffsetMap.put(topicPartition.partition(), offset); + } + + return new ConsumerState(clientConfig.getTopicConfig().getTopicName(), partitionOffsetMap); } private List getAllPartitions() { @@ -86,7 +109,10 @@ private List getAllPartitions() { // Pull out partitions, convert to browser partitions final List topicPartitions = new ArrayList<>(); for (final PartitionInfo partitionInfo: partitionInfos) { - topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + // Skip filtered partitions + if (!clientConfig.isPartitionFiltered(partitionInfo.partition())) { + topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } } return topicPartitions; } @@ -150,7 +176,7 @@ public void next() { commit(); } - public void toHead() { + public ConsumerState toHead() { // Get all available partitions final List topicPartitions = getAllPartitions(); @@ -166,5 +192,27 @@ public void toHead() { kafkaConsumer.seek(topicPartition, newOffset); } commit(); + + return getConsumerState(); + } + + public ConsumerState toTail() { + // Get all available partitions + final List topicPartitions = getAllPartitions(); + + // Get head offsets for each partition + final Map tailOffsets = kafkaConsumer.endOffsets(topicPartitions); + + // Loop over each partition + for (final TopicPartition topicPartition: topicPartitions) { + final long newOffset = tailOffsets.get(topicPartition); + logger.info("Resetting Partition: {} To Tail Offset: {}", topicPartition.partition(), newOffset); + + // Seek to earlier offset + kafkaConsumer.seek(topicPartition, newOffset); + } + commit(); + + return getConsumerState(); } } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/config/ClientConfig.java b/src/main/java/com/darksci/kafkaview/manager/kafka/config/ClientConfig.java index 50dfe7d9..6a02da92 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/config/ClientConfig.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/config/ClientConfig.java @@ -1,5 +1,12 @@ package com.darksci.kafkaview.manager.kafka.config; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + public class ClientConfig { /** * Holds details about what topic we're consuming from. @@ -16,6 +23,8 @@ public class ClientConfig { */ private final String consumerId; + private final Set partitionIds; + /** * Defines how many records to retrieve. */ @@ -24,9 +33,16 @@ public class ClientConfig { private boolean isAutoCommitEnabled = false; public ClientConfig(final TopicConfig topicConfig, final FilterConfig filterConfig, final String consumerId) { + this(topicConfig, filterConfig, consumerId, new ArrayList<>()); + } + + public ClientConfig(final TopicConfig topicConfig, final FilterConfig filterConfig, final String consumerId, final Collection partitionIds) { this.topicConfig = topicConfig; this.filterConfig = filterConfig; this.consumerId = consumerId; + final Set tempSet = new HashSet<>(); + tempSet.addAll(partitionIds); + this.partitionIds = Collections.unmodifiableSet(tempSet); } public TopicConfig getTopicConfig() { @@ -49,6 +65,21 @@ public boolean isAutoCommitEnabled() { return isAutoCommitEnabled; } + /** + * Should we limit what partitions we read from? + */ + public boolean hasFilteredPartitions() { + // Empty means allow all. + return !partitionIds.isEmpty(); + } + + public boolean isPartitionFiltered(final int partitionId) { + if (!hasFilteredPartitions()) { + return false; + } + return !partitionIds.contains(partitionId); + } + @Override public String toString() { return "ClientConfig{" + @@ -57,4 +88,52 @@ public String toString() { ", consumerId='" + consumerId + '\'' + '}'; } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private TopicConfig topicConfig; + private FilterConfig filterConfig; + private String consumerId; + private Set limitPartitions = new HashSet<>(); + + private Builder() { + + } + + public Builder withTopicConfig(final TopicConfig topicConfig) { + this.topicConfig = topicConfig; + return this; + } + + public Builder withFilterConfig(final FilterConfig filterConfig) { + this.filterConfig = filterConfig; + return this; + } + + public Builder withNoFilters() { + return withFilterConfig(FilterConfig.withNoFilters()); + } + + public Builder withConsumerId(final String consumerId) { + this.consumerId = consumerId; + return this; + } + + public Builder withPartition(final int partitionId) { + limitPartitions.add(partitionId); + return this; + } + + public Builder withPartitions(final Collection partitionIds) { + limitPartitions.addAll(partitionIds); + return this; + } + + public ClientConfig build() { + return new ClientConfig(topicConfig, filterConfig, consumerId, limitPartitions); + } + } } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/ConsumerState.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/ConsumerState.java new file mode 100644 index 00000000..e8c064cd --- /dev/null +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/ConsumerState.java @@ -0,0 +1,30 @@ +package com.darksci.kafkaview.manager.kafka.dto; + +import java.util.Collections; +import java.util.Map; + +public class ConsumerState { + private final String topic; + private final Map offsets; + + public ConsumerState(final String topic, final Map offsets) { + this.topic = topic; + this.offsets = Collections.unmodifiableMap(offsets); + } + + public String getTopic() { + return topic; + } + + public Map getOffsets() { + return offsets; + } + + @Override + public String toString() { + return "ConsumerState{" + + "topic='" + topic + '\'' + + ", offsets=" + offsets + + '}'; + } +} diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResult.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResult.java index 9efa260c..1f9b2f66 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResult.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResult.java @@ -3,12 +3,14 @@ public class KafkaResult { private final int partition; private final long offset; + private final long timestamp; private final Object key; private final Object value; - public KafkaResult(final int partition, final long offset, final Object key, final Object value) { + public KafkaResult(final int partition, final long offset, final long timestamp, final Object key, final Object value) { this.partition = partition; this.offset = offset; + this.timestamp = timestamp; this.key = key; this.value = value; } @@ -29,11 +31,16 @@ public Object getValue() { return value; } + public long getTimestamp() { + return timestamp; + } + @Override public String toString() { return "KafkaResult{" + "partition=" + partition + ", offset=" + offset + + ", timestamp=" + timestamp + ", key=" + key + ", value=" + value + '}'; diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResults.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResults.java index 6dbf163b..da7e7e80 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResults.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResults.java @@ -1,22 +1,60 @@ package com.darksci.kafkaview.manager.kafka.dto; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class KafkaResults { private final List results; + private final Map startOffsets; + private final Map endOffsets; - public KafkaResults(final List results) { - this.results = results; + public KafkaResults(final List results, final Map startOffsets, final Map endOffsets) { + this.results = Collections.unmodifiableList(results); + this.startOffsets = Collections.unmodifiableMap(startOffsets); + this.endOffsets = Collections.unmodifiableMap(endOffsets); } public List getResults() { return results; } + public Map getStartOffsets() { + return startOffsets; + } + + public Map getEndOffsets() { + return endOffsets; + } + @Override public String toString() { return "KafkaResults{" + "results=" + results + + ", startOffsets=" + startOffsets + + ", endOffsets=" + endOffsets + '}'; } + + public static KafkaResults newInstance(final List kafkaResults) { + // Find first & last offsets + final Map firstOffsets = new HashMap<>(); + final Map lastOffsets = new HashMap<>(); + + // Loop over the results and build map + for (final KafkaResult kafkaResult: kafkaResults) { + final int partition = kafkaResult.getPartition(); + final long offset = kafkaResult.getOffset(); + + if (firstOffsets.getOrDefault(partition, Long.MAX_VALUE) > offset) { + firstOffsets.put(partition, offset); + } + if (lastOffsets.getOrDefault(partition, -1L) < offset) { + lastOffsets.put(partition, offset); + } + } + + return new KafkaResults(kafkaResults, firstOffsets, lastOffsets); + } } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/NodeDetails.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/NodeDetails.java new file mode 100644 index 00000000..d35f5c3c --- /dev/null +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/NodeDetails.java @@ -0,0 +1,34 @@ +package com.darksci.kafkaview.manager.kafka.dto; + +public class NodeDetails { + private final int id; + private final String host; + private final int port; + + public NodeDetails(final int id, final String host, final int port) { + this.id = id; + this.host = host; + this.port = port; + } + + public int getId() { + return id; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + @Override + public String toString() { + return "NodeDetails{" + + "id=" + id + + ", host='" + host + '\'' + + ", port=" + port + + '}'; + } +} diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/NodeList.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/NodeList.java new file mode 100644 index 00000000..948cc122 --- /dev/null +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/NodeList.java @@ -0,0 +1,23 @@ +package com.darksci.kafkaview.manager.kafka.dto; + +import java.util.Collections; +import java.util.List; + +public class NodeList { + final List nodes; + + public NodeList(final List nodes) { + this.nodes = Collections.unmodifiableList(nodes); + } + + public List getNodes() { + return nodes; + } + + @Override + public String toString() { + return "NodeList{" + + "nodes=" + nodes + + '}'; + } +} diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/PartitionDetails.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/PartitionDetails.java index 37fa621d..532b8439 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/PartitionDetails.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/PartitionDetails.java @@ -1,12 +1,21 @@ package com.darksci.kafkaview.manager.kafka.dto; +import java.util.Collections; +import java.util.List; + public class PartitionDetails { private final String topic; private final int partition; + private final NodeDetails leader; + private final List replicas; + private final List isr; - public PartitionDetails(final String topic, final int partition) { + public PartitionDetails(final String topic, final int partition, final NodeDetails leader, final List replicas, final List isr) { this.topic = topic; this.partition = partition; + this.leader = leader; + this.replicas = Collections.unmodifiableList(replicas); + this.isr = Collections.unmodifiableList(isr); } public String getTopic() { @@ -17,11 +26,26 @@ public int getPartition() { return partition; } + public NodeDetails getLeader() { + return leader; + } + + public List getReplicas() { + return replicas; + } + + public List getIsr() { + return isr; + } + @Override public String toString() { return "PartitionDetails{" + - "browser='" + topic + '\'' + + "topic='" + topic + '\'' + ", partition=" + partition + + ", leader=" + leader + + ", replicas=" + replicas + + ", isr=" + isr + '}'; } } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicDetails.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicDetails.java index 8ebad102..e5b110ad 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicDetails.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicDetails.java @@ -1,14 +1,17 @@ package com.darksci.kafkaview.manager.kafka.dto; +import java.util.Collections; import java.util.List; public class TopicDetails { private final String name; + private final boolean isInternal; private final List partitions; - public TopicDetails(final String name, final List partitions) { + public TopicDetails(final String name, final boolean isInternal, final List partitions) { this.name = name; - this.partitions = partitions; + this.isInternal = isInternal; + this.partitions = Collections.unmodifiableList(partitions); } public String getName() { @@ -19,10 +22,15 @@ public List getPartitions() { return partitions; } + public boolean isInternal() { + return isInternal; + } + @Override public String toString() { return "TopicDetails{" + "name='" + name + '\'' + + ", isInternal=" + isInternal + ", partitions=" + partitions + '}'; } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicList.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicList.java index 3ecd5d45..a72277f0 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicList.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicList.java @@ -3,20 +3,20 @@ import java.util.List; public class TopicList { - private final List topicDetailsList; + private final List topics; - public TopicList(final List topicDetailsList) { - this.topicDetailsList = topicDetailsList; + public TopicList(final List topics) { + this.topics = topics; } - public List getAllTopics() { - return topicDetailsList; + public List getTopics() { + return topics; } @Override public String toString() { return "TopicList{" + - "topicDetailsList=" + topicDetailsList + + "topics=" + topics + '}'; } } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicListing.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicListing.java new file mode 100644 index 00000000..f9468686 --- /dev/null +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/TopicListing.java @@ -0,0 +1,27 @@ +package com.darksci.kafkaview.manager.kafka.dto; + +public class TopicListing { + private final String name; + private final boolean isInternal; + + public TopicListing(final String name, final boolean isInternal) { + this.name = name; + this.isInternal = isInternal; + } + + public String getName() { + return name; + } + + public boolean isInternal() { + return isInternal; + } + + @Override + public String toString() { + return "TopicListing{" + + "name='" + name + '\'' + + ", isInternal=" + isInternal + + '}'; + } +} diff --git a/src/main/java/com/darksci/kafkaview/model/View.java b/src/main/java/com/darksci/kafkaview/model/View.java index 6adf59d1..79594e8e 100644 --- a/src/main/java/com/darksci/kafkaview/model/View.java +++ b/src/main/java/com/darksci/kafkaview/model/View.java @@ -7,7 +7,10 @@ import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.ManyToOne; +import javax.persistence.Transient; import java.sql.Timestamp; +import java.util.HashSet; +import java.util.Set; @Entity public class View { @@ -30,6 +33,9 @@ public class View { @Column(nullable = false) private String topic; + @Column(nullable = false) + private String partitions; + @Column(nullable = false) private Timestamp createdAt; @@ -84,6 +90,29 @@ public void setTopic(final String topic) { this.topic = topic; } + public String getPartitions() { + return partitions; + } + + public void setPartitions(final String partitions) { + this.partitions = partitions; + } + + @Transient + public Set getPartitionsAsSet() { + final Set partitionsSet = new HashSet<>(); + final String[] partitions = getPartitions().split(","); + + for (final String partitionStr: partitions) { + try { + partitionsSet.add(Integer.parseInt(partitionStr)); + } catch (NumberFormatException e) { + // Ignore? + } + } + return partitionsSet; + } + public Timestamp getCreatedAt() { return createdAt; } diff --git a/src/main/java/com/darksci/kafkaview/repository/ViewRepository.java b/src/main/java/com/darksci/kafkaview/repository/ViewRepository.java index beb5a95d..72e997d5 100644 --- a/src/main/java/com/darksci/kafkaview/repository/ViewRepository.java +++ b/src/main/java/com/darksci/kafkaview/repository/ViewRepository.java @@ -7,4 +7,5 @@ @Repository public interface ViewRepository extends CrudRepository { View findByName(final String name); + Iterable findAllByOrderByNameAsc(); } diff --git a/src/main/resources/schema/schema.sql b/src/main/resources/schema/schema.sql index f9da90e3..f6f05b04 100644 --- a/src/main/resources/schema/schema.sql +++ b/src/main/resources/schema/schema.sql @@ -44,6 +44,7 @@ CREATE TABLE IF NOT EXISTS `view` ( key_message_format_id INT(11) UNSIGNED NOT NULL, value_message_format_id INT(11) UNSIGNED NOT NULL, topic TEXT NOT NULL, + partitions TEXT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), created_by INT(11) UNSIGNED DEFAULT NULL, updated_at TIMESTAMP NOT NULL DEFAULT NOW(), diff --git a/src/main/resources/static/js/app.js b/src/main/resources/static/js/app.js index 7914f5a0..5c5bf3ee 100755 --- a/src/main/resources/static/js/app.js +++ b/src/main/resources/static/js/app.js @@ -142,5 +142,60 @@ function init(url) { /* ---------- Popover ---------- */ $('[rel="popover"],[data-rel="popover"],[data-toggle="popover"]').popover(); - } + +// Client Properties +var ApiClient = { + consume: function(viewId, action, callback) { + var actionStr = ''; + if (action != null) { + actionStr = 'action=' + action; + } + jQuery.getJSON('/api/consumer/view/' + viewId, actionStr, callback); + }, + consumeNext: function(viewId, callback) { + ApiClient.consume(viewId, 'next', callback); + }, + consumePrevious: function(viewId, callback) { + ApiClient.consume(viewId, 'previous', callback); + }, + consumeTail: function(viewId, callback) { + ApiClient.consume(viewId, 'tail', callback); + }, + consumeHead: function(viewId, callback) { + ApiClient.consume(viewId, 'head', callback); + }, + setConsumerState: function(viewId, partitionOffsetJson, callback) { + //jQuery.post('/api/consumer/view/' + viewId + '/offsets', partitionOffsetMap, callback); + jQuery.ajax({ + type: 'POST', + url: '/api/consumer/view/' + viewId + '/offsets', + data: partitionOffsetJson, + dataType: 'json', + success: callback, + beforeSend: function(xhr) { + xhr.setRequestHeader('Content-type', 'application/json; charset=utf-8'); + } + }); + }, + // Retrieve cluster node info + getClusterNodes: function(clusterId, callback) { + jQuery.getJSON('/api/cluster/' + clusterId + '/nodes', '', callback); + }, + getTopicDetails: function(clusterId, topic, callback) { + jQuery.getJSON('/api/cluster/' + clusterId + '/topic/' + topic + '/details', '', callback); + }, + getTopics: function(clusterId, callback) { + jQuery.getJSON('/api/cluster/' + clusterId + '/topics/list', '', callback); + } +}; + +var DateTools = { + localTimezone: Intl.DateTimeFormat().resolvedOptions().timeZone, + showPrettyDates: true, + displayTimestamp: function(timestampMs) { + // Adjusts timestamp into local timezone and locate + return new Date(timestampMs).toLocaleString(); + } +}; + diff --git a/src/main/resources/static/package-lock.json b/src/main/resources/static/package-lock.json index 61959e55..d2a569d4 100644 --- a/src/main/resources/static/package-lock.json +++ b/src/main/resources/static/package-lock.json @@ -36,11 +36,20 @@ "json-stable-stringify": "1.0.1" } }, + "align-text": { + "version": "0.1.4", + "resolved": "https://registry.npmjs.org/align-text/-/align-text-0.1.4.tgz", + "integrity": "sha1-DNkKVhCT810KmSVsIrcGlDP60Rc=", + "requires": { + "kind-of": "3.2.2", + "longest": "1.0.1", + "repeat-string": "1.6.1" + } + }, "amdefine": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/amdefine/-/amdefine-1.0.1.tgz", - "integrity": "sha1-SlKCrBZHKek2Gbz9OtFR+BfOkfU=", - "dev": true + "integrity": "sha1-SlKCrBZHKek2Gbz9OtFR+BfOkfU=" }, "ansi-regex": { "version": "2.1.1", @@ -173,8 +182,7 @@ "async": { "version": "1.5.2", "resolved": "https://registry.npmjs.org/async/-/async-1.5.2.tgz", - "integrity": "sha1-7GphrlZIDAw8skHJVhjiCJL5Zyo=", - "dev": true + "integrity": "sha1-7GphrlZIDAw8skHJVhjiCJL5Zyo=" }, "async-each": { "version": "1.0.1", @@ -408,8 +416,7 @@ "camelcase": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/camelcase/-/camelcase-1.2.1.tgz", - "integrity": "sha1-m7UwTS4LVmmLLHWLCKPqqdqlijk=", - "dev": true + "integrity": "sha1-m7UwTS4LVmmLLHWLCKPqqdqlijk=" }, "camelcase-keys": { "version": "2.1.0", @@ -435,6 +442,16 @@ "integrity": "sha1-G2gcIf+EAzyCZUMJBolCDRhxUdw=", "dev": true }, + "center-align": { + "version": "0.1.3", + "resolved": "https://registry.npmjs.org/center-align/-/center-align-0.1.3.tgz", + "integrity": "sha1-qg0yYptu6XIgBBHL1EYckHvCt60=", + "optional": true, + "requires": { + "align-text": "0.1.4", + "lazy-cache": "1.0.4" + } + }, "chalk": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/chalk/-/chalk-1.1.3.tgz", @@ -748,8 +765,7 @@ "decamelize": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/decamelize/-/decamelize-1.2.0.tgz", - "integrity": "sha1-9lNNFRSCabIDUue+4m9QH5oZEpA=", - "dev": true + "integrity": "sha1-9lNNFRSCabIDUue+4m9QH5oZEpA=" }, "defaults": { "version": "1.0.3", @@ -3331,6 +3347,81 @@ "glogg": "1.0.0" } }, + "handlebars": { + "version": "4.0.10", + "resolved": "https://registry.npmjs.org/handlebars/-/handlebars-4.0.10.tgz", + "integrity": "sha1-PTDHGLCaPZbyPqTMH0A8TTup/08=", + "requires": { + "async": "1.5.2", + "optimist": "0.6.1", + "source-map": "0.4.4", + "uglify-js": "2.8.29" + }, + "dependencies": { + "cliui": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/cliui/-/cliui-2.1.0.tgz", + "integrity": "sha1-S0dXYP+AJkx2LDoXGQMukcf+oNE=", + "optional": true, + "requires": { + "center-align": "0.1.3", + "right-align": "0.1.3", + "wordwrap": "0.0.2" + } + }, + "source-map": { + "version": "0.4.4", + "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.4.4.tgz", + "integrity": "sha1-66T12pwNyZneaAMti092FzZSA2s=", + "requires": { + "amdefine": "1.0.1" + } + }, + "uglify-js": { + "version": "2.8.29", + "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-2.8.29.tgz", + "integrity": "sha1-KcVzMUgFe7Th913zW3qcty5qWd0=", + "optional": true, + "requires": { + "source-map": "0.5.7", + "uglify-to-browserify": "1.0.2", + "yargs": "3.10.0" + }, + "dependencies": { + "source-map": { + "version": "0.5.7", + "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.5.7.tgz", + "integrity": "sha1-igOdLRAh0i0eoUyA2OpGi6LvP8w=", + "optional": true + } + } + }, + "window-size": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/window-size/-/window-size-0.1.0.tgz", + "integrity": "sha1-VDjNLqk7IC76Ohn+iIeu58lPnJ0=", + "optional": true + }, + "wordwrap": { + "version": "0.0.2", + "resolved": "https://registry.npmjs.org/wordwrap/-/wordwrap-0.0.2.tgz", + "integrity": "sha1-t5Zpu0LstAn4PVg8rVLKF+qhZD8=", + "optional": true + }, + "yargs": { + "version": "3.10.0", + "resolved": "https://registry.npmjs.org/yargs/-/yargs-3.10.0.tgz", + "integrity": "sha1-9+572FfdfB0tOMDnTvvWgdFDH9E=", + "optional": true, + "requires": { + "camelcase": "1.2.1", + "cliui": "2.1.0", + "decamelize": "1.2.0", + "window-size": "0.1.0" + } + } + } + }, "har-schema": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/har-schema/-/har-schema-1.0.5.tgz", @@ -3548,8 +3639,7 @@ "is-buffer": { "version": "1.1.5", "resolved": "https://registry.npmjs.org/is-buffer/-/is-buffer-1.1.5.tgz", - "integrity": "sha1-Hzsm72E7IUuIy8ojzGwB2Hlh7sw=", - "dev": true + "integrity": "sha1-Hzsm72E7IUuIy8ojzGwB2Hlh7sw=" }, "is-builtin-module": { "version": "1.0.0", @@ -3860,11 +3950,16 @@ "version": "3.2.2", "resolved": "https://registry.npmjs.org/kind-of/-/kind-of-3.2.2.tgz", "integrity": "sha1-MeohpzS6ubuw8yRm2JOupR5KPGQ=", - "dev": true, "requires": { "is-buffer": "1.1.5" } }, + "lazy-cache": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/lazy-cache/-/lazy-cache-1.0.4.tgz", + "integrity": "sha1-odePw6UEdMuAhF07O24dpJpEbo4=", + "optional": true + }, "lazystream": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/lazystream/-/lazystream-1.0.0.tgz", @@ -4254,6 +4349,11 @@ } } }, + "longest": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/longest/-/longest-1.0.1.tgz", + "integrity": "sha1-MKCy2jj3N3DoKUoNIuZiXtd9AJc=" + }, "loud-rejection": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/loud-rejection/-/loud-rejection-1.6.0.tgz", @@ -4940,6 +5040,22 @@ "pinkie-promise": "2.0.1" } }, + "optimist": { + "version": "0.6.1", + "resolved": "https://registry.npmjs.org/optimist/-/optimist-0.6.1.tgz", + "integrity": "sha1-2j6nRob6IaGaERwybpDrFaAZZoY=", + "requires": { + "minimist": "0.0.10", + "wordwrap": "0.0.3" + }, + "dependencies": { + "minimist": { + "version": "0.0.10", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-0.0.10.tgz", + "integrity": "sha1-3j+YVD2/lggr5IrRoMfNqDYwHc8=" + } + } + }, "options": { "version": "0.0.6", "resolved": "https://registry.npmjs.org/options/-/options-0.0.6.tgz", @@ -5360,8 +5476,7 @@ "repeat-string": { "version": "1.6.1", "resolved": "https://registry.npmjs.org/repeat-string/-/repeat-string-1.6.1.tgz", - "integrity": "sha1-jcrkcOHIirwtYA//Sndihtp15jc=", - "dev": true + "integrity": "sha1-jcrkcOHIirwtYA//Sndihtp15jc=" }, "repeating": { "version": "2.0.1", @@ -5480,6 +5595,15 @@ "minimatch": "3.0.4" } }, + "right-align": { + "version": "0.1.3", + "resolved": "https://registry.npmjs.org/right-align/-/right-align-0.1.3.tgz", + "integrity": "sha1-YTObci/mo1FWiSENJOFMlhSGE+8=", + "optional": true, + "requires": { + "align-text": "0.1.4" + } + }, "rimraf": { "version": "2.6.2", "resolved": "https://registry.npmjs.org/rimraf/-/rimraf-2.6.2.tgz", @@ -6250,6 +6374,12 @@ "source-map": "0.5.7" } }, + "uglify-to-browserify": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/uglify-to-browserify/-/uglify-to-browserify-1.0.2.tgz", + "integrity": "sha1-bgkk1r2mta/jSeOabWMoUKD4grc=", + "optional": true + }, "ultron": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/ultron/-/ultron-1.0.2.tgz", @@ -6517,6 +6647,11 @@ "integrity": "sha1-+OGqHuWlPsW/FR/6CXQqatdpeHY=", "dev": true }, + "wordwrap": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/wordwrap/-/wordwrap-0.0.3.tgz", + "integrity": "sha1-o9XabNXAvAAI03I0u68b7WMFkQc=" + }, "wrap-ansi": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-2.1.0.tgz", diff --git a/src/main/resources/static/package.json b/src/main/resources/static/package.json index 71104c48..4b82bec7 100755 --- a/src/main/resources/static/package.json +++ b/src/main/resources/static/package.json @@ -7,6 +7,7 @@ "bootstrap": "^4.0.0-beta", "chart.js": "^2.7.0", "font-awesome": "^4.7.0", + "handlebars": "^4.0.10", "jquery": "^3.2.1", "pace-progress": "^1.0.2", "popper.js": "^1.12.5", diff --git a/src/main/resources/templates/api/location.html b/src/main/resources/templates/api/location.html deleted file mode 100644 index e69de29b..00000000 diff --git a/src/main/resources/templates/browser/index.html b/src/main/resources/templates/browser/index.html index 27048bfd..1da8fcde 100644 --- a/src/main/resources/templates/browser/index.html +++ b/src/main/resources/templates/browser/index.html @@ -11,43 +11,529 @@
+
- Kafka Data + View [[${view.name}]] over topic [[${view.topic}]]
- + +
+
+
+
+ + +
+ - - - - - - - +
-
+ + + +
+
+ + + + +
+
diff --git a/src/main/resources/templates/configuration/view/create.html b/src/main/resources/templates/configuration/view/create.html index 1e885bb1..e8ca6f31 100644 --- a/src/main/resources/templates/configuration/view/create.html +++ b/src/main/resources/templates/configuration/view/create.html @@ -5,13 +5,72 @@ layout:decorate="~{layout}"> - Message Format + View
+ +
@@ -48,25 +107,7 @@ > + + + +
+
+
+