diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index bd6322fd79dc..db2215a9dd37 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -35,13 +35,14 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtendWith; -import java.util.Arrays; -import java.util.HashMap; import java.util.Map; -import java.util.Set; -import java.util.HashSet; +import java.util.List; import java.util.Collections; import java.util.concurrent.ExecutionException; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; +import java.util.Arrays; import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; @@ -62,10 +63,10 @@ public class ClusterTestExtensionsTest { } // Static methods can generate cluster configurations - static void generate1(ClusterGenerator clusterGenerator) { + static List generate1() { Map serverProperties = new HashMap<>(); serverProperties.put("foo", "bar"); - clusterGenerator.accept(ClusterConfig.defaultBuilder() + return Collections.singletonList(ClusterConfig.defaultBuilder() .setTypes(Collections.singleton(Type.ZK)) .setServerProperties(serverProperties) .setTags(Collections.singletonList("Generated Test")) diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java index 3d35b30ebd25..f93e5753bfe4 100644 --- a/core/src/test/java/kafka/test/annotation/Type.java +++ b/core/src/test/java/kafka/test/annotation/Type.java @@ -22,7 +22,6 @@ import kafka.test.junit.ZkClusterInvocationContext; import org.junit.jupiter.api.extension.TestTemplateInvocationContext; -import java.util.function.Consumer; /** * The type of cluster config being requested. Used by {@link kafka.test.ClusterConfig} and the test annotations. @@ -30,22 +29,22 @@ public enum Type { KRAFT { @Override - public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer invocationConsumer) { - invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, false)); + public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) { + return new RaftClusterInvocationContext(baseDisplayName, config, false); } }, CO_KRAFT { @Override - public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer invocationConsumer) { - invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, true)); + public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) { + return new RaftClusterInvocationContext(baseDisplayName, config, true); } }, ZK { @Override - public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer invocationConsumer) { - invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config)); + public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) { + return new ZkClusterInvocationContext(baseDisplayName, config); } }; - public abstract void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer invocationConsumer); + public abstract TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config); } diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java index 2524c41ebafb..2290a0a99eb1 100644 --- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java +++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java @@ -18,7 +18,6 @@ package kafka.test.junit; import kafka.test.ClusterConfig; -import kafka.test.ClusterGenerator; import kafka.test.annotation.ClusterTest; import kafka.test.annotation.ClusterTestDefaults; import kafka.test.annotation.ClusterTests; @@ -32,7 +31,6 @@ import org.junit.platform.commons.util.ReflectionUtils; import java.lang.reflect.Method; -import java.util.function.Consumer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -94,24 +92,19 @@ public Stream provideTestTemplateInvocationContex // Process the @ClusterTemplate annotation ClusterTemplate clusterTemplateAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class); if (clusterTemplateAnnot != null) { - processClusterTemplate(context, clusterTemplateAnnot, generatedContexts::add); - if (generatedContexts.isEmpty()) { - throw new IllegalStateException("ClusterConfig generator method should provide at least one config"); - } + generatedContexts.addAll(processClusterTemplate(context, clusterTemplateAnnot)); } // Process single @ClusterTest annotation ClusterTest clusterTestAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTest.class); if (clusterTestAnnot != null) { - processClusterTest(context, clusterTestAnnot, defaults, generatedContexts::add); + generatedContexts.addAll(processClusterTest(context, clusterTestAnnot, defaults)); } // Process multiple @ClusterTest annotation within @ClusterTests ClusterTests clusterTestsAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTests.class); if (clusterTestsAnnot != null) { - for (ClusterTest annot : clusterTestsAnnot.value()) { - processClusterTest(context, annot, defaults, generatedContexts::add); - } + generatedContexts.addAll(processClusterTests(context, clusterTestsAnnot, defaults)); } if (generatedContexts.isEmpty()) { @@ -122,31 +115,54 @@ public Stream provideTestTemplateInvocationContex return generatedContexts.stream(); } - void processClusterTemplate(ExtensionContext context, ClusterTemplate annot, - Consumer testInvocations) { - // If specified, call cluster config generated method (must be static) - List generatedClusterConfigs = new ArrayList<>(); + + + List processClusterTemplate(ExtensionContext context, ClusterTemplate annot) { if (annot.value().trim().isEmpty()) { throw new IllegalStateException("ClusterTemplate value can't be empty string."); } - generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add); String baseDisplayName = context.getRequiredTestMethod().getName(); - generatedClusterConfigs.forEach(config -> { - for (Type type: config.clusterTypes()) { - type.invocationContexts(baseDisplayName, config, testInvocations); - } - }); + List contexts = generateClusterConfigurations(context, annot.value()) + .stream().flatMap(config -> config.clusterTypes().stream() + .map(type -> type.invocationContexts(baseDisplayName, config))).collect(Collectors.toList()); + + if (contexts.isEmpty()) { + throw new IllegalStateException("ClusterConfig generator method should provide at least one config"); + } + + return contexts; } - private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) { + @SuppressWarnings("unchecked") + private List generateClusterConfigurations(ExtensionContext context, String generateClustersMethods) { Object testInstance = context.getTestInstance().orElse(null); - Method method = ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), generateClustersMethods, ClusterGenerator.class); - ReflectionUtils.invokeMethod(method, testInstance, generator); + Method method = ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), generateClustersMethods); + return (List) ReflectionUtils.invokeMethod(method, testInstance); } - private void processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults, - Consumer testInvocations) { + private List processClusterTests(ExtensionContext context, ClusterTests annots, ClusterTestDefaults defaults) { + + List ret = Arrays.stream(annots.value()) + .flatMap(annot -> processClusterTestInternal(context, annot, defaults).stream()).collect(Collectors.toList()); + + if (ret.isEmpty()) { + throw new IllegalStateException("processClusterTests method should provide at least one config"); + } + + return ret; + } + + private List processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults) { + List ret = processClusterTestInternal(context, annot, defaults); + + if (ret.isEmpty()) { + throw new IllegalStateException("processClusterTest method should provide at least one config"); + } + + return ret; + } + private List processClusterTestInternal(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults) { Type[] types = annot.types().length == 0 ? defaults.types() : annot.types(); Map serverProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties())) .filter(e -> e.id() == -1) @@ -169,9 +185,9 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu .setMetadataVersion(annot.metadataVersion()) .setTags(Arrays.asList(annot.tags())) .build(); - for (Type type : types) { - type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations); - } + + return Arrays.stream(types).map(type -> type.invocationContexts(context.getRequiredTestMethod().getName(), config)) + .collect(Collectors.toList()); } private ClusterTestDefaults getClusterTestDefaults(Class testClass) { diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java index 9e61556dc3f9..7a1ae920a6f4 100644 --- a/core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java +++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java @@ -22,27 +22,24 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.TestTemplateInvocationContext; -import java.util.function.Consumer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ClusterTestExtensionsUnitTest { @Test - @SuppressWarnings("unchecked") void testProcessClusterTemplate() { ClusterTestExtensions ext = new ClusterTestExtensions(); ExtensionContext context = mock(ExtensionContext.class); - Consumer testInvocations = mock(Consumer.class); + ClusterTemplate annot = mock(ClusterTemplate.class); when(annot.value()).thenReturn("").thenReturn(" "); Assertions.assertThrows(IllegalStateException.class, () -> - ext.processClusterTemplate(context, annot, testInvocations) + ext.processClusterTemplate(context, annot) ); Assertions.assertThrows(IllegalStateException.class, () -> - ext.processClusterTemplate(context, annot, testInvocations) + ext.processClusterTemplate(context, annot) ); } } diff --git a/core/src/test/java/kafka/test/junit/README.md b/core/src/test/java/kafka/test/junit/README.md index 491e0de17529..1a2432fabd21 100644 --- a/core/src/test/java/kafka/test/junit/README.md +++ b/core/src/test/java/kafka/test/junit/README.md @@ -42,24 +42,27 @@ annotation takes a single string value which references a static method on the t produce any number of test configurations using a fluent builder style API. ```java -@ClusterTemplate("generateConfigs") -void testSomething() { ... } +import java.util.Arrays; -static void generateConfigs(ClusterGenerator clusterGenerator) { - clusterGenerator.accept(ClusterConfig.defaultClusterBuilder() - .name("Generated Test 1") - .serverProperties(props1) - .ibp("2.7-IV1") - .build()); - clusterGenerator.accept(ClusterConfig.defaultClusterBuilder() - .name("Generated Test 2") - .serverProperties(props2) - .ibp("2.7-IV2") - .build()); - clusterGenerator.accept(ClusterConfig.defaultClusterBuilder() - .name("Generated Test 3") - .serverProperties(props3) - .build()); +@ClusterTemplate("generateConfigs") +void testSomething() { ...} + +static List generateConfigs() { + ClusterConfig config1 = ClusterConfig.defaultClusterBuilder() + .name("Generated Test 1") + .serverProperties(props1) + .ibp("2.7-IV1") + .build(); + ClusterConfig config2 = ClusterConfig.defaultClusterBuilder() + .name("Generated Test 2") + .serverProperties(props2) + .ibp("2.7-IV2") + .build(); + ClusterConfig config3 = ClusterConfig.defaultClusterBuilder() + .name("Generated Test 3") + .serverProperties(props3) + .build(); + return Arrays.asList(config1, config2, config3); } ``` diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala index 0d84892dff69..639c095c9c9a 100644 --- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala @@ -19,7 +19,7 @@ package kafka.coordinator.transaction import kafka.network.SocketServer import kafka.server.IntegrationTestUtils -import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance} +import kafka.test.{ClusterConfig, ClusterInstance} import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.common.message.InitProducerIdRequestData @@ -38,19 +38,19 @@ import scala.concurrent.duration.DurationInt import scala.jdk.CollectionConverters._ object ProducerIdsIntegrationTest { - def uniqueProducerIdsBumpIBP(clusterGenerator: ClusterGenerator): Unit = { + def uniqueProducerIdsBumpIBP(): java.util.List[ClusterConfig] = { val serverProperties = java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8") val perBrokerProperties: java.util.Map[Integer, java.util.Map[String, String]] = java.util.Collections.singletonMap(0, java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0-IV0")) - clusterGenerator.accept(ClusterConfig.defaultBuilder() + List(ClusterConfig.defaultBuilder() .setTypes(Set(Type.ZK).asJava) .setBrokers(3) .setAutoStart(false) .setServerProperties(serverProperties) .setPerServerProperties(perBrokerProperties) - .build()) + .build()).asJava } } diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 464df8ab2ed5..5fd32d8f4bac 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -17,7 +17,7 @@ package kafka.zk import kafka.server.KRaftCachedControllerId -import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance} +import kafka.test.{ClusterConfig, ClusterInstance} import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type} import kafka.test.junit.ClusterTestExtensions import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance @@ -63,8 +63,7 @@ import scala.collection.Seq import scala.jdk.CollectionConverters._ object ZkMigrationIntegrationTest { - - def zkClustersForAllMigrationVersions(clusterGenerator: ClusterGenerator): Unit = { + def zkClustersForAllMigrationVersions(): java.util.List[ClusterConfig] = { Seq( MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_5_IV2, @@ -74,19 +73,19 @@ object ZkMigrationIntegrationTest { MetadataVersion.IBP_3_7_IV2, MetadataVersion.IBP_3_7_IV4, MetadataVersion.IBP_3_8_IV0 - ).foreach { mv => + ).map { mv => val serverProperties = new util.HashMap[String, String]() serverProperties.put("inter.broker.listener.name", "EXTERNAL") serverProperties.put("listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0") serverProperties.put("advertised.listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0") serverProperties.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - clusterGenerator.accept(ClusterConfig.defaultBuilder() + ClusterConfig.defaultBuilder() .setMetadataVersion(mv) .setBrokers(3) .setServerProperties(serverProperties) .setTypes(Set(Type.ZK).asJava) - .build()) - } + .build() + }.asJava } } diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index 625e7a1fdbd2..0f593ad43e6e 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance} +import kafka.test.{ClusterConfig, ClusterInstance} import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest @@ -26,6 +26,7 @@ import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.extension.ExtendWith +import scala.jdk.CollectionConverters._ object ApiVersionsRequestTest { @@ -39,42 +40,42 @@ object ApiVersionsRequestTest { serverProperties } - def testApiVersionsRequestTemplate(clusterGenerator: ClusterGenerator): Unit = { + def testApiVersionsRequestTemplate(): java.util.List[ClusterConfig] = { val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties() serverProperties.put("unstable.api.versions.enable", "false") serverProperties.put("unstable.metadata.versions.enable", "true") - clusterGenerator.accept(ClusterConfig.defaultBuilder() + List(ClusterConfig.defaultBuilder() .setTypes(java.util.Collections.singleton(Type.ZK)) .setServerProperties(serverProperties) .setMetadataVersion(MetadataVersion.IBP_3_8_IV0) - .build()) + .build()).asJava } - def testApiVersionsRequestIncludesUnreleasedApisTemplate(clusterGenerator: ClusterGenerator): Unit = { + def testApiVersionsRequestIncludesUnreleasedApisTemplate(): java.util.List[ClusterConfig] = { val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties() serverProperties.put("unstable.api.versions.enable", "true") serverProperties.put("unstable.metadata.versions.enable", "true") - clusterGenerator.accept(ClusterConfig.defaultBuilder() + List(ClusterConfig.defaultBuilder() .setTypes(java.util.Collections.singleton(Type.ZK)) .setServerProperties(serverProperties) - .build()) + .build()).asJava } - def testApiVersionsRequestValidationV0Template(clusterGenerator: ClusterGenerator): Unit = { + def testApiVersionsRequestValidationV0Template(): java.util.List[ClusterConfig] = { val serverProperties: java.util.HashMap[String, String] = controlPlaneListenerProperties() serverProperties.put("unstable.api.versions.enable", "false") serverProperties.put("unstable.metadata.versions.enable", "false") - clusterGenerator.accept(ClusterConfig.defaultBuilder() + List(ClusterConfig.defaultBuilder() .setTypes(java.util.Collections.singleton(Type.ZK)) .setMetadataVersion(MetadataVersion.IBP_3_7_IV4) - .build()) + .build()).asJava } - def zkApiVersionsRequest(clusterGenerator: ClusterGenerator): Unit = { - clusterGenerator.accept(ClusterConfig.defaultBuilder() + def zkApiVersionsRequest(): java.util.List[ClusterConfig] = { + List(ClusterConfig.defaultBuilder() .setTypes(java.util.Collections.singleton(Type.ZK)) .setServerProperties(controlPlaneListenerProperties()) - .build()) + .build()).asJava } } diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index adb528ee83b7..493b1c216d8e 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -22,7 +22,7 @@ import kafka.api.{KafkaSasl, SaslSetup} import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms} import kafka.test.annotation.{ClusterTemplate, Type} import kafka.test.junit.ClusterTestExtensions -import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance} +import kafka.test.{ClusterConfig, ClusterInstance} import kafka.utils.JaasTestUtils import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs @@ -44,7 +44,7 @@ object SaslApiVersionsRequestTest { val controlPlaneListenerName = "CONTROL_PLANE" val securityProtocol = SecurityProtocol.SASL_PLAINTEXT - def saslApiVersionsRequestClusterConfig(clusterGenerator: ClusterGenerator): Unit = { + def saslApiVersionsRequestClusterConfig(): java.util.List[ClusterConfig] = { val saslServerProperties = new java.util.HashMap[String, String]() saslServerProperties.put(KafkaSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, kafkaClientSaslMechanism) saslServerProperties.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(",")) @@ -59,13 +59,13 @@ object SaslApiVersionsRequestTest { serverProperties.put("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") - clusterGenerator.accept(ClusterConfig.defaultBuilder + List(ClusterConfig.defaultBuilder .setSecurityProtocol(securityProtocol) .setTypes(Set(Type.ZK).asJava) .setSaslServerProperties(saslServerProperties) .setSaslClientProperties(saslClientProperties) .setServerProperties(serverProperties) - .build()) + .build()).asJava } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java index 5b48f5f6958e..ffd33789c9c1 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java @@ -18,7 +18,6 @@ package org.apache.kafka.tools.consumer.group; import kafka.test.ClusterConfig; -import kafka.test.ClusterGenerator; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.utils.Utils; @@ -29,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Collections; +import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -50,7 +50,7 @@ class ConsumerGroupCommandTestUtils { private ConsumerGroupCommandTestUtils() { } - static void generator(ClusterGenerator clusterGenerator) { + static List generator() { Map serverProperties = new HashMap<>(); serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); @@ -61,7 +61,6 @@ static void generator(ClusterGenerator clusterGenerator) { .setServerProperties(serverProperties) .setTags(Collections.singletonList("classicGroupCoordinator")) .build(); - clusterGenerator.accept(classicGroupCoordinator); // Following are test case config with new group coordinator serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); @@ -71,7 +70,7 @@ static void generator(ClusterGenerator clusterGenerator) { .setServerProperties(serverProperties) .setTags(Collections.singletonList("newGroupCoordinator")) .build(); - clusterGenerator.accept(consumerGroupCoordinator); + return Arrays.asList(classicGroupCoordinator, consumerGroupCoordinator); } static AutoCloseable buildConsumers(int numberOfConsumers, diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java index c37a9a6a9c04..8f94f78931d7 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; -import kafka.test.ClusterGenerator; +import kafka.test.ClusterConfig; import kafka.test.ClusterInstance; import kafka.test.annotation.ClusterTemplate; import kafka.test.junit.ClusterTestExtensions; @@ -36,6 +36,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; +import java.util.List; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -67,8 +68,8 @@ @ExtendWith(value = ClusterTestExtensions.class) public class DeleteConsumerGroupsTest { - private static void generator(ClusterGenerator clusterGenerator) { - ConsumerGroupCommandTestUtils.generator(clusterGenerator); + private static List generator() { + return ConsumerGroupCommandTestUtils.generator(); } @Test diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java index c15a467a188e..0c5c9e7b937a 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.tools.consumer.group; +import kafka.test.ClusterConfig; import kafka.test.ClusterInstance; -import kafka.test.ClusterGenerator; import kafka.test.annotation.ClusterTemplate; import kafka.test.junit.ClusterTestExtensions; import org.apache.kafka.clients.CommonClientConfigs; @@ -44,6 +44,7 @@ import java.time.Duration; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; @@ -64,8 +65,8 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest { this.clusterInstance = clusterInstance; } - private static void generator(ClusterGenerator clusterGenerator) { - ConsumerGroupCommandTestUtils.generator(clusterGenerator); + private static List generator() { + return ConsumerGroupCommandTestUtils.generator(); } @ClusterTemplate("generator")