Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ cache:
directories:
- $HOME/.gradle/caches/
- $HOME/.gradle/wrapper/

script:
- ./gradlew check --info
61 changes: 61 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ plugins {

// https://docs.gradle.org/current/userguide/checkstyle_plugin.html
id 'checkstyle'

// https://docs.gradle.org/current/userguide/idea_plugin.html
id 'idea'
}

repositories {
Expand All @@ -34,6 +37,8 @@ targetCompatibility = JavaVersion.VERSION_1_8

ext {
kafkaVersion = "2.0.1"

testcontainersVersion = "1.12.1"
}

distributions {
Expand All @@ -45,6 +50,27 @@ distributions {
}
}

sourceSets {
integrationTest {
java.srcDir file('src/integration-test/java')
resources.srcDir file('src/integration-test/resources')
compileClasspath += sourceSets.main.output + configurations.testRuntime
runtimeClasspath += output + compileClasspath
}
}

idea {
module {
testSourceDirs += project.sourceSets.integrationTest.java.srcDirs
testSourceDirs += project.sourceSets.integrationTest.resources.srcDirs
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

dependencies {
compileOnly "org.apache.kafka:connect-api:$kafkaVersion"

Expand All @@ -53,13 +79,48 @@ dependencies {
testImplementation "org.junit.jupiter:junit-jupiter:5.5.1"
testImplementation "org.hamcrest:hamcrest:2.1"
testImplementation "org.apache.kafka:connect-api:$kafkaVersion"
testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"

testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:2.12.1"
testRuntime "org.apache.logging.log4j:log4j-api:2.12.1"
testRuntime "org.apache.logging.log4j:log4j-core:2.12.1"

integrationTestImplementation "org.apache.kafka:connect-api:$kafkaVersion"
integrationTestImplementation("org.apache.kafka:connect-runtime:$kafkaVersion") {
exclude group: "org.slf4j", module: "slf4j-log4j12"
}
integrationTestImplementation "org.apache.kafka:connect-json:$kafkaVersion"
integrationTestImplementation "org.apache.kafka:connect-transforms:$kafkaVersion"

integrationTestImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
integrationTestImplementation "org.testcontainers:kafka:$testcontainersVersion" // this is not Kafka version
// Make test utils from 'test' available in 'integration-test'
integrationTestImplementation sourceSets.test.output
}

checkstyle {
toolVersion "8.21"
}

task integrationTest(type: Test) {
description = 'Runs the integration tests.'
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath

dependsOn test, distTar

useJUnitPlatform()

// Run always.
outputs.upToDateWhen { false }

// Pass the distribution file path to the tests.
systemProperty("integration-test.distribution.file.path", distTar.archiveFile.get().asFile.path)
systemProperty("integration-test.classes.path", sourceSets.integrationTest.output.classesDirs.getAsPath())
}
check.dependsOn integrationTest

test {
useJUnitPlatform {
includeEngines 'junit-jupiter'
Expand Down
5 changes: 5 additions & 0 deletions config/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@

<!-- See http://checkstyle.sourceforge.net/config.html#Checker -->
<module name="Checker">
<!-- See http://checkstyle.sourceforge.net/config_filters.html#SuppressionFilter -->
<module name="SuppressionFilter">
<property name="file" value="${config_loc}/suppressions.xml" default="checkstyle/suppressions.xml"/>
</module>

<property name="charset" value="UTF-8"/>
<!-- <property name="severity" value="warning"/>-->
<property name="localeLanguage" value="en"/>
Expand Down
27 changes: 27 additions & 0 deletions config/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?xml version="1.0"?>
<!--
// Copyright 2019 Aiven Oy
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-->
<!DOCTYPE suppressions PUBLIC
"-//Checkstyle//DTD SuppressionFilter Configuration 1.2//EN"
"https://checkstyle.org/dtds/suppressions_1_2.dtd">
<suppressions>
<!-- Switch off these complexity metrics for integration tests,
as in them we must instantiate many classes explicitly. -->
<suppress checks="ClassFanOutComplexity"
files="(IntegrationTest|ConnectRunner).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(IntegrationTest|ConnectRunner).java"/>
</suppressions>
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2019 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.transforms;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.FutureCallback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ConnectRunner {
private static final Logger log = LoggerFactory.getLogger(ConnectRunner.class);

private final File pluginDir;
private final String bootstrapServers;

private Herder herder;
private Connect connect;

public ConnectRunner(final File pluginDir,
final String bootstrapServers) {
this.pluginDir = pluginDir;
this.bootstrapServers = bootstrapServers;
}

void start() {
final Map<String, String> workerProps = new HashMap<>();
workerProps.put("bootstrap.servers", bootstrapServers);

workerProps.put("offset.flush.interval.ms", "5000");

// These don't matter much (each connector sets its own converters), but need to be filled with valid classes.
workerProps.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.value.converter.schemas.enable", "false");

// Don't need it since we'll memory MemoryOffsetBackingStore.
workerProps.put("offset.storage.file.filename", "");

workerProps.put("plugin.path", pluginDir.getPath());

final Time time = Time.SYSTEM;
final String workerId = "test-worker";

final Plugins plugins = new Plugins(workerProps);
final StandaloneConfig config = new StandaloneConfig(workerProps);

final Worker worker = new Worker(
workerId, time, plugins, config, new MemoryOffsetBackingStore());
herder = new StandaloneHerder(worker, "cluster-id");

final RestServer rest = new RestServer(config);

connect = new Connect(herder, rest);

connect.start();
}

void createConnector(final Map<String, String> config) throws ExecutionException, InterruptedException {
assert herder != null;

final FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(
new Callback<Herder.Created<ConnectorInfo>>() {
@Override
public void onCompletion(final Throwable error, final Herder.Created<ConnectorInfo> info) {
if (error != null) {
log.error("Failed to create job");
} else {
log.info("Created connector {}", info.result().name());
}
}
});
herder.putConnectorConfig(
config.get(ConnectorConfig.NAME_CONFIG),
config, false, cb
);

final Herder.Created<ConnectorInfo> connectorInfoCreated = cb.get();
assert connectorInfoCreated.created();
}

void stop() {
connect.stop();
}

void awaitStop() {
connect.awaitStop();
}
}
Loading