Skip to content

Commit

Permalink
KAFKA-16677 Replace ClusterType#ALL and ClusterType#DEFAULT by Array (#…
Browse files Browse the repository at this point in the history
…15897)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
FrankYang0529 committed May 13, 2024
1 parent 643db43 commit 334d5d5
Show file tree
Hide file tree
Showing 42 changed files with 176 additions and 250 deletions.
15 changes: 4 additions & 11 deletions core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import kafka.cluster.EndPoint;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.test.junit.ZkClusterInvocationContext;
Expand Down Expand Up @@ -65,7 +63,6 @@

@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults
@Tag("integration")
public class ConfigCommandIntegrationTest {
AdminZkClient adminZkClient;
Expand All @@ -77,10 +74,7 @@ public ConfigCommandIntegrationTest(ClusterInstance cluster) {
this.cluster = cluster;
}

@ClusterTests({
@ClusterTest(clusterType = Type.ZK),
@ClusterTest(clusterType = Type.KRAFT)
})
@ClusterTest(types = {Type.ZK, Type.KRAFT})
public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-name", cluster.isKRaftTest() ? "0" : "1",
Expand All @@ -91,9 +85,8 @@ public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut));
}

@ClusterTests({
@ClusterTest(clusterType = Type.ZK)
})

@ClusterTest(types = {Type.ZK})
public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "users",
Expand Down Expand Up @@ -164,7 +157,7 @@ void deleteAndVerifyConfig(KafkaZkClient zkClient, Set<String> configNames, Opti
verifyConfig(zkClient, Collections.emptyMap(), brokerId);
}

@ClusterTest(clusterType = Type.ZK)
@ClusterTest(types = {Type.ZK})
public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
cluster.shutdownBroker(0);
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.utils.Exit;
import org.apache.kafka.test.NoRetryException;
Expand All @@ -44,7 +42,6 @@

