diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java deleted file mode 100644 index 5f33584cd805..000000000000 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.tools.consumer.group; - -import kafka.api.BaseConsumerTest; -import kafka.server.KafkaConfig; -import kafka.utils.TestUtils; - -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.GroupProtocol; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.RangeAssignor; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.params.provider.Arguments; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -import scala.collection.JavaConverters; -import scala.collection.Seq; - -public class ConsumerGroupCommandTest extends kafka.integration.KafkaServerTestHarness { - public static final String TOPIC = "foo"; - public static final String GROUP = "test.group"; - public static final String PROTOCOL_GROUP = "protocol-group"; - - List consumerGroupService = new ArrayList<>(); - List consumerGroupExecutors = new ArrayList<>(); - - @Override - public Seq generateConfigs() { - List cfgs = new ArrayList<>(); - - TestUtils.createBrokerConfigs( - 1, - zkConnectOrNull(), - false, - true, - scala.None$.empty(), - scala.None$.empty(), - scala.None$.empty(), - true, - false, - false, - false, - scala.collection.immutable.Map$.MODULE$.empty(), - 1, - false, - 1, - (short) 1, - 0, - false - ).foreach(props -> { - if (isNewGroupCoordinatorEnabled()) { - props.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); - props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer"); - } - cfgs.add(KafkaConfig.fromProps(props)); - return null; - }); - - return seq(cfgs); - } - - @BeforeEach - @Override - public void setUp(TestInfo testInfo) { - super.setUp(testInfo); - createTopic(TOPIC, 1, 1, new Properties(), listenerName(), new Properties()); - } - - @AfterEach - @Override - public void tearDown() { - consumerGroupService.forEach(ConsumerGroupCommand.ConsumerGroupService::close); - consumerGroupExecutors.forEach(AbstractConsumerGroupExecutor::shutdown); - super.tearDown(); - } - - Map committedOffsets(String topic, String group) { - try (Consumer consumer = createNoAutoCommitConsumer(group)) { - Set partitions = consumer.partitionsFor(topic).stream() - .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) - .collect(Collectors.toSet()); - return consumer.committed(partitions).entrySet().stream() - .filter(e -> e.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); - } - } - - Consumer createNoAutoCommitConsumer(String group) { - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServers(listenerName())); - props.put("group.id", group); - props.put("enable.auto.commit", "false"); - return new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); - } - - ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) { - ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args); - ConsumerGroupCommand.ConsumerGroupService service = new ConsumerGroupCommand.ConsumerGroupService( - opts, - Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) - ); - - consumerGroupService.add(0, service); - return service; - } - - ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String groupProtocol) { - return addConsumerGroupExecutor(numConsumers, TOPIC, GROUP, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol); - } - - ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String groupProtocol, Optional remoteAssignor) { - return addConsumerGroupExecutor(numConsumers, TOPIC, GROUP, RangeAssignor.class.getName(), remoteAssignor, Optional.empty(), false, groupProtocol); - } - - ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String topic, String group, String groupProtocol) { - return addConsumerGroupExecutor(numConsumers, topic, group, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol); - } - - ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String topic, String group, String strategy, Optional remoteAssignor, - Optional customPropsOpt, boolean syncCommit, String groupProtocol) { - ConsumerGroupExecutor executor = new ConsumerGroupExecutor(bootstrapServers(listenerName()), numConsumers, group, groupProtocol, - topic, strategy, remoteAssignor, customPropsOpt, syncCommit); - addExecutor(executor); - return executor; - } - - SimpleConsumerGroupExecutor addSimpleGroupExecutor(Collection partitions, String group) { - SimpleConsumerGroupExecutor executor = new SimpleConsumerGroupExecutor(bootstrapServers(listenerName()), group, partitions); - addExecutor(executor); - return executor; - } - - private AbstractConsumerGroupExecutor addExecutor(AbstractConsumerGroupExecutor executor) { - consumerGroupExecutors.add(0, executor); - return executor; - } - - abstract class AbstractConsumerRunnable implements Runnable { - final String broker; - final String groupId; - final Optional customPropsOpt; - final boolean syncCommit; - - final Properties props = new Properties(); - KafkaConsumer consumer; - - boolean configured = false; - - public AbstractConsumerRunnable(String broker, String groupId, Optional customPropsOpt, boolean syncCommit) { - this.broker = broker; - this.groupId = groupId; - this.customPropsOpt = customPropsOpt; - this.syncCommit = syncCommit; - } - - void configure() { - configured = true; - configure(props); - customPropsOpt.ifPresent(props::putAll); - consumer = new KafkaConsumer<>(props); - } - - void configure(Properties props) { - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - } - - abstract void subscribe(); - - @Override - public void run() { - assert configured : "Must call configure before use"; - try { - subscribe(); - while (true) { - consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); - if (syncCommit) - consumer.commitSync(); - } - } catch (WakeupException e) { - // OK - } finally { - consumer.close(); - } - } - - void shutdown() { - consumer.wakeup(); - } - } - - class ConsumerRunnable extends AbstractConsumerRunnable { - final String topic; - final String groupProtocol; - final String strategy; - final Optional remoteAssignor; - - public ConsumerRunnable(String broker, String groupId, String groupProtocol, String topic, String strategy, - Optional remoteAssignor, Optional customPropsOpt, boolean syncCommit) { - super(broker, groupId, customPropsOpt, syncCommit); - - this.topic = topic; - this.groupProtocol = groupProtocol; - this.strategy = strategy; - this.remoteAssignor = remoteAssignor; - } - - @Override - void configure(Properties props) { - super.configure(props); - props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); - if (groupProtocol.toUpperCase(Locale.ROOT).equals(GroupProtocol.CONSUMER.toString())) { - remoteAssignor.ifPresent(assignor -> props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, assignor)); - } else { - props.put("partition.assignment.strategy", strategy); - } - } - - @Override - void subscribe() { - consumer.subscribe(Collections.singleton(topic)); - } - } - - class SimpleConsumerRunnable extends AbstractConsumerRunnable { - final Collection partitions; - - public SimpleConsumerRunnable(String broker, String groupId, Collection partitions) { - super(broker, groupId, Optional.empty(), false); - - this.partitions = partitions; - } - - @Override - void subscribe() { - consumer.assign(partitions); - } - } - - class AbstractConsumerGroupExecutor { - final int numThreads; - final ExecutorService executor; - final List consumers = new ArrayList<>(); - - public AbstractConsumerGroupExecutor(int numThreads) { - this.numThreads = numThreads; - this.executor = Executors.newFixedThreadPool(numThreads); - } - - void submit(AbstractConsumerRunnable consumerThread) { - consumers.add(consumerThread); - executor.submit(consumerThread); - } - - void shutdown() { - consumers.forEach(AbstractConsumerRunnable::shutdown); - executor.shutdown(); - try { - executor.awaitTermination(5000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - class ConsumerGroupExecutor extends AbstractConsumerGroupExecutor { - public ConsumerGroupExecutor(String broker, int numConsumers, String groupId, String groupProtocol, String topic, String strategy, - Optional remoteAssignor, Optional customPropsOpt, boolean syncCommit) { - super(numConsumers); - IntStream.rangeClosed(1, numConsumers).forEach(i -> { - ConsumerRunnable th = new ConsumerRunnable(broker, groupId, groupProtocol, topic, strategy, remoteAssignor, customPropsOpt, syncCommit); - th.configure(); - submit(th); - }); - } - } - - class SimpleConsumerGroupExecutor extends AbstractConsumerGroupExecutor { - public SimpleConsumerGroupExecutor(String broker, String groupId, Collection partitions) { - super(1); - - SimpleConsumerRunnable th = new SimpleConsumerRunnable(broker, groupId, partitions); - th.configure(); - submit(th); - } - } - - - public static Stream getTestQuorumAndGroupProtocolParametersAll() { - return BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll(); - } - - @SuppressWarnings({"deprecation"}) - static Seq seq(Collection seq) { - return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq(); - } -} diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index 7cfd659401ce..852a6434a1c1 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -16,834 +16,985 @@ */ package org.apache.kafka.tools.consumer.group; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.junit.ClusterTestExtensions; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.test.TestUtils.RANDOM; -import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; -public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { +@Tag("integration") +@ExtendWith(value = ClusterTestExtensions.class) +public class DescribeConsumerGroupTest { + private static final String TOPIC_PREFIX = "test.topic."; + private static final String GROUP_PREFIX = "test.group."; private static final List> DESCRIBE_TYPE_OFFSETS = Arrays.asList(Collections.singletonList(""), Collections.singletonList("--offsets")); private static final List> DESCRIBE_TYPE_MEMBERS = Arrays.asList(Collections.singletonList("--members"), Arrays.asList("--members", "--verbose")); private static final List> DESCRIBE_TYPE_STATE = Collections.singletonList(Collections.singletonList("--state")); - private static final List> DESCRIBE_TYPES; + private static final List> DESCRIBE_TYPES = Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, DESCRIBE_TYPE_STATE).flatMap(Collection::stream).collect(Collectors.toList()); + private ClusterInstance clusterInstance; - static { - List> describeTypes = new ArrayList<>(); - - describeTypes.addAll(DESCRIBE_TYPE_OFFSETS); - describeTypes.addAll(DESCRIBE_TYPE_MEMBERS); - describeTypes.addAll(DESCRIBE_TYPE_STATE); - - DESCRIBE_TYPES = describeTypes; + private static List generator() { + return ConsumerGroupCommandTestUtils.generator(); } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeNonExistingGroup(String quorum, String groupProtocol) { - createOffsetsTopic(listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeNonExistingGroup(ClusterInstance clusterInstance) { String missingGroup = "missing.group"; for (List describeType : DESCRIBE_TYPES) { // note the group to be queried is a different (non-existing) group - List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", missingGroup)); + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup)); cgcArgs.addAll(describeType); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); - - String output = ToolsTestUtils.grabConsoleOutput(describeGroups(service)); - assertTrue(output.contains("Consumer group '" + missingGroup + "' does not exist."), - "Expected error was not detected for describe option '" + String.join(" ", describeType) + "'"); + try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0]))) { + String output = ToolsTestUtils.grabConsoleOutput(describeGroups(service)); + assertTrue(output.contains("Consumer group '" + missingGroup + "' does not exist."), + "Expected error was not detected for describe option '" + String.join(" ", describeType) + "'"); + } } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeWithMultipleSubActions(String quorum) { - AtomicInteger exitStatus = new AtomicInteger(0); - AtomicReference exitMessage = new AtomicReference<>(""); - Exit.setExitProcedure((status, err) -> { - exitStatus.set(status); - exitMessage.set(err); - throw new RuntimeException(); - }); - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--members", "--state"}; - try { - assertThrows(RuntimeException.class, () -> ConsumerGroupCommand.main(cgcArgs)); - } finally { - Exit.resetExitProcedure(); + @ClusterTemplate("generator") + public void testDescribeOffsetsOfNonExistingGroup(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + String missingGroup = "missing.group"; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); + + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + // note the group to be queried is a different (non-existing) group + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}) + ) { + Entry, Optional>> res = service.collectGroupOffsets(missingGroup); + assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), + "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'."); + } } - assertEquals(1, exitStatus.get()); - assertTrue(exitMessage.get().contains("Option [describe] takes at most one of these options")); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeWithStateValue(String quorum) { - AtomicInteger exitStatus = new AtomicInteger(0); - AtomicReference exitMessage = new AtomicReference<>(""); - Exit.setExitProcedure((status, err) -> { - exitStatus.set(status); - exitMessage.set(err); - throw new RuntimeException(); - }); - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--all-groups", "--state", "Stable"}; - try { - assertThrows(RuntimeException.class, () -> ConsumerGroupCommand.main(cgcArgs)); - } finally { - Exit.resetExitProcedure(); + @ClusterTemplate("generator") + public void testDescribeMembersOfNonExistingGroup(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + String missingGroup = "missing.group"; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + // note the group to be queried is a different (non-existing) group + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}) + ) { + Entry, Optional>> res = service.collectGroupMembers(missingGroup, false); + assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), + "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'."); + + Entry, Optional>> res2 = service.collectGroupMembers(missingGroup, true); + assertTrue(res2.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false), + "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "' (verbose option)."); + } } - assertEquals(1, exitStatus.get()); - assertTrue(exitMessage.get().contains("Option [describe] does not take a value for [state]")); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testPrintVersion(String quorum) { - ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); - Exit.setExitProcedure(exitProcedure); - try { - String out = ToolsTestUtils.captureStandardOut(() -> ConsumerGroupCommandOptions.fromArgs(new String[]{"--version"})); - assertEquals(0, exitProcedure.statusCode()); - assertEquals(AppInfoParser.getVersion(), out); - } finally { - Exit.resetExitProcedure(); + @ClusterTemplate("generator") + public void testDescribeStateOfNonExistingGroup(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + String missingGroup = "missing.group"; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + // note the group to be queried is a different (non-existing) group + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}) + ) { + GroupState state = service.collectGroupState(missingGroup); + assertTrue(Objects.equals(state.state, ConsumerGroupState.DEAD) && state.numMembers == 0 && + state.coordinator != null && clusterInstance.brokerIds().contains(state.coordinator.id()), + "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'." + ); + } } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeOffsetsOfNonExistingGroup(String quorum, String groupProtocol) throws Exception { - String group = "missing.group"; - createOffsetsTopic(listenerName(), new Properties()); - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - // note the group to be queried is a different (non-existing) group - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - Entry, Optional>> res = service.collectGroupOffsets(group); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + group + "'."); + @ClusterTemplate("generator") + public void testDescribeExistingGroup(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + createTopic(topic); + for (List describeType : DESCRIBE_TYPES) { + String protocolGroup = GROUP_PREFIX + groupProtocol.name() + "." + String.join("", describeType); + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic, Collections.emptyMap()); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", protocolGroup}) + ) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty(); + }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeMembersOfNonExistingGroup(String quorum, String groupProtocol) throws Exception { - String group = "missing.group"; - createOffsetsTopic(listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeExistingGroups(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + createTopic(topic); - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - // note the group to be queried is a different (non-existing) group - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - Entry, Optional>> res = service.collectGroupMembers(group, false); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + group + "'."); - - Entry, Optional>> res2 = service.collectGroupMembers(group, true); - assertTrue(res2.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option)."); + // Create N single-threaded consumer groups from a single-partition topic + List protocolConsumerGroupExecutors = new ArrayList<>(); + try { + List groups = new ArrayList<>(); + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP_PREFIX + groupProtocol.name() + "." + String.join("", describeType); + groups.addAll(Arrays.asList("--group", group)); + protocolConsumerGroupExecutors.add(consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap())); + } + + int expectedNumLines = DESCRIBE_TYPES.size() * 2; + + for (List describeType : DESCRIBE_TYPES) { + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe")); + cgcArgs.addAll(groups); + cgcArgs.addAll(describeType); + try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0]))) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count(); + return (numLines == expectedNumLines) && res.getValue().isEmpty(); + }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + } finally { + for (AutoCloseable protocolConsumerGroupExecutor : protocolConsumerGroupExecutors) { + protocolConsumerGroupExecutor.close(); + } + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeStateOfNonExistingGroup(String quorum, String groupProtocol) throws Exception { - String group = "missing.group"; - createOffsetsTopic(listenerName(), new Properties()); - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - // note the group to be queried is a different (non-existing) group - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + @ClusterTemplate("generator") + public void testDescribeAllExistingGroups(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + createTopic(topic); - GroupState state = service.collectGroupState(group); - assertTrue(Objects.equals(state.state, ConsumerGroupState.DEAD) && state.numMembers == 0 && - state.coordinator != null && !brokers().filter(s -> s.config().brokerId() == state.coordinator.id()).isEmpty(), - "Expected the state to be 'Dead', with no members in the group '" + group + "'." - ); + // Create N single-threaded consumer groups from a single-partition topic + List protocolConsumerGroupExecutors = new ArrayList<>(); + List groups = new ArrayList<>(); + try { + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP_PREFIX + groupProtocol.name() + "." + String.join("", describeType); + groups.add(group); + protocolConsumerGroupExecutors.add(consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap())); + } + int expectedNumLines = DESCRIBE_TYPES.size() * 2; + for (List describeType : DESCRIBE_TYPES) { + try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--all-groups"})) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count(); + return (numLines == expectedNumLines) && res.getValue().isEmpty(); + }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + } finally { + for (AutoCloseable protocolConsumerGroupExecutor : protocolConsumerGroupExecutors) { + protocolConsumerGroupExecutor.close(); + } + // remove previous consumer groups, so we can have a clean cluster for next consumer group protocol test. + deleteConsumerGroups(groups); + deleteTopic(topic); + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeExistingGroup(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeOffsetsOfExistingGroup(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - for (List describeType : DESCRIBE_TYPES) { - String group = GROUP + String.join("", describeType); // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, TOPIC, group, groupProtocol); - List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group)); - cgcArgs.addAll(describeType); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); - - TestUtils.waitForCondition(() -> { - Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty(); - }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); - } - } + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + Entry, Optional>> groupOffsets = service.collectGroupOffsets(group); + Optional state = groupOffsets.getKey(); + Optional> assignments = groupOffsets.getValue(); - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeExistingGroups(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); + Predicate isGrp = s -> Objects.equals(s.group, group); - // Create N single-threaded consumer groups from a single-partition topic - List groups = new ArrayList<>(); + boolean res = state.map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && + assignments.isPresent() && + assignments.get().stream().filter(isGrp).count() == 1; - for (List describeType : DESCRIBE_TYPES) { - String group = GROUP + String.join("", describeType); - addConsumerGroupExecutor(1, TOPIC, group, groupProtocol); - groups.addAll(Arrays.asList("--group", group)); - } + if (!res) + return false; - int expectedNumLines = DESCRIBE_TYPES.size() * 2; + Optional maybePartitionState = assignments.get().stream().filter(isGrp).findFirst(); + if (!maybePartitionState.isPresent()) + return false; - for (List describeType : DESCRIBE_TYPES) { - List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe")); - cgcArgs.addAll(groups); - cgcArgs.addAll(describeType); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + PartitionAssignmentState partitionState = maybePartitionState.get(); - TestUtils.waitForCondition(() -> { - Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count(); - return (numLines == expectedNumLines) && res.getValue().isEmpty(); - }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + return !partitionState.consumerId.map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && + !partitionState.clientId.map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && + !partitionState.host.map(h -> h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); + }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group " + group + "."); + } } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeAllExistingGroups(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeMembersOfExistingGroup(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - // Create N single-threaded consumer groups from a single-partition topic - for (List describeType : DESCRIBE_TYPES) { - String group = GROUP + String.join("", describeType); - addConsumerGroupExecutor(1, TOPIC, group, groupProtocol); + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}); + Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers())) + ) { + TestUtils.waitForCondition(() -> { + ConsumerGroupDescription consumerGroupDescription = admin.describeConsumerGroups(Collections.singleton(group)).describedGroups().get(group).get(); + return consumerGroupDescription.members().size() == 1 && consumerGroupDescription.members().iterator().next().assignment().topicPartitions().size() == 1; + }, "Expected a 'Stable' group status, rows and valid member information for group " + group + "."); + + Entry, Optional>> res = service.collectGroupMembers(group, true); + + assertTrue(res.getValue().isPresent()); + assertTrue(res.getValue().get().size() == 1 && res.getValue().get().iterator().next().assignment.size() == 1, + "Expected a topic partition assigned to the single group member for group " + group); + } } + } - int expectedNumLines = DESCRIBE_TYPES.size() * 2; - - for (List describeType : DESCRIBE_TYPES) { - List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--all-groups")); - cgcArgs.addAll(describeType); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + @ClusterTemplate("generator") + public void testDescribeStateOfExistingGroup(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - TestUtils.waitForCondition(() -> { - Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(s -> !s.isEmpty()).count(); - return (numLines == expectedNumLines) && res.getValue().isEmpty(); - }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.singletonMap(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "range")); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + GroupState state = service.collectGroupState(group); + return Objects.equals(state.state, ConsumerGroupState.STABLE) && + state.numMembers == 1 && + state.coordinator != null && + clusterInstance.brokerIds().contains(state.coordinator.id()); + }, "Expected a 'Stable' group status, with one member for group " + group + "."); + } } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeOffsetsOfExistingGroup(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - TestUtils.waitForCondition(() -> { - Entry, Optional>> groupOffsets = service.collectGroupOffsets(GROUP); - Optional state = groupOffsets.getKey(); - Optional> assignments = groupOffsets.getValue(); - - Predicate isGrp = s -> Objects.equals(s.group, GROUP); - - boolean res = state.map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && - assignments.isPresent() && - assignments.get().stream().filter(isGrp).count() == 1; - - if (!res) - return false; + @ClusterTemplate("generator") + public void testDescribeStateOfExistingGroupWithNonDefaultAssignor(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - Optional maybePartitionState = assignments.get().stream().filter(isGrp).findFirst(); - if (!maybePartitionState.isPresent()) - return false; - - PartitionAssignmentState partitionState = maybePartitionState.get(); - - return !partitionState.consumerId.map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && - !partitionState.clientId.map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && - !partitionState.host.map(h -> h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); - }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group " + GROUP + "."); + // run one consumer in the group consuming from a single-partition topic + AutoCloseable protocolConsumerGroupExecutor = null; + try { + String expectedName; + if (groupProtocol.equals(GroupProtocol.CONSUMER)) { + protocolConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CONSUMER, group, topic, Collections.singletonMap(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "range")); + expectedName = RangeAssignor.RANGE_ASSIGNOR_NAME; + } else { + protocolConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, group, topic, Collections.singletonMap(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName())); + expectedName = RoundRobinAssignor.ROUNDROBIN_ASSIGNOR_NAME; + } + + try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})) { + TestUtils.waitForCondition(() -> { + GroupState state = service.collectGroupState(group); + return Objects.equals(state.state, ConsumerGroupState.STABLE) && + state.numMembers == 1 && + Objects.equals(state.assignmentStrategy, expectedName) && + state.coordinator != null && + clusterInstance.brokerIds().contains(state.coordinator.id()); + }, "Expected a 'Stable' group status, with one member and " + expectedName + " assignment strategy for group " + group + "."); + } + } finally { + if (protocolConsumerGroupExecutor != null) { + protocolConsumerGroupExecutor.close(); + } + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeMembersOfExistingGroup(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - TestUtils.waitForCondition(() -> { - Entry, Optional>> groupMembers = service.collectGroupMembers(GROUP, false); - Optional state = groupMembers.getKey(); - Optional> assignments = groupMembers.getValue(); - - Predicate isGrp = s -> Objects.equals(s.group, GROUP); - - boolean res = state.map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && - assignments.isPresent() && - assignments.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 1; - - if (!res) - return false; - - Optional maybeAssignmentState = assignments.get().stream().filter(isGrp).findFirst(); - if (!maybeAssignmentState.isPresent()) - return false; - - MemberAssignmentState assignmentState = maybeAssignmentState.get(); - - return !Objects.equals(assignmentState.consumerId, ConsumerGroupCommand.MISSING_COLUMN_VALUE) && - !Objects.equals(assignmentState.clientId, ConsumerGroupCommand.MISSING_COLUMN_VALUE) && - !Objects.equals(assignmentState.host, ConsumerGroupCommand.MISSING_COLUMN_VALUE); - }, "Expected a 'Stable' group status, rows and valid member information for group " + GROUP + "."); - - Entry, Optional>> res = service.collectGroupMembers(GROUP, true); - - if (res.getValue().isPresent()) { - assertTrue(res.getValue().get().size() == 1 && res.getValue().get().iterator().next().assignment.size() == 1, - "Expected a topic partition assigned to the single group member for group " + GROUP); - } else { - fail("Expected partition assignments for members of group " + GROUP); + @ClusterTemplate("generator") + public void testDescribeExistingGroupWithNoMembers(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + createTopic(topic); + + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP_PREFIX + groupProtocol.name() + String.join("", describeType); + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty(); + }, "Expected describe group results with one data row for describe type '" + String.join(" ", describeType) + "'"); + + protocolConsumerGroupExecutor.close(); + TestUtils.waitForCondition( + () -> ToolsTestUtils.grabConsoleError(describeGroups(service)).contains("Consumer group '" + group + "' has no active members."), + "Expected no active member in describe group results with describe type " + String.join(" ", describeType)); + } + } } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeStateOfExistingGroup(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor( - 1, - groupProtocol, - // This is only effective when new protocol is used. - Optional.of("range") - ); - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(GROUP); - return Objects.equals(state.state, ConsumerGroupState.STABLE) && - state.numMembers == 1 && - Objects.equals(state.assignmentStrategy, "range") && - state.coordinator != null && - brokers().count(s -> s.config().brokerId() == state.coordinator.id()) > 0; - }, "Expected a 'Stable' group status, with one member and round robin assignment strategy for group " + GROUP + "."); - } - - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeStateOfExistingGroupWithNonDefaultAssignor(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - - // run one consumer in the group consuming from a single-partition topic - String expectedName; - if (groupProtocol.equals("consumer")) { - addConsumerGroupExecutor(1, groupProtocol, Optional.of("range")); - expectedName = "range"; - } else { - addConsumerGroupExecutor(1, TOPIC, GROUP, RoundRobinAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol); - expectedName = "roundrobin"; - } - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(GROUP); - return Objects.equals(state.state, ConsumerGroupState.STABLE) && - state.numMembers == 1 && - Objects.equals(state.assignmentStrategy, expectedName) && - state.coordinator != null && - brokers().count(s -> s.config().brokerId() == state.coordinator.id()) > 0; - }, "Expected a 'Stable' group status, with one member and " + expectedName + " assignment strategy for group " + GROUP + "."); - } - - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeOffsetsOfExistingGroupWithNoMembers(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - for (List describeType : DESCRIBE_TYPES) { - String group = GROUP + String.join("", describeType); // run one consumer in the group consuming from a single-partition topic - ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, groupProtocol); - List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group)); - cgcArgs.addAll(describeType); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); - - TestUtils.waitForCondition(() -> { - Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty(); - }, "Expected describe group results with one data row for describe type '" + String.join(" ", describeType) + "'"); - - // stop the consumer so the group has no active member anymore - executor.shutdown(); - TestUtils.waitForCondition( - () -> ToolsTestUtils.grabConsoleError(describeGroups(service)).contains("Consumer group '" + group + "' has no active members."), - "Expected no active member in describe group results with describe type " + String.join(" ", describeType)); + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + Entry, Optional>> res = service.collectGroupOffsets(group); + return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) + && res.getValue().map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, group) && assignment.offset.isPresent())).orElse(false); + }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); + + // stop the consumer so the group has no active member anymore + protocolConsumerGroupExecutor.close(); + + TestUtils.waitForCondition(() -> { + Entry, Optional>> offsets = service.collectGroupOffsets(group); + Optional state = offsets.getKey(); + Optional> assignments = offsets.getValue(); + List testGroupAssignments = assignments.get().stream().filter(a -> Objects.equals(a.group, group)).collect(Collectors.toList()); + PartitionAssignmentState assignment = testGroupAssignments.get(0); + return state.map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) && + testGroupAssignments.size() == 1 && + assignment.consumerId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // the member should be gone + assignment.clientId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && + assignment.host.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); + }, "failed to collect group offsets"); + } } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeOffsetsOfExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - - // run one consumer in the group consuming from a single-partition topic - ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, GROUP, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), true, groupProtocol); - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + @ClusterTemplate("generator") + public void testDescribeMembersOfExistingGroupWithNoMembers(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupOffsets(GROUP); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) - && res.getValue().map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, GROUP) && assignment.offset.isPresent())).orElse(false); - }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); - - // stop the consumer so the group has no active member anymore - executor.shutdown(); - - TestUtils.waitForCondition(() -> { - Entry, Optional>> offsets = service.collectGroupOffsets(GROUP); - Optional state = offsets.getKey(); - Optional> assignments = offsets.getValue(); - List testGroupAssignments = assignments.get().stream().filter(a -> Objects.equals(a.group, GROUP)).collect(Collectors.toList()); - PartitionAssignmentState assignment = testGroupAssignments.get(0); - return state.map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) && - testGroupAssignments.size() == 1 && - assignment.consumerId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // the member should be gone - assignment.clientId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && - assignment.host.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); - }, "failed to collect group offsets"); + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + Entry, Optional>> res = service.collectGroupMembers(group, false); + return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) + && res.getValue().map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, group))).orElse(false); + }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); + + // stop the consumer so the group has no active member anymore + protocolConsumerGroupExecutor.close(); + + TestUtils.waitForCondition(() -> { + Entry, Optional>> res = service.collectGroupMembers(group, false); + return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty(); + }, "Expected no member in describe group members results for group '" + group + "'"); + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeMembersOfExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - - // run one consumer in the group consuming from a single-partition topic - ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, groupProtocol); - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupMembers(GROUP, false); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) - && res.getValue().map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, GROUP))).orElse(false); - }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); + @ClusterTemplate("generator") + public void testDescribeStateOfExistingGroupWithNoMembers(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - // stop the consumer so the group has no active member anymore - executor.shutdown(); - - TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupMembers(GROUP, false); - return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty(); - }, "Expected no member in describe group members results for group '" + GROUP + "'"); + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + GroupState state = service.collectGroupState(group); + return Objects.equals(state.state, ConsumerGroupState.STABLE) && + state.numMembers == 1 && + state.coordinator != null && + clusterInstance.brokerIds().contains(state.coordinator.id()); + }, "Expected the group to initially become stable, and have a single member."); + + // stop the consumer so the group has no active member anymore + protocolConsumerGroupExecutor.close(); + + TestUtils.waitForCondition(() -> { + GroupState state = service.collectGroupState(group); + return Objects.equals(state.state, ConsumerGroupState.EMPTY) && state.numMembers == 0; + }, "Expected the group to become empty after the only member leaving."); + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeStateOfExistingGroupWithNoMembers(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - - // run one consumer in the group consuming from a single-partition topic - ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, groupProtocol); - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(GROUP); - return Objects.equals(state.state, ConsumerGroupState.STABLE) && - state.numMembers == 1 && - state.coordinator != null && - brokers().count(s -> s.config().brokerId() == state.coordinator.id()) > 0; - }, "Expected the group '" + GROUP + "' to initially become stable, and have a single member."); - - // stop the consumer so the group has no active member anymore - executor.shutdown(); - - TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(GROUP); - return Objects.equals(state.state, ConsumerGroupState.EMPTY) && state.numMembers == 0; - }, "Expected the group '" + GROUP + "' to become empty after the only member leaving."); + @ClusterTemplate("generator") + public void testDescribeWithConsumersWithoutAssignedPartitions(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + createTopic(topic); + + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP_PREFIX + groupProtocol.name() + String.join("", describeType); + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group)); + cgcArgs.addAll(describeType); + // run two consumers in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0])) + ) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + int expectedNumRows = DESCRIBE_TYPE_MEMBERS.contains(describeType) ? 3 : 2; + return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows; + }, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'"); + } + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeOffsetsWithConsumersWithoutAssignedPartitions(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - for (List describeType : DESCRIBE_TYPES) { - String group = GROUP + String.join("", describeType); // run two consumers in the group consuming from a single-partition topic - addConsumerGroupExecutor(2, TOPIC, group, groupProtocol); - List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group)); - cgcArgs.addAll(describeType); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); - - TestUtils.waitForCondition(() -> { - Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - int expectedNumRows = DESCRIBE_TYPE_MEMBERS.contains(describeType) ? 3 : 2; - return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows; - }, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'"); + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + Entry, Optional>> res = service.collectGroupOffsets(group); + return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).isPresent() && + res.getValue().isPresent() && + res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 1 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 1; + }, "Expected rows for consumers with no assigned partitions in describe group results"); + } } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeOffsetsWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeMembersWithConsumersWithoutAssignedPartitions(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - // run two consumers in the group consuming from a single-partition topic - addConsumerGroupExecutor(2, groupProtocol); - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupOffsets(GROUP); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).isPresent() && - res.getValue().isPresent() && - res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 1 && - res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 1; - }, "Expected rows for consumers with no assigned partitions in describe group results"); + // run two consumers in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + Entry, Optional>> res = service.collectGroupMembers(group, false); + return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && + res.getValue().isPresent() && + res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 1).count() == 1 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 0).count() == 1 && + res.getValue().get().stream().allMatch(s -> s.assignment.isEmpty()); + }, "Expected rows for consumers with no assigned partitions in describe group results"); + + Entry, Optional>> res = service.collectGroupMembers(group, true); + assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) + && res.getValue().map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false), + "Expected additional columns in verbose version of describe members"); + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeMembersWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - - // run two consumers in the group consuming from a single-partition topic - addConsumerGroupExecutor(2, groupProtocol); - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupMembers(GROUP, false); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && - res.getValue().isPresent() && - res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 && - res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 1 && - res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 0).count() == 1 && - res.getValue().get().stream().allMatch(s -> s.assignment.isEmpty()); - }, "Expected rows for consumers with no assigned partitions in describe group results"); + @ClusterTemplate("generator") + public void testDescribeStateWithConsumersWithoutAssignedPartitions(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - Entry, Optional>> res = service.collectGroupMembers(GROUP, true); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) - && res.getValue().map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false), - "Expected additional columns in verbose version of describe members"); + // run two consumers in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + GroupState state = service.collectGroupState(group); + return Objects.equals(state.state, ConsumerGroupState.STABLE) && state.numMembers == 2; + }, "Expected two consumers in describe group results"); + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeStateWithConsumersWithoutAssignedPartitions(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - - // run two consumers in the group consuming from a single-partition topic - addConsumerGroupExecutor(2, groupProtocol); - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - - TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(GROUP); - return Objects.equals(state.state, ConsumerGroupState.STABLE) && state.numMembers == 2; - }, "Expected two consumers in describe group results"); + @ClusterTemplate("generator") + public void testDescribeWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + createTopic(topic, 2); + + for (List describeType : DESCRIBE_TYPES) { + String group = GROUP_PREFIX + groupProtocol.name() + String.join("", describeType); + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group)); + cgcArgs.addAll(describeType); + // run two consumers in the group consuming from a two-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0])) + ) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + int expectedNumRows = DESCRIBE_TYPE_STATE.contains(describeType) ? 2 : 3; + return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows; + }, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'"); + } + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - String topic2 = "foo2"; - createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic, 2); - for (List describeType : DESCRIBE_TYPES) { - String group = GROUP + String.join("", describeType); // run two consumers in the group consuming from a two-partition topic - addConsumerGroupExecutor(2, topic2, group, groupProtocol); - List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group)); - cgcArgs.addAll(describeType); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); - - TestUtils.waitForCondition(() -> { - Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - int expectedNumRows = DESCRIBE_TYPE_STATE.contains(describeType) ? 2 : 3; - return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows; - }, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'"); + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + Entry, Optional>> res = service.collectGroupOffsets(group); + return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && + res.getValue().isPresent() && + res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 2 && + res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, group) && !x.partition.isPresent()); + }, "Expected two rows (one row per consumer) in describe group results."); + } } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - String topic2 = "foo2"; - createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic, 2); - // run two consumers in the group consuming from a two-partition topic - addConsumerGroupExecutor(2, topic2, GROUP, groupProtocol); + // run two consumers in the group consuming from a two-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + Entry, Optional>> res = service.collectGroupMembers(group, false); + return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && + res.getValue().isPresent() && + res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 1).count() == 2 && + res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, group) && x.numPartitions == 0); + }, "Expected two rows (one row per consumer) in describe group members results."); + + Entry, Optional>> res = service.collectGroupMembers(group, true); + assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0, + "Expected additional columns in verbose version of describe members"); + } + } + } - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + @ClusterTemplate("generator") + public void testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic, 2); - TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupOffsets(GROUP); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && - res.getValue().isPresent() && - res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 && - res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 2 && - res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, GROUP) && !x.partition.isPresent()); - }, "Expected two rows (one row per consumer) in describe group results."); + // run two consumers in the group consuming from a two-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + GroupState state = service.collectGroupState(group); + return Objects.equals(state.state, ConsumerGroupState.STABLE) && Objects.equals(state.group, group) && state.numMembers == 2; + }, "Expected a stable group with two members in describe group state result."); + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - String topic2 = "foo2"; - createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); + @ClusterTemplate("generator") + public void testDescribeSimpleConsumerGroup(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + // Ensure that the offsets of consumers which don't use group management are still displayed + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic, 2); + + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, group, new HashSet<>(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1))), Collections.emptyMap()); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + Entry, Optional>> res = service.collectGroupOffsets(group); + return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) + && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2; + }, "Expected a stable group with two members in describe group state result."); + } + } + } - // run two consumers in the group consuming from a two-partition topic - addConsumerGroupExecutor(2, topic2, GROUP, groupProtocol); + @ClusterTemplate("generator") + public void testDescribeGroupWithShortInitializationTimeout(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + createTopic(topic); - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't + // complete before the timeout expires + List describeType = DESCRIBE_TYPES.get(RANDOM.nextInt(DESCRIBE_TYPES.size())); + String group = GROUP_PREFIX + groupProtocol.name() + String.join("", describeType); - TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupMembers(GROUP, false); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && - res.getValue().isPresent() && - res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 && - res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 2 && - res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 0); - }, "Expected two rows (one row per consumer) in describe group members results."); + // set the group initialization timeout too low for the group to stabilize + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--timeout", "1", "--group", group)); + cgcArgs.addAll(describeType); - Entry, Optional>> res = service.collectGroupMembers(GROUP, true); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0, - "Expected additional columns in verbose version of describe members"); + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0])) + ) { + ExecutionException e = assertThrows(ExecutionException.class, service::describeGroups); + assertInstanceOf(TimeoutException.class, e.getCause()); + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); - String topic2 = "foo2"; - createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); - - // run two consumers in the group consuming from a two-partition topic - addConsumerGroupExecutor(2, topic2, GROUP, groupProtocol); + @ClusterTemplate("generator") + public void testDescribeGroupOffsetsWithShortInitializationTimeout(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't + // complete before the timeout expires - TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(GROUP); - return Objects.equals(state.state, ConsumerGroupState.STABLE) && Objects.equals(state.group, GROUP) && state.numMembers == 2; - }, "Expected a stable group with two members in describe group state result."); + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + // set the group initialization timeout too low for the group to stabilize + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--timeout", "1"}) + ) { + Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupOffsets(group)); + assertEquals(TimeoutException.class, e.getCause().getClass()); + } + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft", "kraft+kip848"}) - public void testDescribeSimpleConsumerGroup(String quorum) throws Exception { - // Ensure that the offsets of consumers which don't use group management are still displayed - - createOffsetsTopic(listenerName(), new Properties()); - String topic2 = "foo2"; - createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); - addSimpleGroupExecutor(Arrays.asList(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)), GROUP); + @ClusterTemplate("generator") + public void testDescribeGroupMembersWithShortInitializationTimeout(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't + // complete before the timeout expires - TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupOffsets(GROUP); - return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) - && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2; - }, "Expected a stable group with two members in describe group state result."); + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + // set the group initialization timeout too low for the group to stabilize + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--timeout", "1"}) + ) { + Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupMembers(group, false)); + assertEquals(TimeoutException.class, e.getCause().getClass()); + e = assertThrows(ExecutionException.class, () -> service.collectGroupMembers(group, true)); + assertEquals(TimeoutException.class, e.getCause().getClass()); + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeGroupWithShortInitializationTimeout(String quorum, String groupProtocol) { - // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't - // complete before the timeout expires + @ClusterTemplate("generator") + public void testDescribeGroupStateWithShortInitializationTimeout(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - List describeType = DESCRIBE_TYPES.get(RANDOM.nextInt(DESCRIBE_TYPES.size())); - String group = GROUP + String.join("", describeType); - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); - // set the group initialization timeout too low for the group to stabilize - List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--timeout", "1", "--group", group)); - cgcArgs.addAll(describeType); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); + // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't + // complete before the timeout expires - ExecutionException e = assertThrows(ExecutionException.class, service::describeGroups); - assertInstanceOf(TimeoutException.class, e.getCause()); + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()); + // set the group initialization timeout too low for the group to stabilize + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--timeout", "1"}) + ) { + Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupState(group)); + assertEquals(TimeoutException.class, e.getCause().getClass()); + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeGroupOffsetsWithShortInitializationTimeout(String quorum, String groupProtocol) { - // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't - // complete before the timeout expires + @ClusterTemplate("generator") + public void testDescribeNonOffsetCommitGroup(ClusterInstance clusterInstance) throws Exception { + this.clusterInstance = clusterInstance; + for (GroupProtocol groupProtocol: clusterInstance.supportedGroupProtocols()) { + String topic = TOPIC_PREFIX + groupProtocol.name(); + String group = GROUP_PREFIX + groupProtocol.name(); + createTopic(topic); - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); + // run one consumer in the group consuming from a single-partition topic + try (AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, group, topic, Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")); + ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) + ) { + TestUtils.waitForCondition(() -> { + Entry, Optional>> groupOffsets = service.collectGroupOffsets(group); - // set the group initialization timeout too low for the group to stabilize - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--timeout", "1"}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + Predicate isGrp = s -> Objects.equals(s.group, group); - Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupOffsets(GROUP)); - assertEquals(TimeoutException.class, e.getCause().getClass()); - } + boolean res = groupOffsets.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && + groupOffsets.getValue().isPresent() && + groupOffsets.getValue().get().stream().filter(isGrp).count() == 1; - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeGroupMembersWithShortInitializationTimeout(String quorum, String groupProtocol) { - // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't - // complete before the timeout expires + if (!res) + return false; - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); + Optional maybeAssignmentState = groupOffsets.getValue().get().stream().filter(isGrp).findFirst(); + if (!maybeAssignmentState.isPresent()) + return false; - // set the group initialization timeout too low for the group to stabilize - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--timeout", "1"}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + PartitionAssignmentState assignmentState = maybeAssignmentState.get(); - Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupMembers(GROUP, false)); - assertEquals(TimeoutException.class, e.getCause().getClass()); - e = assertThrows(ExecutionException.class, () -> service.collectGroupMembers(GROUP, true)); - assertEquals(TimeoutException.class, e.getCause().getClass()); + return assignmentState.consumerId.map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && + assignmentState.clientId.map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && + assignmentState.host.map(h -> !h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); + }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group " + group + "."); + } + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeGroupStateWithShortInitializationTimeout(String quorum, String groupProtocol) { - // Let creation of the offsets topic happen during group initialization to ensure that initialization doesn't - // complete before the timeout expires - - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, groupProtocol); + @Test + public void testDescribeWithUnrecognizedNewConsumerOption() { + String group = GROUP_PREFIX + "unrecognized"; + String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", "localhost:9092", "--describe", "--group", group}; + assertThrows(joptsimple.OptionException.class, () -> ConsumerGroupCommandOptions.fromArgs(cgcArgs)); + } - // set the group initialization timeout too low for the group to stabilize - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP, "--timeout", "1"}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + @Test + public void testDescribeWithMultipleSubActions() { + String group = GROUP_PREFIX + "multiple.sub.actions"; + AtomicInteger exitStatus = new AtomicInteger(0); + AtomicReference exitMessage = new AtomicReference<>(""); + Exit.setExitProcedure((status, err) -> { + exitStatus.set(status); + exitMessage.set(err); + throw new RuntimeException(); + }); + String[] cgcArgs = new String[]{"--bootstrap-server", "localhost:9092", "--describe", "--group", group, "--members", "--state"}; + try { + assertThrows(RuntimeException.class, () -> ConsumerGroupCommand.main(cgcArgs)); + } finally { + Exit.resetExitProcedure(); + } + assertEquals(1, exitStatus.get()); + assertTrue(exitMessage.get().contains("Option [describe] takes at most one of these options")); + } - Throwable e = assertThrows(ExecutionException.class, () -> service.collectGroupState(GROUP)); - assertEquals(TimeoutException.class, e.getCause().getClass()); + @Test + public void testDescribeWithStateValue() { + AtomicInteger exitStatus = new AtomicInteger(0); + AtomicReference exitMessage = new AtomicReference<>(""); + Exit.setExitProcedure((status, err) -> { + exitStatus.set(status); + exitMessage.set(err); + throw new RuntimeException(); + }); + String[] cgcArgs = new String[]{"--bootstrap-server", "localhost:9092", "--describe", "--all-groups", "--state", "Stable"}; + try { + assertThrows(RuntimeException.class, () -> ConsumerGroupCommand.main(cgcArgs)); + } finally { + Exit.resetExitProcedure(); + } + assertEquals(1, exitStatus.get()); + assertTrue(exitMessage.get().contains("Option [describe] does not take a value for [state]")); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDescribeWithUnrecognizedNewConsumerOption(String quorum) { - String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - assertThrows(joptsimple.OptionException.class, () -> getConsumerGroupService(cgcArgs)); + @Test + public void testPrintVersion() { + ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); + Exit.setExitProcedure(exitProcedure); + try { + String out = ToolsTestUtils.captureStandardOut(() -> ConsumerGroupCommandOptions.fromArgs(new String[]{"--version"})); + assertEquals(0, exitProcedure.statusCode()); + assertEquals(AppInfoParser.getVersion(), out); + } finally { + Exit.resetExitProcedure(); + } } - @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) - @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"}) - public void testDescribeNonOffsetCommitGroup(String quorum, String groupProtocol) throws Exception { - createOffsetsTopic(listenerName(), new Properties()); + private static ConsumerGroupCommand.ConsumerGroupService consumerGroupService(String[] args) { + return new ConsumerGroupCommand.ConsumerGroupService( + ConsumerGroupCommandOptions.fromArgs(args), + Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) + ); + } - Properties customProps = new Properties(); - // create a consumer group that never commits offsets - customProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - // run one consumer in the group consuming from a single-partition topic - addConsumerGroupExecutor(1, TOPIC, GROUP, RangeAssignor.class.getName(), Optional.empty(), Optional.of(customProps), false, groupProtocol); + private void createTopic(String topic) { + createTopic(topic, 1); + } - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", GROUP}; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + private void createTopic(String topic, int numPartitions) { + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + Assertions.assertDoesNotThrow(() -> admin.createTopics(Collections.singletonList(new NewTopic(topic, numPartitions, (short) 1))).topicId(topic).get()); + } + } - TestUtils.waitForCondition(() -> { - Entry, Optional>> groupOffsets = service.collectGroupOffsets(GROUP); + private void deleteConsumerGroups(Collection groupIds) { + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + Assertions.assertDoesNotThrow(() -> admin.deleteConsumerGroups(groupIds).all().get()); + } + } - Predicate isGrp = s -> Objects.equals(s.group, GROUP); + private void deleteTopic(String topic) { + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + Assertions.assertDoesNotThrow(() -> admin.deleteTopics(Collections.singletonList(topic)).topicNameValues().get(topic).get()); + } + } - boolean res = groupOffsets.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && - groupOffsets.getValue().isPresent() && - groupOffsets.getValue().get().stream().filter(isGrp).count() == 1; + private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String groupId, Set topicPartitions, Map customConfigs) { + Map configs = composeConfigs( + groupId, + protocol.name, + customConfigs + ); + return ConsumerGroupCommandTestUtils.buildConsumers( + 1, + topicPartitions, + () -> new KafkaConsumer(configs) + ); + } - if (!res) - return false; + private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String groupId, String topicName, Map customConfigs) { + return consumerGroupClosable(protocol, groupId, topicName, customConfigs, 1); + } - Optional maybeAssignmentState = groupOffsets.getValue().get().stream().filter(isGrp).findFirst(); - if (!maybeAssignmentState.isPresent()) - return false; + private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String groupId, String topicName, Map customConfigs, int numConsumers) { + Map configs = composeConfigs( + groupId, + protocol.name, + customConfigs + ); + return ConsumerGroupCommandTestUtils.buildConsumers( + numConsumers, + false, + topicName, + () -> new KafkaConsumer(configs) + ); + } - PartitionAssignmentState assignmentState = maybeAssignmentState.get(); + private Map composeConfigs(String groupId, String groupProtocol, Map customConfigs) { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); + configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); - return assignmentState.consumerId.map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && - assignmentState.clientId.map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && - assignmentState.host.map(h -> !h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); - }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group " + GROUP + "."); + configs.putAll(customConfigs); + return configs; } private Runnable describeGroups(ConsumerGroupCommand.ConsumerGroupService service) { - return () -> { - try { - service.describeGroups(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; + return () -> Assertions.assertDoesNotThrow(service::describeGroups); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java index 7f8dc536c935..ed13dfffdb1e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java @@ -45,7 +45,6 @@ import scala.collection.JavaConverters; import scala.collection.Seq; -import static org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTest.seq; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -55,7 +54,7 @@ public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { public static final int NUM_PARTITIONS = 1; public static final int BROKER_COUNT = 1; public static final String KAFKA_CLIENT_SASL_MECHANISM = "SCRAM-SHA-256"; - private static final Seq KAFKA_SERVER_SASL_MECHANISMS = seq(Collections.singletonList(KAFKA_CLIENT_SASL_MECHANISM)); + private static final Seq KAFKA_SERVER_SASL_MECHANISMS = JavaConverters.asScalaIteratorConverter(Collections.singletonList(KAFKA_CLIENT_SASL_MECHANISM).iterator()).asScala().toSeq(); @SuppressWarnings({"deprecation"}) private Consumer createConsumer() {