From e33b78205257483dabb14663ef4791e1ddac3cd1 Mon Sep 17 00:00:00 2001 From: Crim Date: Tue, 10 Oct 2017 13:50:06 +0900 Subject: [PATCH 1/8] wip --- .../controller/api/ApiController.java | 108 +++++++++++++----- .../controller/browser/BrowserController.java | 65 +---------- .../manager/kafka/KafkaConsumerFactory.java | 18 --- .../resources/templates/browser/index.html | 19 ++- src/main/resources/templates/layout.html | 25 ++-- 5 files changed, 108 insertions(+), 127 deletions(-) diff --git a/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java b/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java index da2c803d..40733f73 100644 --- a/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java +++ b/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java @@ -13,8 +13,14 @@ import com.darksci.kafkaview.manager.kafka.TransactionalKafkaClient; import com.darksci.kafkaview.manager.kafka.config.FilterConfig; import com.darksci.kafkaview.manager.kafka.filter.AFilter; +import com.darksci.kafkaview.manager.plugin.DeserializerLoader; +import com.darksci.kafkaview.manager.plugin.exception.LoaderException; +import com.darksci.kafkaview.manager.ui.FlashMessage; import com.darksci.kafkaview.model.Cluster; +import com.darksci.kafkaview.model.MessageFormat; +import com.darksci.kafkaview.model.View; import com.darksci.kafkaview.repository.ClusterRepository; +import com.darksci.kafkaview.repository.ViewRepository; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +29,7 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import java.util.ArrayList; @@ -36,49 +43,45 @@ public class ApiController extends BaseController { private static final Logger logger = LoggerFactory.getLogger(ApiController.class); + @Autowired + private ViewRepository viewRepository; + + @Autowired + private DeserializerLoader deserializerLoader; + @Autowired private ClusterRepository clusterRepository; /** * GET kafka results */ - @RequestMapping(path = "/consume", method = RequestMethod.GET, produces = "application/json") + @RequestMapping(path = "/consume/view/{id}", method = RequestMethod.GET, produces = "application/json") @ResponseBody - public KafkaResults consume() { - // Create the configs - final String topicName = "TestTopic"; - final String consumerId = "BobsYerAunty"; - - // Defines the Cluster - final ClusterConfig clusterConfig = getClusterConfig(); - - // Define our Deserializer - final DeserializerConfig deserializerConfig = DeserializerConfig.defaultConfig(); - - // Defines our Topic - final TopicConfig topicConfig = new TopicConfig(clusterConfig, deserializerConfig, topicName); - - // Defines any filters - //final FilterConfig filterConfig = FilterConfig.withNoFilters(); - final FilterConfig filterConfig = FilterConfig.withFilters(AFilter.class); - - // Defines our client - final ClientConfig clientConfig = new ClientConfig(topicConfig, filterConfig, consumerId); - - // Build a consumer - final KafkaConsumer kafkaConsumer = new KafkaConsumerFactory(clientConfig).createAndSubscribe(); + public KafkaResults consume( + final @PathVariable Long id, + final @RequestParam(name = "action", required = false) String action) { + + // Retrieve the view definition + final View view = viewRepository.findOne(id); + if (view == null) { + // TODO Return some kind of error. + } // Create consumer final KafkaResults results; - try (final TransactionalKafkaClient transactionalKafkaClient = new TransactionalKafkaClient(kafkaConsumer, clientConfig)) { + try (final TransactionalKafkaClient transactionalKafkaClient = setup(view)) { + // move directions if needed + if ("n".equals(action)) { + transactionalKafkaClient.next(); + } else if ("p".equals(action)) { + transactionalKafkaClient.previous(); + } else if ("r".equals(action)) { + transactionalKafkaClient.toHead(); + } // Poll - results = transactionalKafkaClient.consume(); + results = transactionalKafkaClient.consumePerPartition(); } - - // Debug log - logger.info("Consumed records: {}", results); - return results; } @@ -101,6 +104,51 @@ public List getTopics(final @PathVariable Long id) { } } + private TransactionalKafkaClient setup(final View view) { + // Construct a consumerId based on user + final String consumerId = "MyUserId1"; + + // Grab our relevant bits + final Cluster cluster = view.getCluster(); + final MessageFormat keyMessageFormat = view.getKeyMessageFormat(); + final MessageFormat valueMessageFormat = view.getValueMessageFormat(); + + final Class keyDeserializerClass; + try { + if (keyMessageFormat.isDefaultFormat()) { + keyDeserializerClass = deserializerLoader.getDeserializerClass(keyMessageFormat.getClasspath()); + } else { + keyDeserializerClass = deserializerLoader.getDeserializerClass(keyMessageFormat.getJar(), keyMessageFormat.getClasspath()); + } + } catch (final LoaderException exception) { + throw new RuntimeException(exception.getMessage(), exception); + } + + final Class valueDeserializerClass; + try { + if (valueMessageFormat.isDefaultFormat()) { + valueDeserializerClass = deserializerLoader.getDeserializerClass(valueMessageFormat.getClasspath()); + } else { + valueDeserializerClass = deserializerLoader.getDeserializerClass(valueMessageFormat.getJar(), valueMessageFormat.getClasspath()); + } + } catch (final LoaderException exception) { + throw new RuntimeException(exception.getMessage(), exception); + } + + final ClusterConfig clusterConfig = new ClusterConfig(cluster.getBrokerHosts()); + final DeserializerConfig deserializerConfig = new DeserializerConfig(keyDeserializerClass, valueDeserializerClass); + final TopicConfig topicConfig = new TopicConfig(clusterConfig, deserializerConfig, view.getTopic()); + final ClientConfig clientConfig = new ClientConfig(topicConfig, FilterConfig.withNoFilters(), consumerId); + + // Create the damn consumer + final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory(clientConfig); + final KafkaConsumer kafkaConsumer = kafkaConsumerFactory.createAndSubscribe(); + + // Create consumer + final KafkaResults results; + return new TransactionalKafkaClient(kafkaConsumer, clientConfig); + } + private ClusterConfig getClusterConfig() { return new ClusterConfig("localhost:9092"); } diff --git a/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java b/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java index e3063bc5..49b7c120 100644 --- a/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java +++ b/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java @@ -37,10 +37,9 @@ public class BrowserController extends BaseController { /** * GET Displays main configuration index. */ - @RequestMapping(path = "/{id}/{direction}", method = RequestMethod.GET) + @RequestMapping(path = "/{id}", method = RequestMethod.GET) public String index( final @PathVariable Long id, - final @PathVariable String direction, final RedirectAttributes redirectAttributes, final Model model) { @@ -54,68 +53,6 @@ public String index( return "redirect:/"; } model.addAttribute("view", view); - - // Create consumer - final KafkaResults results; - try (final TransactionalKafkaClient transactionalKafkaClient = setup(view)) { - // move directions if needed - if ("n".equals(direction)) { - transactionalKafkaClient.next(); - } else if ("p".equals(direction)) { - transactionalKafkaClient.previous(); - } else if ("r".equals(direction)) { - transactionalKafkaClient.toHead(); - } - - // Poll - results = transactionalKafkaClient.consumePerPartition(); - } - model.addAttribute("results", results.getResults()); return "browser/index"; } - - private TransactionalKafkaClient setup(final View view) { - // Construct a consumerId based on user - final String consumerId = "MyUserId1"; - - // Grab our relevant bits - final Cluster cluster = view.getCluster(); - final MessageFormat keyMessageFormat = view.getKeyMessageFormat(); - final MessageFormat valueMessageFormat = view.getValueMessageFormat(); - - final Class keyDeserializerClass; - try { - if (keyMessageFormat.isDefaultFormat()) { - keyDeserializerClass = deserializerLoader.getDeserializerClass(keyMessageFormat.getClasspath()); - } else { - keyDeserializerClass = deserializerLoader.getDeserializerClass(keyMessageFormat.getJar(), keyMessageFormat.getClasspath()); - } - } catch (final LoaderException exception) { - throw new RuntimeException(exception.getMessage(), exception); - } - - final Class valueDeserializerClass; - try { - if (valueMessageFormat.isDefaultFormat()) { - valueDeserializerClass = deserializerLoader.getDeserializerClass(valueMessageFormat.getClasspath()); - } else { - valueDeserializerClass = deserializerLoader.getDeserializerClass(valueMessageFormat.getJar(), valueMessageFormat.getClasspath()); - } - } catch (final LoaderException exception) { - throw new RuntimeException(exception.getMessage(), exception); - } - - final ClusterConfig clusterConfig = new ClusterConfig(cluster.getBrokerHosts()); - final DeserializerConfig deserializerConfig = new DeserializerConfig(keyDeserializerClass, valueDeserializerClass); - final TopicConfig topicConfig = new TopicConfig(clusterConfig, deserializerConfig, view.getTopic()); - final ClientConfig clientConfig = new ClientConfig(topicConfig, FilterConfig.withNoFilters(), consumerId); - - // Create the damn consumer - final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory(clientConfig); - final KafkaConsumer kafkaConsumer = kafkaConsumerFactory.createAndSubscribe(); - - // Create consumer - final KafkaResults results; - return new TransactionalKafkaClient(kafkaConsumer, clientConfig); - } } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java index ddd17670..17ee02a1 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java @@ -46,24 +46,6 @@ public KafkaConsumer createAndSubscribe() { // Assign them. kafkaConsumer.assign(topicPartitions); - // TODO: Seek somewhere? - //kafkaConsumer.seekToBeginning(topicPartitions); - - -// final ArrayList topics = new ArrayList<>(); -// topics.add(clientConfig.getTopicConfig().getTopicName()); -// kafkaConsumer.subscribe(topics, new ConsumerRebalanceListener() { -// @Override -// public void onPartitionsRevoked(final Collection partitions) { -// -// } -// -// @Override -// public void onPartitionsAssigned(final Collection partitions) { -// -// } -// }); - // Return the kafka consumer. return kafkaConsumer; } diff --git a/src/main/resources/templates/browser/index.html b/src/main/resources/templates/browser/index.html index 27048bfd..dedb4a40 100644 --- a/src/main/resources/templates/browser/index.html +++ b/src/main/resources/templates/browser/index.html @@ -11,6 +11,14 @@
+
@@ -19,7 +27,14 @@ Kafka Data
- +
+
+
+
+
@@ -38,7 +53,7 @@
-
diff --git a/src/main/resources/templates/layout.html b/src/main/resources/templates/layout.html index ec6fc9e5..96c233a6 100644 --- a/src/main/resources/templates/layout.html +++ b/src/main/resources/templates/layout.html @@ -38,6 +38,9 @@ + + + From 758a332b64a611b633e548f65408104836aecff5 Mon Sep 17 00:00:00 2001 From: Crim Date: Tue, 10 Oct 2017 14:27:00 +0900 Subject: [PATCH 3/8] minor cleanup and tweaks --- src/main/resources/templates/browser/index.html | 2 +- src/main/resources/templates/configuration/view/index.html | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/resources/templates/browser/index.html b/src/main/resources/templates/browser/index.html index 230c4876..7b58c0c4 100644 --- a/src/main/resources/templates/browser/index.html +++ b/src/main/resources/templates/browser/index.html @@ -81,7 +81,7 @@
- Kafka Data + View [[${view.name}]] over topic [[${view.topic}]]
diff --git a/src/main/resources/templates/configuration/view/index.html b/src/main/resources/templates/configuration/view/index.html index 7e086e4a..c74c64dc 100644 --- a/src/main/resources/templates/configuration/view/index.html +++ b/src/main/resources/templates/configuration/view/index.html @@ -39,7 +39,7 @@ - + From 1acfe0f14228fc48c614100b52f1d55c589e8a35 Mon Sep 17 00:00:00 2001 From: Crim Date: Tue, 10 Oct 2017 15:32:59 +0900 Subject: [PATCH 4/8] some UI tweaking --- .../controller/api/ApiController.java | 23 +- .../manager/kafka/KafkaOperations.java | 10 + .../kafka/TransactionalKafkaClient.java | 7 +- .../manager/kafka/dto/KafkaResults.java | 42 ++- .../resources/templates/browser/index.html | 253 +++++++++++++++ src/main/resources/templates/layout.html | 287 +----------------- 6 files changed, 329 insertions(+), 293 deletions(-) diff --git a/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java b/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java index ee5f64da..08b71fcb 100644 --- a/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java +++ b/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java @@ -89,7 +89,7 @@ public KafkaResults consume( } /** - * GET kafka topics for a requested cluster. + * GET listing of all available kafka topics for a requested cluster. */ @RequestMapping(path = "/cluster/{id}/topics/list", method = RequestMethod.GET, produces = "application/json") @ResponseBody @@ -107,6 +107,27 @@ public List getTopics(final @PathVariable Long id) { } } + /** + * GET Details about a cluster. + */ + @RequestMapping(path = "/cluster/{id}/info", method = RequestMethod.GET, produces = "application/json") + @ResponseBody + public List getClusterInfomration(final @PathVariable Long id) { + // Retrieve cluster + final Cluster cluster = clusterRepository.findOne(id); + if (cluster == null) { + // Handle error by returning empty list? + new ArrayList<>(); + } + + try (final KafkaOperations operations = KafkaOperations.newKafkaOperationalInstance(cluster.getBrokerHosts())) { + final TopicList topics = operations.getAvailableTopics(); + return topics.getAllTopics(); + } + } + + + private TransactionalKafkaClient setup(final View view) { // Construct a consumerId based on user final String consumerId = "MyUserId1"; diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaOperations.java b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaOperations.java index d9150955..0e851ab5 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaOperations.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaOperations.java @@ -8,6 +8,8 @@ import com.darksci.kafkaview.manager.kafka.dto.PartitionDetails; import com.darksci.kafkaview.manager.kafka.dto.TopicDetails; import com.darksci.kafkaview.manager.kafka.dto.TopicList; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.slf4j.Logger; @@ -16,6 +18,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Properties; public class KafkaOperations implements AutoCloseable { private final static Logger logger = LoggerFactory.getLogger(KafkaOperations.class); @@ -27,6 +30,9 @@ public KafkaOperations(final KafkaConsumer kafkaConsumer) { this.kafkaConsumer = kafkaConsumer; } + /** + * Retrieve all available topics within cluster. + */ public TopicList getAvailableTopics() { final Map> allTopics = kafkaConsumer.listTopics(); @@ -48,6 +54,10 @@ public TopicList getAvailableTopics() { return new TopicList(topicDetails); } + public void getClusterInfo() { + + } + public void close() { kafkaConsumer.close(); } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java b/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java index 9fa7e684..a5ed7b28 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java @@ -13,12 +13,9 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.SortedSet; import java.util.TreeMap; public class TransactionalKafkaClient implements AutoCloseable { @@ -52,7 +49,7 @@ public KafkaResults consume() { } // Commit offsets commit(); - return new KafkaResults(kafkaResultList); + return KafkaResults.newInstance(kafkaResultList); } public KafkaResults consumePerPartition() { @@ -76,7 +73,7 @@ public KafkaResults consumePerPartition() { allResults.addAll(results); } - return new KafkaResults(allResults); + return KafkaResults.newInstance(allResults); } private List getAllPartitions() { diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResults.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResults.java index 6dbf163b..da7e7e80 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResults.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResults.java @@ -1,22 +1,60 @@ package com.darksci.kafkaview.manager.kafka.dto; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class KafkaResults { private final List results; + private final Map startOffsets; + private final Map endOffsets; - public KafkaResults(final List results) { - this.results = results; + public KafkaResults(final List results, final Map startOffsets, final Map endOffsets) { + this.results = Collections.unmodifiableList(results); + this.startOffsets = Collections.unmodifiableMap(startOffsets); + this.endOffsets = Collections.unmodifiableMap(endOffsets); } public List getResults() { return results; } + public Map getStartOffsets() { + return startOffsets; + } + + public Map getEndOffsets() { + return endOffsets; + } + @Override public String toString() { return "KafkaResults{" + "results=" + results + + ", startOffsets=" + startOffsets + + ", endOffsets=" + endOffsets + '}'; } + + public static KafkaResults newInstance(final List kafkaResults) { + // Find first & last offsets + final Map firstOffsets = new HashMap<>(); + final Map lastOffsets = new HashMap<>(); + + // Loop over the results and build map + for (final KafkaResult kafkaResult: kafkaResults) { + final int partition = kafkaResult.getPartition(); + final long offset = kafkaResult.getOffset(); + + if (firstOffsets.getOrDefault(partition, Long.MAX_VALUE) > offset) { + firstOffsets.put(partition, offset); + } + if (lastOffsets.getOrDefault(partition, -1L) < offset) { + lastOffsets.put(partition, offset); + } + } + + return new KafkaResults(kafkaResults, firstOffsets, lastOffsets); + } } diff --git a/src/main/resources/templates/browser/index.html b/src/main/resources/templates/browser/index.html index 7b58c0c4..97cfea7d 100644 --- a/src/main/resources/templates/browser/index.html +++ b/src/main/resources/templates/browser/index.html @@ -131,5 +131,258 @@
+ + + \ No newline at end of file diff --git a/src/main/resources/templates/layout.html b/src/main/resources/templates/layout.html index 96c233a6..47160e66 100644 --- a/src/main/resources/templates/layout.html +++ b/src/main/resources/templates/layout.html @@ -226,291 +226,8 @@ -
@@ -136,13 +245,13 @@ @@ -150,7 +259,7 @@
-
+
View Details @@ -164,20 +273,22 @@ data-toggle="collapse" href="#collapseCluster" aria-expanded="false" aria-controls="collapseCluster"> - Cluster + Cluster
-
    -
  • Name
  • -
  • - Brokers -
      -
    • broker1
    • -
    • broker2
    • -
    • broker3
    • -
    -
  • -
+ + + + + + + + + + + + +
idhost
Loading...
@@ -189,39 +300,23 @@ data-toggle="collapse" href="#collapseTopic" aria-expanded="false" aria-controls="collapseTopic"> - Topic + Topic
-
    -
  • name
  • -
  • - 10 Partitions -
  • -
  • - Partition 0 -
      -
    • Leader
    • -
    • Follower1
    • -
    • Follower2
    • -
    -
  • -
  • - Partition 1 -
      -
    • Leader
    • -
    • Follower1
    • -
    • Follower2
    • -
    -
  • -
  • - Partition 2 -
      -
    • Leader
    • -
    • Follower1
    • -
    • Follower2
    • -
    -
  • -
+ + + + + + + + + + + + + +
#ReplicasISRs
Loading...
@@ -233,49 +328,59 @@ data-toggle="collapse" href="#collapseView" aria-expanded="false" aria-controls="collapseView"> - View + View
-
    -
  • - Name -
  • -
  • - Key Deserializer -
  • -
  • - Value Deserializer -
  • -
+ + + + + + + + + + + + + + + + + +
FieldValue
Key
Value

- -
+ +
- Partition Offsets + Consumer State

-
- Partition 0 -
-
-
-
-
- Partition 1 -
-
-
-
-
- Partition 2 -
+ + + + + + + + + + + + +
Partition + Offset + + + +
Loading...

From 2e5c6f7b4e01cc8cd873bb1db3e7e57abd5b6165 Mon Sep 17 00:00:00 2001 From: Crim Date: Tue, 10 Oct 2017 22:50:18 +0900 Subject: [PATCH 7/8] add partition filtering --- .../controller/api/ApiController.java | 14 ++- .../controller/browser/BrowserController.java | 12 --- .../configuration/view/ViewController.java | 17 +++ .../configuration/view/forms/ViewForm.java | 16 +++ .../manager/kafka/KafkaConsumerFactory.java | 5 +- .../kafka/TransactionalKafkaClient.java | 30 +++++- .../manager/kafka/config/ClientConfig.java | 79 ++++++++++++++ .../manager/kafka/dto/KafkaResult.java | 9 +- .../com/darksci/kafkaview/model/View.java | 29 +++++ src/main/resources/schema/schema.sql | 1 + src/main/resources/static/js/app.js | 12 +++ .../resources/templates/api/location.html | 0 .../resources/templates/browser/index.html | 57 ++++++++-- .../templates/configuration/view/create.html | 100 ++++++++++++++---- .../templates/configuration/view/index.html | 2 + 15 files changed, 335 insertions(+), 48 deletions(-) delete mode 100644 src/main/resources/templates/api/location.html diff --git a/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java b/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java index 8f84cfc4..24e6d860 100644 --- a/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java +++ b/src/main/java/com/darksci/kafkaview/controller/api/ApiController.java @@ -39,6 +39,7 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; +import javax.servlet.http.HttpSession; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -67,7 +68,8 @@ public class ApiController extends BaseController { @ResponseBody public KafkaResults consume( final @PathVariable Long id, - final @RequestParam(name = "action", required = false) String action) { + final @RequestParam(name = "action", required = false) String action, + final @RequestParam(name = "partitions", required = false) String partitions) { // Retrieve the view definition final View view = viewRepository.findOne(id); @@ -87,7 +89,7 @@ public KafkaResults consume( } else if ("head".equals(action)) { transactionalKafkaClient.toHead(); } else if ("tail".equals(action)) { - // todo - go to tail + transactionalKafkaClient.toTail(); } // Poll @@ -215,7 +217,13 @@ private TransactionalKafkaClient setup(final View view) { final ClusterConfig clusterConfig = new ClusterConfig(cluster.getBrokerHosts()); final DeserializerConfig deserializerConfig = new DeserializerConfig(keyDeserializerClass, valueDeserializerClass); final TopicConfig topicConfig = new TopicConfig(clusterConfig, deserializerConfig, view.getTopic()); - final ClientConfig clientConfig = new ClientConfig(topicConfig, FilterConfig.withNoFilters(), consumerId); + + final ClientConfig clientConfig = ClientConfig.newBuilder() + .withTopicConfig(topicConfig) + .withNoFilters() + .withConsumerId(consumerId) + .withPartitions(view.getPartitionsAsSet()) + .build(); // Create the damn consumer final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory(clientConfig); diff --git a/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java b/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java index 5bea6e5b..8fd634f3 100644 --- a/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java +++ b/src/main/java/com/darksci/kafkaview/controller/browser/BrowserController.java @@ -1,22 +1,10 @@ package com.darksci.kafkaview.controller.browser; import com.darksci.kafkaview.controller.BaseController; -import com.darksci.kafkaview.manager.kafka.KafkaConsumerFactory; -import com.darksci.kafkaview.manager.kafka.TransactionalKafkaClient; -import com.darksci.kafkaview.manager.kafka.config.ClientConfig; -import com.darksci.kafkaview.manager.kafka.config.ClusterConfig; -import com.darksci.kafkaview.manager.kafka.config.DeserializerConfig; -import com.darksci.kafkaview.manager.kafka.config.FilterConfig; -import com.darksci.kafkaview.manager.kafka.config.TopicConfig; -import com.darksci.kafkaview.manager.kafka.dto.KafkaResults; import com.darksci.kafkaview.manager.plugin.DeserializerLoader; -import com.darksci.kafkaview.manager.plugin.exception.LoaderException; import com.darksci.kafkaview.manager.ui.FlashMessage; -import com.darksci.kafkaview.model.Cluster; -import com.darksci.kafkaview.model.MessageFormat; import com.darksci.kafkaview.model.View; import com.darksci.kafkaview.repository.ViewRepository; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; diff --git a/src/main/java/com/darksci/kafkaview/controller/configuration/view/ViewController.java b/src/main/java/com/darksci/kafkaview/controller/configuration/view/ViewController.java index 41b8e906..e074fdbc 100644 --- a/src/main/java/com/darksci/kafkaview/controller/configuration/view/ViewController.java +++ b/src/main/java/com/darksci/kafkaview/controller/configuration/view/ViewController.java @@ -5,6 +5,7 @@ import com.darksci.kafkaview.manager.kafka.KafkaAdminFactory; import com.darksci.kafkaview.manager.kafka.KafkaOperations; import com.darksci.kafkaview.manager.kafka.config.ClusterConfig; +import com.darksci.kafkaview.manager.kafka.dto.TopicDetails; import com.darksci.kafkaview.manager.kafka.dto.TopicList; import com.darksci.kafkaview.manager.ui.FlashMessage; import com.darksci.kafkaview.model.Cluster; @@ -26,6 +27,10 @@ import javax.validation.Valid; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.StringJoiner; +import java.util.stream.Collectors; @Controller @RequestMapping("/configuration/view") @@ -67,6 +72,7 @@ public String createViewForm(final ViewForm viewForm, final Model model) { // If we have a cluster Id model.addAttribute("topics", new ArrayList<>()); + model.addAttribute("partitions", new ArrayList<>()); if (viewForm.getClusterId() != null) { // Lets load the topics now // Retrieve cluster @@ -79,6 +85,12 @@ public String createViewForm(final ViewForm viewForm, final Model model) { try (final KafkaOperations operations = new KafkaOperations(adminClient)) { final TopicList topics = operations.getAvailableTopics(); model.addAttribute("topics", topics.getTopics()); + + // If we have a selected topic + if (viewForm.getTopic() != null && !"!".equals(viewForm.getTopic())) { + final TopicDetails topicDetails = operations.getTopicDetails(viewForm.getTopic()); + model.addAttribute("partitions", topicDetails.getPartitions()); + } } } } @@ -113,6 +125,7 @@ public String editViewForm( viewForm.setKeyMessageFormatId(view.getKeyMessageFormat().getId()); viewForm.setValueMessageFormatId(view.getValueMessageFormat().getId()); viewForm.setTopic(view.getTopic()); + viewForm.setPartitions(view.getPartitionsAsSet()); return createViewForm(viewForm, model); } @@ -172,11 +185,15 @@ public String updateView( final MessageFormat valueMessageFormat = messageFormatRepository.findOne(viewForm.getValueMessageFormatId()); final Cluster cluster = clusterRepository.findOne(viewForm.getClusterId()); + final Set partitionIds = viewForm.getPartitions(); + final String partitionsStr = partitionIds.stream().map(Object::toString).collect(Collectors.joining(",")); + view.setName(viewForm.getName()); view.setTopic(viewForm.getTopic()); view.setKeyMessageFormat(keyMessageFormat); view.setValueMessageFormat(valueMessageFormat); view.setCluster(cluster); + view.setPartitions(partitionsStr); viewRepository.save(view); // Set flash message diff --git a/src/main/java/com/darksci/kafkaview/controller/configuration/view/forms/ViewForm.java b/src/main/java/com/darksci/kafkaview/controller/configuration/view/forms/ViewForm.java index f9c204b5..25822b87 100644 --- a/src/main/java/com/darksci/kafkaview/controller/configuration/view/forms/ViewForm.java +++ b/src/main/java/com/darksci/kafkaview/controller/configuration/view/forms/ViewForm.java @@ -8,6 +8,8 @@ import javax.persistence.ManyToOne; import javax.validation.constraints.NotNull; import javax.validation.constraints.Size; +import java.util.HashSet; +import java.util.Set; public class ViewForm { private Long id = null; @@ -29,6 +31,12 @@ public class ViewForm { @Size(min = 1, max = 255) private String topic; + /** + * Empty set means ALL partitions. + */ + @NotNull + private Set partitions = new HashSet<>(); + public Long getId() { return id; } @@ -77,6 +85,14 @@ public void setTopic(final String topic) { this.topic = topic; } + public Set getPartitions() { + return partitions; + } + + public void setPartitions(final Set partitions) { + this.partitions = partitions; + } + public boolean exists() { return getId() != null; } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java index 17ee02a1..24de1eb8 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/KafkaConsumerFactory.java @@ -40,7 +40,10 @@ public KafkaConsumer createAndSubscribe() { // Pull out partitions, convert to browser partitions final List topicPartitions = new ArrayList<>(); for (final PartitionInfo partitionInfo: partitionInfos) { - topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + // Skip filtered partitions + if (!clientConfig.isPartitionFiltered(partitionInfo.partition())) { + topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } } // Assign them. diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java b/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java index 300bdfae..6e3f512b 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/TransactionalKafkaClient.java @@ -45,6 +45,7 @@ public KafkaResults consume() { new KafkaResult( consumerRecord.partition(), consumerRecord.offset(), + consumerRecord.timestamp(), consumerRecord.key(), consumerRecord.value() ) @@ -108,7 +109,10 @@ private List getAllPartitions() { // Pull out partitions, convert to browser partitions final List topicPartitions = new ArrayList<>(); for (final PartitionInfo partitionInfo: partitionInfos) { - topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + // Skip filtered partitions + if (!clientConfig.isPartitionFiltered(partitionInfo.partition())) { + topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } } return topicPartitions; } @@ -172,7 +176,7 @@ public void next() { commit(); } - public void toHead() { + public ConsumerState toHead() { // Get all available partitions final List topicPartitions = getAllPartitions(); @@ -188,5 +192,27 @@ public void toHead() { kafkaConsumer.seek(topicPartition, newOffset); } commit(); + + return getConsumerState(); + } + + public ConsumerState toTail() { + // Get all available partitions + final List topicPartitions = getAllPartitions(); + + // Get head offsets for each partition + final Map tailOffsets = kafkaConsumer.endOffsets(topicPartitions); + + // Loop over each partition + for (final TopicPartition topicPartition: topicPartitions) { + final long newOffset = tailOffsets.get(topicPartition); + logger.info("Resetting Partition: {} To Tail Offset: {}", topicPartition.partition(), newOffset); + + // Seek to earlier offset + kafkaConsumer.seek(topicPartition, newOffset); + } + commit(); + + return getConsumerState(); } } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/config/ClientConfig.java b/src/main/java/com/darksci/kafkaview/manager/kafka/config/ClientConfig.java index 50dfe7d9..6a02da92 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/config/ClientConfig.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/config/ClientConfig.java @@ -1,5 +1,12 @@ package com.darksci.kafkaview.manager.kafka.config; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + public class ClientConfig { /** * Holds details about what topic we're consuming from. @@ -16,6 +23,8 @@ public class ClientConfig { */ private final String consumerId; + private final Set partitionIds; + /** * Defines how many records to retrieve. */ @@ -24,9 +33,16 @@ public class ClientConfig { private boolean isAutoCommitEnabled = false; public ClientConfig(final TopicConfig topicConfig, final FilterConfig filterConfig, final String consumerId) { + this(topicConfig, filterConfig, consumerId, new ArrayList<>()); + } + + public ClientConfig(final TopicConfig topicConfig, final FilterConfig filterConfig, final String consumerId, final Collection partitionIds) { this.topicConfig = topicConfig; this.filterConfig = filterConfig; this.consumerId = consumerId; + final Set tempSet = new HashSet<>(); + tempSet.addAll(partitionIds); + this.partitionIds = Collections.unmodifiableSet(tempSet); } public TopicConfig getTopicConfig() { @@ -49,6 +65,21 @@ public boolean isAutoCommitEnabled() { return isAutoCommitEnabled; } + /** + * Should we limit what partitions we read from? + */ + public boolean hasFilteredPartitions() { + // Empty means allow all. + return !partitionIds.isEmpty(); + } + + public boolean isPartitionFiltered(final int partitionId) { + if (!hasFilteredPartitions()) { + return false; + } + return !partitionIds.contains(partitionId); + } + @Override public String toString() { return "ClientConfig{" + @@ -57,4 +88,52 @@ public String toString() { ", consumerId='" + consumerId + '\'' + '}'; } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private TopicConfig topicConfig; + private FilterConfig filterConfig; + private String consumerId; + private Set limitPartitions = new HashSet<>(); + + private Builder() { + + } + + public Builder withTopicConfig(final TopicConfig topicConfig) { + this.topicConfig = topicConfig; + return this; + } + + public Builder withFilterConfig(final FilterConfig filterConfig) { + this.filterConfig = filterConfig; + return this; + } + + public Builder withNoFilters() { + return withFilterConfig(FilterConfig.withNoFilters()); + } + + public Builder withConsumerId(final String consumerId) { + this.consumerId = consumerId; + return this; + } + + public Builder withPartition(final int partitionId) { + limitPartitions.add(partitionId); + return this; + } + + public Builder withPartitions(final Collection partitionIds) { + limitPartitions.addAll(partitionIds); + return this; + } + + public ClientConfig build() { + return new ClientConfig(topicConfig, filterConfig, consumerId, limitPartitions); + } + } } diff --git a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResult.java b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResult.java index 9efa260c..1f9b2f66 100644 --- a/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResult.java +++ b/src/main/java/com/darksci/kafkaview/manager/kafka/dto/KafkaResult.java @@ -3,12 +3,14 @@ public class KafkaResult { private final int partition; private final long offset; + private final long timestamp; private final Object key; private final Object value; - public KafkaResult(final int partition, final long offset, final Object key, final Object value) { + public KafkaResult(final int partition, final long offset, final long timestamp, final Object key, final Object value) { this.partition = partition; this.offset = offset; + this.timestamp = timestamp; this.key = key; this.value = value; } @@ -29,11 +31,16 @@ public Object getValue() { return value; } + public long getTimestamp() { + return timestamp; + } + @Override public String toString() { return "KafkaResult{" + "partition=" + partition + ", offset=" + offset + + ", timestamp=" + timestamp + ", key=" + key + ", value=" + value + '}'; diff --git a/src/main/java/com/darksci/kafkaview/model/View.java b/src/main/java/com/darksci/kafkaview/model/View.java index 6adf59d1..79594e8e 100644 --- a/src/main/java/com/darksci/kafkaview/model/View.java +++ b/src/main/java/com/darksci/kafkaview/model/View.java @@ -7,7 +7,10 @@ import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.ManyToOne; +import javax.persistence.Transient; import java.sql.Timestamp; +import java.util.HashSet; +import java.util.Set; @Entity public class View { @@ -30,6 +33,9 @@ public class View { @Column(nullable = false) private String topic; + @Column(nullable = false) + private String partitions; + @Column(nullable = false) private Timestamp createdAt; @@ -84,6 +90,29 @@ public void setTopic(final String topic) { this.topic = topic; } + public String getPartitions() { + return partitions; + } + + public void setPartitions(final String partitions) { + this.partitions = partitions; + } + + @Transient + public Set getPartitionsAsSet() { + final Set partitionsSet = new HashSet<>(); + final String[] partitions = getPartitions().split(","); + + for (final String partitionStr: partitions) { + try { + partitionsSet.add(Integer.parseInt(partitionStr)); + } catch (NumberFormatException e) { + // Ignore? + } + } + return partitionsSet; + } + public Timestamp getCreatedAt() { return createdAt; } diff --git a/src/main/resources/schema/schema.sql b/src/main/resources/schema/schema.sql index f9da90e3..f6f05b04 100644 --- a/src/main/resources/schema/schema.sql +++ b/src/main/resources/schema/schema.sql @@ -44,6 +44,7 @@ CREATE TABLE IF NOT EXISTS `view` ( key_message_format_id INT(11) UNSIGNED NOT NULL, value_message_format_id INT(11) UNSIGNED NOT NULL, topic TEXT NOT NULL, + partitions TEXT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), created_by INT(11) UNSIGNED DEFAULT NULL, updated_at TIMESTAMP NOT NULL DEFAULT NOW(), diff --git a/src/main/resources/static/js/app.js b/src/main/resources/static/js/app.js index 892f18c3..5c5bf3ee 100755 --- a/src/main/resources/static/js/app.js +++ b/src/main/resources/static/js/app.js @@ -184,6 +184,18 @@ var ApiClient = { }, getTopicDetails: function(clusterId, topic, callback) { jQuery.getJSON('/api/cluster/' + clusterId + '/topic/' + topic + '/details', '', callback); + }, + getTopics: function(clusterId, callback) { + jQuery.getJSON('/api/cluster/' + clusterId + '/topics/list', '', callback); + } +}; + +var DateTools = { + localTimezone: Intl.DateTimeFormat().resolvedOptions().timeZone, + showPrettyDates: true, + displayTimestamp: function(timestampMs) { + // Adjusts timestamp into local timezone and locate + return new Date(timestampMs).toLocaleString(); } }; diff --git a/src/main/resources/templates/api/location.html b/src/main/resources/templates/api/location.html deleted file mode 100644 index e69de29b..00000000 diff --git a/src/main/resources/templates/browser/index.html b/src/main/resources/templates/browser/index.html index 87c589a1..84652011 100644 --- a/src/main/resources/templates/browser/index.html +++ b/src/main/resources/templates/browser/index.html @@ -42,6 +42,13 @@ executeConsume('tail'); }); + jQuery('#dateFormatToggle').change(function() { + DateTools.showPrettyDates = jQuery(this).is(":checked"); + + // Reload results + executeConsume('previous'); + }); + jQuery('#toggleStateEditor').click(function() { var rows = jQuery('#consumerState-details tr'); var inputs = jQuery(rows).find('input'); @@ -111,7 +118,17 @@ // Loop over results jQuery.each(results.results, function (index, result) { // Generate html from template - var resultHtml = template(result); + var properties = { + partition: result.partition, + offset: result.offset, + key: result.key, + value: result.value, + timestamp: result.timestamp, + date: DateTools.displayTimestamp(result.timestamp), + timezone: DateTools.localTimezone, + showPrettyDates: DateTools.showPrettyDates + }; + var resultHtml = template(properties); // Append it to our table jQuery(table).append(resultHtml); @@ -186,6 +203,7 @@ Partition Offset + Timestamp Key Value @@ -233,6 +251,13 @@ {{partition}} {{offset}} + + {{#if showPrettyDates}} + {{date}} + {{else}} + {{timestamp}} + {{/if}} + {{key}} {{value}} @@ -339,14 +364,28 @@ - - Key - - - - Value - - + + Partitions + + + + Key + + + + Value + + + + Format Dates + + + +
diff --git a/src/main/resources/templates/configuration/view/create.html b/src/main/resources/templates/configuration/view/create.html index 1e885bb1..e8ca6f31 100644 --- a/src/main/resources/templates/configuration/view/create.html +++ b/src/main/resources/templates/configuration/view/create.html @@ -5,13 +5,72 @@ layout:decorate="~{layout}"> - Message Format + View
+ +
@@ -48,25 +107,7 @@ > + + + +
+
+
+
+
+ +
+ \ No newline at end of file diff --git a/src/main/resources/templates/layout.html b/src/main/resources/templates/layout.html index 47160e66..690d890b 100644 --- a/src/main/resources/templates/layout.html +++ b/src/main/resources/templates/layout.html @@ -110,8 +110,9 @@ - + @@ -132,53 +133,66 @@ + + + + + + +