Skip to content

Commit

Permalink
KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <payang@apache.org>
  • Loading branch information
FrankYang0529 committed May 9, 2024
1 parent 29f3260 commit 4009c0d
Show file tree
Hide file tree
Showing 39 changed files with 164 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public ConfigCommandIntegrationTest(ClusterInstance cluster) {
}

@ClusterTests({
@ClusterTest(clusterType = Type.ZK),
@ClusterTest(clusterType = Type.KRAFT)
@ClusterTest(clusterTypes = {Type.ZK, Type.KRAFT}),
})
public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
Expand All @@ -92,7 +91,7 @@ public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
}

@ClusterTests({
@ClusterTest(clusterType = Type.ZK)
@ClusterTest(clusterTypes = {Type.ZK})
})
public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
Expand Down Expand Up @@ -164,7 +163,7 @@ void deleteAndVerifyConfig(KafkaZkClient zkClient, Set<String> configNames, Opti
verifyConfig(zkClient, Collections.emptyMap(), brokerId);
}

@ClusterTest(clusterType = Type.ZK)
@ClusterTest(clusterTypes = {Type.ZK})
public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
cluster.shutdownBroker(0);
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

@SuppressWarnings("dontUseSystemExit")
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ALL)
@ClusterTestDefaults(clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT})
public class UserScramCredentialsCommandTest {
private static final String USER1 = "user1";
private static final String USER2 = "user2";
Expand Down
24 changes: 13 additions & 11 deletions core/src/test/java/kafka/test/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Represents an immutable requested configuration of a Kafka cluster for integration testing.
*/
public class ClusterConfig {

private final Type type;
private final Set<Type> types;
private final int brokers;
private final int controllers;
private final int disksPerBroker;
Expand All @@ -56,7 +58,7 @@ public class ClusterConfig {
private final Map<Integer, Map<String, String>> perBrokerOverrideProperties;

@SuppressWarnings("checkstyle:ParameterNumber")
private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart,
private ClusterConfig(Set<Type> types, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart,
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties,
Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
Expand All @@ -66,7 +68,7 @@ private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroke
if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero.");
if (disksPerBroker <= 0) throw new IllegalArgumentException("Number of disks must be greater than zero.");

this.type = Objects.requireNonNull(type);
this.types = Objects.requireNonNull(types);
this.brokers = brokers;
this.controllers = controllers;
this.disksPerBroker = disksPerBroker;
Expand All @@ -85,8 +87,8 @@ private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroke
this.perBrokerOverrideProperties = Objects.requireNonNull(perBrokerOverrideProperties);
}

public Type clusterType() {
return type;
public Set<Type> clusterTypes() {
return types;
}

public int numBrokers() {
Expand Down Expand Up @@ -164,7 +166,7 @@ public Map<String, String> nameTags() {

public static Builder defaultBuilder() {
return new Builder()
.setType(Type.ZK)
.setTypes(Collections.singleton(Type.ZK))
.setBrokers(1)
.setControllers(1)
.setDisksPerBroker(1)
Expand All @@ -179,7 +181,7 @@ public static Builder builder() {

public static Builder builder(ClusterConfig clusterConfig) {
return new Builder()
.setType(clusterConfig.type)
.setTypes(clusterConfig.types)
.setBrokers(clusterConfig.brokers)
.setControllers(clusterConfig.controllers)
.setDisksPerBroker(clusterConfig.disksPerBroker)
Expand All @@ -199,7 +201,7 @@ public static Builder builder(ClusterConfig clusterConfig) {
}

public static class Builder {
private Type type;
private Set<Type> types;
private int brokers;
private int controllers;
private int disksPerBroker;
Expand All @@ -219,8 +221,8 @@ public static class Builder {

private Builder() {}

public Builder setType(Type type) {
this.type = type;
public Builder setTypes(Set<Type> types) {
this.types = Collections.unmodifiableSet(new HashSet<>(types));
return this;
}

Expand Down Expand Up @@ -307,7 +309,7 @@ public Builder setPerBrokerProperties(Map<Integer, Map<String, String>> perBroke
}

public ClusterConfig build() {
return new ClusterConfig(type, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName,
return new ClusterConfig(types, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName,
trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
adminClientProperties, saslServerProperties, saslClientProperties,
perBrokerOverrideProperties);
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/test/ClusterConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testCopy() throws IOException {
File trustStoreFile = TestUtils.tempFile();

ClusterConfig clusterConfig = ClusterConfig.builder()
.setType(Type.KRAFT)
.setTypes(Collections.singleton(Type.KRAFT))
.setBrokers(3)
.setControllers(2)
.setDisksPerBroker(1)
Expand Down
35 changes: 13 additions & 22 deletions core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;


@ClusterTestDefaults(clusterType = Type.ZK, serverProperties = {
@ClusterTestDefaults(clusterTypes = {Type.ZK}, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
}) // Set defaults for a few params in @ClusterTest(s)
@ExtendWith(ClusterTestExtensions.class)
Expand Down Expand Up @@ -87,20 +87,15 @@ public void testClusterTemplate() {

// Multiple @ClusterTest can be used with @ClusterTests
@ClusterTests({
@ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = {
@ClusterTest(name = "cluster-tests-1", clusterTypes = {Type.ZK}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "bar"),
@ClusterConfigProperty(key = "spam", value = "eggs")
}),
@ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = {
@ClusterTest(name = "cluster-tests-2", clusterTypes = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
@ClusterConfigProperty(key = "default.key", value = "overwrite.value")
}),
@ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
@ClusterConfigProperty(key = "default.key", value = "overwrite.value")
})
})
public void testClusterTests() {
if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) {
Expand All @@ -117,12 +112,8 @@ public void testClusterTests() {
}

@ClusterTests({
@ClusterTest(clusterType = Type.ZK),
@ClusterTest(clusterType = Type.ZK, disksPerBroker = 2),
@ClusterTest(clusterType = Type.KRAFT),
@ClusterTest(clusterType = Type.KRAFT, disksPerBroker = 2),
@ClusterTest(clusterType = Type.CO_KRAFT),
@ClusterTest(clusterType = Type.CO_KRAFT, disksPerBroker = 2)
@ClusterTest(clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}),
@ClusterTest(clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2),
})
public void testClusterTestWithDisksPerBroker() throws ExecutionException, InterruptedException {
Admin admin = clusterInstance.createAdminClient();
Expand All @@ -146,21 +137,21 @@ public void testDefaults(ClusterInstance clusterInstance) {
}

@ClusterTests({
@ClusterTest(name = "enable-new-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "enable-new-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
}),
@ClusterTest(name = "enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "enable-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
@ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
@ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
Expand All @@ -174,13 +165,13 @@ public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
}

@ClusterTests({
@ClusterTest(name = "disable-new-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "disable-new-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
}),
@ClusterTest(name = "disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "disable-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
@ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterTypes = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/test/annotation/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@Retention(RUNTIME)
@TestTemplate
public @interface ClusterTest {
Type clusterType() default Type.DEFAULT;
Type[] clusterTypes() default {};
int brokers() default 0;
int controllers() default 0;
int disksPerBroker() default 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
@Target({TYPE})
@Retention(RUNTIME)
public @interface ClusterTestDefaults {
Type clusterType() default Type.ZK;
Type[] clusterTypes() default {Type.ZK};
int brokers() default 1;
int controllers() default 1;
int disksPerBroker() default 1;
Expand Down
14 changes: 0 additions & 14 deletions core/src/test/java/kafka/test/annotation/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,6 @@ public void invocationContexts(String baseDisplayName, ClusterConfig config, Con
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config));
}
},
ALL {
@Override
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, false));
invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, true));
invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config));
}
},
DEFAULT {
@Override
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
throw new UnsupportedOperationException("Cannot create invocation contexts for DEFAULT type");
}
};

public abstract void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer);
Expand Down
16 changes: 12 additions & 4 deletions core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -129,7 +131,11 @@ void processClusterTemplate(ExtensionContext context, ClusterTemplate annot,
generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add);

String baseDisplayName = context.getRequiredTestMethod().getName();
generatedClusterConfigs.forEach(config -> config.clusterType().invocationContexts(baseDisplayName, config, testInvocations));
generatedClusterConfigs.forEach(config -> {
for (Type type: config.clusterTypes()) {
type.invocationContexts(baseDisplayName, config, testInvocations);
}
});
}

private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) {
Expand All @@ -140,7 +146,7 @@ private void generateClusterConfigurations(ExtensionContext context, String gene

private void processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults,
Consumer<TestTemplateInvocationContext> testInvocations) {
Type type = annot.clusterType() == Type.DEFAULT ? defaults.clusterType() : annot.clusterType();
Type[] types = annot.clusterTypes().length == 0 ? defaults.clusterTypes() : annot.clusterTypes();

Map<String, String> serverProperties = new HashMap<>();
for (ClusterConfigProperty property : defaults.serverProperties()) {
Expand All @@ -150,7 +156,7 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu
serverProperties.put(property.key(), property.value());
}
ClusterConfig config = ClusterConfig.builder()
.setType(type)
.setTypes(new HashSet<>(Arrays.asList(types)))
.setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers())
.setControllers(annot.controllers() == 0 ? defaults.controllers() : annot.controllers())
.setDisksPerBroker(annot.disksPerBroker() == 0 ? defaults.disksPerBroker() : annot.disksPerBroker())
Expand All @@ -161,7 +167,9 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu
.setSecurityProtocol(annot.securityProtocol())
.setMetadataVersion(annot.metadataVersion())
.build();
type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations);
for (Type type : types) {
type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations);
}
}

private ClusterTestDefaults getClusterTestDefaults(Class<?> testClass) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object ProducerIdsIntegrationTest {
java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0-IV0"))

clusterGenerator.accept(ClusterConfig.defaultBuilder()
.setType(Type.ZK)
.setTypes(Set(Type.ZK).asJava)
.setBrokers(3)
.setAutoStart(false)
.setServerProperties(serverProperties)
Expand All @@ -61,9 +61,9 @@ object ProducerIdsIntegrationTest {
class ProducerIdsIntegrationTest {

@ClusterTests(Array(
new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
new ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
new ClusterTest(clusterTypes = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
new ClusterTest(clusterTypes = Array(Type.KRAFT), brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)
Expand All @@ -76,7 +76,7 @@ class ProducerIdsIntegrationTest {
clusterInstance.stop()
}

@ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
@ClusterTest(clusterTypes = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
new ClusterConfigProperty(key = "num.io.threads", value = "1")
))
@Timeout(20)
Expand All @@ -87,7 +87,7 @@ class ProducerIdsIntegrationTest {
}

@Disabled // TODO: Enable once producer id block size is configurable (KAFKA-15029)
@ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
@ClusterTest(clusterTypes = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
new ClusterConfigProperty(key = "num.io.threads", value = "2")
))
def testMultipleAllocateProducerIdsRequest(clusterInstance: ClusterInstance): Unit = {
Expand Down

0 comments on commit 4009c0d

Please sign in to comment.