Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ for ( sv in availableScalaVersions ) {
}

def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file']
def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs
def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'testkit', 'streams', 'streams:examples'] + connectPkgs

/** Create one task per default Scala version */
def withDefScalaVersions(taskName) {
Expand Down Expand Up @@ -855,6 +855,48 @@ project(':clients') {
}
}

project(':testkit') {
archivesBaseName = "kafka-testkit"

dependencies {
compile project(':core')
compile project(':clients')
compile project(':log4j-appender')
compile libs.slf4jApi

testCompile project(':clients')
testCompile libs.junit
testCompile project(':clients').sourceSets.test.output
testCompile libs.easymock
testCompile libs.powermockJunit4
testCompile libs.powermockEasymock

testRuntime libs.slf4jlog4j

compile(libs.zkclient) {
exclude module: 'zookeeper'
}
compile(libs.zookeeper) {
exclude module: 'slf4j-log4j12'
exclude module: 'log4j'
exclude module: 'netty'
}
}

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
include('log4j*jar')
}
into "$buildDir/dependant-libs"
duplicatesStrategy 'exclude'
}

jar {
dependsOn 'copyDependantLibs'
}
}

project(':tools') {
archivesBaseName = "kafka-tools"

Expand Down
8 changes: 8 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@
<allow pkg="org.apache.kafka.test" />
</subpackage>

<subpackage name="testkit">
<allow pkg="kafka.metrics" />
<allow pkg="kafka.server" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.zookeeper" />
<allow pkg="scala" />
</subpackage>

<subpackage name="tools">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.clients.admin" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected Map<String, Object> postProcessParsedConfig(Map<String, Object> parsed
return Collections.emptyMap();
}

