diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java index 212e60d26bf5..e8cbe118825f 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java @@ -78,8 +78,7 @@ public ConfigCommandIntegrationTest(ClusterInstance cluster) { } @ClusterTests({ - @ClusterTest(clusterType = Type.ZK), - @ClusterTest(clusterType = Type.KRAFT) + @ClusterTest(clusterTypes = {Type.ZK, Type.KRAFT}), }) public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( @@ -92,7 +91,7 @@ public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() { } @ClusterTests({ - @ClusterTest(clusterType = Type.ZK) + @ClusterTest(clusterTypes = {Type.ZK}) }) public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( @@ -164,7 +163,7 @@ void deleteAndVerifyConfig(KafkaZkClient zkClient, Set configNames, Opti verifyConfig(zkClient, Collections.emptyMap(), brokerId); } - @ClusterTest(clusterType = Type.ZK) + @ClusterTest(clusterTypes = {Type.ZK}) public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception { cluster.shutdownBroker(0); String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect(); diff --git a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java index eb89b0ec567a..e80d641b56cc 100644 --- a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java +++ b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java @@ -44,7 +44,7 @@ @SuppressWarnings("dontUseSystemExit") @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ALL) +@ClusterTestDefaults(clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}) public class UserScramCredentialsCommandTest { private static final String USER1 = "user1"; private static final String USER2 = "user2"; diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java index 4a713f35928c..5b5d5ae7c088 100644 --- a/core/src/test/java/kafka/test/ClusterConfig.java +++ b/core/src/test/java/kafka/test/ClusterConfig.java @@ -24,10 +24,12 @@ import java.io.File; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -35,7 +37,7 @@ */ public class ClusterConfig { - private final Type type; + private final Set types; private final int brokers; private final int controllers; private final int disksPerBroker; @@ -56,7 +58,7 @@ public class ClusterConfig { private final Map> perBrokerOverrideProperties; @SuppressWarnings("checkstyle:ParameterNumber") - private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart, + private ClusterConfig(Set types, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart, SecurityProtocol securityProtocol, String listenerName, File trustStoreFile, MetadataVersion metadataVersion, Map serverProperties, Map producerProperties, Map consumerProperties, Map adminClientProperties, Map saslServerProperties, @@ -66,7 +68,7 @@ private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroke if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero."); if (disksPerBroker <= 0) throw new IllegalArgumentException("Number of disks must be greater than zero."); - this.type = Objects.requireNonNull(type); + this.types = Objects.requireNonNull(types); this.brokers = brokers; this.controllers = controllers; this.disksPerBroker = disksPerBroker; @@ -85,8 +87,8 @@ private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroke this.perBrokerOverrideProperties = Objects.requireNonNull(perBrokerOverrideProperties); } - public Type clusterType() { - return type; + public Set clusterTypes() { + return types; } public int numBrokers() { @@ -164,7 +166,7 @@ public Map nameTags() { public static Builder defaultBuilder() { return new Builder() - .setType(Type.ZK) + .setTypes(Collections.singleton(Type.ZK)) .setBrokers(1) .setControllers(1) .setDisksPerBroker(1) @@ -179,7 +181,7 @@ public static Builder builder() { public static Builder builder(ClusterConfig clusterConfig) { return new Builder() - .setType(clusterConfig.type) + .setTypes(clusterConfig.types) .setBrokers(clusterConfig.brokers) .setControllers(clusterConfig.controllers) .setDisksPerBroker(clusterConfig.disksPerBroker) @@ -199,7 +201,7 @@ public static Builder builder(ClusterConfig clusterConfig) { } public static class Builder { - private Type type; + private Set types; private int brokers; private int controllers; private int disksPerBroker; @@ -219,8 +221,8 @@ public static class Builder { private Builder() {} - public Builder setType(Type type) { - this.type = type; + public Builder setTypes(Set types) { + this.types = Collections.unmodifiableSet(new HashSet<>(types)); return this; } @@ -307,7 +309,7 @@ public Builder setPerBrokerProperties(Map> perBroke } public ClusterConfig build() { - return new ClusterConfig(type, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName, + return new ClusterConfig(types, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties, adminClientProperties, saslServerProperties, saslClientProperties, perBrokerOverrideProperties); diff --git a/core/src/test/java/kafka/test/ClusterConfigTest.java b/core/src/test/java/kafka/test/ClusterConfigTest.java index 790ca7d0e5e7..7d3fa029a25e 100644 --- a/core/src/test/java/kafka/test/ClusterConfigTest.java +++ b/core/src/test/java/kafka/test/ClusterConfigTest.java @@ -46,7 +46,7 @@ public void testCopy() throws IOException { File trustStoreFile = TestUtils.tempFile(); ClusterConfig clusterConfig = ClusterConfig.builder() - .setType(Type.KRAFT) + .setTypes(Collections.singleton(Type.KRAFT)) .setBrokers(3) .setControllers(2) .setDisksPerBroker(1) diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index 1bc56df5b71a..f9657496bfed 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -45,7 +45,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG; -@ClusterTestDefaults(clusterType = Type.ZK, serverProperties = { +@ClusterTestDefaults(clusterTypes = {Type.ZK}, serverProperties = { @ClusterConfigProperty(key = "default.key", value = "default.value"), }) // Set defaults for a few params in @ClusterTest(s) @ExtendWith(ClusterTestExtensions.class) @@ -87,20 +87,15 @@ public void testClusterTemplate() { // Multiple @ClusterTest can be used with @ClusterTests @ClusterTests({ - @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = { + @ClusterTest(name = "cluster-tests-1", clusterTypes = {Type.ZK}, serverProperties = { @ClusterConfigProperty(key = "foo", value = "bar"), @ClusterConfigProperty(key = "spam", value = "eggs") }), - @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = { + @ClusterTest(name = "cluster-tests-2", clusterTypes = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), @ClusterConfigProperty(key = "default.key", value = "overwrite.value") }), - @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = { - @ClusterConfigProperty(key = "foo", value = "baz"), - @ClusterConfigProperty(key = "spam", value = "eggz"), - @ClusterConfigProperty(key = "default.key", value = "overwrite.value") - }) }) public void testClusterTests() { if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) { @@ -117,12 +112,8 @@ public void testClusterTests() { } @ClusterTests({ - @ClusterTest(clusterType = Type.ZK), - @ClusterTest(clusterType = Type.ZK, disksPerBroker = 2), - @ClusterTest(clusterType = Type.KRAFT), - @ClusterTest(clusterType = Type.KRAFT, disksPerBroker = 2), - @ClusterTest(clusterType = Type.CO_KRAFT), - @ClusterTest(clusterType = Type.CO_KRAFT, disksPerBroker = 2) + @ClusterTest(clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}), + @ClusterTest(clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2), }) public void testClusterTestWithDisksPerBroker() throws ExecutionException, InterruptedException { Admin admin = clusterInstance.createAdminClient(); @@ -146,21 +137,21 @@ public void testDefaults(ClusterInstance clusterInstance) { } @ClusterTests({ - @ClusterTest(name = "enable-new-coordinator", clusterType = Type.ALL, serverProperties = { + @ClusterTest(name = "enable-new-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), }), - @ClusterTest(name = "enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = { + @ClusterTest(name = "enable-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), }), - @ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = { + @ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), }), - @ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = { + @ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"), @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), }), - @ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = { + @ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"), }), @@ -174,13 +165,13 @@ public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) { } @ClusterTests({ - @ClusterTest(name = "disable-new-coordinator", clusterType = Type.ALL, serverProperties = { + @ClusterTest(name = "disable-new-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), }), - @ClusterTest(name = "disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = { + @ClusterTest(name = "disable-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), }), - @ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = { + @ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"), @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"), }), diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java index c3c9364acd09..c5b63090e326 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTest.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java @@ -33,7 +33,7 @@ @Retention(RUNTIME) @TestTemplate public @interface ClusterTest { - Type clusterType() default Type.DEFAULT; + Type[] clusterTypes() default {}; int brokers() default 0; int controllers() default 0; int disksPerBroker() default 0; diff --git a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java index af45b877652f..44bafc374594 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java @@ -35,7 +35,7 @@ @Target({TYPE}) @Retention(RUNTIME) public @interface ClusterTestDefaults { - Type clusterType() default Type.ZK; + Type[] clusterTypes() default {Type.ZK}; int brokers() default 1; int controllers() default 1; int disksPerBroker() default 1; diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java index 27c2b792d85d..3d35b30ebd25 100644 --- a/core/src/test/java/kafka/test/annotation/Type.java +++ b/core/src/test/java/kafka/test/annotation/Type.java @@ -45,20 +45,6 @@ public void invocationContexts(String baseDisplayName, ClusterConfig config, Con public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer invocationConsumer) { invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config)); } - }, - ALL { - @Override - public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer invocationConsumer) { - invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, false)); - invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, true)); - invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config)); - } - }, - DEFAULT { - @Override - public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer invocationConsumer) { - throw new UnsupportedOperationException("Cannot create invocation contexts for DEFAULT type"); - } }; public abstract void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer invocationConsumer); diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java index bdf3b2b9522e..155908cd3e2d 100644 --- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java +++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java @@ -33,7 +33,9 @@ import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -129,7 +131,11 @@ void processClusterTemplate(ExtensionContext context, ClusterTemplate annot, generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add); String baseDisplayName = context.getRequiredTestMethod().getName(); - generatedClusterConfigs.forEach(config -> config.clusterType().invocationContexts(baseDisplayName, config, testInvocations)); + generatedClusterConfigs.forEach(config -> { + for (Type type: config.clusterTypes()) { + type.invocationContexts(baseDisplayName, config, testInvocations); + } + }); } private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) { @@ -140,7 +146,7 @@ private void generateClusterConfigurations(ExtensionContext context, String gene private void processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults, Consumer testInvocations) { - Type type = annot.clusterType() == Type.DEFAULT ? defaults.clusterType() : annot.clusterType(); + Type[] types = annot.clusterTypes().length == 0 ? defaults.clusterTypes() : annot.clusterTypes(); Map serverProperties = new HashMap<>(); for (ClusterConfigProperty property : defaults.serverProperties()) { @@ -150,7 +156,7 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu serverProperties.put(property.key(), property.value()); } ClusterConfig config = ClusterConfig.builder() - .setType(type) + .setTypes(new HashSet<>(Arrays.asList(types))) .setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers()) .setControllers(annot.controllers() == 0 ? defaults.controllers() : annot.controllers()) .setDisksPerBroker(annot.disksPerBroker() == 0 ? defaults.disksPerBroker() : annot.disksPerBroker()) @@ -161,7 +167,9 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu .setSecurityProtocol(annot.securityProtocol()) .setMetadataVersion(annot.metadataVersion()) .build(); - type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations); + for (Type type : types) { + type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations); + } } private ClusterTestDefaults getClusterTestDefaults(Class testClass) { diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala index 5cf90516af2c..f792ef286ce8 100644 --- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala @@ -45,7 +45,7 @@ object ProducerIdsIntegrationTest { java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0-IV0")) clusterGenerator.accept(ClusterConfig.defaultBuilder() - .setType(Type.ZK) + .setTypes(Set(Type.ZK).asJava) .setBrokers(3) .setAutoStart(false) .setServerProperties(serverProperties) @@ -61,9 +61,9 @@ object ProducerIdsIntegrationTest { class ProducerIdsIntegrationTest { @ClusterTests(Array( - new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1), - new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0), - new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0) + new ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1), + new ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0), + new ClusterTest(clusterTypes = Array(Type.KRAFT), brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0) )) def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = { verifyUniqueIds(clusterInstance) @@ -76,7 +76,7 @@ class ProducerIdsIntegrationTest { clusterInstance.stop() } - @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, serverProperties = Array( new ClusterConfigProperty(key = "num.io.threads", value = "1") )) @Timeout(20) @@ -87,7 +87,7 @@ class ProducerIdsIntegrationTest { } @Disabled // TODO: Enable once producer id block size is configurable (KAFKA-15029) - @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, serverProperties = Array( new ClusterConfigProperty(key = "num.io.threads", value = "2") )) def testMultipleAllocateProducerIdsRequest(clusterInstance: ClusterInstance): Unit = { diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala index 8ed6064e4885..aec2c3135757 100644 --- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala @@ -47,7 +47,7 @@ import scala.jdk.CollectionConverters._ @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class KafkaServerKRaftRegistrationTest { - @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), @@ -95,7 +95,7 @@ class KafkaServerKRaftRegistrationTest { } } - @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0) + @ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0) def testRestartOldIbpZkBrokerInMigrationMode(zkCluster: ClusterInstance): Unit = { // Bootstrap the ZK cluster ID into KRaft val clusterId = zkCluster.clusterId() diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala index 5640f432ed66..b4d211ed783e 100644 --- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala @@ -32,12 +32,12 @@ import scala.jdk.CollectionConverters._ @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class MetadataVersionIntegrationTest { @ClusterTests(value = Array( - new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0), - new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1), - new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV2), - new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV3), - new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_4_IV0), - new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_4_IV0) + new ClusterTest(clusterTypes = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV0), + new ClusterTest(clusterTypes = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV1), + new ClusterTest(clusterTypes = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV2), + new ClusterTest(clusterTypes = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV3), + new ClusterTest(clusterTypes = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_4_IV0), + new ClusterTest(clusterTypes = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_4_IV0) )) def testBasicMetadataVersionUpgrade(clusterInstance: ClusterInstance): Unit = { val admin = clusterInstance.createAdminClient() @@ -60,7 +60,7 @@ class MetadataVersionIntegrationTest { }, "Never saw metadata.version increase on broker") } - @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0) + @ClusterTest(clusterTypes = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV0) def testUpgradeSameVersion(clusterInstance: ClusterInstance): Unit = { val admin = clusterInstance.createAdminClient() val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue @@ -69,7 +69,7 @@ class MetadataVersionIntegrationTest { updateResult.all().get() } - @ClusterTest(clusterType = Type.KRAFT) + @ClusterTest(clusterTypes = Array(Type.KRAFT)) def testDefaultIsLatestVersion(clusterInstance: ClusterInstance): Unit = { val admin = clusterInstance.createAdminClient() val describeResult = admin.describeFeatures() diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 6e4cd7d29938..8fc4be71cb3e 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -84,7 +84,7 @@ object ZkMigrationIntegrationTest { .setMetadataVersion(mv) .setBrokers(3) .setServerProperties(serverProperties) - .setType(Type.ZK) + .setTypes(Set(Type.ZK).asJava) .build()) } } @@ -113,7 +113,7 @@ class ZkMigrationIntegrationTest { } @ClusterTest( - brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES, + brokers = 3, clusterTypes = Array(Type.ZK), autoStart = AutoStart.YES, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( new ClusterConfigProperty(key="authorizer.class.name", value="kafka.security.authorizer.AclAuthorizer"), @@ -156,7 +156,7 @@ class ZkMigrationIntegrationTest { } @ClusterTest( - brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES, + brokers = 3, clusterTypes = Array(Type.ZK), autoStart = AutoStart.YES, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( new ClusterConfigProperty(key = "authorizer.class.name", value = "kafka.security.authorizer.AclAuthorizer"), @@ -219,7 +219,7 @@ class ZkMigrationIntegrationTest { * and modifies data using AdminClient. The ZkMigrationClient is then used to read the metadata from ZK * as would happen during a migration. The generated records are then verified. */ - @ClusterTest(brokers = 3, clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_4_IV0) + @ClusterTest(brokers = 3, clusterTypes = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_4_IV0) def testMigrate(clusterInstance: ClusterInstance): Unit = { val admin = clusterInstance.createAdminClient() val newTopics = new util.ArrayList[NewTopic]() @@ -429,7 +429,7 @@ class ZkMigrationIntegrationTest { } // SCRAM and Quota are intermixed. Test SCRAM Only here - @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array( new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), @@ -492,7 +492,7 @@ class ZkMigrationIntegrationTest { } } - @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), @@ -665,7 +665,7 @@ class ZkMigrationIntegrationTest { } // SCRAM and Quota are intermixed. Test both here - @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array( new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), @@ -730,7 +730,7 @@ class ZkMigrationIntegrationTest { } } - @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), @@ -809,7 +809,7 @@ class ZkMigrationIntegrationTest { } } - @ClusterTest(clusterType = Type.ZK, brokers = 4, metadataVersion = MetadataVersion.IBP_3_7_IV0, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), brokers = 4, metadataVersion = MetadataVersion.IBP_3_7_IV0, serverProperties = Array( new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), @@ -893,7 +893,7 @@ class ZkMigrationIntegrationTest { } } - @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala index 5cb59573d1ad..94d6c6c9836f 100644 --- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala @@ -32,7 +32,7 @@ import org.junit.jupiter.api.{Tag, Timeout} @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT)) @Tag("integration") class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index de5720f3b49a..2922685e4381 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -34,7 +34,7 @@ import org.junit.jupiter.api.extension.ExtendWith class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { @ClusterTests(Array( - new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( + new ClusterTest(clusterTypes = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), // Configure control plane listener to make sure we have separate listeners for testing. @@ -43,11 +43,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"), )), - new ClusterTest(clusterType = Type.CO_KRAFT, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( - new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), - new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), - )), - new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( + new ClusterTest(clusterTypes = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), )), @@ -59,7 +55,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio } @ClusterTests(Array( - new ClusterTest(clusterType = Type.ZK, serverProperties = Array( + new ClusterTest(clusterTypes = Array(Type.ZK), serverProperties = Array( new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), // Configure control plane listener to make sure we have separate listeners for testing. @@ -68,11 +64,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"), )), - new ClusterTest(clusterType = Type.CO_KRAFT, serverProperties = Array( - new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), - new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), - )), - new ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( + new ClusterTest(clusterTypes = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), )), @@ -83,7 +75,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio validateApiVersionsResponse(apiVersionsResponse, enableUnstableLastVersion = true) } - @ClusterTest(clusterType = Type.ZK, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), serverProperties = Array( // Configure control plane listener to make sure we have separate listeners for testing. new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"), new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"), @@ -96,7 +88,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get()) } - @ClusterTest(clusterType = Type.KRAFT) + @ClusterTest(clusterTypes = Array(Type.KRAFT)) def testApiVersionsRequestThroughControllerListener(): Unit = { val request = new ApiVersionsRequest.Builder().build() val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controllerListenerName.get()) @@ -104,15 +96,14 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio } @ClusterTests(Array( - new ClusterTest(clusterType = Type.ZK, serverProperties = Array( + new ClusterTest(clusterTypes = Array(Type.ZK), serverProperties = Array( // Configure control plane listener to make sure we have separate listeners for testing. new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"), new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"), new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"), )), - new ClusterTest(clusterType = Type.CO_KRAFT), - new ClusterTest(clusterType = Type.KRAFT), + new ClusterTest(clusterTypes = Array(Type.KRAFT, Type.CO_KRAFT)), )) def testApiVersionsRequestWithUnsupportedVersion(): Unit = { val apiVersionsRequest = new ApiVersionsRequest.Builder().build() @@ -126,7 +117,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio } @ClusterTests(Array( - new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( + new ClusterTest(clusterTypes = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"), // Configure control plane listener to make sure we have separate listeners for testing. @@ -135,11 +126,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"), )), - new ClusterTest(clusterType = Type.CO_KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( - new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), - new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"), - )), - new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( + new ClusterTest(clusterTypes = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"), )), @@ -150,7 +137,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0) } - @ClusterTest(clusterType = Type.ZK, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK), serverProperties = Array( // Configure control plane listener to make sure we have separate listeners for testing. new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"), new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"), @@ -163,7 +150,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get()) } - @ClusterTest(clusterType = Type.KRAFT) + @ClusterTest(clusterTypes = Array(Type.KRAFT)) def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = { val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short]) val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controllerListenerName.get()) @@ -171,15 +158,14 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio } @ClusterTests(Array( - new ClusterTest(clusterType = Type.ZK, serverProperties = Array( + new ClusterTest(clusterTypes = Array(Type.ZK), serverProperties = Array( // Configure control plane listener to make sure we have separate listeners for testing. new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"), new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"), new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"), new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"), )), - new ClusterTest(clusterType = Type.CO_KRAFT), - new ClusterTest(clusterType = Type.KRAFT), + new ClusterTest(clusterTypes = Array(Type.KRAFT, Type.CO_KRAFT)), )) def testApiVersionsRequestValidationV3(): Unit = { // Invalid request because Name and Version are empty by default diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala index 1e3adc30ee23..939b474615c5 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala @@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(clusterType = Type.ALL) +@ClusterTestDefaults(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT)) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class BrokerMetricNamesTest(cluster: ClusterInstance) { @AfterEach diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala index 6c6199f71da9..f72e11e38ec5 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala @@ -134,7 +134,7 @@ class BrokerRegistrationRequestTest { Errors.forCode(resp.topics().find(topicName).errorCode()) } - @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0, + @ClusterTest(clusterTypes = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false"))) def testRegisterZkWithKRaftMigrationDisabled(clusterInstance: ClusterInstance): Unit = { val clusterId = clusterInstance.clusterId() @@ -162,7 +162,7 @@ class BrokerRegistrationRequestTest { } } - @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3, + @ClusterTest(clusterTypes = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3, serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false"))) def testRegisterZkWith33Controller(clusterInstance: ClusterInstance): Unit = { // Verify that a controller running an old metadata.version cannot register a ZK broker @@ -195,7 +195,7 @@ class BrokerRegistrationRequestTest { } @ClusterTest( - clusterType = Type.KRAFT, + clusterTypes = Array(Type.KRAFT), brokers = 1, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0, @@ -235,7 +235,7 @@ class BrokerRegistrationRequestTest { * through the RPCs. The migration never proceeds past pre-migration since no ZK brokers are registered. */ @ClusterTests(Array( - new ClusterTest(clusterType = Type.KRAFT, autoStart = AutoStart.NO, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0, + new ClusterTest(clusterTypes = Array(Type.KRAFT), autoStart = AutoStart.NO, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true"))) )) def testNoMetadataChangesInPreMigrationMode(clusterInstance: ClusterInstance): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala index ed80f80b0a37..6f02fcbe3d34 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala @@ -36,7 +36,7 @@ import org.junit.jupiter.api.extension.ExtendWith import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(clusterType = Type.ALL) +@ClusterTestDefaults(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT)) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @Tag("integration") class ClientQuotasRequestTest(cluster: ClusterInstance) { @@ -168,7 +168,7 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) { )) } - @ClusterTest(clusterType = Type.ZK) // No SCRAM for Raft yet + @ClusterTest(clusterTypes = Array(Type.ZK)) // No SCRAM for Raft yet def testClientQuotasForScramUsers(): Unit = { val userName = "user" diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 250cc83aa636..2e3c589714fd 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -34,7 +34,7 @@ import scala.jdk.CollectionConverters._ @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), brokers = 1) @Tag("integration") class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { @@ -49,7 +49,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data) } - @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") @@ -137,7 +137,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch) } - @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") @@ -251,7 +251,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId) } - @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala index 03eddbe0ba44..c5090a7d55c6 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala @@ -31,7 +31,7 @@ import org.junit.jupiter.api.extension.ExtendWith @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala index 92cea4c8d3d0..3284273ef4c0 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala @@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( @@ -51,7 +51,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator testDeleteGroups(false) } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") diff --git a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala index 45f967ffc91c..386b5f82c135 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala @@ -30,7 +30,7 @@ import scala.jdk.CollectionConverters._ @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( @@ -42,7 +42,7 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinat testDescribeGroups() } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala index 2a946550b299..8b74043f3bce 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala @@ -33,11 +33,11 @@ import scala.reflect.ClassTag @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT)) @Tag("integration") class DescribeQuorumRequestTest(cluster: ClusterInstance) { - @ClusterTest(clusterType = Type.ZK) + @ClusterTest(clusterTypes = Array(Type.ZK)) def testDescribeQuorumNotSupportedByZkBrokers(): Unit = { val apiRequest = new ApiVersionsRequest.Builder().build() val apiResponse = connectAndReceive[ApiVersionsResponse](apiRequest) diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala index 80a308ef9b71..2c67d4b83385 100644 --- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala @@ -34,7 +34,7 @@ import scala.concurrent.Future @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( @@ -46,7 +46,7 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas testHeartbeat() } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala index 21c0de181ed2..45516e6253ca 100644 --- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala @@ -38,7 +38,7 @@ import scala.jdk.CollectionConverters._ @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( @@ -50,7 +50,7 @@ class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas testJoinGroup() } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala index 3eaff93d02a5..87a6c9b77cad 100644 --- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala @@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( @@ -40,7 +40,7 @@ class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa testLeaveGroup() } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala index c03dffc16d37..1f4a4ca8a93c 100644 --- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala @@ -30,7 +30,7 @@ import org.junit.jupiter.api.extension.ExtendWith @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( @@ -53,7 +53,7 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa testListGroups(false) } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala index 346b17028fb9..eb525c17fb6b 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala @@ -26,7 +26,7 @@ import org.junit.jupiter.api.extension.ExtendWith @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @@ -50,7 +50,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator testOffsetCommit(false) } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") diff --git a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala index 8b4561d8a1c4..bed541c49da3 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala @@ -26,7 +26,7 @@ import org.junit.jupiter.api.extension.ExtendWith @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( @@ -49,7 +49,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinator testOffsetDelete(false) } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala index 79ec4029c9a7..5c1b287f4991 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala @@ -34,7 +34,7 @@ import scala.jdk.CollectionConverters._ @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @@ -60,7 +60,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false) } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), @@ -93,7 +93,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = false) } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), @@ -126,7 +126,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = false) } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 4b7e129fdb47..adb528ee83b7 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -61,7 +61,7 @@ object SaslApiVersionsRequestTest { clusterGenerator.accept(ClusterConfig.defaultBuilder .setSecurityProtocol(securityProtocol) - .setType(Type.ZK) + .setTypes(Set(Type.ZK).asJava) .setSaslServerProperties(saslServerProperties) .setSaslClientProperties(saslClientProperties) .setServerProperties(serverProperties) diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala index ac0d068b6aa9..f04de008fc63 100644 --- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala @@ -35,7 +35,7 @@ import scala.concurrent.{Await, Future} @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) @Tag("integration") class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @ClusterTest(serverProperties = Array( @@ -47,7 +47,7 @@ class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBas testSyncGroup() } - @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + @ClusterTest(clusterTypes = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array( new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"), new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") diff --git a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java index 104f21e327d8..a4d65fa4c2bf 100644 --- a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java @@ -49,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ALL) +@ClusterTestDefaults(clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}) @Tag("integration") public class DeleteRecordsCommandTest { diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index 867afc3e9557..a3ed98af75f4 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -44,7 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.KRAFT) +@ClusterTestDefaults(clusterTypes = {Type.KRAFT}) @Tag("integration") public class FeatureCommandTest { @@ -53,7 +53,7 @@ public FeatureCommandTest(ClusterInstance cluster) { this.cluster = cluster; } - @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) + @ClusterTest(clusterTypes = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1) public void testDescribeWithZK() { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe")) @@ -61,7 +61,7 @@ public void testDescribeWithZK() { assertEquals("", commandOutput); } - @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1) + @ClusterTest(clusterTypes = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1) public void testDescribeWithKRaft() { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe")) @@ -71,7 +71,7 @@ public void testDescribeWithKRaft() { "SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput)); } - @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV4) + @ClusterTest(clusterTypes = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_7_IV4) public void testDescribeWithKRaftAndBootstrapControllers() { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-controller", cluster.bootstrapControllers(), "describe")) @@ -81,7 +81,7 @@ public void testDescribeWithKRaftAndBootstrapControllers() { "SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(commandOutput)); } - @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) + @ClusterTest(clusterTypes = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1) public void testUpgradeMetadataVersionWithZk() { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), @@ -91,7 +91,7 @@ public void testUpgradeMetadataVersionWithZk() { "update because the provided feature is not supported.", commandOutput); } - @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1) + @ClusterTest(clusterTypes = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1) public void testUpgradeMetadataVersionWithKraft() { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), @@ -106,7 +106,7 @@ public void testUpgradeMetadataVersionWithKraft() { assertEquals("metadata.version was upgraded to 6.", commandOutput); } - @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) + @ClusterTest(clusterTypes = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1) public void testDowngradeMetadataVersionWithZk() { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), @@ -129,7 +129,7 @@ public void testDowngradeMetadataVersionWithZk() { "update because the provided feature is not supported.", commandOutput); } - @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1) + @ClusterTest(clusterTypes = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1) public void testDowngradeMetadataVersionWithKRaft() { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java index 231222ec6a35..eafe2a446378 100644 --- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java @@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterTestDefaults(clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "4") diff --git a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java index e2267e56622c..681f6a1db74d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java @@ -63,7 +63,7 @@ @SuppressWarnings("deprecation") @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = { +@ClusterTestDefaults(clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, brokers = 3, serverProperties = { @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"), @ClusterConfigProperty(key = "auto.leader.rebalance.enable", value = "false"), @ClusterConfigProperty(key = "controlled.shutdown.enable", value = "true"), diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java index aa858f468ac0..4915bc83bf26 100644 --- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java @@ -44,7 +44,7 @@ import static java.util.Arrays.stream; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.KRAFT) +@ClusterTestDefaults(clusterTypes = {Type.KRAFT}) @Tag("integration") class MetadataQuorumCommandTest { @@ -59,12 +59,12 @@ public MetadataQuorumCommandTest(ClusterInstance cluster) { * 3. Fewer brokers than controllers */ @ClusterTests({ - @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 2), - @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 2), - @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 1), - @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 1), - @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 2), - @ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 2) + @ClusterTest(clusterTypes = {Type.CO_KRAFT}, brokers = 2, controllers = 2), + @ClusterTest(clusterTypes = {Type.KRAFT}, brokers = 2, controllers = 2), + @ClusterTest(clusterTypes = {Type.CO_KRAFT}, brokers = 2, controllers = 1), + @ClusterTest(clusterTypes = {Type.KRAFT}, brokers = 2, controllers = 1), + @ClusterTest(clusterTypes = {Type.CO_KRAFT}, brokers = 1, controllers = 2), + @ClusterTest(clusterTypes = {Type.KRAFT}, brokers = 1, controllers = 2) }) public void testDescribeQuorumReplicationSuccessful() throws InterruptedException { cluster.waitForReadyBrokers(); @@ -73,7 +73,7 @@ public void testDescribeQuorumReplicationSuccessful() throws InterruptedExceptio ); List outputs = stream(describeOutput.split("\n")).skip(1).collect(Collectors.toList()); - if (cluster.config().clusterType() == Type.CO_KRAFT) + if (cluster.config().clusterTypes().contains(Type.CO_KRAFT)) assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.size()); else assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.size()); @@ -86,7 +86,7 @@ public void testDescribeQuorumReplicationSuccessful() throws InterruptedExceptio assertEquals(cluster.config().numControllers() - 1, outputs.stream().filter(o -> followerPattern.matcher(o).find()).count()); Pattern observerPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+[\\dmsago\\s]+-?[\\dmsago\\s]+Observer\\s*"); - if (cluster.config().clusterType() == Type.CO_KRAFT) + if (cluster.config().clusterTypes().contains(Type.CO_KRAFT)) assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), outputs.stream().filter(o -> observerPattern.matcher(o).find()).count()); else @@ -100,12 +100,12 @@ public void testDescribeQuorumReplicationSuccessful() throws InterruptedExceptio * 3. Fewer brokers than controllers */ @ClusterTests({ - @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 2), - @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 2), - @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 1), - @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 1), - @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 2), - @ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 2) + @ClusterTest(clusterTypes = {Type.CO_KRAFT}, brokers = 2, controllers = 2), + @ClusterTest(clusterTypes = {Type.KRAFT}, brokers = 2, controllers = 2), + @ClusterTest(clusterTypes = {Type.CO_KRAFT}, brokers = 2, controllers = 1), + @ClusterTest(clusterTypes = {Type.KRAFT}, brokers = 2, controllers = 1), + @ClusterTest(clusterTypes = {Type.CO_KRAFT}, brokers = 1, controllers = 2), + @ClusterTest(clusterTypes = {Type.KRAFT}, brokers = 1, controllers = 2) }) public void testDescribeQuorumStatusSuccessful() throws InterruptedException { cluster.waitForReadyBrokers(); @@ -124,15 +124,14 @@ public void testDescribeQuorumStatusSuccessful() throws InterruptedException { assertTrue(outputs[6].matches("CurrentVoters:\\s+\\[\\d+(,\\d+)*]")); // There are no observers if we have fewer brokers than controllers - if (cluster.config().clusterType() == Type.CO_KRAFT && cluster.config().numBrokers() <= cluster.config().numControllers()) + if (cluster.config().clusterTypes().contains(Type.CO_KRAFT) && cluster.config().numBrokers() <= cluster.config().numControllers()) assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[]")); else assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[\\d+(,\\d+)*]")); } @ClusterTests({ - @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1), - @ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1) + @ClusterTest(clusterTypes = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1, controllers = 1), }) public void testOnlyOneBrokerAndOneController() { String statusOutput = ToolsTestUtils.captureStandardOut(() -> @@ -148,7 +147,7 @@ public void testOnlyOneBrokerAndOneController() { } @ClusterTests({ - @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1) + @ClusterTest(clusterTypes = {Type.CO_KRAFT}, brokers = 1, controllers = 1) }) public void testCommandConfig() throws IOException { // specifying a --command-config containing properties that would prevent login must fail @@ -157,7 +156,7 @@ public void testCommandConfig() throws IOException { "--command-config", tmpfile.getAbsolutePath(), "describe", "--status")); } - @ClusterTest(clusterType = Type.ZK, brokers = 1) + @ClusterTest(clusterTypes = {Type.ZK}, brokers = 1) public void testDescribeQuorumInZkMode() { assertInstanceOf(UnsupportedVersionException.class, assertThrows( ExecutionException.class, @@ -171,7 +170,7 @@ public void testDescribeQuorumInZkMode() { } - @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1) + @ClusterTest(clusterTypes = {Type.CO_KRAFT}, brokers = 1, controllers = 1) public void testHumanReadableOutput() { assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--human-readable")); assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status", "--human-readable")); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java index 1ffad2b16c1e..36dfab500147 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.singleton; import static kafka.test.annotation.Type.CO_KRAFT; @@ -53,40 +55,21 @@ static void generator(ClusterGenerator clusterGenerator) { serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false"); - ClusterConfig zk = ClusterConfig.defaultBuilder() - .setType(ZK) + ClusterConfig classicGroupCoordinator = ClusterConfig.defaultBuilder() + .setTypes(Stream.of(ZK, KRAFT, CO_KRAFT).collect(Collectors.toSet())) .setServerProperties(serverProperties) .build(); - clusterGenerator.accept(zk); - - ClusterConfig raftWithLegacyCoordinator = ClusterConfig.defaultBuilder() - .setType(KRAFT) - .setServerProperties(serverProperties) - .build(); - clusterGenerator.accept(raftWithLegacyCoordinator); - - ClusterConfig combinedKRaftWithLegacyCoordinator = ClusterConfig.defaultBuilder() - .setType(CO_KRAFT) - .setServerProperties(serverProperties) - .build(); - clusterGenerator.accept(combinedKRaftWithLegacyCoordinator); + clusterGenerator.accept(classicGroupCoordinator); // Following are test case config with new group coordinator serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); - ClusterConfig raftWithNewGroupCoordinator = ClusterConfig.defaultBuilder() - .setType(KRAFT) - .setName("newGroupCoordinator") - .setServerProperties(serverProperties) - .build(); - clusterGenerator.accept(raftWithNewGroupCoordinator); - - ClusterConfig combinedKRaftWithNewGroupCoordinator = ClusterConfig.defaultBuilder() - .setType(CO_KRAFT) - .setName("newGroupCoordinator") + ClusterConfig consumerGroupCoordinator = ClusterConfig.defaultBuilder() + .setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet())) + .setName("consumerGroupCoordinator") .setServerProperties(serverProperties) .build(); - clusterGenerator.accept(combinedKRaftWithNewGroupCoordinator); + clusterGenerator.accept(consumerGroupCoordinator); } static AutoCloseable buildConsumers(int numberOfConsumers, diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java index d2f102a7eb56..42fb8da04350 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java @@ -57,7 +57,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; @Tag("integration") -@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterTestDefaults(clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), @ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true")