diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 41dd2e2f80..e3cc823fe2 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -18,7 +18,7 @@
client.init.timeout.ms
*/
public static final String KAFKACLIENT_INIT_TIMEOUT_CONFIG = "client.init.timeout.ms";
+ /**
+ * client.request.timeout.ms
+ */
+ public static final String KAFKACLIENT_REQUEST_TIMEOUT_CONFIG = "client.request.timeout.ms";
public static final String ZOOKEEPER_SET_ACL_CONFIG = "zookeeper.set.acl";
public static final String KAFKACLIENT_SECURITY_PROTOCOL_CONFIG =
@@ -247,6 +251,9 @@ public class KafkaRestConfig extends RestConfig {
protected static final String KAFKACLIENT_INIT_TIMEOUT_DOC =
"The timeout for initialization of the Kafka store, including creation of the Kafka topic "
+ "that stores schema data.";
+ protected static final String KAFKACLIENT_REQUEST_TIMEOUT_DOC =
+ "The timeout for sending any admin-client request to Kafka cluster including waiting for"
+ + " the response on client side.";
protected static final String KAFKACLIENT_TIMEOUT_DOC =
"The timeout for an operation on the Kafka store";
protected static final String
@@ -450,6 +457,14 @@ protected static ConfigDef baseKafkaRestConfigDef() {
Importance.MEDIUM,
KAFKACLIENT_INIT_TIMEOUT_DOC
)
+ .define(
+ KAFKACLIENT_REQUEST_TIMEOUT_CONFIG,
+ Type.INT,
+ 60000,
+ Range.atLeast(0),
+ Importance.MEDIUM,
+ KAFKACLIENT_REQUEST_TIMEOUT_DOC
+ )
.define(
KAFKACLIENT_TIMEOUT_CONFIG,
Type.INT,
diff --git a/kafka-rest-common/src/main/java/io/confluent/kafkarest/entities/Topic.java b/kafka-rest-common/src/main/java/io/confluent/kafkarest/entities/Topic.java
index 084dac9722..85b98badfb 100644
--- a/kafka-rest-common/src/main/java/io/confluent/kafkarest/entities/Topic.java
+++ b/kafka-rest-common/src/main/java/io/confluent/kafkarest/entities/Topic.java
@@ -15,6 +15,7 @@
package io.confluent.kafkarest.entities;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.hibernate.validator.constraints.NotEmpty;
@@ -23,17 +24,14 @@
import java.util.Objects;
import java.util.Properties;
-import javax.validation.constraints.NotNull;
-
+@JsonInclude(JsonInclude.Include.NON_NULL)
public class Topic {
@NotEmpty
private String name;
- @NotNull
private Properties configs;
- @NotEmpty
private List partitions;
public Topic(
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/AdminClientWrapper.java b/kafka-rest/src/main/java/io/confluent/kafkarest/AdminClientWrapper.java
index 54ae893170..34832f5c37 100644
--- a/kafka-rest/src/main/java/io/confluent/kafkarest/AdminClientWrapper.java
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/AdminClientWrapper.java
@@ -16,18 +16,22 @@
package io.confluent.kafkarest;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
import java.util.Vector;
@@ -39,15 +43,18 @@
public class AdminClientWrapper {
- private AdminClient adminClient;
- private int initTimeOut;
+ private final AdminClient adminClient;
+ private final int initTimeOut;
+ private final int requestTimeOut;
public AdminClientWrapper(KafkaRestConfig kafkaRestConfig, AdminClient adminClient) {
this.adminClient = adminClient;
this.initTimeOut = kafkaRestConfig.getInt(KafkaRestConfig.KAFKACLIENT_INIT_TIMEOUT_CONFIG);
+ this.requestTimeOut = kafkaRestConfig.getInt(
+ KafkaRestConfig.KAFKACLIENT_REQUEST_TIMEOUT_CONFIG);
}
- public static Properties adminProperties(KafkaRestConfig kafkaRestConfig) {
+ static Properties adminProperties(KafkaRestConfig kafkaRestConfig) {
Properties properties = new Properties();
properties.putAll(kafkaRestConfig.getAdminProperties());
properties.put(KafkaRestConfig.BOOTSTRAP_SERVERS_CONFIG,
@@ -67,36 +74,32 @@ public List getBrokerIds() throws Exception {
}
public Collection getTopicNames() throws Exception {
- Collection allTopics = null;
- allTopics = new TreeSet<>(
+ return new TreeSet<>(
adminClient.listTopics().names().get(initTimeOut, TimeUnit.MILLISECONDS));
- return allTopics;
}
public boolean topicExists(String topic) throws Exception {
- Collection allTopics = getTopicNames();
- return allTopics.contains(topic);
+ return getTopicNames().contains(topic);
}
public Topic getTopic(String topicName) throws Exception {
- Topic topic = null;
if (topicExists(topicName)) {
TopicDescription topicDescription = getTopicDescription(topicName);
- topic = buildTopic(topicName, topicDescription);
+ return buildTopic(topicName, topicDescription);
+ } else {
+ return null;
}
- return topic;
}
public List getTopicPartitions(String topicName) throws Exception {
TopicDescription topicDescription = getTopicDescription(topicName);
- List partitions = buildPartitonsData(topicDescription.partitions(), null);
- return partitions;
+ return buildPartitionsData(topicDescription.partitions(), null);
}
public Partition getTopicPartition(String topicName, int partition) throws Exception {
TopicDescription topicDescription = getTopicDescription(topicName);
- List partitions = buildPartitonsData(topicDescription.partitions(), partition);
+ List partitions = buildPartitionsData(topicDescription.partitions(), partition);
if (partitions.isEmpty()) {
return null;
}
@@ -108,22 +111,33 @@ public boolean partitionExists(String topicName, int partition) throws Exception
return (partition >= 0 && partition < topic.getPartitions().size());
}
+ public Collection listConsumerGroups() throws Exception {
+ return adminClient.listConsumerGroups(new ListConsumerGroupsOptions()
+ .timeoutMs(requestTimeOut)).all().get(requestTimeOut, TimeUnit.MILLISECONDS);
+ }
+
+ public Map describeConsumerGroups(
+ Collection groupIds) throws Exception {
+ return adminClient.describeConsumerGroups(groupIds,
+ new DescribeConsumerGroupsOptions().timeoutMs(requestTimeOut))
+ .all().get(requestTimeOut, TimeUnit.MILLISECONDS);
+ }
+
private Topic buildTopic(String topicName, TopicDescription topicDescription) throws Exception {
- List partitions = buildPartitonsData(topicDescription.partitions(), null);
+ List partitions = buildPartitionsData(topicDescription.partitions(), null);
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Config config = adminClient.describeConfigs(
- Collections.unmodifiableList(Arrays.asList(topicResource))
+ Collections.unmodifiableList(Collections.singletonList(topicResource))
).values().get(topicResource).get();
Properties topicProps = new Properties();
for (ConfigEntry configEntry : config.entries()) {
topicProps.put(configEntry.name(), configEntry.value());
}
- Topic topic = new Topic(topicName, topicProps, partitions);
- return topic;
+ return new Topic(topicName, topicProps, partitions);
}
- private List buildPartitonsData(
+ private List buildPartitionsData(
List partitions,
Integer partitionsFilter
) {
@@ -153,7 +167,7 @@ private List buildPartitonsData(
}
private TopicDescription getTopicDescription(String topicName) throws Exception {
- return adminClient.describeTopics(Collections.unmodifiableList(Arrays.asList(topicName)))
+ return adminClient.describeTopics(Collections.singletonList(topicName))
.values().get(topicName).get(initTimeOut, TimeUnit.MILLISECONDS);
}
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/DefaultKafkaRestContext.java b/kafka-rest/src/main/java/io/confluent/kafkarest/DefaultKafkaRestContext.java
index fd85d570c8..212eb4f776 100644
--- a/kafka-rest/src/main/java/io/confluent/kafkarest/DefaultKafkaRestContext.java
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/DefaultKafkaRestContext.java
@@ -30,6 +30,7 @@ public class DefaultKafkaRestContext implements KafkaRestContext {
private ProducerPool producerPool;
private KafkaConsumerManager kafkaConsumerManager;
private AdminClientWrapper adminClientWrapper;
+ private GroupMetadataObserver groupMetadataObserver;
public DefaultKafkaRestContext(
@@ -37,6 +38,7 @@ public DefaultKafkaRestContext(
ProducerPool producerPool,
KafkaConsumerManager kafkaConsumerManager,
AdminClientWrapper adminClientWrapper,
+ GroupMetadataObserver groupMetadataObserver,
ScalaConsumersContext scalaConsumersContext
) {
@@ -44,6 +46,7 @@ public DefaultKafkaRestContext(
this.producerPool = producerPool;
this.kafkaConsumerManager = kafkaConsumerManager;
this.adminClientWrapper = adminClientWrapper;
+ this.groupMetadataObserver = groupMetadataObserver;
this.scalaConsumersContext = scalaConsumersContext;
}
@@ -54,7 +57,7 @@ public KafkaRestConfig getConfig() {
}
@Override
- public ProducerPool getProducerPool() {
+ public synchronized ProducerPool getProducerPool() {
if (producerPool == null) {
producerPool = new ProducerPool(config);
}
@@ -77,7 +80,7 @@ public SimpleConsumerManager getSimpleConsumerManager() {
}
@Override
- public KafkaConsumerManager getKafkaConsumerManager() {
+ public synchronized KafkaConsumerManager getKafkaConsumerManager() {
if (kafkaConsumerManager == null) {
kafkaConsumerManager = new KafkaConsumerManager(config);
}
@@ -85,7 +88,7 @@ public KafkaConsumerManager getKafkaConsumerManager() {
}
@Override
- public AdminClientWrapper getAdminClientWrapper() {
+ public synchronized AdminClientWrapper getAdminClientWrapper() {
if (adminClientWrapper == null) {
adminClientWrapper = new AdminClientWrapper(config,
AdminClient.create(AdminClientWrapper.adminProperties(config)));
@@ -93,6 +96,14 @@ public AdminClientWrapper getAdminClientWrapper() {
return adminClientWrapper;
}
+ @Override
+ public synchronized GroupMetadataObserver getGroupMetadataObserver() {
+ if (groupMetadataObserver == null) {
+ groupMetadataObserver = new GroupMetadataObserver(config, getAdminClientWrapper());
+ }
+ return groupMetadataObserver;
+ }
+
@Override
public void shutdown() {
if (kafkaConsumerManager != null) {
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/GroupMetadataObserver.java b/kafka-rest/src/main/java/io/confluent/kafkarest/GroupMetadataObserver.java
new file mode 100644
index 0000000000..fbac601676
--- /dev/null
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/GroupMetadataObserver.java
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * httcp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.confluent.kafkarest;
+
+import io.confluent.kafkarest.entities.ConsumerGroup;
+import io.confluent.kafkarest.entities.ConsumerGroupCoordinator;
+import io.confluent.kafkarest.entities.ConsumerGroupSubscription;
+import io.confluent.kafkarest.entities.ConsumerTopicPartitionDescription;
+import io.confluent.kafkarest.entities.Topic;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.Properties;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.HashSet;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+
+public class GroupMetadataObserver {
+
+ private static KafkaConsumer, ?> createConsumer(String groupId,
+ KafkaRestConfig appConfig) {
+ final Properties properties = new Properties();
+ String deserializer = StringDeserializer.class.getName();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ RestConfigUtils.bootstrapBrokers(appConfig));
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
+ properties.putAll(appConfig.getConsumerProperties());
+
+ return new KafkaConsumer<>(properties);
+ }
+
+ private final Logger log = LoggerFactory.getLogger(GroupMetadataObserver.class);
+
+ private final KafkaRestConfig config;
+ private final AdminClientWrapper adminClientWrapper;
+
+ public GroupMetadataObserver(KafkaRestConfig config, AdminClientWrapper adminClientWrapper) {
+ this.config = Objects.requireNonNull(config);
+ this.adminClientWrapper = Objects.requireNonNull(adminClientWrapper);
+ }
+
+ /**
+ *
Get consumer group list restricted by paging parameters
+ *
+ * @return list of consumer groups
+ */
+ public List getPagedConsumerGroupList(Integer startPos,
+ Integer count)
+ throws Exception {
+ return getConsumerGroups(getPagedConsumerGroup(startPos, count));
+ }
+
+ /**
+ *
Get consumer group list
+ *
+ * @return list of consumer groups
+ */
+ public List getConsumerGroupList() throws Exception {
+ return getConsumerGroups(adminClientWrapper.listConsumerGroups());
+ }
+
+ private List getConsumerGroups(Collection groupsOverview)
+ throws Exception {
+ final List result = new ArrayList<>();
+ List groupIds = groupsOverview.stream().map(ConsumerGroupListing::groupId)
+ .collect(Collectors.toList());
+ for (Entry eachGroupInfo :
+ adminClientWrapper.describeConsumerGroups(groupIds).entrySet()) {
+ final Node node = eachGroupInfo.getValue().coordinator();
+ result.add(new ConsumerGroup(eachGroupInfo.getKey(),
+ new ConsumerGroupCoordinator(node.host(), node.port())));
+ }
+ return result;
+ }
+
+ private Collection getPagedConsumerGroup(Integer startPosition,
+ Integer count)
+ throws Exception {
+ Collection groupsOverview;
+ final List consumerGroupListings =
+ new ArrayList<>(adminClientWrapper.listConsumerGroups());
+ consumerGroupListings.sort(Comparator.comparing(ConsumerGroupListing::groupId));
+ groupsOverview = consumerGroupListings.subList(startPosition,
+ Math.min(consumerGroupListings.size(), startPosition + count));
+ return groupsOverview;
+ }
+
+ /**
+ *
Get consumer group description
+ *
+ * @param groupId - group name
+ * @return description of consumer group
+ */
+ public Set getConsumerGroupTopicInformation(String groupId)
+ throws Exception {
+ final Set result = getConsumerGroupTopics(groupId);
+ log.debug("Get topic list {}", result);
+ return result;
+ }
+
+ /**
+ *
Get consumer group description restricted by paging parameters
+ *
+ * @param groupId - group name
+ * @return description of consumer group
+ */
+ public Set getPagedConsumerGroupTopicInformation(String groupId,
+ Integer startPos,
+ Integer count)
+ throws Exception {
+ final Set result = getConsumerGroupTopics(groupId);
+ log.debug("Get topic list {}", result);
+ return result.stream()
+ .skip(startPos)
+ .limit(Math.min(result.size(), startPos + count))
+ .collect(Collectors.toSet());
+ }
+
+ private Set getConsumerGroupTopics(String groupId) throws Exception {
+ final Set result = new HashSet<>();
+ final Collection memberDescriptions =
+ adminClientWrapper.describeConsumerGroups(Collections.singleton(groupId))
+ .get(groupId).members();
+ if (memberDescriptions.isEmpty()) {
+ return Collections.emptySet();
+ }
+ for (MemberDescription eachSummary : memberDescriptions) {
+ for (TopicPartition topicPartition : eachSummary.assignment().topicPartitions()) {
+ result.add(new Topic(topicPartition.topic(), null, null));
+ }
+ }
+ return result;
+ }
+
+ /**
+ *
Get consumer group description
+ *
+ * @param groupId - group name
+ * @return description of consumer group
+ * (all consumed topics with all partition offset information)
+ */
+ public ConsumerGroupSubscription getConsumerGroupInformation(String groupId) throws Exception {
+ return getConsumerGroupInformation(groupId, Collections.emptyList());
+ }
+
+ /**
+ *
Get consumer group description
+ *
+ * @param groupId - group name
+ * @param topics - topic names for filter - default empty topic names
+ * @param offsetOpt - offset for TopicPartitionEntity
+ * collection for each consumer member for paging
+ * @param countOpt - count of elements TopicPartitionEntity
+ * collection for each consumer member for paging
+ * @return description of consumer group
+ */
+ public ConsumerGroupSubscription getConsumerGroupInformation(
+ String groupId,
+ Collection topics,
+ Integer offsetOpt,
+ Integer countOpt) throws Exception {
+ final ConsumerGroupDescription consumerGroupSummary =
+ adminClientWrapper.describeConsumerGroups(Collections.singleton(groupId))
+ .get(groupId);
+ final Collection summaries = consumerGroupSummary.members();
+ if (summaries.isEmpty()) {
+ return ConsumerGroupSubscription.empty();
+ }
+ log.debug("Get summary list {}", summaries);
+ try (KafkaConsumer kafkaConsumer = createConsumer(groupId, config)) {
+ final List consumerTopicPartitionDescriptions =
+ getConsumerTopicPartitionDescriptions(topics, summaries, kafkaConsumer);
+ final Node coordinatorNode = consumerGroupSummary.coordinator();
+ return new ConsumerGroupSubscription(
+ getPagedTopicPartitionList(consumerTopicPartitionDescriptions, offsetOpt, countOpt),
+ consumerTopicPartitionDescriptions.size(),
+ new ConsumerGroupCoordinator(coordinatorNode.host(), coordinatorNode.port()));
+ }
+ }
+
+ /**
+ *
Get consumer group description
+ *
+ * @param groupId - group name
+ * @param topics - topic names for filter - default empty topic names
+ * @return description of consumer group
+ */
+ public ConsumerGroupSubscription getConsumerGroupInformation(
+ String groupId,
+ Collection topics) throws Exception {
+ final ConsumerGroupDescription consumerGroupSummary =
+ adminClientWrapper.describeConsumerGroups(Collections.singleton(groupId))
+ .get(groupId);
+ final Collection summaries = consumerGroupSummary.members();
+ if (summaries.isEmpty()) {
+ return ConsumerGroupSubscription.empty();
+ }
+ log.debug("Get summary list {}", summaries);
+ try (KafkaConsumer kafkaConsumer = createConsumer(groupId, config)) {
+ final List consumerTopicPartitionDescriptions =
+ getConsumerTopicPartitionDescriptions(topics, summaries, kafkaConsumer);
+ final Node coordinatorNode = consumerGroupSummary.coordinator();
+ return new ConsumerGroupSubscription(
+ consumerTopicPartitionDescriptions,
+ consumerTopicPartitionDescriptions.size(),
+ new ConsumerGroupCoordinator(coordinatorNode.host(), coordinatorNode.port()));
+ }
+ }
+
+ private List getConsumerTopicPartitionDescriptions(
+ Collection topics,
+ Collection consumerGroupMembers,
+ KafkaConsumer kafkaConsumer) {
+ final List consumerTopicPartitionDescriptions =
+ new ArrayList<>();
+ for (MemberDescription summary : consumerGroupMembers) {
+ final Set assignedTopicPartitions =
+ summary.assignment().topicPartitions();
+ final List filteredTopicPartitions = new ArrayList<>();
+ if (!topics.isEmpty()) {
+ final List newTopicPartitions = new ArrayList<>();
+ for (TopicPartition topicPartition : assignedTopicPartitions) {
+ if (topics.contains(topicPartition.topic())) {
+ newTopicPartitions.add(topicPartition);
+ }
+ }
+ filteredTopicPartitions.addAll(newTopicPartitions);
+ } else {
+ filteredTopicPartitions.addAll(assignedTopicPartitions);
+ }
+ filteredTopicPartitions.sort(Comparator.comparingInt(TopicPartition::partition));
+ kafkaConsumer.assign(filteredTopicPartitions);
+ consumerTopicPartitionDescriptions.addAll(
+ createConsumerTopicPartitionDescriptions(kafkaConsumer,
+ summary, filteredTopicPartitions));
+ }
+ consumerTopicPartitionDescriptions.sort(
+ Comparator.comparingInt(ConsumerTopicPartitionDescription::getPartitionId));
+ return consumerTopicPartitionDescriptions;
+ }
+
+ private List createConsumerTopicPartitionDescriptions(
+ KafkaConsumer kafkaConsumer,
+ MemberDescription summary,
+ List filteredTopicPartitions) {
+ final List result = new ArrayList<>();
+ for (TopicPartition topicPartition : filteredTopicPartitions) {
+ final OffsetAndMetadata metadata = kafkaConsumer.committed(topicPartition);
+ // Get current offset
+ final Long currentOffset = Optional.ofNullable(metadata).isPresent()
+ ? metadata.offset() : 0;
+ // Goto end offset for current TopicPartition WITHOUT COMMIT
+ kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
+ // Get end offset
+ final Long totalOffset = kafkaConsumer.position(topicPartition);
+ result.add(
+ new ConsumerTopicPartitionDescription(summary.consumerId(),
+ summary.host(),
+ topicPartition.topic(),
+ topicPartition.partition(),
+ currentOffset,
+ totalOffset - currentOffset,
+ totalOffset
+ ));
+ }
+ return result;
+ }
+
+ private List getPagedTopicPartitionList(
+ List topicPartitionList,
+ Integer offsetOpt,
+ Integer countOpt) {
+ return topicPartitionList.subList(offsetOpt,
+ Math.min(topicPartitionList.size(), offsetOpt + countOpt));
+ }
+}
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestApplication.java b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestApplication.java
index 7c72fe27ae..fa26763033 100644
--- a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestApplication.java
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestApplication.java
@@ -23,6 +23,7 @@
import io.confluent.kafkarest.resources.BrokersResource;
import io.confluent.kafkarest.resources.ConsumersResource;
import io.confluent.kafkarest.resources.PartitionsResource;
+import io.confluent.kafkarest.resources.ConsumerGroupsResource;
import io.confluent.kafkarest.resources.RootResource;
import io.confluent.kafkarest.resources.TopicsResource;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
@@ -65,7 +66,7 @@ public KafkaRestApplication(KafkaRestConfig config) {
@Override
public void setupResources(Configurable> config, KafkaRestConfig appConfig) {
setupInjectedResources(config, appConfig, null,
- null, null, null
+ null, null, null, null
);
}
@@ -78,6 +79,7 @@ protected void setupInjectedResources(
ProducerPool producerPool,
KafkaConsumerManager kafkaConsumerManager,
AdminClientWrapper adminClientWrapperInjected,
+ GroupMetadataObserver groupMetadataObserver,
ScalaConsumersContext scalaConsumersContext
) {
if (StringUtil.isBlank(appConfig.getString(KafkaRestConfig.BOOTSTRAP_SERVERS_CONFIG))
@@ -87,9 +89,10 @@ protected void setupInjectedResources(
+ KafkaRestConfig.ZOOKEEPER_CONNECT_CONFIG
+ " needs to be configured");
}
+
KafkaRestContextProvider.initialize(config, appConfig, producerPool,
- kafkaConsumerManager, adminClientWrapperInjected, scalaConsumersContext
- );
+ kafkaConsumerManager, adminClientWrapperInjected,
+ groupMetadataObserver, scalaConsumersContext);
ContextInvocationHandler contextInvocationHandler = new ContextInvocationHandler();
KafkaRestContext context =
(KafkaRestContext) Proxy.newProxyInstance(
@@ -102,6 +105,7 @@ protected void setupInjectedResources(
config.register(new TopicsResource(context));
config.register(new PartitionsResource(context));
config.register(new ConsumersResource(context));
+ config.register(new ConsumerGroupsResource(context));
config.register(new io.confluent.kafkarest.resources.v2.ConsumersResource(context));
config.register(new io.confluent.kafkarest.resources.v2.PartitionsResource(context));
config.register(KafkaRestCleanupFilter.class);
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestContext.java b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestContext.java
index e46041ee2d..396c665834 100644
--- a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestContext.java
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestContext.java
@@ -18,22 +18,24 @@
import io.confluent.kafkarest.v2.KafkaConsumerManager;
public interface KafkaRestContext {
- public KafkaRestConfig getConfig();
+ KafkaRestConfig getConfig();
- public ProducerPool getProducerPool();
+ ProducerPool getProducerPool();
@Deprecated
- public ScalaConsumersContext getScalaConsumersContext();
+ ScalaConsumersContext getScalaConsumersContext();
@Deprecated
- public ConsumerManager getConsumerManager();
+ ConsumerManager getConsumerManager();
@Deprecated
- public SimpleConsumerManager getSimpleConsumerManager();
+ SimpleConsumerManager getSimpleConsumerManager();
- public KafkaConsumerManager getKafkaConsumerManager();
+ KafkaConsumerManager getKafkaConsumerManager();
- public AdminClientWrapper getAdminClientWrapper();
+ AdminClientWrapper getAdminClientWrapper();
+
+ GroupMetadataObserver getGroupMetadataObserver();
void shutdown();
}
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerGroup.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerGroup.java
new file mode 100644
index 0000000000..d510b81e59
--- /dev/null
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerGroup.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.confluent.kafkarest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ConsumerGroup {
+
+ private final String groupId;
+ private final ConsumerGroupCoordinator coordinator;
+
+ @JsonCreator
+ public ConsumerGroup(@JsonProperty("groupId") String groupId,
+ @JsonProperty("coordinator") ConsumerGroupCoordinator coordinator) {
+ this.groupId = groupId;
+ this.coordinator = coordinator;
+ }
+
+ @JsonProperty
+ public String getGroupId() {
+ return groupId;
+ }
+
+ @JsonProperty
+ public ConsumerGroupCoordinator getCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumerGroup{"
+ + "groupId='" + groupId + '\''
+ + ", coordinator=" + coordinator
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ConsumerGroup that = (ConsumerGroup) o;
+
+ if (!groupId.equals(that.groupId)) {
+ return false;
+ }
+ return coordinator.equals(that.coordinator);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = groupId.hashCode();
+ result = 31 * result + coordinator.hashCode();
+ return result;
+ }
+}
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerGroupCoordinator.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerGroupCoordinator.java
new file mode 100644
index 0000000000..f655f6c875
--- /dev/null
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerGroupCoordinator.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.confluent.kafkarest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ConsumerGroupCoordinator {
+
+ public static ConsumerGroupCoordinator empty() {
+ return new ConsumerGroupCoordinator("", -1);
+ }
+
+ private final String host;
+ private final Integer port;
+
+ @JsonCreator
+ public ConsumerGroupCoordinator(@JsonProperty("host") String host,
+ @JsonProperty("port") Integer port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @JsonProperty
+ public String getHost() {
+ return host;
+ }
+
+ @JsonProperty
+ public Integer getPort() {
+ return port;
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumerGroupCoordinator{"
+ + "host='" + host + '\''
+ + ", port=" + port
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ConsumerGroupCoordinator that = (ConsumerGroupCoordinator) o;
+
+ if (!host.equals(that.host)) {
+ return false;
+ }
+ return port.equals(that.port);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = host.hashCode();
+ result = 31 * result + port.hashCode();
+ return result;
+ }
+}
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerGroupSubscription.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerGroupSubscription.java
new file mode 100644
index 0000000000..ddd446f57c
--- /dev/null
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerGroupSubscription.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.confluent.kafkarest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ConsumerGroupSubscription {
+
+ public static ConsumerGroupSubscription empty() {
+ return new ConsumerGroupSubscription(
+ Collections.emptyList(),
+ 0,
+ ConsumerGroupCoordinator.empty());
+ }
+
+ private final List topicPartitionList;
+ private final Integer topicPartitionCount;
+ private final ConsumerGroupCoordinator coordinator;
+
+ @JsonCreator
+ public ConsumerGroupSubscription(
+ @JsonProperty("topicPartitions") List
+ topicPartitionList,
+ @JsonProperty("topicPartitionCount") Integer topicPartitionCount,
+ @JsonProperty("coordinator") ConsumerGroupCoordinator coordinator) {
+ this.topicPartitionList = topicPartitionList;
+ this.topicPartitionCount = topicPartitionCount;
+ this.coordinator = coordinator;
+ }
+
+ @JsonProperty
+ public List getTopicPartitionList() {
+ return topicPartitionList;
+ }
+
+ @JsonProperty
+ public Integer getTopicPartitionCount() {
+ return topicPartitionCount;
+ }
+
+ @JsonProperty
+ public ConsumerGroupCoordinator getCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumerEntity{"
+ + "topicPartitionList=" + topicPartitionList
+ + ", topicPartitionCount=" + topicPartitionCount
+ + ", coordinator=" + coordinator
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ConsumerGroupSubscription that = (ConsumerGroupSubscription) o;
+
+ if (!topicPartitionList.equals(that.topicPartitionList)) {
+ return false;
+ }
+ if (!topicPartitionCount.equals(that.topicPartitionCount)) {
+ return false;
+ }
+ return coordinator.equals(that.coordinator);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = topicPartitionList.hashCode();
+ result = 31 * result + topicPartitionCount.hashCode();
+ result = 31 * result + coordinator.hashCode();
+ return result;
+ }
+}
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerTopicPartitionDescription.java b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerTopicPartitionDescription.java
new file mode 100644
index 0000000000..36d934cf02
--- /dev/null
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/entities/ConsumerTopicPartitionDescription.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.confluent.kafkarest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ConsumerTopicPartitionDescription {
+
+ private final String consumerId;
+ private final String consumerIp;
+ private final String topicName;
+ private final Integer partitionId;
+ private final Long currentOffset;
+ private final Long lag;
+ private final Long endOffset;
+
+ @JsonCreator
+ public ConsumerTopicPartitionDescription(@JsonProperty("consumerId") String consumerId,
+ @JsonProperty("consumerIp") String consumerIp,
+ @JsonProperty("topicName") String topicName,
+ @JsonProperty("partitionId") Integer partitionId,
+ @JsonProperty("currentOffset") Long currentOffset,
+ @JsonProperty("lag") Long lag,
+ @JsonProperty("endOffset") Long endOffset) {
+ this.consumerId = consumerId;
+ this.consumerIp = consumerIp;
+ this.topicName = topicName;
+ this.partitionId = partitionId;
+ this.currentOffset = currentOffset;
+ this.lag = lag;
+ this.endOffset = endOffset;
+ }
+
+ @JsonProperty
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ @JsonProperty
+ public String getConsumerIp() {
+ return consumerIp;
+ }
+
+ @JsonProperty
+ public String getTopicName() {
+ return topicName;
+ }
+
+ @JsonProperty
+ public Integer getPartitionId() {
+ return partitionId;
+ }
+
+ @JsonProperty
+ public Long getCurrentOffset() {
+ return currentOffset;
+ }
+
+ @JsonProperty
+ public Long getLag() {
+ return lag;
+ }
+
+ @JsonProperty
+ public Long getEndOffset() {
+ return endOffset;
+ }
+
+ @Override
+ public String toString() {
+ return "TopicPartitionEntity{"
+ + "consumerId='" + consumerId + '\''
+ + ", consumerIp='" + consumerIp + '\''
+ + ", topicName='" + topicName + '\''
+ + ", partitionId=" + partitionId
+ + ", currentOffset=" + currentOffset
+ + ", lag=" + lag
+ + ", endOffset=" + endOffset
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ConsumerTopicPartitionDescription that = (ConsumerTopicPartitionDescription) o;
+
+ return consumerId.equals(that.consumerId)
+ && consumerIp.equals(that.consumerIp)
+ && topicName.equals(that.topicName)
+ && partitionId.equals(that.partitionId)
+ && currentOffset.equals(that.currentOffset)
+ && lag.equals(that.lag)
+ && endOffset.equals(that.endOffset);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = consumerId.hashCode();
+ result = 31 * result + consumerIp.hashCode();
+ result = 31 * result + topicName.hashCode();
+ result = 31 * result + partitionId.hashCode();
+ result = 31 * result + currentOffset.hashCode();
+ result = 31 * result + lag.hashCode();
+ result = 31 * result + endOffset.hashCode();
+ return result;
+ }
+}
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/extension/KafkaRestContextProvider.java b/kafka-rest/src/main/java/io/confluent/kafkarest/extension/KafkaRestContextProvider.java
index 76793f5121..3854dfc62c 100644
--- a/kafka-rest/src/main/java/io/confluent/kafkarest/extension/KafkaRestContextProvider.java
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/extension/KafkaRestContextProvider.java
@@ -17,13 +17,15 @@
import io.confluent.kafkarest.AdminClientWrapper;
import io.confluent.kafkarest.DefaultKafkaRestContext;
+import io.confluent.kafkarest.GroupMetadataObserver;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.ProducerPool;
import io.confluent.kafkarest.ScalaConsumersContext;
-import io.confluent.kafkarest.v2.KafkaConsumerManager;
import java.util.concurrent.atomic.AtomicBoolean;
+import io.confluent.kafkarest.v2.KafkaConsumerManager;
+
import javax.ws.rs.core.Configurable;
public class KafkaRestContextProvider {
@@ -41,6 +43,7 @@ public static void initialize(
ProducerPool producerPool,
KafkaConsumerManager kafkaConsumerManager,
AdminClientWrapper adminClientWrapper,
+ GroupMetadataObserver groupMetadataObserver,
ScalaConsumersContext scalaConsumersContext
) {
if (initialized.compareAndSet(false, true)) {
@@ -50,8 +53,9 @@ public static void initialize(
ScalaConsumersContext.registerExceptionMappers(config, appConfig);
}
defaultContext =
- new DefaultKafkaRestContext(appConfig, producerPool, kafkaConsumerManager,
- adminClientWrapper, scalaConsumersContext);
+ new DefaultKafkaRestContext(appConfig, producerPool,
+ kafkaConsumerManager, adminClientWrapper,
+ groupMetadataObserver, scalaConsumersContext);
defaultAppConfig = appConfig;
}
}
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/resources/ConsumerGroupsResource.java b/kafka-rest/src/main/java/io/confluent/kafkarest/resources/ConsumerGroupsResource.java
new file mode 100644
index 0000000000..c2ba69280c
--- /dev/null
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/resources/ConsumerGroupsResource.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.confluent.kafkarest.resources;
+
+import io.confluent.kafkarest.KafkaRestContext;
+import io.confluent.kafkarest.Versions;
+import io.confluent.kafkarest.entities.ConsumerGroupSubscription;
+import io.confluent.kafkarest.entities.ConsumerGroup;
+import io.confluent.kafkarest.entities.Topic;
+import io.confluent.rest.annotations.PerformanceMetric;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Provides metadata about consumers groups
+ */
+@Path("/groups")
+@Produces({Versions.KAFKA_V1_JSON_WEIGHTED, Versions.KAFKA_DEFAULT_JSON_WEIGHTED,
+ Versions.JSON_WEIGHTED, Versions.KAFKA_V2_JSON_WEIGHTED})
+@Consumes({Versions.KAFKA_V1_JSON, Versions.KAFKA_DEFAULT_JSON, Versions.JSON,
+ Versions.GENERIC_REQUEST, Versions.KAFKA_V2_JSON})
+public class ConsumerGroupsResource {
+
+ private final KafkaRestContext context;
+
+ /**
+ *