diff --git a/build.gradle b/build.gradle
index 725cf0b8bb201..28dbae86f5106 100644
--- a/build.gradle
+++ b/build.gradle
@@ -513,7 +513,7 @@ for ( sv in availableScalaVersions ) {
}
def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file']
-def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs
+def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'testkit', 'streams', 'streams:examples'] + connectPkgs
/** Create one task per default Scala version */
def withDefScalaVersions(taskName) {
@@ -855,6 +855,48 @@ project(':clients') {
}
}
+project(':testkit') {
+ archivesBaseName = "kafka-testkit"
+
+ dependencies {
+ compile project(':core')
+ compile project(':clients')
+ compile project(':log4j-appender')
+ compile libs.slf4jApi
+
+ testCompile project(':clients')
+ testCompile libs.junit
+ testCompile project(':clients').sourceSets.test.output
+ testCompile libs.easymock
+ testCompile libs.powermockJunit4
+ testCompile libs.powermockEasymock
+
+ testRuntime libs.slf4jlog4j
+
+ compile(libs.zkclient) {
+ exclude module: 'zookeeper'
+ }
+ compile(libs.zookeeper) {
+ exclude module: 'slf4j-log4j12'
+ exclude module: 'log4j'
+ exclude module: 'netty'
+ }
+ }
+
+ tasks.create(name: "copyDependantLibs", type: Copy) {
+ from (configurations.testRuntime) {
+ include('slf4j-log4j12*')
+ include('log4j*jar')
+ }
+ into "$buildDir/dependant-libs"
+ duplicatesStrategy 'exclude'
+ }
+
+ jar {
+ dependsOn 'copyDependantLibs'
+ }
+}
+
project(':tools') {
archivesBaseName = "kafka-tools"
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 18e76e776b788..245b275a091fc 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -172,6 +172,14 @@
+
+
+
+
+
+
+
+
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 61a5798eeb4a0..9538665a16c0c 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -86,7 +86,7 @@ protected Map postProcessParsedConfig(Map parsed
return Collections.emptyMap();
}
- protected Object get(String key) {
+ public Object get(String key) {
if (!values.containsKey(key))
throw new ConfigException(String.format("Unknown configuration '%s'", key));
used.add(key);
diff --git a/settings.gradle b/settings.gradle
index f0fdf07128c43..c2edb57c450cd 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
+include 'core', 'examples', 'clients', 'testkit', 'tools', 'streams', 'streams:examples', 'log4j-appender',
'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'
diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaCluster.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaCluster.java
new file mode 100644
index 0000000000000..51211dfa1f5e9
--- /dev/null
+++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaCluster.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.testkit;
+
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Implements a test cluster.
+ */
+public class MiniKafkaCluster implements AutoCloseable {
+ private final Logger log;
+ private Map kafkas = new TreeMap<>();
+ private Map zookeepers = new HashMap<>();
+
+ MiniKafkaCluster(MiniKafkaClusterBuilder clusterBld) {
+ this.log = clusterBld.logContext.logger(MiniKafkaCluster.class);
+ boolean successful = false, interrupted = false;
+ ExecutorService executorService = Executors.newFixedThreadPool(clusterBld.kafkaBlds.size());
+ try {
+ // Initialize ZK nodes
+ int zkIndex = 0;
+ log.trace("Starting {} zookeeper(s).", clusterBld.zkBlds.size());
+ for (MiniZookeeperNodeBuilder zkBld : clusterBld.zkBlds) {
+ MiniZookeeperNode zookeeper = zkBld.build(clusterBld, String.format("zk%d", zkIndex++));
+ zookeeper.start();
+ this.zookeepers.put(zookeeper.hostPort(), zookeeper);
+ }
+ log.trace("Finished starting {} zookeeper(s). Starting {} kafka(s).",
+ clusterBld.zkBlds.size(), clusterBld.kafkaBlds.size());
+
+ // Initialize Kafka nodes
+ int autoAssignedIdx = 0;
+ for (MiniKafkaNodeBuilder kafkaBld : clusterBld.kafkaBlds) {
+ final String brokerName;
+ if (kafkaBld.id == MiniKafkaNodeBuilder.INVALID_NODE_ID) {
+ brokerName = generateNameForAutoAssignedBroker(autoAssignedIdx++);
+ } else {
+ if (kafkas.containsKey(kafkaBld.id)) {
+ throw new RuntimeException("Can't have two brokers both assigned node id " +
+ kafkaBld.id);
+ }
+ brokerName = String.format("broker%d", kafkaBld.id);
+ }
+ final MiniKafkaNode kafka = kafkaBld.build(clusterBld, brokerName, zkString());
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ kafka.start();
+ kafkas.put(kafka.id(), kafka);
+ } catch (Throwable e) {
+ log.error("Unable to start {}", brokerName, e);
+ kafka.shutdown();
+ }
+ }
+ });
+ }
+ executorService.shutdown();
+ interrupted = TestKitUtil.awaitTerminationUninterruptibly(executorService);
+ if (kafkas.size() != clusterBld.kafkaBlds.size()) {
+ throw new RuntimeException("Broker(s) failed to start.");
+ }
+ successful = true;
+ log.trace("Finished starting {} kafka(s). Cluster setup is complete.",
+ clusterBld.kafkaBlds.size());
+ } finally {
+ if (!successful) {
+ executorService.shutdownNow();
+ close();
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public String zkString() {
+ return Utils.join(zookeepers.keySet(), ",");
+ }
+
+ /**
+ * Generate a name for an auto-assigned broker.
+ */
+ private final static String generateNameForAutoAssignedBroker(int index) {
+ if (index < ('Z' - 'A')) {
+ return String.format("broker%c", 'A' + index);
+ }
+ int count = index / ('Z' - 'A');
+ index %= 'Z' - 'A';
+ return String.format("broker%d%c", count, 'A' + index);
+ }
+
+ public Map kafkas() {
+ return Collections.unmodifiableMap(this.kafkas);
+ }
+
+ public Map zookeepers() {
+ return Collections.unmodifiableMap(this.zookeepers);
+ }
+
+ @Override
+ public void close() {
+ boolean interrupted = false;
+ log.trace("Closing cluster. Shutting down {} kafka(s).", kafkas.size());
+ if (!kafkas.isEmpty()) {
+ ExecutorService executorService = Executors.newFixedThreadPool(this.kafkas.size());
+ for (final MiniKafkaNode kafka : this.kafkas.values()) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ kafka.shutdown();
+ }
+ });
+ }
+ executorService.shutdown();
+ interrupted |= TestKitUtil.awaitTerminationUninterruptibly(executorService);
+ }
+
+ log.trace("Finished shutting down {} kafka server(s). Shutting down {} zookeeper(s).",
+ kafkas.size(), zookeepers.size());
+ for (MiniZookeeperNode zookeeper : zookeepers.values()) {
+ zookeeper.shutdown();
+ }
+ log.trace("Finished shutting down {} zookeeper server(s). Starting {} kafka(s).");
+ for (MiniKafkaNode kafka : kafkas.values()) {
+ kafka.close();
+ }
+ log.trace("Finished closing {} kafka(s). Closing {} zookeeper(s).", kafkas.size());
+ kafkas.clear();
+ for (MiniZookeeperNode zookeeper : zookeepers.values()) {
+ zookeeper.close();
+ }
+ log.trace("Finished closing {} zookeeper(s). Finished closing cluster.", zookeepers.size());
+ zookeepers.clear();
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaClusterBuilder.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaClusterBuilder.java
new file mode 100644
index 0000000000000..8d4df708e83bd
--- /dev/null
+++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaClusterBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.testkit;
+
+import org.apache.kafka.common.utils.LogContext;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builds a new test cluster.
+ */
+public class MiniKafkaClusterBuilder {
+ LogContext logContext = new LogContext("");
+
+ Map configs = new HashMap<>();
+
+ Collection kafkaBlds = new ArrayList<>();
+
+ Collection zkBlds = new ArrayList<>(3);
+
+ public MiniKafkaClusterBuilder logContext(LogContext logContext) {
+ this.logContext = logContext;
+ return this;
+ }
+
+ public MiniKafkaClusterBuilder config(String key, String value) {
+ this.configs.put(key, value);
+ return this;
+ }
+
+ public MiniKafkaClusterBuilder configs(Map configs) {
+ this.configs.putAll(configs);
+ return this;
+ }
+
+ public MiniKafkaClusterBuilder addNode(MiniKafkaNodeBuilder kafkaBld) {
+ kafkaBlds.add(kafkaBld);
+ return this;
+ }
+
+ public MiniKafkaClusterBuilder addZookeeperNode(MiniZookeeperNodeBuilder zkBld) {
+ zkBlds.add(zkBld);
+ return this;
+ }
+
+ public MiniKafkaCluster build() {
+ return new MiniKafkaCluster(this);
+ }
+}
diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDir.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDir.java
new file mode 100644
index 0000000000000..96e800820415c
--- /dev/null
+++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDir.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.testkit;
+
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Implements a Kafka log directory.
+ */
+public class MiniKafkaLogDir implements AutoCloseable {
+ private final Logger log;
+ File dir;
+
+ MiniKafkaLogDir(Logger log, File dir) {
+ this.log = log;
+ this.dir = dir;
+ }
+
+ /**
+ * Stops Zookeeper.
+ * Waits for all threads to be stopped.
+ * Does not throw exceptions.
+ */
+ @Override
+ public void close() {
+ if (dir != null) {
+ try {
+ Utils.delete(dir);
+ } catch (IOException e) {
+ log.error("Error deleting logDir", e);
+ } finally {
+ dir = null;
+ }
+ }
+ }
+}
diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDirBuilder.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDirBuilder.java
new file mode 100644
index 0000000000000..aa856ec331a32
--- /dev/null
+++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDirBuilder.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.testkit;
+
+import org.slf4j.Logger;
+
+import java.io.File;
+
+/**
+ * Builds a MiniKafkaLogDir.
+ */
+public class MiniKafkaLogDirBuilder {
+ MiniKafkaLogDir build(Logger log, File dir) {
+ return new MiniKafkaLogDir(log, dir);
+ }
+}
diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNode.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNode.java
new file mode 100644
index 0000000000000..49a0fd2de75a5
--- /dev/null
+++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNode.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.testkit;
+
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import scala.Some;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MiniKafkaNode implements AutoCloseable {
+ private final String name;
+ private final Map fixedConfigs;
+ private final LogContext logContext;
+ private final Logger log;
+ private final Collection logDirBlds;
+ private File rootDir;
+ private Collection logDirs = null;
+ private KafkaServer kafkaServer = null;
+
+ MiniKafkaNode(MiniKafkaClusterBuilder clusterBld, MiniKafkaNodeBuilder nodeBld,
+ String name, String zkString) {
+ this.name = name;
+ Map config = new HashMap<>();
+ setupNodeId(nodeBld, config);
+ config.put("zookeeper.connect", zkString); //KafkaConfig$.MODULE$.ZkConnectProp
+ this.logContext = new LogContext(clusterBld.logContext.logPrefix() + ": " + name + ": ");
+ this.log = logContext.logger(MiniKafkaNode.class);
+ if (nodeBld.logDirBlds.isEmpty()) {
+ this.logDirBlds = new ArrayList(1);
+ this.logDirBlds.add(new MiniKafkaLogDirBuilder());
+ } else {
+ this.logDirBlds = new ArrayList<>(nodeBld.logDirBlds);
+ }
+ this.fixedConfigs = Collections.unmodifiableMap(
+ TestKitUtil.mergeConfigs(clusterBld.configs, nodeBld.configs, config));
+ }
+
+ private static void setupNodeId(MiniKafkaNodeBuilder nodeBld, Map config) {
+ if (nodeBld.id == MiniKafkaNodeBuilder.INVALID_NODE_ID) {
+ config.put("broker.id.generation.enable", "true"); //KafkaConfig$.MODULE$.BrokerIdGenerationEnableProp
+ } else {
+ config.put("broker.id.generation.enable", "false"); //KafkaConfig$.MODULE$.BrokerIdGenerationEnableProp
+ config.put("broker.id", String.format("%d", nodeBld.id)); //KafkaConfig$.MODULE$.BrokerIdProp
+ }
+ }
+
+ public void start() {
+ close();
+ boolean success = false;
+ try {
+ HashMap effectiveConfigs = new HashMap<>(fixedConfigs);
+ this.rootDir = TestKitUtil.createTempDir(name + "-");
+ int logDirIdx = 0;
+ ArrayList logPaths = new ArrayList<>();
+ this.logDirs = new ArrayList<>(logDirBlds.size());
+ for (MiniKafkaLogDirBuilder logDirBld : logDirBlds) {
+ MiniKafkaLogDir logDir = logDirBld.build(log,
+ new File(rootDir, String.format("oplog%d", logDirIdx++)));
+ logDirs.add(logDir);
+ logPaths.add(logDir.dir.getAbsolutePath());
+ }
+ effectiveConfigs.put("log.dirs", Utils.join(logPaths, ",")); // KafkaConfig.LogDirsProp
+ effectiveConfigs.put("listeners", "PLAINTEXT://localhost:0"); // KakfaConfig.ListenersProp
+ KafkaConfig config = new KafkaConfig(effectiveConfigs, false);
+ this.kafkaServer = new KafkaServer(config, Time.SYSTEM, new Some<>(name),
+ scala.collection.JavaConversions.asScalaBuffer(
+ new ArrayList()).seq());
+ this.kafkaServer.startup();
+ success = true;
+ } finally {
+ if (!success) {
+ close();
+ }
+ }
+ }
+
+ public void shutdown() {
+ if (this.kafkaServer != null) {
+ try {
+ this.kafkaServer.shutdown();
+ } catch (Throwable e) {
+ log.error("{}: Unable to shutdown KafkaServer", name, e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ shutdown();
+ if (this.kafkaServer != null) {
+ try {
+ this.kafkaServer.awaitShutdown();
+ } catch (Throwable e) {
+ log.error("{} awaitShutdown error", name, e);
+ } finally {
+ this.kafkaServer = null;
+ }
+ }
+ if (this.logDirs != null) {
+ for (MiniKafkaLogDir logDir : this.logDirs) {
+ logDir.close();
+ }
+ this.logDirs = null;
+ }
+ if (this.rootDir != null) {
+ try {
+ Utils.delete(rootDir);
+ } catch (Throwable e) {
+ log.error("{} error deleting {}", name, rootDir.getAbsolutePath());
+ } finally {
+ this.rootDir = null;
+ }
+ }
+ }
+
+ /**
+ * Get the current value of a configuration key from the server.
+ *
+ * @param key The configuration key.
+ *
+ * @returns null if the server is not running;
+ * null if no such configuration key was found in the server's config;
+ * The associated configuration value otherwise.
+ */
+ public String config(String key) {
+ if (kafkaServer == null) {
+ log.trace("Unable to retrieve configuration {}: server is not running.", key);
+ return null;
+ }
+ Object val;
+ try {
+ val = kafkaServer.config().get(key);
+ } catch (ConfigException e) {
+ log.trace("Unable to retrieve configuration {}: {}", key, e.getMessage());
+ return null;
+ }
+ return val.toString();
+ }
+
+ /**
+ * Returns the broker ID, or null if the broker is not running.
+ */
+ public Integer id() {
+ return kafkaServer == null ? null : kafkaServer.config().brokerId();
+ }
+}
diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNodeBuilder.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNodeBuilder.java
new file mode 100644
index 0000000000000..b4e0270073b7a
--- /dev/null
+++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNodeBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.testkit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builds a MiniKafkaNode.
+ */
+public class MiniKafkaNodeBuilder {
+ static final int INVALID_NODE_ID = -1;
+
+ int id = INVALID_NODE_ID;
+
+ Map configs = new HashMap<>();
+
+ Collection logDirBlds = new ArrayList<>();
+
+ public MiniKafkaNodeBuilder() {
+ configs.put("controlled.shutdown.enable", "false"); //KafkaConfig$.MODULE$.ControlledShutdownEnableProp
+ }
+
+ public MiniKafkaNodeBuilder config(String key, String value) {
+ this.configs.put(key, value);
+ return this;
+ }
+
+ public MiniKafkaNodeBuilder configs(Map configs) {
+ this.configs.putAll(configs);
+ return this;
+ }
+
+ public MiniKafkaNodeBuilder id(int id) {
+ this.id = id;
+ return this;
+ }
+
+ public MiniKafkaNodeBuilder rack(String rack) {
+ if (rack == null) {
+ configs.remove("broker.rack"); //KafkaConfig$.MODULE$.RackProp
+ } else {
+ configs.put("broker.rack", rack);
+ }
+ return this;
+ }
+
+ public MiniKafkaNodeBuilder enableControlledShutdown(boolean enabled) {
+ configs.put("controlled.shutdown.enable", Boolean.toString(enabled)); //KafkaConfig$.MODULE$.ControlledShutdownEnableProp
+ return this;
+ }
+
+ public MiniKafkaNodeBuilder addLogDir(MiniKafkaLogDirBuilder logDirBld) {
+ this.logDirBlds.add(logDirBld);
+ return this;
+ }
+
+ MiniKafkaNode build(MiniKafkaClusterBuilder clusterBld, String name, String zkString) {
+ return new MiniKafkaNode(clusterBld, this, name, zkString);
+ }
+}
diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNode.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNode.java
new file mode 100644
index 0000000000000..e786eca62b83b
--- /dev/null
+++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNode.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.testkit;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+public class MiniZookeeperNode implements AutoCloseable {
+ private final static int TICK_TIME = 500;
+ private final Logger log;
+ private final String name;
+ private File dir = null;
+ private File logDir = null;
+ private File snapshotDir = null;
+ private ZooKeeperServer zkServer = null;
+ private NIOServerCnxnFactory factory = null;
+ private int port = -1;
+
+ MiniZookeeperNode(MiniKafkaClusterBuilder clusterBld, String name) {
+ this.log = new LogContext(clusterBld.logContext.logPrefix() + ": " + name + ": ").
+ logger(MiniZookeeperNode.class);
+ this.name = name;
+ }
+
+ /**
+ * Returns the host:port combination in use, or null if ZK is not running.
+ */
+ public String hostPort() {
+ if (port == -1) {
+ return null;
+ } else {
+ return "localhost:" + port;
+ }
+ }
+
+ /**
+ * Start Zookeeper. Throws an exception on failure.
+ */
+ public void start() {
+ close();
+ boolean success = false;
+ try {
+ this.dir = TestKitUtil.createTempDir(name + "-");
+ this.snapshotDir = Files.createDirectory(new File(dir, "snapshot").toPath()).toFile();
+ this.logDir = Files.createDirectory(new File(dir, "logs").toPath()).toFile();
+ this.zkServer = new ZooKeeperServer(snapshotDir, logDir, TICK_TIME);
+ this.factory = new NIOServerCnxnFactory();
+ InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
+ factory.configure(addr, 0);
+ factory.startup(zkServer);
+ this.port = zkServer.getClientPort();
+ success = true;
+ } catch (InterruptedException | IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (!success) {
+ close();
+ }
+ }
+ }
+
+ /**
+ * Stops Zookeeper.
+ * Does not wait for shutdown to complete.
+ * Does not throw exceptions.
+ */
+ public void shutdown() {
+ if (factory != null) {
+ try {
+ factory.shutdown();
+ } catch (Throwable e) {
+ log.error("Error shutting down factory", e);
+ } finally {
+ factory = null;
+ }
+ }
+ if (zkServer != null) {
+ try {
+ zkServer.shutdown();
+ } catch (Throwable e) {
+ log.error("Error shutting down zkServer", e);
+ } finally {
+ zkServer = null;
+ }
+ }
+ }
+
+ /**
+ * Stops Zookeeper.
+ * Waits for all threads to be stopped.
+ * Does not throw exceptions.
+ */
+ @Override
+ public void close() {
+ if (this.port != -1) {
+ while (true) {
+ try {
+ sendFourLetterWord("stat", 500);
+ } catch (IOException e) {
+ break;
+ }
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ shutdown();
+ if (dir != null) {
+ try {
+ Utils.delete(dir);
+ } catch (Throwable e) {
+ log.error("Error deleting logDir", e);
+ } finally {
+ dir = null;
+ }
+ }
+ }
+
+ private void sendFourLetterWord(String word, int timeout) throws IOException {
+ InetSocketAddress addr = new InetSocketAddress("localhost", port);
+ try (Socket sock = new Socket()) {
+ sock.connect(addr, timeout);
+ OutputStream out = sock.getOutputStream();
+ out.write(word.getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ }
+ }
+}
diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNodeBuilder.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNodeBuilder.java
new file mode 100644
index 0000000000000..4165f8018a3d7
--- /dev/null
+++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNodeBuilder.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.testkit;
+
+/**
+ * Builds a MiniZookeeperNode.
+ */
+public class MiniZookeeperNodeBuilder {
+ MiniZookeeperNode build(MiniKafkaClusterBuilder bld, String name) {
+ return new MiniZookeeperNode(bld, name);
+ }
+}
diff --git a/testkit/src/main/java/org/apache/kafka/testkit/TestKitUtil.java b/testkit/src/main/java/org/apache/kafka/testkit/TestKitUtil.java
new file mode 100644
index 0000000000000..de34b8879fb46
--- /dev/null
+++ b/testkit/src/main/java/org/apache/kafka/testkit/TestKitUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.testkit;
+
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TestKitUtil {
+ private final static Logger log = LoggerFactory.getLogger(TestKitUtil.class);
+
+ //static void closeAll(final T... vals) {
+ //
+ //}
+
+ static T firstNonNull(final T... vals) {
+ for (T val : vals) {
+ if (val != null) {
+ return val;
+ }
+ }
+ throw new RuntimeException("All values were null.");
+ }
+
+ public static boolean awaitTerminationUninterruptibly(ExecutorService executorService) {
+ boolean interrupted = false;
+ while (true) {
+ try {
+ executorService.awaitTermination(100, TimeUnit.DAYS);
+ return interrupted;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ }
+
+ static File createTempDir(final String prefix) {
+ final File file;
+ try {
+ file = Files.createTempDirectory(prefix).toFile();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ Utils.delete(file);
+ } catch (IOException e) {
+ log.error("Error deleting {}", file.getAbsolutePath(), e);
+ }
+ }
+ });
+ return file;
+ }
+
+ /**
+ * Merge many configuration maps into one.
+ * The later configuration maps override the earlier ones.
+ */
+ public static Map mergeConfigs(Map... configsArr) {
+ HashMap map = new HashMap<>();
+ for (Map configs : configsArr) {
+ map.putAll(configs);
+ }
+ return map;
+ }
+}
diff --git a/testkit/src/test/java/org/apache/kafka/testkit/MiniKafkaClusterTest.java b/testkit/src/test/java/org/apache/kafka/testkit/MiniKafkaClusterTest.java
new file mode 100644
index 0000000000000..dcb1501ca3bf4
--- /dev/null
+++ b/testkit/src/test/java/org/apache/kafka/testkit/MiniKafkaClusterTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.testkit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MiniKafkaClusterTest {
+ private static final Logger log = LoggerFactory.getLogger(MiniKafkaClusterTest.class);
+
+ @Rule
+ final public Timeout globalTimeout = Timeout.millis(120000);
+
+ @Test
+ public void testCreateDestroy() throws Exception {
+ try (MiniKafkaCluster cluster = new MiniKafkaClusterBuilder()
+ .addZookeeperNode(new MiniZookeeperNodeBuilder())
+ .addNode(new MiniKafkaNodeBuilder())
+ .build()) {
+ assertEquals(1, cluster.kafkas().values().size());
+ MiniKafkaNode kafka = cluster.kafkas().values().iterator().next();
+ int maxReservedBrokerId = Integer.valueOf(kafka.config("reserved.broker.max.id"));
+ assertTrue(maxReservedBrokerId <= kafka.id());
+ }
+ }
+
+ @Test
+ public void testThreeNodeCluster() throws Exception {
+ try (MiniKafkaCluster cluster = new MiniKafkaClusterBuilder()
+ .addZookeeperNode(new MiniZookeeperNodeBuilder())
+ .addNode(new MiniKafkaNodeBuilder().id(1))
+ .addNode(new MiniKafkaNodeBuilder().id(2))
+ .addNode(new MiniKafkaNodeBuilder().id(3))
+ .build()) {
+ assertTrue(cluster.zkString().startsWith("localhost:"));
+ assertEquals(1, cluster.kafkas().get(1).id().intValue());
+ assertEquals(2, cluster.kafkas().get(2).id().intValue());
+ assertEquals(3, cluster.kafkas().get(3).id().intValue());
+ }
+ }
+}
diff --git a/testkit/src/test/resources/log4j.properties b/testkit/src/test/resources/log4j.properties
new file mode 100644
index 0000000000000..e30e2860094a6
--- /dev/null
+++ b/testkit/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=TRACE, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.kafka=TRACE