diff --git a/.travis.yml b/.travis.yml index c8d3a3b..6d032f3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,3 +14,6 @@ cache: directories: - $HOME/.gradle/caches/ - $HOME/.gradle/wrapper/ + +script: + - ./gradlew check --info diff --git a/build.gradle b/build.gradle index e5babcb..c44edcc 100644 --- a/build.gradle +++ b/build.gradle @@ -23,6 +23,9 @@ plugins { // https://docs.gradle.org/current/userguide/checkstyle_plugin.html id 'checkstyle' + + // https://docs.gradle.org/current/userguide/idea_plugin.html + id 'idea' } repositories { @@ -34,6 +37,8 @@ targetCompatibility = JavaVersion.VERSION_1_8 ext { kafkaVersion = "2.0.1" + + testcontainersVersion = "1.12.1" } distributions { @@ -45,6 +50,27 @@ distributions { } } +sourceSets { + integrationTest { + java.srcDir file('src/integration-test/java') + resources.srcDir file('src/integration-test/resources') + compileClasspath += sourceSets.main.output + configurations.testRuntime + runtimeClasspath += output + compileClasspath + } +} + +idea { + module { + testSourceDirs += project.sourceSets.integrationTest.java.srcDirs + testSourceDirs += project.sourceSets.integrationTest.resources.srcDirs + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + dependencies { compileOnly "org.apache.kafka:connect-api:$kafkaVersion" @@ -53,13 +79,48 @@ dependencies { testImplementation "org.junit.jupiter:junit-jupiter:5.5.1" testImplementation "org.hamcrest:hamcrest:2.1" testImplementation "org.apache.kafka:connect-api:$kafkaVersion" + testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" + testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:2.12.1" + testRuntime "org.apache.logging.log4j:log4j-api:2.12.1" + testRuntime "org.apache.logging.log4j:log4j-core:2.12.1" + + integrationTestImplementation "org.apache.kafka:connect-api:$kafkaVersion" + integrationTestImplementation("org.apache.kafka:connect-runtime:$kafkaVersion") { + exclude group: "org.slf4j", module: "slf4j-log4j12" + } + integrationTestImplementation "org.apache.kafka:connect-json:$kafkaVersion" + integrationTestImplementation "org.apache.kafka:connect-transforms:$kafkaVersion" + + integrationTestImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" + integrationTestImplementation "org.testcontainers:kafka:$testcontainersVersion" // this is not Kafka version + // Make test utils from 'test' available in 'integration-test' + integrationTestImplementation sourceSets.test.output } checkstyle { toolVersion "8.21" } +task integrationTest(type: Test) { + description = 'Runs the integration tests.' + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath + + dependsOn test, distTar + + useJUnitPlatform() + + // Run always. + outputs.upToDateWhen { false } + + // Pass the distribution file path to the tests. + systemProperty("integration-test.distribution.file.path", distTar.archiveFile.get().asFile.path) + systemProperty("integration-test.classes.path", sourceSets.integrationTest.output.classesDirs.getAsPath()) +} +check.dependsOn integrationTest + test { useJUnitPlatform { includeEngines 'junit-jupiter' diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml index ce2be8a..5386784 100644 --- a/config/checkstyle/checkstyle.xml +++ b/config/checkstyle/checkstyle.xml @@ -27,6 +27,11 @@ + + + + + diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml new file mode 100644 index 0000000..9e34f8d --- /dev/null +++ b/config/checkstyle/suppressions.xml @@ -0,0 +1,27 @@ + + + + + + + + diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/ConnectRunner.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/ConnectRunner.java new file mode 100644 index 0000000..f923222 --- /dev/null +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/ConnectRunner.java @@ -0,0 +1,122 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.Connect; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; +import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.FutureCallback; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ConnectRunner { + private static final Logger log = LoggerFactory.getLogger(ConnectRunner.class); + + private final File pluginDir; + private final String bootstrapServers; + + private Herder herder; + private Connect connect; + + public ConnectRunner(final File pluginDir, + final String bootstrapServers) { + this.pluginDir = pluginDir; + this.bootstrapServers = bootstrapServers; + } + + void start() { + final Map workerProps = new HashMap<>(); + workerProps.put("bootstrap.servers", bootstrapServers); + + workerProps.put("offset.flush.interval.ms", "5000"); + + // These don't matter much (each connector sets its own converters), but need to be filled with valid classes. + workerProps.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter.schemas.enable", "false"); + + // Don't need it since we'll memory MemoryOffsetBackingStore. + workerProps.put("offset.storage.file.filename", ""); + + workerProps.put("plugin.path", pluginDir.getPath()); + + final Time time = Time.SYSTEM; + final String workerId = "test-worker"; + + final Plugins plugins = new Plugins(workerProps); + final StandaloneConfig config = new StandaloneConfig(workerProps); + + final Worker worker = new Worker( + workerId, time, plugins, config, new MemoryOffsetBackingStore()); + herder = new StandaloneHerder(worker, "cluster-id"); + + final RestServer rest = new RestServer(config); + + connect = new Connect(herder, rest); + + connect.start(); + } + + void createConnector(final Map config) throws ExecutionException, InterruptedException { + assert herder != null; + + final FutureCallback> cb = new FutureCallback<>( + new Callback>() { + @Override + public void onCompletion(final Throwable error, final Herder.Created info) { + if (error != null) { + log.error("Failed to create job"); + } else { + log.info("Created connector {}", info.result().name()); + } + } + }); + herder.putConnectorConfig( + config.get(ConnectorConfig.NAME_CONFIG), + config, false, cb + ); + + final Herder.Created connectorInfoCreated = cb.get(); + assert connectorInfoCreated.created(); + } + + void stop() { + connect.stop(); + } + + void awaitStop() { + connect.awaitStop(); + } +} diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java new file mode 100644 index 0000000..f613732 --- /dev/null +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java @@ -0,0 +1,190 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Testcontainers +final class IntegrationTest { + private static final Logger log = LoggerFactory.getLogger(IntegrationTest.class); + + private final TopicPartition originalTopicPartition0 = + new TopicPartition(TestSourceConnector.ORIGINAL_TOPIC, 0); + private final TopicPartition newTopicPartition0 = + new TopicPartition(TestSourceConnector.NEW_TOPIC, 0); + + private static File pluginsDir; + + @Container + private final KafkaContainer kafka = new KafkaContainer() + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"); + + private AdminClient adminClient; + private KafkaConsumer consumer; + + private ConnectRunner connectRunner; + + @BeforeAll + static void setUpAll() throws IOException, InterruptedException { + final File testDir = Files.createTempDirectory("aiven-kafka-connect-transforms-test-").toFile(); + testDir.deleteOnExit(); + + pluginsDir = new File(testDir, "plugins/"); + assert pluginsDir.mkdirs(); + + // Unpack the library distribution. + final File transformDir = new File(pluginsDir, "aiven-kafka-connect-transforms/"); + assert transformDir.mkdirs(); + final File distFile = new File(System.getProperty("integration-test.distribution.file.path")); + assert distFile.exists(); + final String cmd = String.format("tar -xf %s --strip-components=1 -C %s", + distFile.toString(), transformDir.toString()); + final Process p = Runtime.getRuntime().exec(cmd); + assert p.waitFor() == 0; + + // Copy the test connector classes. + final File testConnectorPluginDir = new File(pluginsDir, "test-connector/"); + assert testConnectorPluginDir.mkdirs(); + final File integrationTestClassesPath = new File(System.getProperty("integration-test.classes.path")); + assert integrationTestClassesPath.exists(); + + final Class[] testConnectorClasses = new Class[]{ + TestSourceConnector.class, TestSourceConnector.TestSourceConnectorTask.class + }; + for (final Class clazz : testConnectorClasses) { + final String packageName = clazz.getPackage().getName(); + final String packagePrefix = packageName + "."; + assert clazz.getCanonicalName().startsWith(packagePrefix); + + final String packageSubpath = packageName.replace('.', '/'); + final String classNameWithoutPackage = clazz.getCanonicalName().substring(packagePrefix.length()); + final String classFileName = classNameWithoutPackage.replace('.', '$') + ".class"; + final File classFileSrc = new File( + new File(integrationTestClassesPath, packageSubpath), classFileName + ); + assert classFileSrc.exists(); + final File classFileDest = new File(testConnectorPluginDir, classFileName); + Files.copy(classFileSrc.toPath(), classFileDest.toPath()); + } + } + + @BeforeEach + void setUp() throws ExecutionException, InterruptedException { + final Properties adminClientConfig = new Properties(); + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + adminClient = AdminClient.create(adminClientConfig); + + final Map consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumer = new KafkaConsumer<>(consumerProps); + + final NewTopic originalTopic = new NewTopic(TestSourceConnector.ORIGINAL_TOPIC, 1, (short) 1); + final NewTopic newTopic = new NewTopic(TestSourceConnector.NEW_TOPIC, 1, (short) 1); + adminClient.createTopics(Arrays.asList(originalTopic, newTopic)).all().get(); + + connectRunner = new ConnectRunner(pluginsDir, kafka.getBootstrapServers()); + connectRunner.start(); + } + + @AfterEach + final void tearDown() { + connectRunner.stop(); + adminClient.close(); + consumer.close(); + + connectRunner.awaitStop(); + } + + @Test + @Timeout(10) + final void testExtractTopic() throws ExecutionException, InterruptedException, IOException { + final Map connectorConfig = new HashMap<>(); + connectorConfig.put("name", "test-source-connector"); + connectorConfig.put("connector.class", TestSourceConnector.class.getName()); + connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("transforms", + "ExtractTopicFromValueField"); + connectorConfig.put("transforms.ExtractTopicFromValueField.type", + "io.aiven.kafka.connect.transforms.ExtractTopic$Value"); + connectorConfig.put("transforms.ExtractTopicFromValueField.field.name", + TestSourceConnector.ROUTING_FIELD); + connectorConfig.put("tasks.max", "1"); + connectRunner.createConnector(connectorConfig); + + waitForCondition( + () -> consumer + .endOffsets(Arrays.asList(originalTopicPartition0, newTopicPartition0)) + .values().stream().reduce(Long::sum).map(s -> s >= TestSourceConnector.MESSAGES_TO_PRODUCE) + .orElse(false), + 5000, "Messages appear in any topic" + ); + final Map endOffsets = consumer.endOffsets( + Arrays.asList(originalTopicPartition0, newTopicPartition0)); + // The original topic should be empty. + assertEquals(0, endOffsets.get(originalTopicPartition0)); + // The new topic should be non-empty. + assertEquals(TestSourceConnector.MESSAGES_TO_PRODUCE, endOffsets.get(newTopicPartition0)); + } + + private void waitForCondition(final Supplier conditionChecker, + final long maxWaitMs, + final String condition) throws InterruptedException { + final long startTime = System.currentTimeMillis(); + + boolean testConditionMet; + while (!(testConditionMet = conditionChecker.get()) && ((System.currentTimeMillis() - startTime) < maxWaitMs)) { + Thread.sleep(Math.min(maxWaitMs, 100L)); + } + + if (!testConditionMet) { + throw new AssertionError("Condition not met within timeout " + maxWaitMs + ": " + condition); + } + } +} diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/TestSourceConnector.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/TestSourceConnector.java new file mode 100644 index 0000000..96780ae --- /dev/null +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/TestSourceConnector.java @@ -0,0 +1,114 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; + +/** + * A connector needed for testing of ExtractTopic. + * + *

It just produces a fixed number of struct records. + */ +public class TestSourceConnector extends SourceConnector { + static final int MESSAGES_TO_PRODUCE = 10; + + static final String ORIGINAL_TOPIC = "original-topic"; + static final String NEW_TOPIC = "new-topic"; + static final String ROUTING_FIELD = "field-0"; + + @Override + public void start(final Map props) { + } + + @Override + public Class taskClass() { + return TestSourceConnectorTask.class; + } + + @Override + public List> taskConfigs(final int maxTasks) { + return Collections.singletonList(Collections.EMPTY_MAP); + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public String version() { + return null; + } + + public static class TestSourceConnectorTask extends SourceTask { + private int counter = 0; + + private final Schema valueSchema = SchemaBuilder.struct() + .field(ROUTING_FIELD, SchemaBuilder.STRING_SCHEMA) + .schema(); + private final Struct value = new Struct(valueSchema).put(ROUTING_FIELD, NEW_TOPIC); + + @Override + public void start(final Map props) { + } + + @Override + public List poll() throws InterruptedException { + if (counter >= MESSAGES_TO_PRODUCE) { + return null; // indicate pause + } + + final Map sourcePartition = new HashMap<>(); + sourcePartition.put("partition", "0"); + final Map sourceOffset = new HashMap<>(); + sourceOffset.put("offset", Integer.toString(counter)); + + counter += 1; + + return Collections.singletonList( + new SourceRecord(sourcePartition, sourceOffset, + ORIGINAL_TOPIC, + valueSchema, value) + ); + } + + @Override + public void stop() { + } + + @Override + public String version() { + return null; + } + } +} diff --git a/src/integration-test/resources/log4j2-test.properties b/src/integration-test/resources/log4j2-test.properties new file mode 100644 index 0000000..5c8ca61 --- /dev/null +++ b/src/integration-test/resources/log4j2-test.properties @@ -0,0 +1,21 @@ +status = error +dest = err +name = PropertiesConfig + +filter.threshold.type = ThresholdFilter +filter.threshold.level = debug + +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = [%d] %p %m (%c:%L)%n +appender.console.filter.threshold.type = ThresholdFilter +appender.console.filter.threshold.level = info + +rootLogger.level = info +rootLogger.appenderRef.stdout.ref = STDOUT + +logger.org_apache_zookeeper_level.name = org.apache.zookeeper +logger.org_apache_zookeeper_level.level = error +logger.org_reflections.name = org.reflections +logger.org_reflections.level = error