From 54c0e9e9e1b49ded1d3b2ac748cc1ead45277ced Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 17 Apr 2024 16:57:50 +0800 Subject: [PATCH] debug --- ...tsConsumerGroupCommandIntegrationTest.java | 94 ++++++++++++------- 1 file changed, 61 insertions(+), 33 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java index 5b1c139e0867..6903d0c6dd45 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java @@ -16,7 +16,14 @@ */ package org.apache.kafka.tools.consumer.group; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -29,8 +36,8 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.config.Defaults; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Collections; import java.util.Map; @@ -42,19 +49,40 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { + @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") +}) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { + private final ClusterInstance clusterInstance; + public static final String TOPIC = "foo"; + public static final String GROUP = "test.group"; + + DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { // Constructor injections + this.clusterInstance = clusterInstance; + } + String[] getArgs(String group, String topic) { return new String[] { - "--bootstrap-server", bootstrapServers(listenerName()), + "--bootstrap-server", clusterInstance.bootstrapServers(), "--delete-offsets", "--group", group, "--topic", topic }; } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsNonExistingGroup(String quorum) { + ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) { + ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args); + + return new ConsumerGroupCommand.ConsumerGroupService( + opts, + Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) + ); + } + + @ClusterTest + public void testDeleteOffsetsNonExistingGroup() { String group = "missing.group"; String topic = "foo:1"; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(getArgs(group, topic)); @@ -63,51 +91,47 @@ public void testDeleteOffsetsNonExistingGroup(String quorum) { assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey()); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) { + @ClusterTest + public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() { + createTopic(TOPIC); testWithStableConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly(String quorum) { + @ClusterTest + public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() { + createTopic(TOPIC); testWithStableConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition(String quorum) { + @ClusterTest + public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() { testWithStableConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly(String quorum) { + @ClusterTest + public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() { testWithStableConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition(String quorum) { + @ClusterTest + public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() { + createTopic(TOPIC); testWithEmptyConsumerGroup(TOPIC, 0, 0, Errors.NONE); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly(String quorum) { + @ClusterTest + public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() { + createTopic(TOPIC); testWithEmptyConsumerGroup(TOPIC, -1, 0, Errors.NONE); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition(String quorum) { + @ClusterTest + public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() { testWithEmptyConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly(String quorum) { + @ClusterTest + public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly() { testWithEmptyConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION); } @@ -193,7 +217,7 @@ private void withEmptyConsumerGroup(Runnable body) { } private KafkaProducer createProducer(Properties config) { - config.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(listenerName())); + config.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); config.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1"); config.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); config.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); @@ -202,7 +226,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { - config.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(listenerName())); + config.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); config.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, GROUP); config.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); @@ -213,4 +237,8 @@ private Consumer createConsumer(Properties config) { return new KafkaConsumer<>(config); } + + private void createTopic(String topic) { + TestUtils.createTopicWithAdminRaw(clusterInstance.createAdminClient(), topic, 1, 1, scala.collection.immutable.Map$.MODULE$.empty(), new Properties()); + } }