Skip to content

Commit

Permalink
KAFKA-16654 Refactor kafka.test.annotation.Type and ClusterTestExtens…
Browse files Browse the repository at this point in the history
…ions (apache#15916)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
TaiJuWu authored and gongxuanzhang committed Jun 12, 2024
1 parent e4210e0 commit a519dab
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 103 deletions.
13 changes: 7 additions & 6 deletions core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.util.Arrays;

import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
Expand All @@ -62,10 +63,10 @@ public class ClusterTestExtensionsTest {
}

// Static methods can generate cluster configurations
static void generate1(ClusterGenerator clusterGenerator) {
static List<ClusterConfig> generate1() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put("foo", "bar");
clusterGenerator.accept(ClusterConfig.defaultBuilder()
return Collections.singletonList(ClusterConfig.defaultBuilder()
.setTypes(Collections.singleton(Type.ZK))
.setServerProperties(serverProperties)
.setTags(Collections.singletonList("Generated Test"))
Expand Down
15 changes: 7 additions & 8 deletions core/src/test/java/kafka/test/annotation/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,29 @@
import kafka.test.junit.ZkClusterInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;

import java.util.function.Consumer;

/**
* The type of cluster config being requested. Used by {@link kafka.test.ClusterConfig} and the test annotations.
*/
public enum Type {
KRAFT {
@Override
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, false));
public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) {
return new RaftClusterInvocationContext(baseDisplayName, config, false);
}
},
CO_KRAFT {
@Override
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, true));
public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) {
return new RaftClusterInvocationContext(baseDisplayName, config, true);
}
},
ZK {
@Override
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config));
public TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config) {
return new ZkClusterInvocationContext(baseDisplayName, config);
}
};

public abstract void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer);
public abstract TestTemplateInvocationContext invocationContexts(String baseDisplayName, ClusterConfig config);
}
72 changes: 44 additions & 28 deletions core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka.test.junit;

import kafka.test.ClusterConfig;
import kafka.test.ClusterGenerator;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.ClusterTests;
Expand All @@ -32,7 +31,6 @@
import org.junit.platform.commons.util.ReflectionUtils;

import java.lang.reflect.Method;
import java.util.function.Consumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -94,24 +92,19 @@ public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContex
// Process the @ClusterTemplate annotation
ClusterTemplate clusterTemplateAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class);
if (clusterTemplateAnnot != null) {
processClusterTemplate(context, clusterTemplateAnnot, generatedContexts::add);
if (generatedContexts.isEmpty()) {
throw new IllegalStateException("ClusterConfig generator method should provide at least one config");
}
generatedContexts.addAll(processClusterTemplate(context, clusterTemplateAnnot));
}

// Process single @ClusterTest annotation
ClusterTest clusterTestAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTest.class);
if (clusterTestAnnot != null) {
processClusterTest(context, clusterTestAnnot, defaults, generatedContexts::add);
generatedContexts.addAll(processClusterTest(context, clusterTestAnnot, defaults));
}

// Process multiple @ClusterTest annotation within @ClusterTests
ClusterTests clusterTestsAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTests.class);
if (clusterTestsAnnot != null) {
for (ClusterTest annot : clusterTestsAnnot.value()) {
processClusterTest(context, annot, defaults, generatedContexts::add);
}
generatedContexts.addAll(processClusterTests(context, clusterTestsAnnot, defaults));
}

if (generatedContexts.isEmpty()) {
Expand All @@ -122,31 +115,54 @@ public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContex
return generatedContexts.stream();
}

void processClusterTemplate(ExtensionContext context, ClusterTemplate annot,
Consumer<TestTemplateInvocationContext> testInvocations) {
// If specified, call cluster config generated method (must be static)
List<ClusterConfig> generatedClusterConfigs = new ArrayList<>();


List<TestTemplateInvocationContext> processClusterTemplate(ExtensionContext context, ClusterTemplate annot) {
if (annot.value().trim().isEmpty()) {
throw new IllegalStateException("ClusterTemplate value can't be empty string.");
}
generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add);

String baseDisplayName = context.getRequiredTestMethod().getName();
generatedClusterConfigs.forEach(config -> {
for (Type type: config.clusterTypes()) {
type.invocationContexts(baseDisplayName, config, testInvocations);
}
});
List<TestTemplateInvocationContext> contexts = generateClusterConfigurations(context, annot.value())
.stream().flatMap(config -> config.clusterTypes().stream()
.map(type -> type.invocationContexts(baseDisplayName, config))).collect(Collectors.toList());

if (contexts.isEmpty()) {
throw new IllegalStateException("ClusterConfig generator method should provide at least one config");
}

return contexts;
}

private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) {
@SuppressWarnings("unchecked")
private List<ClusterConfig> generateClusterConfigurations(ExtensionContext context, String generateClustersMethods) {
Object testInstance = context.getTestInstance().orElse(null);
Method method = ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), generateClustersMethods, ClusterGenerator.class);
ReflectionUtils.invokeMethod(method, testInstance, generator);
Method method = ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), generateClustersMethods);
return (List<ClusterConfig>) ReflectionUtils.invokeMethod(method, testInstance);
}

