diff --git a/CHANGELOG.md b/CHANGELOG.md
index c479b0e7..9ced09ac 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## 2.4.0 (07/02/2019)
#### New Features
- [PR-180](https://github.com/SourceLabOrg/kafka-webview/issues/180) Consumer Group page now shows average rate of consumption per partition.
+- [ISSUE-184](https://github.com/SourceLabOrg/kafka-webview/issues/184) Cluster Kafka Consumer View for multiple topics.
#### Bug Fixes
- [ISSUE-175](https://github.com/SourceLabOrg/kafka-webview/issues/175) Update multi-threaded consumers with unique consumerId [PR](https://github.com/SourceLabOrg/kafka-webview/pull/176).
diff --git a/dev-cluster/pom.xml b/dev-cluster/pom.xml
index 3c8d00b1..34d6628d 100644
--- a/dev-cluster/pom.xml
+++ b/dev-cluster/pom.xml
@@ -52,7 +52,7 @@
com.salesforce.kafka.test
kafka-junit-core
- 3.1.1
+ 3.1.2
diff --git a/dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/DevCluster.java b/dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/DevCluster.java
index 830781fb..7d3b3282 100644
--- a/dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/DevCluster.java
+++ b/dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/DevCluster.java
@@ -47,7 +47,12 @@
import org.slf4j.LoggerFactory;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
/**
@@ -146,15 +151,11 @@ public static void main(final String[] args) throws Exception {
if (topicNames != null && topicNames.length > 0) {
final KafkaTestUtils testUtils = new KafkaTestUtils(kafkaTestCluster);
if (cmd.hasOption("consumer")) {
- for (final String topicName : topicNames) {
- runEndlessConsumer(topicName, testUtils);
- }
+ runEndlessConsumer(Arrays.asList(topicNames), testUtils);
}
if (cmd.hasOption("producer")) {
- for (final String topicName : topicNames) {
- runEndlessProducer(topicName, testUtils);
- }
+ runEndlessProducer(Arrays.asList(topicNames), testUtils);
}
}
@@ -237,46 +238,57 @@ private static CommandLine parseArguments(final String[] args) throws ParseExcep
/**
* Fire up a new thread running an endless producer script into the given topic and partitions.
- * @param topicName Name of the topic to produce records into.
+ * @param topicNames Names of the topic to produce records into.
* @param utils KafkaUtils instance.
*/
private static void runEndlessProducer(
- final String topicName,
+ final Collection topicNames,
final KafkaTestUtils utils
) {
final Thread producerThread = new Thread(() -> {
- // Determine how many partitions there are
- final TopicDescription topicDescription = utils.describeTopic(topicName);
+ final Map topicDescriptions = new HashMap<>();
+
+ // Gather details about topics
+ for (final String topicName : topicNames) {
+ // Determine how many partitions there are
+ topicDescriptions.put(topicName, utils.describeTopic(topicName));
+ }
do {
// Publish some data into that topic for each partition.
- for (final TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
- utils.produceRecords(1000, topicName, partitionInfo.partition());
+ for (final Map.Entry entry : topicDescriptions.entrySet()) {
+ final String topicName = entry.getKey();
+ final TopicDescription topicDescription = entry.getValue();
+
+ for (final TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
+ utils.produceRecords(1000, topicName, partitionInfo.partition());
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
try {
- Thread.sleep(1000L);
+ Thread.sleep(5000L);
} catch (InterruptedException e) {
return;
}
}
- try {
- Thread.sleep(5000L);
- } catch (InterruptedException e) {
- return;
- }
+
} while (true);
});
- logger.info("Starting endless producer for topic {}", topicName);
- producerThread.setName("Endless producer for topic " + topicName);
+ logger.info("Starting endless producer for topic {}", topicNames);
+ producerThread.setName("Endless producer for topic " + topicNames);
producerThread.start();
}
/**
* Fire up a new thread running an enless consumer script that reads from the given topic.
- * @param topicName Topic to consume from.
+ * @param topicNames Topics to consume from.
* @param utils KafkaUtils instance.
*/
- private static void runEndlessConsumer(final String topicName, final KafkaTestUtils utils) {
+ private static void runEndlessConsumer(final Collection topicNames, final KafkaTestUtils utils) {
final Thread consumerThread = new Thread(() -> {
// Start a consumer
final Properties properties = new Properties();
@@ -286,7 +298,7 @@ private static void runEndlessConsumer(final String topicName, final KafkaTestUt
try (final KafkaConsumer consumer
= utils.getKafkaConsumer(StringDeserializer.class, StringDeserializer.class, properties)) {
- consumer.subscribe(Collections.singleton(topicName));
+ consumer.subscribe(topicNames);
do {
final ConsumerRecords records = consumer.poll(1000);
consumer.commitSync();
@@ -305,8 +317,8 @@ private static void runEndlessConsumer(final String topicName, final KafkaTestUt
}
});
- logger.info("Starting endless consumer for topic {}", topicName);
- consumerThread.setName("Endless consumer for topic " + topicName);
+ logger.info("Starting endless consumer for topic {}", topicNames);
+ consumerThread.setName("Endless consumer for topic " + topicNames);
consumerThread.start();
}
}
diff --git a/kafka-webview-ui/pom.xml b/kafka-webview-ui/pom.xml
index 0b86979c..0d0fa581 100644
--- a/kafka-webview-ui/pom.xml
+++ b/kafka-webview-ui/pom.xml
@@ -218,7 +218,7 @@
com.salesforce.kafka.test
kafka-junit4
- 3.1.1
+ 3.1.2
test
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java
index 2327bbb3..1c2f4335 100644
--- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java
@@ -554,7 +554,8 @@ public ConsumerGroupOffsets getConsumerOffsets(
final Cluster cluster = retrieveClusterById(id);
try (final KafkaOperations operations = createOperationsClient(cluster)) {
- return operations.getConsumerGroupOffsets(consumerGroupId);
+ final ConsumerGroupOffsets offsets = operations.getConsumerGroupOffsets(consumerGroupId);
+ return offsets;
} catch (final Exception exception) {
throw new ApiException("ClusterNodes", exception);
}
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperations.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperations.java
index f219dc63..b899d0ff 100644
--- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperations.java
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperations.java
@@ -50,6 +50,7 @@
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupIdentifier;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsets;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsetsWithTailPositions;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupTopicOffsets;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.CreateTopic;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeDetails;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeList;
@@ -455,27 +456,26 @@ public List getConsumerGroupDetails(final List con
*/
public ConsumerGroupOffsets getConsumerGroupOffsets(final String consumerGroupId) {
try {
+ // Create new builder
+ final ConsumerGroupOffsets.Builder builder = ConsumerGroupOffsets.newBuilder()
+ .withConsumerId(consumerGroupId);
+
// Make request
final ListConsumerGroupOffsetsResult results = adminClient.listConsumerGroupOffsets(consumerGroupId);
- final List offsetList = new ArrayList<>();
- String topic = null;
-
// Iterate over results
final Map partitionsToOffsets = results
.partitionsToOffsetAndMetadata()
.get();
+ // Loop over results
for (final Map.Entry entry : partitionsToOffsets.entrySet()) {
- offsetList.add(new PartitionOffset(
- entry.getKey().partition(), entry.getValue().offset()
- ));
- if (topic == null) {
- topic = entry.getKey().topic();
- }
+ builder.withOffset(
+ entry.getKey().topic(), entry.getKey().partition(), entry.getValue().offset()
+ );
}
- return new ConsumerGroupOffsets(consumerGroupId, topic, offsetList);
+ return builder.build();
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
@@ -490,24 +490,30 @@ public ConsumerGroupOffsets getConsumerGroupOffsets(final String consumerGroupId
*/
public ConsumerGroupOffsetsWithTailPositions getConsumerGroupOffsetsWithTailOffsets(final String consumerGroupId) {
final ConsumerGroupOffsets consumerGroupOffsets = getConsumerGroupOffsets(consumerGroupId);
- final TailOffsets tailOffsets = getTailOffsets(consumerGroupOffsets.getTopic(), consumerGroupOffsets.getPartitions());
- final List offsetsWithPartitions = new ArrayList<>();
+ // Create builder
+ final ConsumerGroupOffsetsWithTailPositions.Builder builder = ConsumerGroupOffsetsWithTailPositions.newBuilder()
+ .withConsumerId(consumerGroupId)
+ .withTimestamp(System.currentTimeMillis());
- for (final PartitionOffset entry : tailOffsets.getOffsets()) {
- offsetsWithPartitions.add(new PartitionOffsetWithTailPosition(
- entry.getPartition(),
- consumerGroupOffsets.getOffsetForPartition(entry.getPartition()),
- entry.getOffset()
- ));
+ // Loop over each topic
+ for (final String topicName : consumerGroupOffsets.getTopicNames()) {
+ final ConsumerGroupTopicOffsets topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName);
+
+ // Retrieve tail offsets
+ final TailOffsets tailOffsets = getTailOffsets(topicName, topicOffsets.getPartitions());
+
+ // Build offsets with partitions
+ for (final PartitionOffset entry : tailOffsets.getOffsets()) {
+ builder.withOffsetsForTopic(topicName, new PartitionOffsetWithTailPosition(
+ entry.getPartition(),
+ topicOffsets.getOffsetForPartition(entry.getPartition()),
+ entry.getOffset()
+ ));
+ }
}
- return new ConsumerGroupOffsetsWithTailPositions(
- consumerGroupId,
- consumerGroupOffsets.getTopic(),
- offsetsWithPartitions,
- System.currentTimeMillis()
- );
+ return builder.build();
}
/**
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java
index 56b4c09a..2b43fca2 100644
--- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsets.java
@@ -30,97 +30,109 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
/**
- * Represents details about a consumer group offset positions.
+ * Contains details about a consumer group's current offsets for one or more topics they are consuming from.
*/
public class ConsumerGroupOffsets {
private final String consumerId;
- private final String topic;
- private final Map offsetMap;
+ private final Map topics;
- /**
- * Constructor.
- * @param consumerId id of consumer group.
- * @param topic name of the topic.
- * @param offsets details about each partition and offset.
- */
- public ConsumerGroupOffsets(final String consumerId, final String topic, final Collection offsets) {
+ public ConsumerGroupOffsets(final String consumerId, final Map topics) {
this.consumerId = consumerId;
- this.topic = topic;
-
- final Map offsetMap = new HashMap<>();
- for (final PartitionOffset offset : offsets) {
- offsetMap.put(
- offset.getPartition(),
- offset
- );
- }
- this.offsetMap = Collections.unmodifiableMap(offsetMap);
+ this.topics = topics;
}
public String getConsumerId() {
return consumerId;
}
- public String getTopic() {
- return topic;
- }
-
/**
- * Marked private to keep from being serialized in responses.
+ * All topic names consuming from.
+ * @return Immutable list of all topic names being consumed.
*/
- private Map getOffsetMap() {
- return offsetMap;
+ public Collection getTopicNames() {
+ // Get all topic names sorted.
+ return Collections.unmodifiableList(topics.keySet().stream()
+ .sorted()
+ .collect(Collectors.toList())
+ );
}
/**
- * @return List of offsets.
+ * Return all offsets per topic as an immutable list.
+ * @return Immutable list of offsets for each topic and partition.
*/
- public List getOffsets() {
- final List offsetList = new ArrayList<>(offsetMap.values());
-
- // Sort by partition
- offsetList.sort((o1, o2) -> Integer.valueOf(o1.getPartition()).compareTo(o2.getPartition()));
- return Collections.unmodifiableList(offsetList);
+ public List getTopics() {
+ return Collections.unmodifiableList(new ArrayList<>(topics.values()));
}
/**
- * Get offset for the requested partition.
- * @param partition id of partition.
- * @return offset stored
- * @throws RuntimeException if requested invalid partition.
+ * For a given topic, return the consumer group offsets for it.
+ * @param topicName name of the topic to retrieve offsets for.
+ * @return Offsets for the topic.
*/
- public long getOffsetForPartition(final int partition) {
- final Optional offsetOptional = getOffsetMap()
- .values()
- .stream()
- .filter((offset) -> offset.getPartition() == partition)
- .findFirst();
-
- if (offsetOptional.isPresent()) {
- return offsetOptional.get().getOffset();
+ public ConsumerGroupTopicOffsets getOffsetsForTopic(final String topicName) {
+ if (!topics.containsKey(topicName)) {
+ throw new IllegalArgumentException("No topic defined: " + topicName);
}
- throw new RuntimeException("Unable to find partition " + partition);
+ return topics.get(topicName);
}
/**
- * @return Set of all available partitions.
+ * Builder instance.
+ * @return new builder instance.
*/
- public Set getPartitions() {
- final TreeSet partitions = new TreeSet<>(offsetMap.keySet());
- return Collections.unmodifiableSet(partitions);
+ public static Builder newBuilder() {
+ return new Builder();
}
- @Override
- public String toString() {
- return "ConsumerGroupOffsets{"
- + "consumerId='" + consumerId + '\''
- + ", topic='" + topic + '\''
- + ", offsetMap=" + offsetMap
- + '}';
+ /**
+ * Builder instance.
+ */
+ public static final class Builder {
+ private String consumerId;
+ private Map> topicOffsets = new HashMap<>();
+
+ private Builder() {
+ }
+
+ public Builder withConsumerId(final String consumerId) {
+ this.consumerId = consumerId;
+ return this;
+ }
+
+ /**
+ * Add offsets entry.
+ * @param topic name of topic.
+ * @param partition which partition on the topic.
+ * @param offset The current offset for the topic and partition.
+ * @return Builder instance.
+ */
+ public Builder withOffset(final String topic, final int partition, final long offset) {
+ if (!topicOffsets.containsKey(topic)) {
+ topicOffsets.put(topic, new ArrayList<>());
+ }
+ topicOffsets.get(topic).add(new PartitionOffset(
+ partition, offset
+ ));
+ return this;
+ }
+
+ /**
+ * Build a ConsumerGroupOffsets instance.
+ * @return ConsumerGroupOffsets instance.
+ */
+ public ConsumerGroupOffsets build() {
+ final Map topicOffsetsMap = new HashMap<>();
+ for (final Map.Entry> entry : topicOffsets.entrySet()) {
+ topicOffsetsMap.put(
+ entry.getKey(), new ConsumerGroupTopicOffsets(entry.getKey(), entry.getValue())
+ );
+ }
+
+ return new ConsumerGroupOffsets(consumerId, topicOffsetsMap);
+ }
}
}
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java
index c5b90515..a50a61bd 100644
--- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsWithTailPositions.java
@@ -25,13 +25,12 @@
package org.sourcelab.kafka.webview.ui.manager.kafka.dto;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
/**
* Represents details about a consumer group offset positions, including the current tail offset positions for
@@ -39,42 +38,46 @@
*/
public class ConsumerGroupOffsetsWithTailPositions {
private final String consumerId;
- private final String topic;
- private final Map offsetMap;
+ private final Map topicToOffsetMap;
private final long timestamp;
/**
* Constructor.
* @param consumerId id of consumer group.
- * @param topic name of the topic.
- * @param offsets details about each partition and offset.
+ * @param topicToOffsets details about each partition and offset mapped by topic.
* @param timestamp Timestamp offsets were retrieved.
*/
public ConsumerGroupOffsetsWithTailPositions(
final String consumerId,
- final String topic,
- final Iterable offsets,
+ final Map> topicToOffsets,
final long timestamp) {
this.consumerId = consumerId;
- this.topic = topic;
this.timestamp = timestamp;
- final Map copiedMap = new HashMap<>();
- for (final PartitionOffsetWithTailPosition offset : offsets) {
- copiedMap.put(
- offset.getPartition(),
- offset
- );
+ final Map tempMap = new HashMap<>();
+
+ for (final Map.Entry> entry : topicToOffsets.entrySet()) {
+ final String topic = entry.getKey();
+ final Iterable offsets = entry.getValue();
+
+ final Map copiedMap = new HashMap<>();
+ for (final PartitionOffsetWithTailPosition offset : offsets) {
+ copiedMap.put(
+ offset.getPartition(),
+ offset
+ );
+ }
+ tempMap.put(topic, new ConsumerGroupTopicOffsetsWithTailPositions(topic, Collections.unmodifiableMap(copiedMap)));
}
- this.offsetMap = Collections.unmodifiableMap(copiedMap);
+ this.topicToOffsetMap = Collections.unmodifiableMap(tempMap);
}
- public String getConsumerId() {
- return consumerId;
+ public static Builder newBuilder() {
+ return new Builder();
}
- public String getTopic() {
- return topic;
+ public String getConsumerId() {
+ return consumerId;
}
public long getTimestamp() {
@@ -82,76 +85,86 @@ public long getTimestamp() {
}
/**
- * Marked private to keep from being serialized in responses.
+ * All topic names that the consumer is currently consuming/subscribed to.
+ * @return Immutable list of all topic names.
*/
- private Map getOffsetMap() {
- return offsetMap;
- }
-
- /**
- * @return List of offsets.
- */
- public List getOffsets() {
- final List offsetList = new ArrayList<>(offsetMap.values());
-
- // Sort by partition
- offsetList.sort((o1, o2) -> Integer.valueOf(o1.getPartition()).compareTo(o2.getPartition()));
- return Collections.unmodifiableList(offsetList);
- }
-
- /**
- * Get offset for the requested partition.
- * @param partition id of partition.
- * @return offset stored
- * @throws RuntimeException if requested invalid partition.
- */
- public long getOffsetForPartition(final int partition) {
- final Optional offsetOptional = getOffsetMap()
- .values()
- .stream()
- .filter((offset) -> offset.getPartition() == partition)
- .findFirst();
-
- if (offsetOptional.isPresent()) {
- return offsetOptional.get().getOffset();
- }
- throw new RuntimeException("Unable to find partition " + partition);
+ public Collection getTopicNames() {
+ return Collections.unmodifiableList(topicToOffsetMap.keySet().stream()
+ .sorted()
+ .collect(Collectors.toList())
+ );
}
/**
- * Get offset for the requested partition.
- * @param partition id of partition.
- * @return offset stored
- * @throws RuntimeException if requested invalid partition.
+ * For a given topic, return its offset map.
+ * @param topic topic to return map for.
+ * @return Offsets and tail positions for the given topic.
*/
- public long getTailOffsetForPartition(final int partition) {
- final Optional offsetOptional = getOffsetMap()
- .values()
- .stream()
- .filter((offset) -> offset.getPartition() == partition)
- .findFirst();
-
- if (offsetOptional.isPresent()) {
- return offsetOptional.get().getTail();
+ public ConsumerGroupTopicOffsetsWithTailPositions getOffsetsForTopic(final String topic) {
+ if (!topicToOffsetMap.containsKey(topic)) {
+ throw new IllegalArgumentException("No topic exists: " + topic);
}
- throw new RuntimeException("Unable to find partition " + partition);
+ return topicToOffsetMap.get(topic);
}
- /**
- * @return Set of all available partitions.
- */
- public Set getPartitions() {
- final TreeSet partitions = new TreeSet<>(offsetMap.keySet());
- return Collections.unmodifiableSet(partitions);
+ public Map getTopicToOffsetMap() {
+ return topicToOffsetMap;
}
@Override
public String toString() {
return "ConsumerGroupOffsetsWithTailPositions{"
+ "consumerId='" + consumerId + '\''
- + ", topic='" + topic + '\''
+ ", timestamp='" + timestamp + '\''
- + ", offsetMap=" + offsetMap
+ + ", offsetMap=" + topicToOffsetMap
+ '}';
}
+
+ /**
+ * Builder instance.
+ */
+ public static final class Builder {
+ private String consumerId;
+ private Map> topicToOffsetMap = new HashMap<>();
+ private long timestamp;
+
+ private Builder() {
+ }
+
+ public Builder withConsumerId(final String consumerId) {
+ this.consumerId = consumerId;
+ return this;
+ }
+
+ public Builder withTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ /**
+ * Add new tail position offsets for a given topic.
+ * @param topic topic to add tail position for.
+ * @param partitionOffsetWithTailPosition partition and tail offsets.
+ * @return Builder instance
+ */
+ public Builder withOffsetsForTopic(final String topic, final PartitionOffsetWithTailPosition partitionOffsetWithTailPosition) {
+ if (!topicToOffsetMap.containsKey(topic)) {
+ topicToOffsetMap.put(topic, new ArrayList<>());
+ }
+ topicToOffsetMap.get(topic).add(partitionOffsetWithTailPosition);
+ return this;
+ }
+
+ /**
+ * Build a new ConsumerGroupOffsetsWithTailPositions instance.
+ * @return new ConsumerGroupOffsetsWithTailPositions instance.
+ */
+ public ConsumerGroupOffsetsWithTailPositions build() {
+ return new ConsumerGroupOffsetsWithTailPositions(
+ consumerId,
+ topicToOffsetMap,
+ timestamp
+ );
+ }
+ }
}
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsets.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsets.java
new file mode 100644
index 00000000..16b7e5c4
--- /dev/null
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsets.java
@@ -0,0 +1,118 @@
+/**
+ * MIT License
+ *
+ * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/)
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.sourcelab.kafka.webview.ui.manager.kafka.dto;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Represents details about a consumer group offset positions for 1 or more topics the consumer is subscribed to.
+ */
+public class ConsumerGroupTopicOffsets {
+ private final String topic;
+ private final Map offsetMap;
+
+ /**
+ * Constructor. Preferred to use Builder instance.
+ * @param topic name of the topic.
+ * @param offsets details about each partition and offset.
+ */
+ public ConsumerGroupTopicOffsets(final String topic, final Collection offsets) {
+ this.topic = topic;
+
+ final Map offsetMap = new HashMap<>();
+ for (final PartitionOffset offset : offsets) {
+ offsetMap.put(
+ offset.getPartition(),
+ offset
+ );
+ }
+ this.offsetMap = Collections.unmodifiableMap(offsetMap);
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * Marked private to keep from being serialized in responses.
+ */
+ private Map getOffsetMap() {
+ return offsetMap;
+ }
+
+ /**
+ * @return List of offsets.
+ */
+ public List getOffsets() {
+ final List offsetList = new ArrayList<>(offsetMap.values());
+
+ // Sort by partition
+ offsetList.sort((o1, o2) -> Integer.valueOf(o1.getPartition()).compareTo(o2.getPartition()));
+ return Collections.unmodifiableList(offsetList);
+ }
+
+ /**
+ * Get offset for the requested partition.
+ * @param partition id of partition.
+ * @return offset stored
+ * @throws RuntimeException if requested invalid partition.
+ */
+ public long getOffsetForPartition(final int partition) {
+ final Optional offsetOptional = getOffsetMap()
+ .values()
+ .stream()
+ .filter((offset) -> offset.getPartition() == partition)
+ .findFirst();
+
+ if (offsetOptional.isPresent()) {
+ return offsetOptional.get().getOffset();
+ }
+ throw new RuntimeException("Unable to find partition " + partition);
+ }
+
+ /**
+ * @return Set of all available partitions.
+ */
+ public Set getPartitions() {
+ final TreeSet partitions = new TreeSet<>(offsetMap.keySet());
+ return Collections.unmodifiableSet(partitions);
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumerGroupOffsets{"
+ + ", topic='" + topic + '\''
+ + ", offsetMap=" + offsetMap
+ + '}';
+ }
+}
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsetsWithTailPositions.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsetsWithTailPositions.java
new file mode 100644
index 00000000..fb122825
--- /dev/null
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupTopicOffsetsWithTailPositions.java
@@ -0,0 +1,115 @@
+/**
+ * MIT License
+ *
+ * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/)
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.sourcelab.kafka.webview.ui.manager.kafka.dto;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Defines a Consumer's Offsets and tail positions for a single topic it is consuming from.
+ */
+public class ConsumerGroupTopicOffsetsWithTailPositions {
+ private final String topic;
+ private final Map offsetMap;
+
+ public ConsumerGroupTopicOffsetsWithTailPositions(final String topic, final Map offsetMap) {
+ this.topic = topic;
+ this.offsetMap = offsetMap;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * Get offset for the requested partition.
+ * @param partition id of partition.
+ * @return offset stored
+ * @throws RuntimeException if requested invalid partition.
+ */
+ public long getOffsetForPartition(final int partition) {
+ final Optional offsetOptional = getOffsetMap()
+ .values()
+ .stream()
+ .filter((offset) -> offset.getPartition() == partition)
+ .findFirst();
+
+ if (offsetOptional.isPresent()) {
+ return offsetOptional.get().getOffset();
+ }
+ throw new RuntimeException("Unable to find partition " + partition);
+ }
+
+ /**
+ * Get offset for the requested partition.
+ * @param partition id of partition.
+ * @return offset stored
+ * @throws RuntimeException if requested invalid partition.
+ */
+ public long getTailOffsetForPartition(final int partition) {
+ final Optional offsetOptional = getOffsetMap()
+ .values()
+ .stream()
+ .filter((offset) -> offset.getPartition() == partition)
+ .findFirst();
+
+ if (offsetOptional.isPresent()) {
+ return offsetOptional.get().getTail();
+ }
+ throw new RuntimeException("Unable to find partition " + partition);
+ }
+
+ /**
+ * @return List of offsets.
+ */
+ public List getOffsets() {
+ final List offsetList = new ArrayList<>(offsetMap.values());
+
+ // Sort by partition
+ offsetList.sort((o1, o2) -> Integer.valueOf(o1.getPartition()).compareTo(o2.getPartition()));
+ return Collections.unmodifiableList(offsetList);
+ }
+
+ /**
+ * @return Set of all available partitions.
+ */
+ public Set getPartitions() {
+ final TreeSet partitions = new TreeSet<>(offsetMap.keySet());
+ return Collections.unmodifiableSet(partitions);
+ }
+
+ /**
+ * Marked private to keep from being serialized in responses.
+ */
+ private Map getOffsetMap() {
+ return offsetMap;
+ }
+
+}
diff --git a/kafka-webview-ui/src/main/resources/templates/cluster/readConsumer.html b/kafka-webview-ui/src/main/resources/templates/cluster/readConsumer.html
index 364714b9..2b0b6a84 100644
--- a/kafka-webview-ui/src/main/resources/templates/cluster/readConsumer.html
+++ b/kafka-webview-ui/src/main/resources/templates/cluster/readConsumer.html
@@ -1,9 +1,9 @@
+ xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout"
+ xmlns:sec="http://www.thymeleaf.org/thymeleaf-extras-springsecurity4"
+ xmlns:th="http://www.thymeleaf.org"
+ layout:decorate="~{layout}">
Cluster Explorer
@@ -84,10 +84,24 @@
});
});
+ // Generate unique list of topics.
var uniqueTopics = topics.filter(function (value, index, self) {
return self.indexOf(value) === index;
});
+ // Populate topic selector only once.
+ if (jQuery('#topicSelector option').length === 0) {
+ var selectorSource = jQuery('#consumer-topic-select-template').html();
+ var selectorTemplate = Handlebars.compile(selectorSource);
+ jQuery.each(uniqueTopics, function (index, topic) {
+ var properties = {
+ topic: topic
+ };
+ var resultHtml = selectorTemplate(properties);
+ jQuery('#topicSelector').append(resultHtml);
+ });
+ }
+
// Generate html from template
var properties = {
clusterName: ConsumerInfo.clusterName,
@@ -168,8 +182,46 @@
ApiClient.getConsumerOffsetsWithTailPositions(ConsumerInfo.clusterId, ConsumerInfo.groupId, ConsumerInfo.handleConsumerOffsets);
},
+ resetGraph : function(graphInstance, topicName) {
+ // Clear labels
+ graphInstance.graphData.labels = [];
+
+ // Clear data points
+ jQuery.each(graphInstance.graphData.datasets, function(index, dataSet) {
+ dataSet.data = [];
+ });
+
+ // Reset title
+ graphInstance.graphOptions.title.text = graphInstance.graphOptions.title.srcText.replace('{{topic}}', topicName);
+ },
+
// Handle ConsumerOffsets Results
handleConsumerOffsets: function(results) {
+ // Determine which topic we're watching
+ console.log("Refreshing for topic: " + jQuery('#topicSelector').val());
+ var currentTopic = jQuery('#topicSelector').val();
+ if (ConsumerInfo.lastTopicSelected === null || ConsumerInfo.lastTopicSelected !== currentTopic) {
+ console.log("Resetting Graphs");
+ // Reset all data/graphs.
+ ConsumerInfo.lastOffsetData = null;
+ ConsumerInfo.rateData = [];
+
+ // For each graph
+ ConsumerInfo.resetGraph(ConsumerPositionsGraph, currentTopic);
+ ConsumerInfo.resetGraph(ConsumerRateGraph, currentTopic);
+ ConsumerInfo.resetGraph(TailRateGraph, currentTopic);
+ ConsumerInfo.resetGraph(LagGraph, currentTopic);
+ }
+ ConsumerInfo.lastTopicSelected = currentTopic;
+
+ // Get results for specific topic
+ var topicResults = results.topicToOffsetMap[currentTopic];
+
+ // Gracefully handle when a consumer group is not consuming ANY topics.
+ if (topicResults === undefined || topicResults === null) {
+ topicResults = {offsets: {}};
+ }
+
// Get consumer offsets table
var table = jQuery('#consumer-offsets-tbody');
@@ -192,7 +244,7 @@
totalConsumerLag = 0;
// Process each partition
- jQuery.each(results.offsets, function(index, offsetData) {
+ jQuery.each(topicResults.offsets, function(index, offsetData) {
// Calculate differences
var differences = ConsumerInfo.findOffsetDifference(
offsetData.partition,
@@ -247,7 +299,7 @@
jQuery('td#consumer-total-rate').text(totalConsumerRate.toFixed(2) + '/sec');
// Retain previous
- ConsumerInfo.lastOffsetData = results.offsets;
+ ConsumerInfo.lastOffsetData = topicResults.offsets;
// Hide loaders
jQuery('#consumer-offsets-loader').toggle(false);
@@ -257,7 +309,7 @@
ConsumerInfo.refreshOffsetsTimer = setTimeout(ConsumerInfo.refreshConsumerOffsets, ConsumerInfo.refreshRateConsumerOffsets);
// Push update to graphs
- ConsumerInfo.updateConsumerPositionsGraph(results.offsets);
+ ConsumerInfo.updateConsumerPositionsGraph(topicResults.offsets);
ConsumerInfo.updateRateGraph(ConsumerRateGraph, "ConsumerRateChart", rateData, "consumerRatePerSec")
ConsumerInfo.updateRateGraph(TailRateGraph, "TailRateChart", rateData, "tailRatePerSec")
ConsumerInfo.updateRateGraph(LagGraph, "LagChart", lagData, "lag")
@@ -321,7 +373,7 @@
},
// Calculate the rate of change (msgs per second).
- findRate(partition, timestamp, consumerDifference, tailDifference) {
+ findRate: function (partition, timestamp, consumerDifference, tailDifference) {
// If we poll once per 10 seconds, then we would average over 3 polls, or ~ 30 secs.
var numberOfEntriesToAverage = 3;
@@ -531,8 +583,9 @@
responsive: true,
maintainAspectRatio: false,
title: {
- display: true,
- text: "Consumer Lag (Percent Consumed)"
+ display: true,
+ text: "Consumer Lag (Percent Consumed)",
+ srcText: "Consumer Lag (Percent Consumed) on topic {{topic}}"
},
animation: {
duration: 0
@@ -600,7 +653,8 @@
maintainAspectRatio: false,
title: {
display: true,
- text: "Consumer Rate (Msgs/Sec)"
+ text: "Consumer Rate (Msgs/Sec)",
+ srcText: "Consumer Rate (Msgs/Sec) on topic {{topic}}"
},
scales: {
xAxes: [{
@@ -642,7 +696,8 @@
maintainAspectRatio: false,
title: {
display: true,
- text: "Producer Rate (Msgs/Sec)"
+ text: "Producer Rate (Msgs/Sec)",
+ srcText: "Producer Rate (Msgs/Sec) on topic {{topic}}"
},
scales: {
xAxes: [{
@@ -684,7 +739,8 @@
maintainAspectRatio: false,
title: {
display: true,
- text: "Lag Per Partition (messages)"
+ text: "Lag Per Partition (messages)",
+ srcText: "Lag Per Partition (messages) on topic {{topic}}"
},
scales: {
xAxes: [{
@@ -811,7 +867,11 @@
diff --git a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/controller/api/ApiControllerTest.java b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/controller/api/ApiControllerTest.java
index 265bfe61..2a7dd8ac 100644
--- a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/controller/api/ApiControllerTest.java
+++ b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/controller/api/ApiControllerTest.java
@@ -47,8 +47,11 @@
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -260,7 +263,7 @@ public void test_listConsumers() throws Exception {
);
// Create a consumer with state on the cluster.
- final String consumerId = createConsumerWithState();
+ final String consumerId = createConsumerWithState(new String[] {"TestTopic-" + System.currentTimeMillis()});
// Hit end point
mockMvc
@@ -289,7 +292,7 @@ public void test_removeConsumer_withAdminRole() throws Exception {
);
// Create a consumer with state on the cluster.
- final String consumerId = createConsumerWithState();
+ final String consumerId = createConsumerWithState(new String[] {"TestTopic-" + System.currentTimeMillis()});
// Construct payload
final String payload = "{ \"consumerId\": \"" + consumerId + "\", \"clusterId\": \"" + cluster.getId() + "\"}";
@@ -337,7 +340,7 @@ public void test_removeConsumer_withNonAdminRole() throws Exception {
);
// Create a consumer with state on the cluster.
- final String consumerId = createConsumerWithState();
+ final String consumerId = createConsumerWithState(new String[] {"TestTopic-" + System.currentTimeMillis()});
// Construct payload
final String payload = "{ \"consumerId\": \"" + consumerId + "\", \"clusterId\": \"" + cluster.getId() + "\"}";
@@ -382,7 +385,10 @@ public void test_listConsumersAndDetails() throws Exception {
);
// Create a consumer with state on the cluster.
- final String consumerId = createConsumerWithState();
+ final String consumerId = createConsumerWithState(new String[] {
+ "TestTopic-A" + System.currentTimeMillis(),
+ "TestTopic-B" + System.currentTimeMillis()
+ });
// Hit end point
mockMvc
@@ -419,7 +425,7 @@ public void test_specificConsumerDetails() throws Exception {
);
// Create a consumer with state on the cluster.
- final String consumerId = createConsumerWithState();
+ final String consumerId = createConsumerWithState(new String[] {"TestTopic-" + System.currentTimeMillis()});
// Hit end point
mockMvc
@@ -456,7 +462,9 @@ public void test_specificConsumerOffsets() throws Exception {
);
// Create a consumer with state on the cluster.
- final String consumerId = createConsumerWithState();
+ final String topicNameA = "TestTopicA-" + System.currentTimeMillis();
+ final String topicNameB = "TestTopicB-" + System.currentTimeMillis();
+ final String consumerId = createConsumerWithState(new String[] {topicNameA, topicNameB});
// Hit end point
mockMvc
@@ -469,13 +477,16 @@ public void test_specificConsumerOffsets() throws Exception {
.andExpect(status().isOk())
// Should have content similar to:
- // {"consumerId":"test-consumer-id-1543909610144","topic":"TestTopic-1543909610145","offsets":[{"partition":0,"offset":10}],"partitions":[0]}
+ // {"consumerId":"MyConsumerId","topics":[{"topic":"topic-a","partitions":[0,1],"offsets":[{"partition":0,"offset":0},{"partition":1,"offset":1}]},{"topic":"topic-b","partitions":[0,1],"offsets":[{"partition":0,"offset":2},{"partition":1,"offset":3}]}],"topicNames":["topic-a","topic-b"]}
// Validate results seem right.
.andExpect(content().string(containsString("\"consumerId\":\"" + consumerId )))
- .andExpect(content().string(containsString("\"topic\":\"TestTopic-")))
+ .andExpect(content().string(containsString("\"topic\":\"" + topicNameA + "\"")))
+ .andExpect(content().string(containsString("\"topic\":\"" + topicNameB + "\"")))
+ .andExpect(content().string(containsString("\"topicNames\":[\"" + topicNameA + "\",\"" + topicNameB + "\"]")))
.andExpect(content().string(containsString("\"offsets\":[{\"partition\":0,\"offset\":10}]")))
.andExpect(content().string(containsString("\"partitions\":[0]")));
+
}
/**
@@ -491,7 +502,9 @@ public void test_specificConsumerOffsetsWithTailOffsets() throws Exception {
);
// Create a consumer with state on the cluster.
- final String consumerId = createConsumerWithState();
+ final String topicNameA = "TestTopicA-" + System.currentTimeMillis();
+ final String topicNameB = "TestTopicB-" + System.currentTimeMillis();
+ final String consumerId = createConsumerWithState(new String[] {topicNameA, topicNameB});
// Hit end point
mockMvc
@@ -504,13 +517,15 @@ public void test_specificConsumerOffsetsWithTailOffsets() throws Exception {
.andExpect(status().isOk())
// Should have content similar to:
- // {"consumerId":"test-consumer-id-1544775318028","topic":"TestTopic-1544775318028","offsets":[{"partition":0,"offset":10,"tail":10}],"partitions":[0]}
+ // {"consumerId":"test-consumer-id-1573204049314","topicToOffsetMap":{"TestTopic-1573204049314":{"topic":"TestTopic-1573204049314","offsets":[{"partition":0,"offset":10,"tail":10}],"partitions":[0]}},"timestamp":1573204054752,"topicNames":["TestTopic-1573204049314"]}
// Validate results seem right.
.andExpect(content().string(containsString("\"consumerId\":\"" + consumerId )))
- .andExpect(content().string(containsString("\"topic\":\"TestTopic-")))
+ .andExpect(content().string(containsString("\"topic\":\"" + topicNameA + "\"")))
+ .andExpect(content().string(containsString("\"topic\":\"" + topicNameB + "\"")))
.andExpect(content().string(containsString("\"offsets\":[{\"partition\":0,\"offset\":10,\"tail\":10}]")))
- .andExpect(content().string(containsString("\"partitions\":[0]")));
+ .andExpect(content().string(containsString("\"partitions\":[0]")))
+ .andExpect(content().string(containsString("\"topicNames\":[\"" + topicNameA + "\",\"" + topicNameB + "\"]")));
}
/**
@@ -610,20 +625,21 @@ public void test_listTopics_withSearchString() throws Exception {
* Helper method to create a consumer with state on the given cluster.
* @return Consumer group id created.
*/
- private String createConsumerWithState() {
+ private String createConsumerWithState(final String[] topics) {
final int totalRecords = 10;
final String consumerId = "test-consumer-id-" + System.currentTimeMillis();
// Define our new topic name
- final String newTopic = "TestTopic-" + System.currentTimeMillis();
- sharedKafkaTestResource
- .getKafkaTestUtils()
- .createTopic(newTopic, 1, (short)1);
-
- // Publish records into topic
- sharedKafkaTestResource
- .getKafkaTestUtils()
- .produceRecords(totalRecords, newTopic, 0);
+ for (final String topic : topics) {
+ sharedKafkaTestResource
+ .getKafkaTestUtils()
+ .createTopic(topic, 1, (short) 1);
+
+ // Publish records into topic
+ sharedKafkaTestResource
+ .getKafkaTestUtils()
+ .produceRecords(totalRecords, topic, 0);
+ }
// Create a consumer and consume from the records, maintaining state.
final Properties consumerProperties = new Properties();
@@ -635,7 +651,7 @@ private String createConsumerWithState() {
.getKafkaConsumer(StringDeserializer.class, StringDeserializer.class, consumerProperties)) {
// Consume
- consumer.subscribe(Collections.singleton(newTopic));
+ consumer.subscribe(Arrays.asList(topics));
consumer.poll(Duration.ofSeconds(5));
// Save state.
diff --git a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java
index 31b003d9..1278f613 100644
--- a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java
+++ b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperationsTest.java
@@ -45,6 +45,8 @@
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupIdentifier;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsets;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsetsWithTailPositions;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupTopicOffsets;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupTopicOffsetsWithTailPositions;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.CreateTopic;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeDetails;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeList;
@@ -59,6 +61,8 @@
import org.sourcelab.kafka.webview.ui.manager.socket.StartingPosition;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -567,18 +571,23 @@ public void testGetConsumerGroupOffsets() {
assertNotNull(consumerGroupOffsets);
// Validate bits
- assertEquals(topicName, consumerGroupOffsets.getTopic());
+ assertTrue(consumerGroupOffsets.getTopicNames().contains(topicName));
+ assertEquals(1, consumerGroupOffsets.getTopicNames().size());
+
+ ConsumerGroupTopicOffsets topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName);
+
+ assertEquals(topicName, topicOffsets.getTopic());
assertEquals(finalConsumerId, consumerGroupOffsets.getConsumerId());
- assertEquals(2, consumerGroupOffsets.getOffsets().size());
- assertEquals(10, consumerGroupOffsets.getOffsetForPartition(0));
- assertEquals(10, consumerGroupOffsets.getOffsetForPartition(1));
+ assertEquals(2, topicOffsets.getOffsets().size());
+ assertEquals(10, topicOffsets.getOffsetForPartition(0));
+ assertEquals(10, topicOffsets.getOffsetForPartition(1));
- final PartitionOffset offsetsPartition0 = consumerGroupOffsets.getOffsets().get(0);
+ final PartitionOffset offsetsPartition0 = topicOffsets.getOffsets().get(0);
assertNotNull(offsetsPartition0);
assertEquals(0, offsetsPartition0.getPartition());
assertEquals(10, offsetsPartition0.getOffset());
- final PartitionOffset offsetsPartition1 = consumerGroupOffsets.getOffsets().get(1);
+ final PartitionOffset offsetsPartition1 = topicOffsets.getOffsets().get(1);
assertNotNull(offsetsPartition1);
assertEquals(1, offsetsPartition1.getPartition());
assertEquals(10, offsetsPartition1.getOffset());
@@ -586,10 +595,11 @@ public void testGetConsumerGroupOffsets() {
}
/**
- * Test getting details about a consumer with tail offset positions incuded.
+ * Test getting details about a consumer with tail offset positions included for a consumer
+ * consuming from a single topic.
*/
@Test
- public void testGetConsumerGroupOffsetsWithTailPositions() {
+ public void testGetConsumerGroupOffsetsWithTailPositions_singleTopic() {
// First need to create a topic.
final String topicName = "AnotherTestTopic-" + System.currentTimeMillis();
@@ -621,19 +631,23 @@ public void testGetConsumerGroupOffsetsWithTailPositions() {
assertNotNull(consumerGroupOffsets);
// Validate bits
- assertEquals(topicName, consumerGroupOffsets.getTopic());
+ assertTrue(topicName, consumerGroupOffsets.getTopicNames().contains(topicName));
+ assertEquals("Should have a single topic", 1, consumerGroupOffsets.getTopicNames().size());
+
+ final ConsumerGroupTopicOffsetsWithTailPositions topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName);
+
assertEquals(finalConsumerId, consumerGroupOffsets.getConsumerId());
- assertEquals(2, consumerGroupOffsets.getOffsets().size());
- assertEquals(10, consumerGroupOffsets.getOffsetForPartition(0));
- assertEquals(10, consumerGroupOffsets.getOffsetForPartition(1));
+ assertEquals(2, topicOffsets.getOffsets().size());
+ assertEquals(10, topicOffsets.getOffsetForPartition(0));
+ assertEquals(10, topicOffsets.getOffsetForPartition(1));
- final PartitionOffsetWithTailPosition offsetsPartition0 = consumerGroupOffsets.getOffsets().get(0);
+ final PartitionOffsetWithTailPosition offsetsPartition0 = topicOffsets.getOffsets().get(0);
assertNotNull(offsetsPartition0);
assertEquals(0, offsetsPartition0.getPartition());
assertEquals(10, offsetsPartition0.getOffset());
assertEquals(10, offsetsPartition0.getTail());
- final PartitionOffsetWithTailPosition offsetsPartition1 = consumerGroupOffsets.getOffsets().get(1);
+ final PartitionOffsetWithTailPosition offsetsPartition1 = topicOffsets.getOffsets().get(1);
assertNotNull(offsetsPartition1);
assertEquals(1, offsetsPartition1.getPartition());
assertEquals(10, offsetsPartition1.getOffset());
@@ -641,6 +655,106 @@ public void testGetConsumerGroupOffsetsWithTailPositions() {
}
}
+ /**
+ * Test getting details about a consumer with tail offset positions included for a consumer
+ * consuming from multiple topics.
+ */
+ @Test
+ public void testGetConsumerGroupOffsetsWithTailPositions_multipleTopics() {
+ // First need to create a topic.
+ final String topicName1 = "AnotherTestTopic1-" + System.currentTimeMillis();
+ final String topicName2 = "AnotherTestTopic2-" + System.currentTimeMillis();
+
+ final Collection topicNames = new ArrayList<>();
+ topicNames.add(topicName1);
+ topicNames.add(topicName2);
+
+ // Create topics
+ sharedKafkaTestResource
+ .getKafkaTestUtils()
+ .createTopic(topicName1, 2, (short) 1);
+
+ sharedKafkaTestResource
+ .getKafkaTestUtils()
+ .createTopic(topicName2, 2, (short) 1);
+
+ // Publish data into the topics
+ sharedKafkaTestResource
+ .getKafkaTestUtils()
+ .produceRecords(10, topicName1, 0);
+ sharedKafkaTestResource
+ .getKafkaTestUtils()
+ .produceRecords(10, topicName1, 1);
+
+ // Publish data into the topics
+ sharedKafkaTestResource
+ .getKafkaTestUtils()
+ .produceRecords(20, topicName2, 0);
+ sharedKafkaTestResource
+ .getKafkaTestUtils()
+ .produceRecords(20, topicName2, 1);
+
+ final String consumerId1 = "ConsumerA-" + System.currentTimeMillis();
+ final String consumerPrefix = "TestConsumer";
+ final String finalConsumerId = consumerPrefix + "-" + consumerId1;
+
+ // Create consumer, consume from topic, keep alive.
+ try (final KafkaConsumer consumer = consumeFromTopics(topicNames, consumerId1, consumerPrefix)) {
+
+ // Ask for list of offsets.
+ final ConsumerGroupOffsetsWithTailPositions consumerGroupOffsets
+ = kafkaOperations.getConsumerGroupOffsetsWithTailOffsets(finalConsumerId);
+
+ // We should have one
+ assertNotNull(consumerGroupOffsets);
+
+ // Validate bits
+ assertTrue(topicName1, consumerGroupOffsets.getTopicNames().contains(topicName1));
+ assertTrue(topicName2, consumerGroupOffsets.getTopicNames().contains(topicName2));
+ assertEquals("Should have two topics", 2, consumerGroupOffsets.getTopicNames().size());
+
+ // Validate topic 1
+ ConsumerGroupTopicOffsetsWithTailPositions topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName1);
+
+ assertEquals(finalConsumerId, consumerGroupOffsets.getConsumerId());
+ assertEquals(2, topicOffsets.getOffsets().size());
+ assertEquals(10, topicOffsets.getOffsetForPartition(0));
+ assertEquals(10, topicOffsets.getOffsetForPartition(1));
+
+ PartitionOffsetWithTailPosition offsetsPartition0 = topicOffsets.getOffsets().get(0);
+ assertNotNull(offsetsPartition0);
+ assertEquals(0, offsetsPartition0.getPartition());
+ assertEquals(10, offsetsPartition0.getOffset());
+ assertEquals(10, offsetsPartition0.getTail());
+
+ PartitionOffsetWithTailPosition offsetsPartition1 = topicOffsets.getOffsets().get(1);
+ assertNotNull(offsetsPartition1);
+ assertEquals(1, offsetsPartition1.getPartition());
+ assertEquals(10, offsetsPartition1.getOffset());
+ assertEquals(10, offsetsPartition1.getTail());
+
+ // Validate topic 2
+ topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName2);
+
+ assertEquals(finalConsumerId, consumerGroupOffsets.getConsumerId());
+ assertEquals(2, topicOffsets.getOffsets().size());
+ assertEquals(20, topicOffsets.getOffsetForPartition(0));
+ assertEquals(20, topicOffsets.getOffsetForPartition(1));
+
+ offsetsPartition0 = topicOffsets.getOffsets().get(0);
+ assertNotNull(offsetsPartition0);
+ assertEquals(0, offsetsPartition0.getPartition());
+ assertEquals(20, offsetsPartition0.getOffset());
+ assertEquals(20, offsetsPartition0.getTail());
+
+ offsetsPartition1 = topicOffsets.getOffsets().get(1);
+ assertNotNull(offsetsPartition1);
+ assertEquals(1, offsetsPartition1.getPartition());
+ assertEquals(20, offsetsPartition1.getOffset());
+ assertEquals(20, offsetsPartition1.getTail());
+ }
+ }
+
/**
* Test getting tail offsets for a topic.
*/
@@ -718,6 +832,16 @@ private void validateNode(final NodeDetails node, final int expectedId, final St
* @param consumerPrefix Any consumer Id prefix.
*/
private KafkaConsumer consumeFromTopic(final String topic, final String consumerId, final String consumerPrefix) {
+ return consumeFromTopics(Collections.singleton(topic), consumerId, consumerPrefix);
+ }
+
+ /**
+ * Helper method to consumer records from a topic.
+ * @param topics topics to consume from.
+ * @param consumerId Consumer's consumerId
+ * @param consumerPrefix Any consumer Id prefix.
+ */
+ private KafkaConsumer consumeFromTopics(final Collection topics, final String consumerId, final String consumerPrefix) {
// Create cluster config.
final ClusterConfig clusterConfig = ClusterConfig.newBuilder()
.withBrokerHosts(sharedKafkaTestResource.getKafkaConnectString())
@@ -741,6 +865,7 @@ private KafkaConsumer consumeFromTopic(final String topic, final
.build();
// Create Topic Config
+ final String topic = topics.iterator().next();
final org.sourcelab.kafka.webview.ui.manager.kafka.config.TopicConfig topicConfig = new org.sourcelab.kafka.webview.ui.manager.kafka.config.TopicConfig(clusterConfig, deserializerConfig, topic);
// Create FilterConfig
@@ -760,10 +885,9 @@ private KafkaConsumer consumeFromTopic(final String topic, final
final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory(new KafkaClientConfigUtil("not/used", consumerPrefix));
final KafkaConsumer consumer = kafkaConsumerFactory.createConsumerAndSubscribe(clientConfig);
- // "Subscribe" to topic.
+ // subscribe to all topics.
consumer.unsubscribe();
- consumer.subscribe(Collections.singletonList(topicConfig.getTopicName()));
-
+ consumer.subscribe(topics);
// consume and commit offsets.
// Wait for assignment to complete.
diff --git a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsTest.java b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsTest.java
new file mode 100644
index 00000000..018f0920
--- /dev/null
+++ b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupOffsetsTest.java
@@ -0,0 +1,118 @@
+/**
+ * MIT License
+ *
+ * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/)
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+
+package org.sourcelab.kafka.webview.ui.manager.kafka.dto;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Simple test cases over how ConsumerGroupOffsets gets serialized.
+ */
+@SpringBootTest
+@RunWith(SpringRunner.class)
+public class ConsumerGroupOffsetsTest {
+
+ /**
+ * Reference to springboot's configured object mapper.
+ */
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ /**
+ * Validates the builder constructs an appropriate object.
+ */
+ @Test
+ public void testBuilder() {
+ final ConsumerGroupOffsets.Builder builder = ConsumerGroupOffsets.newBuilder()
+ .withConsumerId("MyConsumerId")
+ .withOffset("topic-a", 0, 0)
+ .withOffset("topic-a", 1, 1)
+ .withOffset("topic-b", 0, 2)
+ .withOffset("topic-b", 1, 3);
+
+ final ConsumerGroupOffsets offsets = builder.build();
+
+ // Validate constructed properly.
+ assertEquals("Should have proper identifier", "MyConsumerId", offsets.getConsumerId());
+ assertTrue("Should have topic a", offsets.getTopicNames().contains("topic-a"));
+ assertTrue("Should have topic b", offsets.getTopicNames().contains("topic-b"));
+ assertEquals("Should have only 2 topics", 2, offsets.getTopicNames().size());
+
+ // Validate topic A
+ final ConsumerGroupTopicOffsets topicAOffsets = offsets.getOffsetsForTopic("topic-a");
+ assertEquals("Has correct topic name", "topic-a", topicAOffsets.getTopic());
+ assertTrue("has partition 0", topicAOffsets.getPartitions().contains(0));
+ assertTrue("has partition 1", topicAOffsets.getPartitions().contains(1));
+ assertEquals("Only has 2 partitions", 2, topicAOffsets.getPartitions().size());
+ assertEquals("Only has 2 partitions", 2, topicAOffsets.getOffsets().size());
+ assertEquals("Has correct offset for partition 0", 0, topicAOffsets.getOffsetForPartition(0));
+ assertEquals("Has correct offset for partition 1", 1, topicAOffsets.getOffsetForPartition(1));
+
+ // Validate topic B
+ final ConsumerGroupTopicOffsets topicBOffsets = offsets.getOffsetsForTopic("topic-b");
+ assertEquals("Has correct topic name", "topic-b", topicBOffsets.getTopic());
+ assertTrue("has partition 0", topicBOffsets.getPartitions().contains(0));
+ assertTrue("has partition 1", topicBOffsets.getPartitions().contains(1));
+ assertEquals("Only has 2 partitions", 2, topicBOffsets.getPartitions().size());
+ assertEquals("Only has 2 partitions", 2, topicBOffsets.getOffsets().size());
+ assertEquals("Has correct offset for partition 0", 2, topicBOffsets.getOffsetForPartition(0));
+ assertEquals("Has correct offset for partition 1", 3, topicBOffsets.getOffsetForPartition(1));
+ }
+
+ /**
+ * Validates the object serializes to json correctly.
+ */
+ @Test
+ public void testSerialization() throws JsonProcessingException {
+ // Define our expected output.
+ final String expectedResult = "{\"consumerId\":\"MyConsumerId\",\"topics\":[{\"topic\":\"topic-a\",\"partitions\":[0,1],\"offsets\":[{\"partition\":0,\"offset\":0},{\"partition\":1,\"offset\":1}]},{\"topic\":\"topic-b\",\"partitions\":[0,1],\"offsets\":[{\"partition\":0,\"offset\":2},{\"partition\":1,\"offset\":3}]}],\"topicNames\":[\"topic-a\",\"topic-b\"]}";
+
+ final ConsumerGroupOffsets offsets = ConsumerGroupOffsets.newBuilder()
+ .withConsumerId("MyConsumerId")
+ .withOffset("topic-a", 0, 0)
+ .withOffset("topic-a", 1, 1)
+ .withOffset("topic-b", 0, 2)
+ .withOffset("topic-b", 1, 3)
+ .build();
+
+ // Now attempt to serialize
+ assertTrue("Should be able to serialize", objectMapper.canSerialize(ConsumerGroupOffsets.class));
+
+ // Attempt to serialize
+ final String result = objectMapper.writeValueAsString(offsets);
+
+ // Validate
+ assertEquals("Should have expected serialized value", expectedResult, result);
+ }
+}
\ No newline at end of file
diff --git a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/KafkaResultTest.java b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/KafkaResultTest.java
index 15132375..170aaa45 100644
--- a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/KafkaResultTest.java
+++ b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/KafkaResultTest.java
@@ -47,7 +47,6 @@ public class KafkaResultTest {
@Autowired
private ObjectMapper objectMapper;
-
/**
* Verify that we serialize using jackson appropriately using a 'known'/string value.
*/