Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## 2.4.0 (07/02/2019)
#### New Features
- [PR-180](https://github.com/SourceLabOrg/kafka-webview/issues/180) Consumer Group page now shows average rate of consumption per partition.
- [ISSUE-184](https://github.com/SourceLabOrg/kafka-webview/issues/184) Cluster Kafka Consumer View for multiple topics.

#### Bug Fixes
- [ISSUE-175](https://github.com/SourceLabOrg/kafka-webview/issues/175) Update multi-threaded consumers with unique consumerId [PR](https://github.com/SourceLabOrg/kafka-webview/pull/176).
Expand Down
2 changes: 1 addition & 1 deletion dev-cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<dependency>
<groupId>com.salesforce.kafka.test</groupId>
<artifactId>kafka-junit-core</artifactId>
<version>3.1.1</version>
<version>3.1.2</version>
</dependency>

<!-- Include Kafka 1.1.x -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
Expand Down Expand Up @@ -146,15 +151,11 @@ public static void main(final String[] args) throws Exception {
if (topicNames != null && topicNames.length > 0) {
final KafkaTestUtils testUtils = new KafkaTestUtils(kafkaTestCluster);
if (cmd.hasOption("consumer")) {
for (final String topicName : topicNames) {
runEndlessConsumer(topicName, testUtils);
}
runEndlessConsumer(Arrays.asList(topicNames), testUtils);
}

if (cmd.hasOption("producer")) {
for (final String topicName : topicNames) {
runEndlessProducer(topicName, testUtils);
}
runEndlessProducer(Arrays.asList(topicNames), testUtils);
}
}

Expand Down Expand Up @@ -237,46 +238,57 @@ private static CommandLine parseArguments(final String[] args) throws ParseExcep

/**
* Fire up a new thread running an endless producer script into the given topic and partitions.
* @param topicName Name of the topic to produce records into.
* @param topicNames Names of the topic to produce records into.
* @param utils KafkaUtils instance.
*/
private static void runEndlessProducer(
final String topicName,
final Collection<String> topicNames,
final KafkaTestUtils utils
) {
final Thread producerThread = new Thread(() -> {
// Determine how many partitions there are
final TopicDescription topicDescription = utils.describeTopic(topicName);
final Map<String, TopicDescription> topicDescriptions = new HashMap<>();

// Gather details about topics
for (final String topicName : topicNames) {
// Determine how many partitions there are
topicDescriptions.put(topicName, utils.describeTopic(topicName));
}

do {
// Publish some data into that topic for each partition.
for (final TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
utils.produceRecords(1000, topicName, partitionInfo.partition());
for (final Map.Entry<String, TopicDescription> entry : topicDescriptions.entrySet()) {
final String topicName = entry.getKey();
final TopicDescription topicDescription = entry.getValue();

for (final TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
utils.produceRecords(1000, topicName, partitionInfo.partition());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
return;
}
}
try {
Thread.sleep(1000L);
Thread.sleep(5000L);
} catch (InterruptedException e) {
return;
}
}
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
return;
}

} while (true);
});

logger.info("Starting endless producer for topic {}", topicName);
producerThread.setName("Endless producer for topic " + topicName);
logger.info("Starting endless producer for topic {}", topicNames);
producerThread.setName("Endless producer for topic " + topicNames);
producerThread.start();
}

/**
* Fire up a new thread running an enless consumer script that reads from the given topic.
* @param topicName Topic to consume from.
* @param topicNames Topics to consume from.
* @param utils KafkaUtils instance.
*/
private static void runEndlessConsumer(final String topicName, final KafkaTestUtils utils) {
private static void runEndlessConsumer(final Collection<String> topicNames, final KafkaTestUtils utils) {
final Thread consumerThread = new Thread(() -> {
// Start a consumer
final Properties properties = new Properties();
Expand All @@ -286,7 +298,7 @@ private static void runEndlessConsumer(final String topicName, final KafkaTestUt
try (final KafkaConsumer<String, String> consumer
= utils.getKafkaConsumer(StringDeserializer.class, StringDeserializer.class, properties)) {

consumer.subscribe(Collections.singleton(topicName));
consumer.subscribe(topicNames);
do {
final ConsumerRecords<String, String> records = consumer.poll(1000);
consumer.commitSync();
Expand All @@ -305,8 +317,8 @@ private static void runEndlessConsumer(final String topicName, final KafkaTestUt
}
});

logger.info("Starting endless consumer for topic {}", topicName);
consumerThread.setName("Endless consumer for topic " + topicName);
logger.info("Starting endless consumer for topic {}", topicNames);
consumerThread.setName("Endless consumer for topic " + topicNames);
consumerThread.start();
}
}
2 changes: 1 addition & 1 deletion kafka-webview-ui/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@
<dependency>
<groupId>com.salesforce.kafka.test</groupId>
<artifactId>kafka-junit4</artifactId>
<version>3.1.1</version>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ public ConsumerGroupOffsets getConsumerOffsets(
final Cluster cluster = retrieveClusterById(id);

try (final KafkaOperations operations = createOperationsClient(cluster)) {
return operations.getConsumerGroupOffsets(consumerGroupId);
final ConsumerGroupOffsets offsets = operations.getConsumerGroupOffsets(consumerGroupId);
return offsets;
} catch (final Exception exception) {
throw new ApiException("ClusterNodes", exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupIdentifier;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsets;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsetsWithTailPositions;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupTopicOffsets;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.CreateTopic;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeDetails;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeList;
Expand Down Expand Up @@ -455,27 +456,26 @@ public List<ConsumerGroupDetails> getConsumerGroupDetails(final List<String> con
*/
public ConsumerGroupOffsets getConsumerGroupOffsets(final String consumerGroupId) {
try {
// Create new builder
final ConsumerGroupOffsets.Builder builder = ConsumerGroupOffsets.newBuilder()
.withConsumerId(consumerGroupId);

// Make request
final ListConsumerGroupOffsetsResult results = adminClient.listConsumerGroupOffsets(consumerGroupId);

final List<PartitionOffset> offsetList = new ArrayList<>();
String topic = null;

// Iterate over results
final Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> partitionsToOffsets = results
.partitionsToOffsetAndMetadata()
.get();

// Loop over results
for (final Map.Entry<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> entry : partitionsToOffsets.entrySet()) {
offsetList.add(new PartitionOffset(
entry.getKey().partition(), entry.getValue().offset()
));
if (topic == null) {
topic = entry.getKey().topic();
}
builder.withOffset(
entry.getKey().topic(), entry.getKey().partition(), entry.getValue().offset()
);
}

return new ConsumerGroupOffsets(consumerGroupId, topic, offsetList);
return builder.build();
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
Expand All @@ -490,24 +490,30 @@ public ConsumerGroupOffsets getConsumerGroupOffsets(final String consumerGroupId
*/
public ConsumerGroupOffsetsWithTailPositions getConsumerGroupOffsetsWithTailOffsets(final String consumerGroupId) {
final ConsumerGroupOffsets consumerGroupOffsets = getConsumerGroupOffsets(consumerGroupId);
final TailOffsets tailOffsets = getTailOffsets(consumerGroupOffsets.getTopic(), consumerGroupOffsets.getPartitions());

final List<PartitionOffsetWithTailPosition> offsetsWithPartitions = new ArrayList<>();
// Create builder
final ConsumerGroupOffsetsWithTailPositions.Builder builder = ConsumerGroupOffsetsWithTailPositions.newBuilder()
.withConsumerId(consumerGroupId)
.withTimestamp(System.currentTimeMillis());

for (final PartitionOffset entry : tailOffsets.getOffsets()) {
offsetsWithPartitions.add(new PartitionOffsetWithTailPosition(
entry.getPartition(),
consumerGroupOffsets.getOffsetForPartition(entry.getPartition()),
entry.getOffset()
));
// Loop over each topic
for (final String topicName : consumerGroupOffsets.getTopicNames()) {
final ConsumerGroupTopicOffsets topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName);

// Retrieve tail offsets
final TailOffsets tailOffsets = getTailOffsets(topicName, topicOffsets.getPartitions());

// Build offsets with partitions
for (final PartitionOffset entry : tailOffsets.getOffsets()) {
builder.withOffsetsForTopic(topicName, new PartitionOffsetWithTailPosition(
entry.getPartition(),
topicOffsets.getOffsetForPartition(entry.getPartition()),
entry.getOffset()
));
}
}

return new ConsumerGroupOffsetsWithTailPositions(
consumerGroupId,
consumerGroupOffsets.getTopic(),
offsetsWithPartitions,
System.currentTimeMillis()
);
return builder.build();
}

/**
Expand Down
Loading