private void processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults,
Consumer<TestTemplateInvocationContext> testInvocations) {
private List<TestTemplateInvocationContext> processClusterTests(ExtensionContext context, ClusterTests annots, ClusterTestDefaults defaults) {

List<TestTemplateInvocationContext> ret = Arrays.stream(annots.value())
.flatMap(annot -> processClusterTestInternal(context, annot, defaults).stream()).collect(Collectors.toList());

if (ret.isEmpty()) {
throw new IllegalStateException("processClusterTests method should provide at least one config");
}

return ret;
}

private List<TestTemplateInvocationContext> processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults) {
List<TestTemplateInvocationContext> ret = processClusterTestInternal(context, annot, defaults);

if (ret.isEmpty()) {
throw new IllegalStateException("processClusterTest method should provide at least one config");
}

return ret;
}
private List<TestTemplateInvocationContext> processClusterTestInternal(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults) {
Type[] types = annot.types().length == 0 ? defaults.types() : annot.types();
Map<String, String> serverProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties()))
.filter(e -> e.id() == -1)
Expand All @@ -169,9 +185,9 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu
.setMetadataVersion(annot.metadataVersion())
.setTags(Arrays.asList(annot.tags()))
.build();
for (Type type : types) {
type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations);
}

return Arrays.stream(types).map(type -> type.invocationContexts(context.getRequiredTestMethod().getName(), config))
.collect(Collectors.toList());
}

private ClusterTestDefaults getClusterTestDefaults(Class<?> testClass) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,24 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import java.util.function.Consumer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ClusterTestExtensionsUnitTest {
@Test
@SuppressWarnings("unchecked")
void testProcessClusterTemplate() {
ClusterTestExtensions ext = new ClusterTestExtensions();
ExtensionContext context = mock(ExtensionContext.class);
Consumer<TestTemplateInvocationContext> testInvocations = mock(Consumer.class);

ClusterTemplate annot = mock(ClusterTemplate.class);
when(annot.value()).thenReturn("").thenReturn(" ");

Assertions.assertThrows(IllegalStateException.class, () ->
ext.processClusterTemplate(context, annot, testInvocations)
ext.processClusterTemplate(context, annot)
);

Assertions.assertThrows(IllegalStateException.class, () ->
ext.processClusterTemplate(context, annot, testInvocations)
ext.processClusterTemplate(context, annot)
);
}
}
37 changes: 20 additions & 17 deletions core/src/test/java/kafka/test/junit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,27 @@ annotation takes a single string value which references a static method on the t
produce any number of test configurations using a fluent builder style API.

```java
@ClusterTemplate("generateConfigs")
void testSomething() { ... }
import java.util.Arrays;

static void generateConfigs(ClusterGenerator clusterGenerator) {
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
.name("Generated Test 1")
.serverProperties(props1)
.ibp("2.7-IV1")
.build());
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
.name("Generated Test 2")
.serverProperties(props2)
.ibp("2.7-IV2")
.build());
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
.name("Generated Test 3")
.serverProperties(props3)
.build());
@ClusterTemplate("generateConfigs")
void testSomething() { ...}

static List<ClusterConfig> generateConfigs() {
ClusterConfig config1 = ClusterConfig.defaultClusterBuilder()
.name("Generated Test 1")
.serverProperties(props1)
.ibp("2.7-IV1")
.build();
ClusterConfig config2 = ClusterConfig.defaultClusterBuilder()
.name("Generated Test 2")
.serverProperties(props2)
.ibp("2.7-IV2")
.build();
ClusterConfig config3 = ClusterConfig.defaultClusterBuilder()
.name("Generated Test 3")
.serverProperties(props3)
.build();
return Arrays.asList(config1, config2, config3);
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.coordinator.transaction

import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import kafka.test.{ClusterConfig, ClusterInstance}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.message.InitProducerIdRequestData
Expand All @@ -38,19 +38,19 @@ import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters._

object ProducerIdsIntegrationTest {
def uniqueProducerIdsBumpIBP(clusterGenerator: ClusterGenerator): Unit = {
def uniqueProducerIdsBumpIBP(): java.util.List[ClusterConfig] = {
val serverProperties = java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8")
val perBrokerProperties: java.util.Map[Integer, java.util.Map[String, String]] =
java.util.Collections.singletonMap(0,
java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0-IV0"))

clusterGenerator.accept(ClusterConfig.defaultBuilder()
List(ClusterConfig.defaultBuilder()
.setTypes(Set(Type.ZK).asJava)
.setBrokers(3)
.setAutoStart(false)
.setServerProperties(serverProperties)
.setPerServerProperties(perBrokerProperties)
.build())
.build()).asJava
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kafka.zk

import kafka.server.KRaftCachedControllerId
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import kafka.test.{ClusterConfig, ClusterInstance}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
Expand Down Expand Up @@ -63,8 +63,7 @@ import scala.collection.Seq
import scala.jdk.CollectionConverters._

object ZkMigrationIntegrationTest {

def zkClustersForAllMigrationVersions(clusterGenerator: ClusterGenerator): Unit = {
def zkClustersForAllMigrationVersions(): java.util.List[ClusterConfig] = {
Seq(
MetadataVersion.IBP_3_4_IV0,
MetadataVersion.IBP_3_5_IV2,
Expand All @@ -74,19 +73,19 @@ object ZkMigrationIntegrationTest {
MetadataVersion.IBP_3_7_IV2,
MetadataVersion.IBP_3_7_IV4,
MetadataVersion.IBP_3_8_IV0
).foreach { mv =>
).map { mv =>
val serverProperties = new util.HashMap[String, String]()
serverProperties.put("inter.broker.listener.name", "EXTERNAL")
serverProperties.put("listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
serverProperties.put("advertised.listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
serverProperties.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
clusterGenerator.accept(ClusterConfig.defaultBuilder()
ClusterConfig.defaultBuilder()
.setMetadataVersion(mv)
.setBrokers(3)
.setServerProperties(serverProperties)
.setTypes(Set(Type.ZK).asJava)
.build())
}
.build()
}.asJava
}
}

Expand Down
Loading

0 comments on commit a519dab

Please sign in to comment.