protected Object get(String key) {
public Object get(String key) {
if (!values.containsKey(key))
throw new ConfigException(String.format("Unknown configuration '%s'", key));
used.add(key);
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
include 'core', 'examples', 'clients', 'testkit', 'tools', 'streams', 'streams:examples', 'log4j-appender',
'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'
161 changes: 161 additions & 0 deletions testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaCluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.testkit;

import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Implements a test cluster.
*/
public class MiniKafkaCluster implements AutoCloseable {
private final Logger log;
private Map<Integer, MiniKafkaNode> kafkas = new TreeMap<>();
private Map<String, MiniZookeeperNode> zookeepers = new HashMap<>();

MiniKafkaCluster(MiniKafkaClusterBuilder clusterBld) {
this.log = clusterBld.logContext.logger(MiniKafkaCluster.class);
boolean successful = false, interrupted = false;
ExecutorService executorService = Executors.newFixedThreadPool(clusterBld.kafkaBlds.size());
try {
// Initialize ZK nodes
int zkIndex = 0;
log.trace("Starting {} zookeeper(s).", clusterBld.zkBlds.size());
for (MiniZookeeperNodeBuilder zkBld : clusterBld.zkBlds) {
MiniZookeeperNode zookeeper = zkBld.build(clusterBld, String.format("zk%d", zkIndex++));
zookeeper.start();
this.zookeepers.put(zookeeper.hostPort(), zookeeper);
}
log.trace("Finished starting {} zookeeper(s). Starting {} kafka(s).",
clusterBld.zkBlds.size(), clusterBld.kafkaBlds.size());

// Initialize Kafka nodes
int autoAssignedIdx = 0;
for (MiniKafkaNodeBuilder kafkaBld : clusterBld.kafkaBlds) {
final String brokerName;
if (kafkaBld.id == MiniKafkaNodeBuilder.INVALID_NODE_ID) {
brokerName = generateNameForAutoAssignedBroker(autoAssignedIdx++);
} else {
if (kafkas.containsKey(kafkaBld.id)) {
throw new RuntimeException("Can't have two brokers both assigned node id " +
kafkaBld.id);
}
brokerName = String.format("broker%d", kafkaBld.id);
}
final MiniKafkaNode kafka = kafkaBld.build(clusterBld, brokerName, zkString());
executorService.submit(new Runnable() {
@Override
public void run() {
try {
kafka.start();
kafkas.put(kafka.id(), kafka);
} catch (Throwable e) {
log.error("Unable to start {}", brokerName, e);
kafka.shutdown();
}
}
});
}
executorService.shutdown();
interrupted = TestKitUtil.awaitTerminationUninterruptibly(executorService);
if (kafkas.size() != clusterBld.kafkaBlds.size()) {
throw new RuntimeException("Broker(s) failed to start.");
}
successful = true;
log.trace("Finished starting {} kafka(s). Cluster setup is complete.",
clusterBld.kafkaBlds.size());
} finally {
if (!successful) {
executorService.shutdownNow();
close();
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

public String zkString() {
return Utils.join(zookeepers.keySet(), ",");
}

/**
* Generate a name for an auto-assigned broker.
*/
private final static String generateNameForAutoAssignedBroker(int index) {
if (index < ('Z' - 'A')) {
return String.format("broker%c", 'A' + index);
}
int count = index / ('Z' - 'A');
index %= 'Z' - 'A';
return String.format("broker%d%c", count, 'A' + index);
}

public Map<Integer, MiniKafkaNode> kafkas() {
return Collections.unmodifiableMap(this.kafkas);
}

public Map<String, MiniZookeeperNode> zookeepers() {
return Collections.unmodifiableMap(this.zookeepers);
}

@Override
public void close() {
boolean interrupted = false;
log.trace("Closing cluster. Shutting down {} kafka(s).", kafkas.size());
if (!kafkas.isEmpty()) {
ExecutorService executorService = Executors.newFixedThreadPool(this.kafkas.size());
for (final MiniKafkaNode kafka : this.kafkas.values()) {
executorService.submit(new Runnable() {
@Override
public void run() {
kafka.shutdown();
}
});
}
executorService.shutdown();
interrupted |= TestKitUtil.awaitTerminationUninterruptibly(executorService);
}

log.trace("Finished shutting down {} kafka server(s). Shutting down {} zookeeper(s).",
kafkas.size(), zookeepers.size());
for (MiniZookeeperNode zookeeper : zookeepers.values()) {
zookeeper.shutdown();
}
log.trace("Finished shutting down {} zookeeper server(s). Starting {} kafka(s).");
for (MiniKafkaNode kafka : kafkas.values()) {
kafka.close();
}
log.trace("Finished closing {} kafka(s). Closing {} zookeeper(s).", kafkas.size());
kafkas.clear();
for (MiniZookeeperNode zookeeper : zookeepers.values()) {
zookeeper.close();
}
log.trace("Finished closing {} zookeeper(s). Finished closing cluster.", zookeepers.size());
zookeepers.clear();
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.testkit;

import org.apache.kafka.common.utils.LogContext;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* Builds a new test cluster.
*/
public class MiniKafkaClusterBuilder {
LogContext logContext = new LogContext("");

Map<String, String> configs = new HashMap<>();

Collection<MiniKafkaNodeBuilder> kafkaBlds = new ArrayList<>();

Collection<MiniZookeeperNodeBuilder> zkBlds = new ArrayList<>(3);

public MiniKafkaClusterBuilder logContext(LogContext logContext) {
this.logContext = logContext;
return this;
}

public MiniKafkaClusterBuilder config(String key, String value) {
this.configs.put(key, value);
return this;
}

public MiniKafkaClusterBuilder configs(Map<String, String> configs) {
this.configs.putAll(configs);
return this;
}

public MiniKafkaClusterBuilder addNode(MiniKafkaNodeBuilder kafkaBld) {
kafkaBlds.add(kafkaBld);
return this;
}

public MiniKafkaClusterBuilder addZookeeperNode(MiniZookeeperNodeBuilder zkBld) {
zkBlds.add(zkBld);
return this;
}

public MiniKafkaCluster build() {
return new MiniKafkaCluster(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.testkit;

import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

import java.io.File;
import java.io.IOException;

/**
* Implements a Kafka log directory.
*/
public class MiniKafkaLogDir implements AutoCloseable {
private final Logger log;
File dir;

MiniKafkaLogDir(Logger log, File dir) {
this.log = log;
this.dir = dir;
}

/**
* Stops Zookeeper.
* Waits for all threads to be stopped.
* Does not throw exceptions.
*/
@Override
public void close() {
if (dir != null) {
try {
Utils.delete(dir);
} catch (IOException e) {
log.error("Error deleting logDir", e);
} finally {
dir = null;
}
}
}
}
Loading