From 2c9533727681805b01a9c28e1fc469dc6c40a814 Mon Sep 17 00:00:00 2001 From: Crim Date: Tue, 15 Oct 2019 23:05:51 +0900 Subject: [PATCH 01/10] Start refactoring backend consumer code to support multiple topics --- .../kafka/devcluster/DevCluster.java | 62 +++-- .../ui/controller/api/ApiController.java | 1 + .../ui/manager/kafka/KafkaOperations.java | 51 ++-- .../kafka/dto/ConsumerGroupOffsets.java | 120 ++++----- ...ConsumerGroupOffsetsWithTailPositions.java | 231 ++++++++++++------ .../kafka/dto/ConsumerGroupTopicOffsets.java | 118 +++++++++ .../ui/manager/kafka/KafkaOperationsTest.java | 158 ++++++++++-- 7 files changed, 529 insertions(+), 212 deletions(-) create mode 100644 kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsets.java diff --git a/dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/DevCluster.java b/dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/DevCluster.java index 830781fb..7d3b3282 100644 --- a/dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/DevCluster.java +++ b/dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/DevCluster.java @@ -47,7 +47,12 @@ import org.slf4j.LoggerFactory; import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; /** @@ -146,15 +151,11 @@ public static void main(final String[] args) throws Exception { if (topicNames != null && topicNames.length > 0) { final KafkaTestUtils testUtils = new KafkaTestUtils(kafkaTestCluster); if (cmd.hasOption("consumer")) { - for (final String topicName : topicNames) { - runEndlessConsumer(topicName, testUtils); - } + runEndlessConsumer(Arrays.asList(topicNames), testUtils); } if (cmd.hasOption("producer")) { - for (final String topicName : topicNames) { - runEndlessProducer(topicName, testUtils); - } + runEndlessProducer(Arrays.asList(topicNames), testUtils); } } @@ -237,46 +238,57 @@ private static CommandLine parseArguments(final String[] args) throws ParseExcep /** * Fire up a new thread running an endless producer script into the given topic and partitions. - * @param topicName Name of the topic to produce records into. + * @param topicNames Names of the topic to produce records into. * @param utils KafkaUtils instance. */ private static void runEndlessProducer( - final String topicName, + final Collection topicNames, final KafkaTestUtils utils ) { final Thread producerThread = new Thread(() -> { - // Determine how many partitions there are - final TopicDescription topicDescription = utils.describeTopic(topicName); + final Map topicDescriptions = new HashMap<>(); + + // Gather details about topics + for (final String topicName : topicNames) { + // Determine how many partitions there are + topicDescriptions.put(topicName, utils.describeTopic(topicName)); + } do { // Publish some data into that topic for each partition. - for (final TopicPartitionInfo partitionInfo : topicDescription.partitions()) { - utils.produceRecords(1000, topicName, partitionInfo.partition()); + for (final Map.Entry entry : topicDescriptions.entrySet()) { + final String topicName = entry.getKey(); + final TopicDescription topicDescription = entry.getValue(); + + for (final TopicPartitionInfo partitionInfo : topicDescription.partitions()) { + utils.produceRecords(1000, topicName, partitionInfo.partition()); + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + return; + } + } try { - Thread.sleep(1000L); + Thread.sleep(5000L); } catch (InterruptedException e) { return; } } - try { - Thread.sleep(5000L); - } catch (InterruptedException e) { - return; - } + } while (true); }); - logger.info("Starting endless producer for topic {}", topicName); - producerThread.setName("Endless producer for topic " + topicName); + logger.info("Starting endless producer for topic {}", topicNames); + producerThread.setName("Endless producer for topic " + topicNames); producerThread.start(); } /** * Fire up a new thread running an enless consumer script that reads from the given topic. - * @param topicName Topic to consume from. + * @param topicNames Topics to consume from. * @param utils KafkaUtils instance. */ - private static void runEndlessConsumer(final String topicName, final KafkaTestUtils utils) { + private static void runEndlessConsumer(final Collection topicNames, final KafkaTestUtils utils) { final Thread consumerThread = new Thread(() -> { // Start a consumer final Properties properties = new Properties(); @@ -286,7 +298,7 @@ private static void runEndlessConsumer(final String topicName, final KafkaTestUt try (final KafkaConsumer consumer = utils.getKafkaConsumer(StringDeserializer.class, StringDeserializer.class, properties)) { - consumer.subscribe(Collections.singleton(topicName)); + consumer.subscribe(topicNames); do { final ConsumerRecords records = consumer.poll(1000); consumer.commitSync(); @@ -305,8 +317,8 @@ private static void runEndlessConsumer(final String topicName, final KafkaTestUt } }); - logger.info("Starting endless consumer for topic {}", topicName); - consumerThread.setName("Endless consumer for topic " + topicName); + logger.info("Starting endless consumer for topic {}", topicNames); + consumerThread.setName("Endless consumer for topic " + topicNames); consumerThread.start(); } } diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java index 2327bbb3..750bef4a 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java @@ -45,6 +45,7 @@ import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupDetails; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupIdentifier; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsets; +import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupTopicOffsets; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsetsWithTailPositions; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerState; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.CreateTopic; diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperations.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperations.java index f219dc63..0123452f 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperations.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperations.java @@ -49,6 +49,7 @@ import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupDetails; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupIdentifier; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsets; +import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupTopicOffsets; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsetsWithTailPositions; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.CreateTopic; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeDetails; @@ -455,11 +456,14 @@ public List getConsumerGroupDetails(final List con */ public ConsumerGroupOffsets getConsumerGroupOffsets(final String consumerGroupId) { try { + // Create new builder + final ConsumerGroupOffsets.Builder builder = ConsumerGroupOffsets.newBuilder() + .withConsumerId(consumerGroupId); + // Make request final ListConsumerGroupOffsetsResult results = adminClient.listConsumerGroupOffsets(consumerGroupId); final List offsetList = new ArrayList<>(); - String topic = null; // Iterate over results final Map partitionsToOffsets = results @@ -467,15 +471,12 @@ public ConsumerGroupOffsets getConsumerGroupOffsets(final String consumerGroupId .get(); for (final Map.Entry entry : partitionsToOffsets.entrySet()) { - offsetList.add(new PartitionOffset( - entry.getKey().partition(), entry.getValue().offset() - )); - if (topic == null) { - topic = entry.getKey().topic(); - } + builder.withOffsets( + entry.getKey().topic(), entry.getKey().partition(), entry.getValue().offset() + ); } - return new ConsumerGroupOffsets(consumerGroupId, topic, offsetList); + return builder.build(); } catch (final InterruptedException | ExecutionException e) { throw new RuntimeException(e.getMessage(), e); } @@ -490,24 +491,30 @@ public ConsumerGroupOffsets getConsumerGroupOffsets(final String consumerGroupId */ public ConsumerGroupOffsetsWithTailPositions getConsumerGroupOffsetsWithTailOffsets(final String consumerGroupId) { final ConsumerGroupOffsets consumerGroupOffsets = getConsumerGroupOffsets(consumerGroupId); - final TailOffsets tailOffsets = getTailOffsets(consumerGroupOffsets.getTopic(), consumerGroupOffsets.getPartitions()); - final List offsetsWithPartitions = new ArrayList<>(); + // Create builder + final ConsumerGroupOffsetsWithTailPositions.Builder builder = ConsumerGroupOffsetsWithTailPositions.newBuilder() + .withConsumerId(consumerGroupId) + .withTimestamp(System.currentTimeMillis()); + + // Loop over each topic + for (final String topicName : consumerGroupOffsets.getTopicNames()) { + final ConsumerGroupTopicOffsets topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName); - for (final PartitionOffset entry : tailOffsets.getOffsets()) { - offsetsWithPartitions.add(new PartitionOffsetWithTailPosition( - entry.getPartition(), - consumerGroupOffsets.getOffsetForPartition(entry.getPartition()), - entry.getOffset() - )); + // Retrieve tail offsets + final TailOffsets tailOffsets = getTailOffsets(topicName, topicOffsets.getPartitions()); + + // Build offsets with partitions + for (final PartitionOffset entry : tailOffsets.getOffsets()) { + builder.withOffsetsForTopic(topicName, new PartitionOffsetWithTailPosition( + entry.getPartition(), + topicOffsets.getOffsetForPartition(entry.getPartition()), + entry.getOffset() + )); + } } - return new ConsumerGroupOffsetsWithTailPositions( - consumerGroupId, - consumerGroupOffsets.getTopic(), - offsetsWithPartitions, - System.currentTimeMillis() - ); + return builder.build(); } /** diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java index 56b4c09a..71425ba1 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java @@ -26,101 +26,79 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.TreeSet; +import java.util.stream.Collectors; /** - * Represents details about a consumer group offset positions. + * */ -public class ConsumerGroupOffsets { +public class ConsumerGroupOffsets +{ private final String consumerId; - private final String topic; - private final Map offsetMap; + private final Map topics; - /** - * Constructor. - * @param consumerId id of consumer group. - * @param topic name of the topic. - * @param offsets details about each partition and offset. - */ - public ConsumerGroupOffsets(final String consumerId, final String topic, final Collection offsets) { + public ConsumerGroupOffsets(final String consumerId, final Map topics) { this.consumerId = consumerId; - this.topic = topic; - - final Map offsetMap = new HashMap<>(); - for (final PartitionOffset offset : offsets) { - offsetMap.put( - offset.getPartition(), - offset - ); - } - this.offsetMap = Collections.unmodifiableMap(offsetMap); + this.topics = topics; } public String getConsumerId() { return consumerId; } - public String getTopic() { - return topic; + public Collection getTopicNames() { + // Get all topic names sorted. + return topics.keySet().stream() + .sorted() + .collect(Collectors.toList()); } - /** - * Marked private to keep from being serialized in responses. - */ - private Map getOffsetMap() { - return offsetMap; + public ConsumerGroupTopicOffsets getOffsetsForTopic(final String topicName) { + if (!topics.containsKey(topicName)) { + throw new IllegalArgumentException("No topic defined: " + topicName); + } + return topics.get(topicName); } - /** - * @return List of offsets. - */ - public List getOffsets() { - final List offsetList = new ArrayList<>(offsetMap.values()); - - // Sort by partition - offsetList.sort((o1, o2) -> Integer.valueOf(o1.getPartition()).compareTo(o2.getPartition())); - return Collections.unmodifiableList(offsetList); + public static Builder newBuilder() + { + return new Builder(); } - /** - * Get offset for the requested partition. - * @param partition id of partition. - * @return offset stored - * @throws RuntimeException if requested invalid partition. - */ - public long getOffsetForPartition(final int partition) { - final Optional offsetOptional = getOffsetMap() - .values() - .stream() - .filter((offset) -> offset.getPartition() == partition) - .findFirst(); - if (offsetOptional.isPresent()) { - return offsetOptional.get().getOffset(); + public static final class Builder { + private String consumerId; + private Map> topicOffsets = new HashMap<>(); + + private Builder() { } - throw new RuntimeException("Unable to find partition " + partition); - } - /** - * @return Set of all available partitions. - */ - public Set getPartitions() { - final TreeSet partitions = new TreeSet<>(offsetMap.keySet()); - return Collections.unmodifiableSet(partitions); - } + public Builder withConsumerId(String consumerId) { + this.consumerId = consumerId; + return this; + } + + public Builder withOffsets(String topic, int partition, long offset) { + if (!topicOffsets.containsKey(topic)) { + topicOffsets.put(topic, new ArrayList<>()); + } + topicOffsets.get(topic).add(new PartitionOffset( + partition, offset + )); + return this; + } + + public ConsumerGroupOffsets build() { + final Map topicOffsetsMap = new HashMap<>(); + for (final Map.Entry> entry : topicOffsets.entrySet()) { + topicOffsetsMap.put( + entry.getKey(), new ConsumerGroupTopicOffsets(entry.getKey(), entry.getValue()) + ); + } - @Override - public String toString() { - return "ConsumerGroupOffsets{" - + "consumerId='" + consumerId + '\'' - + ", topic='" + topic + '\'' - + ", offsetMap=" + offsetMap - + '}'; + return new ConsumerGroupOffsets(consumerId, topicOffsetsMap); + } } } diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java index c5b90515..3376ef5f 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java @@ -25,6 +25,7 @@ package org.sourcelab.kafka.webview.ui.manager.kafka.dto; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -32,6 +33,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.stream.Collectors; /** * Represents details about a consumer group offset positions, including the current tail offset positions for @@ -39,119 +41,194 @@ */ public class ConsumerGroupOffsetsWithTailPositions { private final String consumerId; - private final String topic; - private final Map offsetMap; + private final Map topicToOffsetMap; private final long timestamp; /** * Constructor. * @param consumerId id of consumer group. - * @param topic name of the topic. - * @param offsets details about each partition and offset. + * @param topicToOffsets details about each partition and offset mapped by topic. * @param timestamp Timestamp offsets were retrieved. */ public ConsumerGroupOffsetsWithTailPositions( final String consumerId, - final String topic, - final Iterable offsets, + final Map> topicToOffsets, final long timestamp) { this.consumerId = consumerId; - this.topic = topic; this.timestamp = timestamp; - final Map copiedMap = new HashMap<>(); - for (final PartitionOffsetWithTailPosition offset : offsets) { - copiedMap.put( - offset.getPartition(), - offset - ); + final Map tempMap = new HashMap<>(); + + for (final Map.Entry> entry : topicToOffsets.entrySet()) { + final String topic = entry.getKey(); + final Iterable offsets = entry.getValue(); + + final Map copiedMap = new HashMap<>(); + for (final PartitionOffsetWithTailPosition offset : offsets) { + copiedMap.put( + offset.getPartition(), + offset + ); + } + tempMap.put(topic, new ConsumerGroupTopicOffsetsWithTailPositions(topic, Collections.unmodifiableMap(copiedMap))); } - this.offsetMap = Collections.unmodifiableMap(copiedMap); + this.topicToOffsetMap = Collections.unmodifiableMap(tempMap); } - public String getConsumerId() { - return consumerId; + public static Builder newBuilder() { + return new Builder(); } - public String getTopic() { - return topic; + public String getConsumerId() { + return consumerId; } public long getTimestamp() { return timestamp; } - /** - * Marked private to keep from being serialized in responses. - */ - private Map getOffsetMap() { - return offsetMap; + public Collection getTopicNames() { + return topicToOffsetMap.keySet().stream() + .sorted() + .collect(Collectors.toList()); } - /** - * @return List of offsets. - */ - public List getOffsets() { - final List offsetList = new ArrayList<>(offsetMap.values()); - - // Sort by partition - offsetList.sort((o1, o2) -> Integer.valueOf(o1.getPartition()).compareTo(o2.getPartition())); - return Collections.unmodifiableList(offsetList); - } - - /** - * Get offset for the requested partition. - * @param partition id of partition. - * @return offset stored - * @throws RuntimeException if requested invalid partition. - */ - public long getOffsetForPartition(final int partition) { - final Optional offsetOptional = getOffsetMap() - .values() - .stream() - .filter((offset) -> offset.getPartition() == partition) - .findFirst(); - - if (offsetOptional.isPresent()) { - return offsetOptional.get().getOffset(); + public ConsumerGroupTopicOffsetsWithTailPositions getOffsetsForTopic(final String topic) { + if (!topicToOffsetMap.containsKey(topic)) { + throw new IllegalArgumentException("No topic exists: " + topic); } - throw new RuntimeException("Unable to find partition " + partition); + return topicToOffsetMap.get(topic); } - /** - * Get offset for the requested partition. - * @param partition id of partition. - * @return offset stored - * @throws RuntimeException if requested invalid partition. - */ - public long getTailOffsetForPartition(final int partition) { - final Optional offsetOptional = getOffsetMap() - .values() - .stream() - .filter((offset) -> offset.getPartition() == partition) - .findFirst(); - - if (offsetOptional.isPresent()) { - return offsetOptional.get().getTail(); - } - throw new RuntimeException("Unable to find partition " + partition); - } - - /** - * @return Set of all available partitions. - */ - public Set getPartitions() { - final TreeSet partitions = new TreeSet<>(offsetMap.keySet()); - return Collections.unmodifiableSet(partitions); + public Map getTopicToOffsetMap() { + return topicToOffsetMap; } @Override public String toString() { return "ConsumerGroupOffsetsWithTailPositions{" + "consumerId='" + consumerId + '\'' - + ", topic='" + topic + '\'' + ", timestamp='" + timestamp + '\'' - + ", offsetMap=" + offsetMap + + ", offsetMap=" + topicToOffsetMap + '}'; } + + /** + * Defines a Consumer's Offsets and tail positions for a single topic it is consuming from. + */ + public static class ConsumerGroupTopicOffsetsWithTailPositions { + private final String topic; + private final Map offsetMap; + + public ConsumerGroupTopicOffsetsWithTailPositions(final String topic, final Map offsetMap) { + this.topic = topic; + this.offsetMap = offsetMap; + } + + public String getTopic() { + return topic; + } + + /** + * Get offset for the requested partition. + * @param partition id of partition. + * @return offset stored + * @throws RuntimeException if requested invalid partition. + */ + public long getOffsetForPartition(final int partition) { + final Optional offsetOptional = getOffsetMap() + .values() + .stream() + .filter((offset) -> offset.getPartition() == partition) + .findFirst(); + + if (offsetOptional.isPresent()) { + return offsetOptional.get().getOffset(); + } + throw new RuntimeException("Unable to find partition " + partition); + } + + /** + * Get offset for the requested partition. + * @param partition id of partition. + * @return offset stored + * @throws RuntimeException if requested invalid partition. + */ + public long getTailOffsetForPartition(final int partition) { + final Optional offsetOptional = getOffsetMap() + .values() + .stream() + .filter((offset) -> offset.getPartition() == partition) + .findFirst(); + + if (offsetOptional.isPresent()) { + return offsetOptional.get().getTail(); + } + throw new RuntimeException("Unable to find partition " + partition); + } + + /** + * @return List of offsets. + */ + public List getOffsets() { + final List offsetList = new ArrayList<>(offsetMap.values()); + + // Sort by partition + offsetList.sort((o1, o2) -> Integer.valueOf(o1.getPartition()).compareTo(o2.getPartition())); + return Collections.unmodifiableList(offsetList); + } + + /** + * @return Set of all available partitions. + */ + public Set getPartitions() { + final TreeSet partitions = new TreeSet<>(offsetMap.keySet()); + return Collections.unmodifiableSet(partitions); + } + + /** + * Marked private to keep from being serialized in responses. + */ + private Map getOffsetMap() { + return offsetMap; + } + + } + + public static final class Builder { + private String consumerId; + private Map> topicToOffsetMap = new HashMap<>(); + private long timestamp; + + private Builder() { + } + + public Builder withConsumerId(String consumerId) { + this.consumerId = consumerId; + return this; + } + + public Builder withTimestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + public Builder withOffsetsForTopic(final String topic, PartitionOffsetWithTailPosition partitionOffsetWithTailPosition) { + if (!topicToOffsetMap.containsKey(topic)) { + topicToOffsetMap.put(topic, new ArrayList<>()); + } + topicToOffsetMap.get(topic).add(partitionOffsetWithTailPosition); + return this; + } + + + + public ConsumerGroupOffsetsWithTailPositions build() { + return new ConsumerGroupOffsetsWithTailPositions( + consumerId, + topicToOffsetMap, + timestamp + ); + } + } } diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsets.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsets.java new file mode 100644 index 00000000..16b7e5c4 --- /dev/null +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsets.java @@ -0,0 +1,118 @@ +/** + * MIT License + * + * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.sourcelab.kafka.webview.ui.manager.kafka.dto; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; + +/** + * Represents details about a consumer group offset positions for 1 or more topics the consumer is subscribed to. + */ +public class ConsumerGroupTopicOffsets { + private final String topic; + private final Map offsetMap; + + /** + * Constructor. Preferred to use Builder instance. + * @param topic name of the topic. + * @param offsets details about each partition and offset. + */ + public ConsumerGroupTopicOffsets(final String topic, final Collection offsets) { + this.topic = topic; + + final Map offsetMap = new HashMap<>(); + for (final PartitionOffset offset : offsets) { + offsetMap.put( + offset.getPartition(), + offset + ); + } + this.offsetMap = Collections.unmodifiableMap(offsetMap); + } + + public String getTopic() { + return topic; + } + + /** + * Marked private to keep from being serialized in responses. + */ + private Map getOffsetMap() { + return offsetMap; + } + + /** + * @return List of offsets. + */ + public List getOffsets() { + final List offsetList = new ArrayList<>(offsetMap.values()); + + // Sort by partition + offsetList.sort((o1, o2) -> Integer.valueOf(o1.getPartition()).compareTo(o2.getPartition())); + return Collections.unmodifiableList(offsetList); + } + + /** + * Get offset for the requested partition. + * @param partition id of partition. + * @return offset stored + * @throws RuntimeException if requested invalid partition. + */ + public long getOffsetForPartition(final int partition) { + final Optional offsetOptional = getOffsetMap() + .values() + .stream() + .filter((offset) -> offset.getPartition() == partition) + .findFirst(); + + if (offsetOptional.isPresent()) { + return offsetOptional.get().getOffset(); + } + throw new RuntimeException("Unable to find partition " + partition); + } + + /** + * @return Set of all available partitions. + */ + public Set getPartitions() { + final TreeSet partitions = new TreeSet<>(offsetMap.keySet()); + return Collections.unmodifiableSet(partitions); + } + + @Override + public String toString() { + return "ConsumerGroupOffsets{" + + ", topic='" + topic + '\'' + + ", offsetMap=" + offsetMap + + '}'; + } +} diff --git a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java index 31b003d9..e7d5214b 100644 --- a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java +++ b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java @@ -26,6 +26,7 @@ import com.salesforce.kafka.test.junit4.SharedKafkaTestResource; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; @@ -44,6 +45,7 @@ import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupDetails; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupIdentifier; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsets; +import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupTopicOffsets; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsetsWithTailPositions; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.CreateTopic; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeDetails; @@ -59,6 +61,8 @@ import org.sourcelab.kafka.webview.ui.manager.socket.StartingPosition; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -567,18 +571,23 @@ public void testGetConsumerGroupOffsets() { assertNotNull(consumerGroupOffsets); // Validate bits - assertEquals(topicName, consumerGroupOffsets.getTopic()); + assertTrue(consumerGroupOffsets.getTopicNames().contains(topicName)); + assertEquals(1, consumerGroupOffsets.getTopicNames().size()); + + ConsumerGroupTopicOffsets topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName); + + assertEquals(topicName, topicOffsets.getTopic()); assertEquals(finalConsumerId, consumerGroupOffsets.getConsumerId()); - assertEquals(2, consumerGroupOffsets.getOffsets().size()); - assertEquals(10, consumerGroupOffsets.getOffsetForPartition(0)); - assertEquals(10, consumerGroupOffsets.getOffsetForPartition(1)); + assertEquals(2, topicOffsets.getOffsets().size()); + assertEquals(10, topicOffsets.getOffsetForPartition(0)); + assertEquals(10, topicOffsets.getOffsetForPartition(1)); - final PartitionOffset offsetsPartition0 = consumerGroupOffsets.getOffsets().get(0); + final PartitionOffset offsetsPartition0 = topicOffsets.getOffsets().get(0); assertNotNull(offsetsPartition0); assertEquals(0, offsetsPartition0.getPartition()); assertEquals(10, offsetsPartition0.getOffset()); - final PartitionOffset offsetsPartition1 = consumerGroupOffsets.getOffsets().get(1); + final PartitionOffset offsetsPartition1 = topicOffsets.getOffsets().get(1); assertNotNull(offsetsPartition1); assertEquals(1, offsetsPartition1.getPartition()); assertEquals(10, offsetsPartition1.getOffset()); @@ -586,10 +595,11 @@ public void testGetConsumerGroupOffsets() { } /** - * Test getting details about a consumer with tail offset positions incuded. + * Test getting details about a consumer with tail offset positions included for a consumer + * consuming from a single topic. */ @Test - public void testGetConsumerGroupOffsetsWithTailPositions() { + public void testGetConsumerGroupOffsetsWithTailPositions_singleTopic() { // First need to create a topic. final String topicName = "AnotherTestTopic-" + System.currentTimeMillis(); @@ -621,19 +631,23 @@ public void testGetConsumerGroupOffsetsWithTailPositions() { assertNotNull(consumerGroupOffsets); // Validate bits - assertEquals(topicName, consumerGroupOffsets.getTopic()); + assertTrue(topicName, consumerGroupOffsets.getTopicNames().contains(topicName)); + assertEquals("Should have a single topic", 1, consumerGroupOffsets.getTopicNames().size()); + + final ConsumerGroupOffsetsWithTailPositions.ConsumerGroupTopicOffsetsWithTailPositions topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName); + assertEquals(finalConsumerId, consumerGroupOffsets.getConsumerId()); - assertEquals(2, consumerGroupOffsets.getOffsets().size()); - assertEquals(10, consumerGroupOffsets.getOffsetForPartition(0)); - assertEquals(10, consumerGroupOffsets.getOffsetForPartition(1)); + assertEquals(2, topicOffsets.getOffsets().size()); + assertEquals(10, topicOffsets.getOffsetForPartition(0)); + assertEquals(10, topicOffsets.getOffsetForPartition(1)); - final PartitionOffsetWithTailPosition offsetsPartition0 = consumerGroupOffsets.getOffsets().get(0); + final PartitionOffsetWithTailPosition offsetsPartition0 = topicOffsets.getOffsets().get(0); assertNotNull(offsetsPartition0); assertEquals(0, offsetsPartition0.getPartition()); assertEquals(10, offsetsPartition0.getOffset()); assertEquals(10, offsetsPartition0.getTail()); - final PartitionOffsetWithTailPosition offsetsPartition1 = consumerGroupOffsets.getOffsets().get(1); + final PartitionOffsetWithTailPosition offsetsPartition1 = topicOffsets.getOffsets().get(1); assertNotNull(offsetsPartition1); assertEquals(1, offsetsPartition1.getPartition()); assertEquals(10, offsetsPartition1.getOffset()); @@ -641,6 +655,106 @@ public void testGetConsumerGroupOffsetsWithTailPositions() { } } + /** + * Test getting details about a consumer with tail offset positions included for a consumer + * consuming from multiple topics. + */ + @Test + public void testGetConsumerGroupOffsetsWithTailPositions_multipleTopics() { + // First need to create a topic. + final String topicName1 = "AnotherTestTopic1-" + System.currentTimeMillis(); + final String topicName2 = "AnotherTestTopic2-" + System.currentTimeMillis(); + + final Collection topicNames = new ArrayList<>(); + topicNames.add(topicName1); + topicNames.add(topicName2); + + // Create topics + sharedKafkaTestResource + .getKafkaTestUtils() + .createTopic(topicName1, 2, (short) 1); + + sharedKafkaTestResource + .getKafkaTestUtils() + .createTopic(topicName2, 2, (short) 1); + + // Publish data into the topics + sharedKafkaTestResource + .getKafkaTestUtils() + .produceRecords(10, topicName1, 0); + sharedKafkaTestResource + .getKafkaTestUtils() + .produceRecords(10, topicName1, 1); + + // Publish data into the topics + sharedKafkaTestResource + .getKafkaTestUtils() + .produceRecords(20, topicName2, 0); + sharedKafkaTestResource + .getKafkaTestUtils() + .produceRecords(20, topicName2, 1); + + final String consumerId1 = "ConsumerA-" + System.currentTimeMillis(); + final String consumerPrefix = "TestConsumer"; + final String finalConsumerId = consumerPrefix + "-" + consumerId1; + + // Create consumer, consume from topic, keep alive. + try (final KafkaConsumer consumer = consumeFromTopics(topicNames, consumerId1, consumerPrefix)) { + + // Ask for list of offsets. + final ConsumerGroupOffsetsWithTailPositions consumerGroupOffsets + = kafkaOperations.getConsumerGroupOffsetsWithTailOffsets(finalConsumerId); + + // We should have one + assertNotNull(consumerGroupOffsets); + + // Validate bits + assertTrue(topicName1, consumerGroupOffsets.getTopicNames().contains(topicName1)); + assertTrue(topicName2, consumerGroupOffsets.getTopicNames().contains(topicName2)); + assertEquals("Should have two topics", 2, consumerGroupOffsets.getTopicNames().size()); + + // Validate topic 1 + ConsumerGroupOffsetsWithTailPositions.ConsumerGroupTopicOffsetsWithTailPositions topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName1); + + assertEquals(finalConsumerId, consumerGroupOffsets.getConsumerId()); + assertEquals(2, topicOffsets.getOffsets().size()); + assertEquals(10, topicOffsets.getOffsetForPartition(0)); + assertEquals(10, topicOffsets.getOffsetForPartition(1)); + + PartitionOffsetWithTailPosition offsetsPartition0 = topicOffsets.getOffsets().get(0); + assertNotNull(offsetsPartition0); + assertEquals(0, offsetsPartition0.getPartition()); + assertEquals(10, offsetsPartition0.getOffset()); + assertEquals(10, offsetsPartition0.getTail()); + + PartitionOffsetWithTailPosition offsetsPartition1 = topicOffsets.getOffsets().get(1); + assertNotNull(offsetsPartition1); + assertEquals(1, offsetsPartition1.getPartition()); + assertEquals(10, offsetsPartition1.getOffset()); + assertEquals(10, offsetsPartition1.getTail()); + + // Validate topic 2 + topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName2); + + assertEquals(finalConsumerId, consumerGroupOffsets.getConsumerId()); + assertEquals(2, topicOffsets.getOffsets().size()); + assertEquals(20, topicOffsets.getOffsetForPartition(0)); + assertEquals(20, topicOffsets.getOffsetForPartition(1)); + + offsetsPartition0 = topicOffsets.getOffsets().get(0); + assertNotNull(offsetsPartition0); + assertEquals(0, offsetsPartition0.getPartition()); + assertEquals(20, offsetsPartition0.getOffset()); + assertEquals(20, offsetsPartition0.getTail()); + + offsetsPartition1 = topicOffsets.getOffsets().get(1); + assertNotNull(offsetsPartition1); + assertEquals(1, offsetsPartition1.getPartition()); + assertEquals(20, offsetsPartition1.getOffset()); + assertEquals(20, offsetsPartition1.getTail()); + } + } + /** * Test getting tail offsets for a topic. */ @@ -718,6 +832,16 @@ private void validateNode(final NodeDetails node, final int expectedId, final St * @param consumerPrefix Any consumer Id prefix. */ private KafkaConsumer consumeFromTopic(final String topic, final String consumerId, final String consumerPrefix) { + return consumeFromTopics(Collections.singleton(topic), consumerId, consumerPrefix); + } + + /** + * Helper method to consumer records from a topic. + * @param topics topics to consume from. + * @param consumerId Consumer's consumerId + * @param consumerPrefix Any consumer Id prefix. + */ + private KafkaConsumer consumeFromTopics(final Collection topics, final String consumerId, final String consumerPrefix) { // Create cluster config. final ClusterConfig clusterConfig = ClusterConfig.newBuilder() .withBrokerHosts(sharedKafkaTestResource.getKafkaConnectString()) @@ -741,6 +865,7 @@ private KafkaConsumer consumeFromTopic(final String topic, final .build(); // Create Topic Config + final String topic = topics.iterator().next(); final org.sourcelab.kafka.webview.ui.manager.kafka.config.TopicConfig topicConfig = new org.sourcelab.kafka.webview.ui.manager.kafka.config.TopicConfig(clusterConfig, deserializerConfig, topic); // Create FilterConfig @@ -760,10 +885,9 @@ private KafkaConsumer consumeFromTopic(final String topic, final final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory(new KafkaClientConfigUtil("not/used", consumerPrefix)); final KafkaConsumer consumer = kafkaConsumerFactory.createConsumerAndSubscribe(clientConfig); - // "Subscribe" to topic. + // subscribe to all topics. consumer.unsubscribe(); - consumer.subscribe(Collections.singletonList(topicConfig.getTopicName())); - + consumer.subscribe(topics); // consume and commit offsets. // Wait for assignment to complete. From 48a894130f075ddd7484f6e735b388de446d7bfd Mon Sep 17 00:00:00 2001 From: Crim Date: Tue, 15 Oct 2019 23:07:00 +0900 Subject: [PATCH 02/10] Start refactoring backend consumer code to support multiple topics --- ...ConsumerGroupOffsetsWithTailPositions.java | 85 ------------- ...merGroupTopicOffsetsWithTailPositions.java | 118 ++++++++++++++++++ .../ui/manager/kafka/KafkaOperationsTest.java | 6 +- 3 files changed, 121 insertions(+), 88 deletions(-) create mode 100644 kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsetsWithTailPositions.java diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java index 3376ef5f..f81d9f6b 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java @@ -30,9 +30,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.TreeSet; import java.util.stream.Collectors; /** @@ -113,88 +110,6 @@ public String toString() { + '}'; } - /** - * Defines a Consumer's Offsets and tail positions for a single topic it is consuming from. - */ - public static class ConsumerGroupTopicOffsetsWithTailPositions { - private final String topic; - private final Map offsetMap; - - public ConsumerGroupTopicOffsetsWithTailPositions(final String topic, final Map offsetMap) { - this.topic = topic; - this.offsetMap = offsetMap; - } - - public String getTopic() { - return topic; - } - - /** - * Get offset for the requested partition. - * @param partition id of partition. - * @return offset stored - * @throws RuntimeException if requested invalid partition. - */ - public long getOffsetForPartition(final int partition) { - final Optional offsetOptional = getOffsetMap() - .values() - .stream() - .filter((offset) -> offset.getPartition() == partition) - .findFirst(); - - if (offsetOptional.isPresent()) { - return offsetOptional.get().getOffset(); - } - throw new RuntimeException("Unable to find partition " + partition); - } - - /** - * Get offset for the requested partition. - * @param partition id of partition. - * @return offset stored - * @throws RuntimeException if requested invalid partition. - */ - public long getTailOffsetForPartition(final int partition) { - final Optional offsetOptional = getOffsetMap() - .values() - .stream() - .filter((offset) -> offset.getPartition() == partition) - .findFirst(); - - if (offsetOptional.isPresent()) { - return offsetOptional.get().getTail(); - } - throw new RuntimeException("Unable to find partition " + partition); - } - - /** - * @return List of offsets. - */ - public List getOffsets() { - final List offsetList = new ArrayList<>(offsetMap.values()); - - // Sort by partition - offsetList.sort((o1, o2) -> Integer.valueOf(o1.getPartition()).compareTo(o2.getPartition())); - return Collections.unmodifiableList(offsetList); - } - - /** - * @return Set of all available partitions. - */ - public Set getPartitions() { - final TreeSet partitions = new TreeSet<>(offsetMap.keySet()); - return Collections.unmodifiableSet(partitions); - } - - /** - * Marked private to keep from being serialized in responses. - */ - private Map getOffsetMap() { - return offsetMap; - } - - } - public static final class Builder { private String consumerId; private Map> topicToOffsetMap = new HashMap<>(); diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsetsWithTailPositions.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsetsWithTailPositions.java new file mode 100644 index 00000000..c4ae0dca --- /dev/null +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsetsWithTailPositions.java @@ -0,0 +1,118 @@ +/** + * MIT License + * + * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.sourcelab.kafka.webview.ui.manager.kafka.dto; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; + +/** + * Defines a Consumer's Offsets and tail positions for a single topic it is consuming from. + */ +public class ConsumerGroupTopicOffsetsWithTailPositions { + private final String topic; + private final Map offsetMap; + + public ConsumerGroupTopicOffsetsWithTailPositions(final String topic, final Map offsetMap) { + this.topic = topic; + this.offsetMap = offsetMap; + } + + public String getTopic() { + return topic; + } + + /** + * Get offset for the requested partition. + * @param partition id of partition. + * @return offset stored + * @throws RuntimeException if requested invalid partition. + */ + public long getOffsetForPartition(final int partition) { + final Optional offsetOptional = getOffsetMap() + .values() + .stream() + .filter((offset) -> offset.getPartition() == partition) + .findFirst(); + + if (offsetOptional.isPresent()) { + return offsetOptional.get().getOffset(); + } + throw new RuntimeException("Unable to find partition " + partition); + } + + /** + * Get offset for the requested partition. + * @param partition id of partition. + * @return offset stored + * @throws RuntimeException if requested invalid partition. + */ + public long getTailOffsetForPartition(final int partition) { + final Optional offsetOptional = getOffsetMap() + .values() + .stream() + .filter((offset) -> offset.getPartition() == partition) + .findFirst(); + + if (offsetOptional.isPresent()) { + return offsetOptional.get().getTail(); + } + throw new RuntimeException("Unable to find partition " + partition); + } + + /** + * @return List of offsets. + */ + public List getOffsets() { + final List offsetList = new ArrayList<>(offsetMap.values()); + + // Sort by partition + offsetList.sort((o1, o2) -> Integer.valueOf(o1.getPartition()).compareTo(o2.getPartition())); + return Collections.unmodifiableList(offsetList); + } + + /** + * @return Set of all available partitions. + */ + public Set getPartitions() { + final TreeSet partitions = new TreeSet<>(offsetMap.keySet()); + return Collections.unmodifiableSet(partitions); + } + + /** + * Marked private to keep from being serialized in responses. + */ + private Map getOffsetMap() { + return offsetMap; + } + +} diff --git a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java index e7d5214b..7bbb6774 100644 --- a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java +++ b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java @@ -26,7 +26,6 @@ import com.salesforce.kafka.test.junit4.SharedKafkaTestResource; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; @@ -47,6 +46,7 @@ import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsets; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupTopicOffsets; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsetsWithTailPositions; +import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupTopicOffsetsWithTailPositions; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.CreateTopic; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeDetails; import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeList; @@ -634,7 +634,7 @@ public void testGetConsumerGroupOffsetsWithTailPositions_singleTopic() { assertTrue(topicName, consumerGroupOffsets.getTopicNames().contains(topicName)); assertEquals("Should have a single topic", 1, consumerGroupOffsets.getTopicNames().size()); - final ConsumerGroupOffsetsWithTailPositions.ConsumerGroupTopicOffsetsWithTailPositions topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName); + final ConsumerGroupTopicOffsetsWithTailPositions topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName); assertEquals(finalConsumerId, consumerGroupOffsets.getConsumerId()); assertEquals(2, topicOffsets.getOffsets().size()); @@ -714,7 +714,7 @@ public void testGetConsumerGroupOffsetsWithTailPositions_multipleTopics() { assertEquals("Should have two topics", 2, consumerGroupOffsets.getTopicNames().size()); // Validate topic 1 - ConsumerGroupOffsetsWithTailPositions.ConsumerGroupTopicOffsetsWithTailPositions topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName1); + ConsumerGroupTopicOffsetsWithTailPositions topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName1); assertEquals(finalConsumerId, consumerGroupOffsets.getConsumerId()); assertEquals(2, topicOffsets.getOffsets().size()); From 2ae8f064ebe7c299cb3cd6c0628545b8b943effb Mon Sep 17 00:00:00 2001 From: Crim Date: Tue, 15 Oct 2019 23:16:20 +0900 Subject: [PATCH 03/10] code cleanup --- .../kafka/dto/ConsumerGroupOffsets.java | 42 +++++++++++++++---- ...ConsumerGroupOffsetsWithTailPositions.java | 33 ++++++++++++--- 2 files changed, 61 insertions(+), 14 deletions(-) diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java index 71425ba1..b35af2e1 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java @@ -26,16 +26,16 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** - * + * Contains details about a consumer group's current offsets for one or more topics they are consuming from. */ -public class ConsumerGroupOffsets -{ +public class ConsumerGroupOffsets { private final String consumerId; private final Map topics; @@ -48,13 +48,23 @@ public String getConsumerId() { return consumerId; } + /** + * All topic names consuming from. + * @return Immutable list of all topic names being consumed. + */ public Collection getTopicNames() { // Get all topic names sorted. - return topics.keySet().stream() + return Collections.unmodifiableList(topics.keySet().stream() .sorted() - .collect(Collectors.toList()); + .collect(Collectors.toList()) + ); } + /** + * For a given topic, return the consumer group offsets for it. + * @param topicName name of the topic to retrieve offsets for. + * @return Offsets for the topic. + */ public ConsumerGroupTopicOffsets getOffsetsForTopic(final String topicName) { if (!topics.containsKey(topicName)) { throw new IllegalArgumentException("No topic defined: " + topicName); @@ -62,12 +72,17 @@ public ConsumerGroupTopicOffsets getOffsetsForTopic(final String topicName) { return topics.get(topicName); } - public static Builder newBuilder() - { + /** + * Builder instance. + * @return new builder instance. + */ + public static Builder newBuilder() { return new Builder(); } - + /** + * Builder instance. + */ public static final class Builder { private String consumerId; private Map> topicOffsets = new HashMap<>(); @@ -80,6 +95,13 @@ public Builder withConsumerId(String consumerId) { return this; } + /** + * Add offsets entry. + * @param topic name of topic. + * @param partition which partition on the topic. + * @param offset The current offset for the topic and partition. + * @return Builder instance. + */ public Builder withOffsets(String topic, int partition, long offset) { if (!topicOffsets.containsKey(topic)) { topicOffsets.put(topic, new ArrayList<>()); @@ -90,6 +112,10 @@ public Builder withOffsets(String topic, int partition, long offset) { return this; } + /** + * Build a ConsumerGroupOffsets instance. + * @return ConsumerGroupOffsets instance. + */ public ConsumerGroupOffsets build() { final Map topicOffsetsMap = new HashMap<>(); for (final Map.Entry> entry : topicOffsets.entrySet()) { diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java index f81d9f6b..a50a61bd 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java @@ -84,12 +84,22 @@ public long getTimestamp() { return timestamp; } + /** + * All topic names that the consumer is currently consuming/subscribed to. + * @return Immutable list of all topic names. + */ public Collection getTopicNames() { - return topicToOffsetMap.keySet().stream() + return Collections.unmodifiableList(topicToOffsetMap.keySet().stream() .sorted() - .collect(Collectors.toList()); + .collect(Collectors.toList()) + ); } + /** + * For a given topic, return its offset map. + * @param topic topic to return map for. + * @return Offsets and tail positions for the given topic. + */ public ConsumerGroupTopicOffsetsWithTailPositions getOffsetsForTopic(final String topic) { if (!topicToOffsetMap.containsKey(topic)) { throw new IllegalArgumentException("No topic exists: " + topic); @@ -110,6 +120,9 @@ public String toString() { + '}'; } + /** + * Builder instance. + */ public static final class Builder { private String consumerId; private Map> topicToOffsetMap = new HashMap<>(); @@ -118,7 +131,7 @@ public static final class Builder { private Builder() { } - public Builder withConsumerId(String consumerId) { + public Builder withConsumerId(final String consumerId) { this.consumerId = consumerId; return this; } @@ -128,7 +141,13 @@ public Builder withTimestamp(long timestamp) { return this; } - public Builder withOffsetsForTopic(final String topic, PartitionOffsetWithTailPosition partitionOffsetWithTailPosition) { + /** + * Add new tail position offsets for a given topic. + * @param topic topic to add tail position for. + * @param partitionOffsetWithTailPosition partition and tail offsets. + * @return Builder instance + */ + public Builder withOffsetsForTopic(final String topic, final PartitionOffsetWithTailPosition partitionOffsetWithTailPosition) { if (!topicToOffsetMap.containsKey(topic)) { topicToOffsetMap.put(topic, new ArrayList<>()); } @@ -136,8 +155,10 @@ public Builder withOffsetsForTopic(final String topic, PartitionOffsetWithTailPo return this; } - - + /** + * Build a new ConsumerGroupOffsetsWithTailPositions instance. + * @return new ConsumerGroupOffsetsWithTailPositions instance. + */ public ConsumerGroupOffsetsWithTailPositions build() { return new ConsumerGroupOffsetsWithTailPositions( consumerId, From d290789f19f38f2ab24bbfc24bc466b7c66cdd10 Mon Sep 17 00:00:00 2001 From: Crim Date: Sun, 27 Oct 2019 08:22:37 -0400 Subject: [PATCH 04/10] wip --- .../main/resources/templates/cluster/readConsumer.html | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka-webview-ui/src/main/resources/templates/cluster/readConsumer.html b/kafka-webview-ui/src/main/resources/templates/cluster/readConsumer.html index 364714b9..f17d666b 100644 --- a/kafka-webview-ui/src/main/resources/templates/cluster/readConsumer.html +++ b/kafka-webview-ui/src/main/resources/templates/cluster/readConsumer.html @@ -1,9 +1,9 @@ + xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout" + xmlns:sec="http://www.thymeleaf.org/thymeleaf-extras-springsecurity4" + xmlns:th="http://www.thymeleaf.org" + layout:decorate="~{layout}"> Cluster Explorer @@ -863,7 +863,7 @@ - +