@SuppressWarnings("dontUseSystemExit")
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ALL)
public class UserScramCredentialsCommandTest {
private static final String USER1 = "user1";
private static final String USER2 = "user2";
Expand Down
25 changes: 14 additions & 11 deletions core/src/test/java/kafka/test/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Represents an immutable requested configuration of a Kafka cluster for integration testing.
*/
public class ClusterConfig {

private final Type type;
private final Set<Type> types;
private final int brokers;
private final int controllers;
private final int disksPerBroker;
Expand All @@ -56,7 +59,7 @@ public class ClusterConfig {
private final Map<Integer, Map<String, String>> perServerProperties;

@SuppressWarnings("checkstyle:ParameterNumber")
private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart,
private ClusterConfig(Set<Type> types, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart,
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties,
Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
Expand All @@ -66,7 +69,7 @@ private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroke
if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero.");
if (disksPerBroker <= 0) throw new IllegalArgumentException("Number of disks must be greater than zero.");

this.type = Objects.requireNonNull(type);
this.types = Objects.requireNonNull(types);
this.brokers = brokers;
this.controllers = controllers;
this.disksPerBroker = disksPerBroker;
Expand All @@ -85,8 +88,8 @@ private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroke
this.perServerProperties = Objects.requireNonNull(perServerProperties);
}

public Type clusterType() {
return type;
public Set<Type> clusterTypes() {
return types;
}

public int numBrokers() {
Expand Down Expand Up @@ -164,7 +167,7 @@ public Map<String, String> nameTags() {

public static Builder defaultBuilder() {
return new Builder()
.setType(Type.ZK)
.setTypes(Stream.of(Type.ZK, Type.KRAFT, Type.CO_KRAFT).collect(Collectors.toSet()))
.setBrokers(1)
.setControllers(1)
.setDisksPerBroker(1)
Expand All @@ -179,7 +182,7 @@ public static Builder builder() {

public static Builder builder(ClusterConfig clusterConfig) {
return new Builder()
.setType(clusterConfig.type)
.setTypes(clusterConfig.types)
.setBrokers(clusterConfig.brokers)
.setControllers(clusterConfig.controllers)
.setDisksPerBroker(clusterConfig.disksPerBroker)
Expand All @@ -199,7 +202,7 @@ public static Builder builder(ClusterConfig clusterConfig) {
}

public static class Builder {
private Type type;
private Set<Type> types;
private int brokers;
private int controllers;
private int disksPerBroker;
Expand All @@ -219,8 +222,8 @@ public static class Builder {

private Builder() {}

public Builder setType(Type type) {
this.type = type;
public Builder setTypes(Set<Type> types) {
this.types = Collections.unmodifiableSet(new HashSet<>(types));
return this;
}

Expand Down Expand Up @@ -307,7 +310,7 @@ public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServe
}

public ClusterConfig build() {
return new ClusterConfig(type, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName,
return new ClusterConfig(types, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName,
trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
adminClientProperties, saslServerProperties, saslClientProperties, perServerProperties);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/test/ClusterConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testCopy() throws IOException {
File trustStoreFile = TestUtils.tempFile();

ClusterConfig clusterConfig = ClusterConfig.builder()
.setType(Type.KRAFT)
.setTypes(Collections.singleton(Type.KRAFT))
.setBrokers(3)
.setControllers(2)
.setDisksPerBroker(1)
Expand Down
13 changes: 3 additions & 10 deletions core/src/test/java/kafka/test/ClusterInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.network.ListenerName;
Expand All @@ -39,18 +40,10 @@

public interface ClusterInstance {

enum ClusterType {
ZK,
RAFT
}

/**
* Cluster type. For now, only ZK is supported.
*/
ClusterType clusterType();
Type type();

default boolean isKRaftTest() {
return clusterType() == ClusterType.RAFT;
return type() == Type.KRAFT || type() == Type.CO_KRAFT;
}

/**
Expand Down
39 changes: 18 additions & 21 deletions core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;

@ClusterTestDefaults(clusterType = Type.ZK, serverProperties = {
@ClusterTestDefaults(types = {Type.ZK}, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
}) // Set defaults for a few params in @ClusterTest(s)
Expand All @@ -65,6 +65,7 @@ static void generate1(ClusterGenerator clusterGenerator) {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put("foo", "bar");
clusterGenerator.accept(ClusterConfig.defaultBuilder()
.setTypes(Collections.singleton(Type.ZK))
.setName("Generated Test")
.setServerProperties(serverProperties)
.build());
Expand All @@ -74,14 +75,14 @@ static void generate1(ClusterGenerator clusterGenerator) {
@ClusterTest
public void testClusterTest(ClusterInstance clusterInstance) {
Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected objects should be the same");
Assertions.assertEquals(ClusterInstance.ClusterType.ZK, clusterInstance.clusterType()); // From the class level default
Assertions.assertEquals(Type.ZK, clusterInstance.type()); // From the class level default
Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key"));
}

// generate1 is a template method which generates any number of cluster configs
@ClusterTemplate("generate1")
public void testClusterTemplate() {
Assertions.assertEquals(ClusterInstance.ClusterType.ZK, clusterInstance.clusterType(),
Assertions.assertEquals(Type.ZK, clusterInstance.type(),
"generate1 provided a Zk cluster, so we should see that here");
Assertions.assertEquals("Generated Test", clusterInstance.config().name().orElse(""),
"generate1 named this cluster config, so we should see that here");
Expand All @@ -90,19 +91,19 @@ public void testClusterTemplate() {

// Multiple @ClusterTest can be used with @ClusterTests
@ClusterTests({
@ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = {
@ClusterTest(name = "cluster-tests-1", types = {Type.ZK}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "bar"),
@ClusterConfigProperty(key = "spam", value = "eggs"),
@ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // this one will be ignored as there is no broker id is 86400
}),
@ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = {
@ClusterTest(name = "cluster-tests-2", types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
@ClusterConfigProperty(key = "default.key", value = "overwrite.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"),
@ClusterConfigProperty(id = 3000, key = "queued.max.requests", value = "300")
}),
@ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = {
@ClusterTest(name = "cluster-tests-3", types = {Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
@ClusterConfigProperty(key = "default.key", value = "overwrite.value"),
Expand Down Expand Up @@ -136,7 +137,7 @@ public void testClusterTests() throws ExecutionException, InterruptedException {
Assertions.assertEquals("200", configs.get(configResource).get("queued.max.requests").value());
}
// In KRaft cluster non-combined mode, assert the controller server 3000 contains the property queued.max.requests 300
if (clusterInstance.config().clusterType() == Type.KRAFT) {
if (clusterInstance.type() == Type.KRAFT) {
try (Admin admin = Admin.create(Collections.singletonMap(
AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()))) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
Expand All @@ -149,12 +150,8 @@ public void testClusterTests() throws ExecutionException, InterruptedException {
}

@ClusterTests({
@ClusterTest(clusterType = Type.ZK),
@ClusterTest(clusterType = Type.ZK, disksPerBroker = 2),
@ClusterTest(clusterType = Type.KRAFT),
@ClusterTest(clusterType = Type.KRAFT, disksPerBroker = 2),
@ClusterTest(clusterType = Type.CO_KRAFT),
@ClusterTest(clusterType = Type.CO_KRAFT, disksPerBroker = 2)
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2),
})
public void testClusterTestWithDisksPerBroker() throws ExecutionException, InterruptedException {
Admin admin = clusterInstance.createAdminClient();
Expand All @@ -178,21 +175,21 @@ public void testDefaults(ClusterInstance clusterInstance) {
}

@ClusterTests({
@ClusterTest(name = "enable-new-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "enable-new-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
}),
@ClusterTest(name = "enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "enable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
@ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
@ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
Expand All @@ -206,13 +203,13 @@ public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
}

@ClusterTests({
@ClusterTest(name = "disable-new-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "disable-new-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
}),
@ClusterTest(name = "disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "disable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/test/annotation/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@Retention(RUNTIME)
@TestTemplate
public @interface ClusterTest {
Type clusterType() default Type.DEFAULT;
Type[] types() default {};
int brokers() default 0;
int controllers() default 0;
int disksPerBroker() default 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
@Target({TYPE})
@Retention(RUNTIME)
public @interface ClusterTestDefaults {
Type clusterType() default Type.ZK;
Type[] types() default {Type.ZK, Type.KRAFT, Type.CO_KRAFT};
int brokers() default 1;
int controllers() default 1;
int disksPerBroker() default 1;
Expand Down
14 changes: 0 additions & 14 deletions core/src/test/java/kafka/test/annotation/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,6 @@ public void invocationContexts(String baseDisplayName, ClusterConfig config, Con
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config));
}
},
ALL {
@Override
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, false));
invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, true));
invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config));
}
},
DEFAULT {
@Override
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
throw new UnsupportedOperationException("Cannot create invocation contexts for DEFAULT type");
}
};

public abstract void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer);
Expand Down

0 comments on commit 334d5d5

Please sign in to comment.