From 6d02ae1de59ca4b9958a839576f765371d090076 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Thu, 18 Jan 2018 14:07:00 -0800 Subject: [PATCH] Add TestKit --- build.gradle | 44 ++++- checkstyle/import-control.xml | 8 + .../kafka/common/config/AbstractConfig.java | 2 +- settings.gradle | 2 +- .../kafka/testkit/MiniKafkaCluster.java | 161 ++++++++++++++++ .../testkit/MiniKafkaClusterBuilder.java | 66 +++++++ .../apache/kafka/testkit/MiniKafkaLogDir.java | 55 ++++++ .../kafka/testkit/MiniKafkaLogDirBuilder.java | 31 ++++ .../apache/kafka/testkit/MiniKafkaNode.java | 172 ++++++++++++++++++ .../kafka/testkit/MiniKafkaNodeBuilder.java | 78 ++++++++ .../kafka/testkit/MiniZookeeperNode.java | 156 ++++++++++++++++ .../testkit/MiniZookeeperNodeBuilder.java | 27 +++ .../org/apache/kafka/testkit/TestKitUtil.java | 90 +++++++++ .../kafka/testkit/MiniKafkaClusterTest.java | 62 +++++++ testkit/src/test/resources/log4j.properties | 21 +++ 15 files changed, 972 insertions(+), 3 deletions(-) create mode 100644 testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaCluster.java create mode 100644 testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaClusterBuilder.java create mode 100644 testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDir.java create mode 100644 testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDirBuilder.java create mode 100644 testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNode.java create mode 100644 testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNodeBuilder.java create mode 100644 testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNode.java create mode 100644 testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNodeBuilder.java create mode 100644 testkit/src/main/java/org/apache/kafka/testkit/TestKitUtil.java create mode 100644 testkit/src/test/java/org/apache/kafka/testkit/MiniKafkaClusterTest.java create mode 100644 testkit/src/test/resources/log4j.properties diff --git a/build.gradle b/build.gradle index 725cf0b8bb201..28dbae86f5106 100644 --- a/build.gradle +++ b/build.gradle @@ -513,7 +513,7 @@ for ( sv in availableScalaVersions ) { } def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file'] -def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs +def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'testkit', 'streams', 'streams:examples'] + connectPkgs /** Create one task per default Scala version */ def withDefScalaVersions(taskName) { @@ -855,6 +855,48 @@ project(':clients') { } } +project(':testkit') { + archivesBaseName = "kafka-testkit" + + dependencies { + compile project(':core') + compile project(':clients') + compile project(':log4j-appender') + compile libs.slf4jApi + + testCompile project(':clients') + testCompile libs.junit + testCompile project(':clients').sourceSets.test.output + testCompile libs.easymock + testCompile libs.powermockJunit4 + testCompile libs.powermockEasymock + + testRuntime libs.slf4jlog4j + + compile(libs.zkclient) { + exclude module: 'zookeeper' + } + compile(libs.zookeeper) { + exclude module: 'slf4j-log4j12' + exclude module: 'log4j' + exclude module: 'netty' + } + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.testRuntime) { + include('slf4j-log4j12*') + include('log4j*jar') + } + into "$buildDir/dependant-libs" + duplicatesStrategy 'exclude' + } + + jar { + dependsOn 'copyDependantLibs' + } +} + project(':tools') { archivesBaseName = "kafka-tools" diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 18e76e776b788..245b275a091fc 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -172,6 +172,14 @@ + + + + + + + + diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 61a5798eeb4a0..9538665a16c0c 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -86,7 +86,7 @@ protected Map postProcessParsedConfig(Map parsed return Collections.emptyMap(); } - protected Object get(String key) { + public Object get(String key) { if (!values.containsKey(key)) throw new ConfigException(String.format("Unknown configuration '%s'", key)); used.add(key); diff --git a/settings.gradle b/settings.gradle index f0fdf07128c43..c2edb57c450cd 100644 --- a/settings.gradle +++ b/settings.gradle @@ -13,5 +13,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender', +include 'core', 'examples', 'clients', 'testkit', 'tools', 'streams', 'streams:examples', 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks' diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaCluster.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaCluster.java new file mode 100644 index 0000000000000..51211dfa1f5e9 --- /dev/null +++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaCluster.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.testkit; + +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Implements a test cluster. + */ +public class MiniKafkaCluster implements AutoCloseable { + private final Logger log; + private Map kafkas = new TreeMap<>(); + private Map zookeepers = new HashMap<>(); + + MiniKafkaCluster(MiniKafkaClusterBuilder clusterBld) { + this.log = clusterBld.logContext.logger(MiniKafkaCluster.class); + boolean successful = false, interrupted = false; + ExecutorService executorService = Executors.newFixedThreadPool(clusterBld.kafkaBlds.size()); + try { + // Initialize ZK nodes + int zkIndex = 0; + log.trace("Starting {} zookeeper(s).", clusterBld.zkBlds.size()); + for (MiniZookeeperNodeBuilder zkBld : clusterBld.zkBlds) { + MiniZookeeperNode zookeeper = zkBld.build(clusterBld, String.format("zk%d", zkIndex++)); + zookeeper.start(); + this.zookeepers.put(zookeeper.hostPort(), zookeeper); + } + log.trace("Finished starting {} zookeeper(s). Starting {} kafka(s).", + clusterBld.zkBlds.size(), clusterBld.kafkaBlds.size()); + + // Initialize Kafka nodes + int autoAssignedIdx = 0; + for (MiniKafkaNodeBuilder kafkaBld : clusterBld.kafkaBlds) { + final String brokerName; + if (kafkaBld.id == MiniKafkaNodeBuilder.INVALID_NODE_ID) { + brokerName = generateNameForAutoAssignedBroker(autoAssignedIdx++); + } else { + if (kafkas.containsKey(kafkaBld.id)) { + throw new RuntimeException("Can't have two brokers both assigned node id " + + kafkaBld.id); + } + brokerName = String.format("broker%d", kafkaBld.id); + } + final MiniKafkaNode kafka = kafkaBld.build(clusterBld, brokerName, zkString()); + executorService.submit(new Runnable() { + @Override + public void run() { + try { + kafka.start(); + kafkas.put(kafka.id(), kafka); + } catch (Throwable e) { + log.error("Unable to start {}", brokerName, e); + kafka.shutdown(); + } + } + }); + } + executorService.shutdown(); + interrupted = TestKitUtil.awaitTerminationUninterruptibly(executorService); + if (kafkas.size() != clusterBld.kafkaBlds.size()) { + throw new RuntimeException("Broker(s) failed to start."); + } + successful = true; + log.trace("Finished starting {} kafka(s). Cluster setup is complete.", + clusterBld.kafkaBlds.size()); + } finally { + if (!successful) { + executorService.shutdownNow(); + close(); + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + public String zkString() { + return Utils.join(zookeepers.keySet(), ","); + } + + /** + * Generate a name for an auto-assigned broker. + */ + private final static String generateNameForAutoAssignedBroker(int index) { + if (index < ('Z' - 'A')) { + return String.format("broker%c", 'A' + index); + } + int count = index / ('Z' - 'A'); + index %= 'Z' - 'A'; + return String.format("broker%d%c", count, 'A' + index); + } + + public Map kafkas() { + return Collections.unmodifiableMap(this.kafkas); + } + + public Map zookeepers() { + return Collections.unmodifiableMap(this.zookeepers); + } + + @Override + public void close() { + boolean interrupted = false; + log.trace("Closing cluster. Shutting down {} kafka(s).", kafkas.size()); + if (!kafkas.isEmpty()) { + ExecutorService executorService = Executors.newFixedThreadPool(this.kafkas.size()); + for (final MiniKafkaNode kafka : this.kafkas.values()) { + executorService.submit(new Runnable() { + @Override + public void run() { + kafka.shutdown(); + } + }); + } + executorService.shutdown(); + interrupted |= TestKitUtil.awaitTerminationUninterruptibly(executorService); + } + + log.trace("Finished shutting down {} kafka server(s). Shutting down {} zookeeper(s).", + kafkas.size(), zookeepers.size()); + for (MiniZookeeperNode zookeeper : zookeepers.values()) { + zookeeper.shutdown(); + } + log.trace("Finished shutting down {} zookeeper server(s). Starting {} kafka(s)."); + for (MiniKafkaNode kafka : kafkas.values()) { + kafka.close(); + } + log.trace("Finished closing {} kafka(s). Closing {} zookeeper(s).", kafkas.size()); + kafkas.clear(); + for (MiniZookeeperNode zookeeper : zookeepers.values()) { + zookeeper.close(); + } + log.trace("Finished closing {} zookeeper(s). Finished closing cluster.", zookeepers.size()); + zookeepers.clear(); + if (interrupted) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaClusterBuilder.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaClusterBuilder.java new file mode 100644 index 0000000000000..8d4df708e83bd --- /dev/null +++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaClusterBuilder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.testkit; + +import org.apache.kafka.common.utils.LogContext; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Builds a new test cluster. + */ +public class MiniKafkaClusterBuilder { + LogContext logContext = new LogContext(""); + + Map configs = new HashMap<>(); + + Collection kafkaBlds = new ArrayList<>(); + + Collection zkBlds = new ArrayList<>(3); + + public MiniKafkaClusterBuilder logContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + public MiniKafkaClusterBuilder config(String key, String value) { + this.configs.put(key, value); + return this; + } + + public MiniKafkaClusterBuilder configs(Map configs) { + this.configs.putAll(configs); + return this; + } + + public MiniKafkaClusterBuilder addNode(MiniKafkaNodeBuilder kafkaBld) { + kafkaBlds.add(kafkaBld); + return this; + } + + public MiniKafkaClusterBuilder addZookeeperNode(MiniZookeeperNodeBuilder zkBld) { + zkBlds.add(zkBld); + return this; + } + + public MiniKafkaCluster build() { + return new MiniKafkaCluster(this); + } +} diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDir.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDir.java new file mode 100644 index 0000000000000..96e800820415c --- /dev/null +++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDir.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.testkit; + +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; + +/** + * Implements a Kafka log directory. + */ +public class MiniKafkaLogDir implements AutoCloseable { + private final Logger log; + File dir; + + MiniKafkaLogDir(Logger log, File dir) { + this.log = log; + this.dir = dir; + } + + /** + * Stops Zookeeper. + * Waits for all threads to be stopped. + * Does not throw exceptions. + */ + @Override + public void close() { + if (dir != null) { + try { + Utils.delete(dir); + } catch (IOException e) { + log.error("Error deleting logDir", e); + } finally { + dir = null; + } + } + } +} diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDirBuilder.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDirBuilder.java new file mode 100644 index 0000000000000..aa856ec331a32 --- /dev/null +++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaLogDirBuilder.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.testkit; + +import org.slf4j.Logger; + +import java.io.File; + +/** + * Builds a MiniKafkaLogDir. + */ +public class MiniKafkaLogDirBuilder { + MiniKafkaLogDir build(Logger log, File dir) { + return new MiniKafkaLogDir(log, dir); + } +} diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNode.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNode.java new file mode 100644 index 0000000000000..49a0fd2de75a5 --- /dev/null +++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNode.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.testkit; + +import kafka.metrics.KafkaMetricsReporter; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import scala.Some; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class MiniKafkaNode implements AutoCloseable { + private final String name; + private final Map fixedConfigs; + private final LogContext logContext; + private final Logger log; + private final Collection logDirBlds; + private File rootDir; + private Collection logDirs = null; + private KafkaServer kafkaServer = null; + + MiniKafkaNode(MiniKafkaClusterBuilder clusterBld, MiniKafkaNodeBuilder nodeBld, + String name, String zkString) { + this.name = name; + Map config = new HashMap<>(); + setupNodeId(nodeBld, config); + config.put("zookeeper.connect", zkString); //KafkaConfig$.MODULE$.ZkConnectProp + this.logContext = new LogContext(clusterBld.logContext.logPrefix() + ": " + name + ": "); + this.log = logContext.logger(MiniKafkaNode.class); + if (nodeBld.logDirBlds.isEmpty()) { + this.logDirBlds = new ArrayList(1); + this.logDirBlds.add(new MiniKafkaLogDirBuilder()); + } else { + this.logDirBlds = new ArrayList<>(nodeBld.logDirBlds); + } + this.fixedConfigs = Collections.unmodifiableMap( + TestKitUtil.mergeConfigs(clusterBld.configs, nodeBld.configs, config)); + } + + private static void setupNodeId(MiniKafkaNodeBuilder nodeBld, Map config) { + if (nodeBld.id == MiniKafkaNodeBuilder.INVALID_NODE_ID) { + config.put("broker.id.generation.enable", "true"); //KafkaConfig$.MODULE$.BrokerIdGenerationEnableProp + } else { + config.put("broker.id.generation.enable", "false"); //KafkaConfig$.MODULE$.BrokerIdGenerationEnableProp + config.put("broker.id", String.format("%d", nodeBld.id)); //KafkaConfig$.MODULE$.BrokerIdProp + } + } + + public void start() { + close(); + boolean success = false; + try { + HashMap effectiveConfigs = new HashMap<>(fixedConfigs); + this.rootDir = TestKitUtil.createTempDir(name + "-"); + int logDirIdx = 0; + ArrayList logPaths = new ArrayList<>(); + this.logDirs = new ArrayList<>(logDirBlds.size()); + for (MiniKafkaLogDirBuilder logDirBld : logDirBlds) { + MiniKafkaLogDir logDir = logDirBld.build(log, + new File(rootDir, String.format("oplog%d", logDirIdx++))); + logDirs.add(logDir); + logPaths.add(logDir.dir.getAbsolutePath()); + } + effectiveConfigs.put("log.dirs", Utils.join(logPaths, ",")); // KafkaConfig.LogDirsProp + effectiveConfigs.put("listeners", "PLAINTEXT://localhost:0"); // KakfaConfig.ListenersProp + KafkaConfig config = new KafkaConfig(effectiveConfigs, false); + this.kafkaServer = new KafkaServer(config, Time.SYSTEM, new Some<>(name), + scala.collection.JavaConversions.asScalaBuffer( + new ArrayList()).seq()); + this.kafkaServer.startup(); + success = true; + } finally { + if (!success) { + close(); + } + } + } + + public void shutdown() { + if (this.kafkaServer != null) { + try { + this.kafkaServer.shutdown(); + } catch (Throwable e) { + log.error("{}: Unable to shutdown KafkaServer", name, e); + } + } + } + + @Override + public void close() { + shutdown(); + if (this.kafkaServer != null) { + try { + this.kafkaServer.awaitShutdown(); + } catch (Throwable e) { + log.error("{} awaitShutdown error", name, e); + } finally { + this.kafkaServer = null; + } + } + if (this.logDirs != null) { + for (MiniKafkaLogDir logDir : this.logDirs) { + logDir.close(); + } + this.logDirs = null; + } + if (this.rootDir != null) { + try { + Utils.delete(rootDir); + } catch (Throwable e) { + log.error("{} error deleting {}", name, rootDir.getAbsolutePath()); + } finally { + this.rootDir = null; + } + } + } + + /** + * Get the current value of a configuration key from the server. + * + * @param key The configuration key. + * + * @returns null if the server is not running; + * null if no such configuration key was found in the server's config; + * The associated configuration value otherwise. + */ + public String config(String key) { + if (kafkaServer == null) { + log.trace("Unable to retrieve configuration {}: server is not running.", key); + return null; + } + Object val; + try { + val = kafkaServer.config().get(key); + } catch (ConfigException e) { + log.trace("Unable to retrieve configuration {}: {}", key, e.getMessage()); + return null; + } + return val.toString(); + } + + /** + * Returns the broker ID, or null if the broker is not running. + */ + public Integer id() { + return kafkaServer == null ? null : kafkaServer.config().brokerId(); + } +} diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNodeBuilder.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNodeBuilder.java new file mode 100644 index 0000000000000..b4e0270073b7a --- /dev/null +++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniKafkaNodeBuilder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.testkit; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Builds a MiniKafkaNode. + */ +public class MiniKafkaNodeBuilder { + static final int INVALID_NODE_ID = -1; + + int id = INVALID_NODE_ID; + + Map configs = new HashMap<>(); + + Collection logDirBlds = new ArrayList<>(); + + public MiniKafkaNodeBuilder() { + configs.put("controlled.shutdown.enable", "false"); //KafkaConfig$.MODULE$.ControlledShutdownEnableProp + } + + public MiniKafkaNodeBuilder config(String key, String value) { + this.configs.put(key, value); + return this; + } + + public MiniKafkaNodeBuilder configs(Map configs) { + this.configs.putAll(configs); + return this; + } + + public MiniKafkaNodeBuilder id(int id) { + this.id = id; + return this; + } + + public MiniKafkaNodeBuilder rack(String rack) { + if (rack == null) { + configs.remove("broker.rack"); //KafkaConfig$.MODULE$.RackProp + } else { + configs.put("broker.rack", rack); + } + return this; + } + + public MiniKafkaNodeBuilder enableControlledShutdown(boolean enabled) { + configs.put("controlled.shutdown.enable", Boolean.toString(enabled)); //KafkaConfig$.MODULE$.ControlledShutdownEnableProp + return this; + } + + public MiniKafkaNodeBuilder addLogDir(MiniKafkaLogDirBuilder logDirBld) { + this.logDirBlds.add(logDirBld); + return this; + } + + MiniKafkaNode build(MiniKafkaClusterBuilder clusterBld, String name, String zkString) { + return new MiniKafkaNode(clusterBld, this, name, zkString); + } +} diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNode.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNode.java new file mode 100644 index 0000000000000..e786eca62b83b --- /dev/null +++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNode.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.testkit; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; + +public class MiniZookeeperNode implements AutoCloseable { + private final static int TICK_TIME = 500; + private final Logger log; + private final String name; + private File dir = null; + private File logDir = null; + private File snapshotDir = null; + private ZooKeeperServer zkServer = null; + private NIOServerCnxnFactory factory = null; + private int port = -1; + + MiniZookeeperNode(MiniKafkaClusterBuilder clusterBld, String name) { + this.log = new LogContext(clusterBld.logContext.logPrefix() + ": " + name + ": "). + logger(MiniZookeeperNode.class); + this.name = name; + } + + /** + * Returns the host:port combination in use, or null if ZK is not running. + */ + public String hostPort() { + if (port == -1) { + return null; + } else { + return "localhost:" + port; + } + } + + /** + * Start Zookeeper. Throws an exception on failure. + */ + public void start() { + close(); + boolean success = false; + try { + this.dir = TestKitUtil.createTempDir(name + "-"); + this.snapshotDir = Files.createDirectory(new File(dir, "snapshot").toPath()).toFile(); + this.logDir = Files.createDirectory(new File(dir, "logs").toPath()).toFile(); + this.zkServer = new ZooKeeperServer(snapshotDir, logDir, TICK_TIME); + this.factory = new NIOServerCnxnFactory(); + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0); + factory.configure(addr, 0); + factory.startup(zkServer); + this.port = zkServer.getClientPort(); + success = true; + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } finally { + if (!success) { + close(); + } + } + } + + /** + * Stops Zookeeper. + * Does not wait for shutdown to complete. + * Does not throw exceptions. + */ + public void shutdown() { + if (factory != null) { + try { + factory.shutdown(); + } catch (Throwable e) { + log.error("Error shutting down factory", e); + } finally { + factory = null; + } + } + if (zkServer != null) { + try { + zkServer.shutdown(); + } catch (Throwable e) { + log.error("Error shutting down zkServer", e); + } finally { + zkServer = null; + } + } + } + + /** + * Stops Zookeeper. + * Waits for all threads to be stopped. + * Does not throw exceptions. + */ + @Override + public void close() { + if (this.port != -1) { + while (true) { + try { + sendFourLetterWord("stat", 500); + } catch (IOException e) { + break; + } + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + shutdown(); + if (dir != null) { + try { + Utils.delete(dir); + } catch (Throwable e) { + log.error("Error deleting logDir", e); + } finally { + dir = null; + } + } + } + + private void sendFourLetterWord(String word, int timeout) throws IOException { + InetSocketAddress addr = new InetSocketAddress("localhost", port); + try (Socket sock = new Socket()) { + sock.connect(addr, timeout); + OutputStream out = sock.getOutputStream(); + out.write(word.getBytes(StandardCharsets.UTF_8)); + out.flush(); + } + } +} diff --git a/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNodeBuilder.java b/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNodeBuilder.java new file mode 100644 index 0000000000000..4165f8018a3d7 --- /dev/null +++ b/testkit/src/main/java/org/apache/kafka/testkit/MiniZookeeperNodeBuilder.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.testkit; + +/** + * Builds a MiniZookeeperNode. + */ +public class MiniZookeeperNodeBuilder { + MiniZookeeperNode build(MiniKafkaClusterBuilder bld, String name) { + return new MiniZookeeperNode(bld, name); + } +} diff --git a/testkit/src/main/java/org/apache/kafka/testkit/TestKitUtil.java b/testkit/src/main/java/org/apache/kafka/testkit/TestKitUtil.java new file mode 100644 index 0000000000000..de34b8879fb46 --- /dev/null +++ b/testkit/src/main/java/org/apache/kafka/testkit/TestKitUtil.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.testkit; + +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class TestKitUtil { + private final static Logger log = LoggerFactory.getLogger(TestKitUtil.class); + + //static void closeAll(final T... vals) { + // + //} + + static T firstNonNull(final T... vals) { + for (T val : vals) { + if (val != null) { + return val; + } + } + throw new RuntimeException("All values were null."); + } + + public static boolean awaitTerminationUninterruptibly(ExecutorService executorService) { + boolean interrupted = false; + while (true) { + try { + executorService.awaitTermination(100, TimeUnit.DAYS); + return interrupted; + } catch (InterruptedException e) { + interrupted = true; + } + } + } + + static File createTempDir(final String prefix) { + final File file; + try { + file = Files.createTempDirectory(prefix).toFile(); + } catch (IOException e) { + throw new RuntimeException(e); + } + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + Utils.delete(file); + } catch (IOException e) { + log.error("Error deleting {}", file.getAbsolutePath(), e); + } + } + }); + return file; + } + + /** + * Merge many configuration maps into one. + * The later configuration maps override the earlier ones. + */ + public static Map mergeConfigs(Map... configsArr) { + HashMap map = new HashMap<>(); + for (Map configs : configsArr) { + map.putAll(configs); + } + return map; + } +} diff --git a/testkit/src/test/java/org/apache/kafka/testkit/MiniKafkaClusterTest.java b/testkit/src/test/java/org/apache/kafka/testkit/MiniKafkaClusterTest.java new file mode 100644 index 0000000000000..dcb1501ca3bf4 --- /dev/null +++ b/testkit/src/test/java/org/apache/kafka/testkit/MiniKafkaClusterTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.testkit; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MiniKafkaClusterTest { + private static final Logger log = LoggerFactory.getLogger(MiniKafkaClusterTest.class); + + @Rule + final public Timeout globalTimeout = Timeout.millis(120000); + + @Test + public void testCreateDestroy() throws Exception { + try (MiniKafkaCluster cluster = new MiniKafkaClusterBuilder() + .addZookeeperNode(new MiniZookeeperNodeBuilder()) + .addNode(new MiniKafkaNodeBuilder()) + .build()) { + assertEquals(1, cluster.kafkas().values().size()); + MiniKafkaNode kafka = cluster.kafkas().values().iterator().next(); + int maxReservedBrokerId = Integer.valueOf(kafka.config("reserved.broker.max.id")); + assertTrue(maxReservedBrokerId <= kafka.id()); + } + } + + @Test + public void testThreeNodeCluster() throws Exception { + try (MiniKafkaCluster cluster = new MiniKafkaClusterBuilder() + .addZookeeperNode(new MiniZookeeperNodeBuilder()) + .addNode(new MiniKafkaNodeBuilder().id(1)) + .addNode(new MiniKafkaNodeBuilder().id(2)) + .addNode(new MiniKafkaNodeBuilder().id(3)) + .build()) { + assertTrue(cluster.zkString().startsWith("localhost:")); + assertEquals(1, cluster.kafkas().get(1).id().intValue()); + assertEquals(2, cluster.kafkas().get(2).id().intValue()); + assertEquals(3, cluster.kafkas().get(3).id().intValue()); + } + } +} diff --git a/testkit/src/test/resources/log4j.properties b/testkit/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..e30e2860094a6 --- /dev/null +++ b/testkit/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=TRACE, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n + +log4j.logger.org.apache.kafka=TRACE