partitions) {
+ return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions());
+ }
+
/**
* Elect the preferred replica as leader for topic partitions.
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 35a781e1deb5..107eb56d63ae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -217,7 +217,7 @@ public static ConfigDef configDef() {
}
public static void main(String[] args) {
- System.out.println(CONFIG.toHtmlTable());
+ System.out.println(CONFIG.toHtml());
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
index 4fada35c790c..85c0a905240b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
@@ -18,9 +18,11 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ApiException;
import java.util.Collection;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* The result of {@link Admin#createTopics(Collection)}.
@@ -29,9 +31,11 @@
*/
@InterfaceStability.Evolving
public class CreateTopicsResult {
- private final Map> futures;
+ final static int UNKNOWN = -1;
- CreateTopicsResult(Map> futures) {
+ private final Map> futures;
+
+ CreateTopicsResult(Map> futures) {
this.futures = futures;
}
@@ -40,7 +44,8 @@ public class CreateTopicsResult {
* topic creations.
*/
public Map> values() {
- return futures;
+ return futures.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().thenApply(v -> (Void) null)));
}
/**
@@ -49,4 +54,84 @@ public Map> values() {
public KafkaFuture all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
}
+
+ /**
+ * Returns a future that provides topic configs for the topic when the request completes.
+ *
+ * If broker version doesn't support replication factor in the response, throw
+ * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ * If broker returned an error for topic configs, throw appropriate exception. For example,
+ * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
+ * have permission to describe topic configs.
+ */
+ public KafkaFuture config(String topic) {
+ return futures.get(topic).thenApply(TopicMetadataAndConfig::config);
+ }
+
+ /**
+ * Returns a future that provides number of partitions in the topic when the request completes.
+ *
+ * If broker version doesn't support replication factor in the response, throw
+ * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ * If broker returned an error for topic configs, throw appropriate exception. For example,
+ * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
+ * have permission to describe topic configs.
+ */
+ public KafkaFuture numPartitions(String topic) {
+ return futures.get(topic).thenApply(TopicMetadataAndConfig::numPartitions);
+ }
+
+ /**
+ * Returns a future that provides replication factor for the topic when the request completes.
+ *
+ * If broker version doesn't support replication factor in the response, throw
+ * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ * If broker returned an error for topic configs, throw appropriate exception. For example,
+ * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
+ * have permission to describe topic configs.
+ */
+ public KafkaFuture replicationFactor(String topic) {
+ return futures.get(topic).thenApply(TopicMetadataAndConfig::replicationFactor);
+ }
+
+ static class TopicMetadataAndConfig {
+ private final ApiException exception;
+ private final int numPartitions;
+ private final int replicationFactor;
+ private final Config config;
+
+ TopicMetadataAndConfig(int numPartitions, int replicationFactor, Config config) {
+ this.exception = null;
+ this.numPartitions = numPartitions;
+ this.replicationFactor = replicationFactor;
+ this.config = config;
+ }
+
+ TopicMetadataAndConfig(ApiException exception) {
+ this.exception = exception;
+ this.numPartitions = UNKNOWN;
+ this.replicationFactor = UNKNOWN;
+ this.config = null;
+ }
+
+ public int numPartitions() {
+ ensureSuccess();
+ return numPartitions;
+ }
+
+ public int replicationFactor() {
+ ensureSuccess();
+ return replicationFactor;
+ }
+
+ public Config config() {
+ ensureSuccess();
+ return config;
+ }
+
+ private void ensureSuccess() {
+ if (exception != null)
+ throw exception;
+ }
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsOptions.java
new file mode 100644
index 000000000000..63e6b4be84ba
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Set;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for the {@link Admin#deleteConsumerGroupOffsets(String, Set)} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DeleteConsumerGroupOffsetsOptions extends AbstractOptions {
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java
new file mode 100644
index 000000000000..433f478c3862
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Set;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.KafkaFuture.BaseFunction;
+import org.apache.kafka.common.KafkaFuture.BiConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.protocol.Errors;
+
+/**
+ * The result of the {@link Admin#deleteConsumerGroupOffsets(String, Set)} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DeleteConsumerGroupOffsetsResult {
+ private final KafkaFuture