From c3e9b208a309d6fabe0231a47c6114e053a84000 Mon Sep 17 00:00:00 2001 From: JACQUES Francois Date: Thu, 2 Nov 2017 22:33:44 +0100 Subject: [PATCH 1/2] Update kafka to 1.0.0. Update guava to 23.3-jre. --- README.md | 1 + pom.xml | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 442b6f0..c035329 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ Version | Kafka Version 3.0.4 | 0.10.2.1 3.1.0 | 0.11.0.0 3.1.1 | 0.11.0.1 +4.0.0 | 1.0.0 Installation ------------- diff --git a/pom.xml b/pom.xml index fba5314..c511cf4 100644 --- a/pom.xml +++ b/pom.xml @@ -46,7 +46,7 @@ 1.8 - 0.11.0.1 + 1.0.0 @@ -101,7 +101,7 @@ com.google.guava guava - 23.0 + 23.3-jre junit From 3cdeb99b4e68f0ceb9caff8fb00f361a6d5467c1 Mon Sep 17 00:00:00 2001 From: JACQUES Francois Date: Sat, 11 Nov 2017 16:16:25 +0100 Subject: [PATCH 2/2] Add support for kafka clustering --- .../charithe/kafka/EphemeralKafkaBroker.java | 93 ++++++---- .../charithe/kafka/EphemeralKafkaCluster.java | 174 ++++++++++++++++++ .../github/charithe/kafka/KafkaJunitRule.java | 7 +- .../kafka/EphemeralKafkaClusterTest.java | 110 +++++++++++ .../charithe/kafka/KafkaHelperTest.java | 2 +- src/test/resources/logback-test.xml | 2 +- 6 files changed, 351 insertions(+), 37 deletions(-) create mode 100644 src/main/java/com/github/charithe/kafka/EphemeralKafkaCluster.java create mode 100644 src/test/java/com/github/charithe/kafka/EphemeralKafkaClusterTest.java diff --git a/src/main/java/com/github/charithe/kafka/EphemeralKafkaBroker.java b/src/main/java/com/github/charithe/kafka/EphemeralKafkaBroker.java index 0267826..8e2a037 100644 --- a/src/main/java/com/github/charithe/kafka/EphemeralKafkaBroker.java +++ b/src/main/java/com/github/charithe/kafka/EphemeralKafkaBroker.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; @@ -36,6 +37,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; @@ -50,6 +52,7 @@ public class EphemeralKafkaBroker { private Properties overrideBrokerProperties; private TestingServer zookeeper; + private boolean managedZk = false; private KafkaServerStartable kafkaServer; private Path kafkaLogDir; @@ -104,6 +107,13 @@ public static EphemeralKafkaBroker create(int kafkaPort, int zookeeperPort, Prop this.overrideBrokerProperties = overrideBrokerProperties; } + EphemeralKafkaBroker(TestingServer zookeeper, int kafkaPort, Properties overrideBrokerProperties) { + this.zookeeper = zookeeper; + this.kafkaPort = kafkaPort; + this.overrideBrokerProperties = overrideBrokerProperties; + this.managedZk = true; + } + /** * Start the Kafka broker */ @@ -120,11 +130,13 @@ public CompletableFuture start() throws Exception { } private CompletableFuture startBroker() throws Exception { - if (zookeeperPort == ALLOCATE_RANDOM_PORT) { - zookeeper = new TestingServer(true); - zookeeperPort = zookeeper.getPort(); - } else { - zookeeper = new TestingServer(zookeeperPort, true); + if(!this.managedZk) { + if (zookeeperPort == ALLOCATE_RANDOM_PORT) { + zookeeper = new TestingServer(true); + zookeeperPort = zookeeper.getPort(); + } else { + zookeeper = new TestingServer(zookeeperPort, true); + } } kafkaPort = kafkaPort == ALLOCATE_RANDOM_PORT ? InstanceSpec.getRandomPort() : kafkaPort; @@ -134,13 +146,18 @@ private CompletableFuture startBroker() throws Exception { LOGGER.info("Starting Kafka server with config: {}", kafkaConfig.props()); kafkaServer = new KafkaServerStartable(kafkaConfig); brokerStarted = true; + final Integer brokerId = kafkaServer.serverConfig().getInt(KafkaConfig.BrokerIdProp()); + if(brokerId != null) { + /* Avoid warning for missing meta.properties */ + Files.write(kafkaLogDir.resolve("meta.properties"), ("version=0\nbroker.id=" + brokerId).getBytes(StandardCharsets.UTF_8)); + } return CompletableFuture.runAsync(() -> kafkaServer.startup()); } /** * Stop the Kafka broker */ - public void stop() { + public void stop() throws ExecutionException, InterruptedException { if (brokerStarted) { synchronized (this) { if (brokerStarted) { @@ -151,37 +168,45 @@ public void stop() { } } - private void stopBroker() { - try { - if (kafkaServer != null) { - LOGGER.info("Shutting down Kafka Server"); - kafkaServer.shutdown(); - } + private void stopBroker() throws ExecutionException, InterruptedException { + stopBrokerAsync().get(); + } - if (zookeeper != null) { - LOGGER.info("Shutting down Zookeeper"); - zookeeper.close(); - } + CompletableFuture stopBrokerAsync() { + return CompletableFuture.runAsync(() -> { + try { + if (kafkaServer != null) { + LOGGER.info("Shutting down Kafka Server"); + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); + } + + if (zookeeper != null && !this.managedZk) { + LOGGER.info("Shutting down Zookeeper"); + zookeeper.close(); + } - if (Files.exists(kafkaLogDir)) { - LOGGER.info("Deleting the log dir: {}", kafkaLogDir); - Files.walkFileTree(kafkaLogDir, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - Files.deleteIfExists(file); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - Files.deleteIfExists(dir); - return FileVisitResult.CONTINUE; - } - }); + if (Files.exists(kafkaLogDir)) { + LOGGER.info("Deleting the log dir: {}", kafkaLogDir); + Files.walkFileTree(kafkaLogDir, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.deleteIfExists(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + Files.deleteIfExists(dir); + return FileVisitResult.CONTINUE; + } + }); + } + } catch (Exception e) { + LOGGER.error("Failed to clean-up Kafka", e); } - } catch (Exception e) { - LOGGER.error("Failed to clean-up Kafka", e); - } + }); + } private KafkaConfig buildKafkaConfig(String zookeeperQuorum) throws IOException { diff --git a/src/main/java/com/github/charithe/kafka/EphemeralKafkaCluster.java b/src/main/java/com/github/charithe/kafka/EphemeralKafkaCluster.java new file mode 100644 index 0000000..35094d3 --- /dev/null +++ b/src/main/java/com/github/charithe/kafka/EphemeralKafkaCluster.java @@ -0,0 +1,174 @@ +package com.github.charithe.kafka; + +import kafka.server.KafkaConfig; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingServer; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class EphemeralKafkaCluster implements AutoCloseable { + private static final int ALLOCATE_RANDOM_PORT = -1; + + private int numBroker; + private TestingServer zookeeper; + private final List brokers = new ArrayList<>(); + + private EphemeralKafkaCluster(int numBroker, int zookeeperPort) throws Exception { + this.zookeeper = new TestingServer(zookeeperPort); + this.numBroker = numBroker; + for (int i = 0; i< numBroker; ++i){ + this.addBroker(); + } + } + + /** + * Create a new ephemeral Kafka cluster + * + * @return EphemeralKafkaCluster + */ + public static EphemeralKafkaCluster create(int numBroker) throws Exception { + return create(numBroker, ALLOCATE_RANDOM_PORT); + } + + /** + * Create a new ephemeral Kafka cluster with the specified Zookeeper port + * + * @param zookeeperPort Port the Zookeeper should listen on + * @return EphemeralKafkaCluster + */ + public static EphemeralKafkaCluster create(int numBroker, int zookeeperPort) throws Exception { + return new EphemeralKafkaCluster(numBroker, zookeeperPort); + } + + public boolean isHealthy() { + return brokers.stream().filter(b -> !b.isRunning()).count() == 0; + } + + public boolean isRunning() { + return brokers.stream().filter(EphemeralKafkaBroker::isRunning).count() > 0; + } + + public void stop() throws IOException, ExecutionException, InterruptedException { + CompletableFuture.allOf(brokers.stream().map(EphemeralKafkaBroker::stopBrokerAsync).toArray(CompletableFuture[]::new)).get(); + brokers.clear(); + zookeeper.stop(); + } + + private EphemeralKafkaBroker addBroker() throws Exception { + final int brokerPort = InstanceSpec.getRandomPort(); + Properties brokerConfigProperties = new Properties(); + brokerConfigProperties.setProperty(KafkaConfig.BrokerIdProp(), brokers.size() + ""); + brokerConfigProperties.setProperty(KafkaConfig.ZkConnectProp(), zookeeper.getConnectString()); + brokerConfigProperties.setProperty(KafkaConfig.ControlledShutdownEnableProp(), false + ""); + brokerConfigProperties.setProperty(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1"); + brokerConfigProperties.setProperty(KafkaConfig.DeleteTopicEnableProp(), true + ""); + brokerConfigProperties.setProperty(KafkaConfig.PortProp(), "" + brokerPort); + brokerConfigProperties.setProperty(KafkaConfig.SslEnabledProtocolsProp(), false + ""); + brokerConfigProperties.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), true + ""); + brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "300"); + brokerConfigProperties.setProperty(KafkaConfig.ReplicaFetchWaitMaxMsProp(), "100"); + brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "10"); + + brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), numBroker + ""); + brokerConfigProperties.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), 1 + ""); + brokerConfigProperties.setProperty(KafkaConfig.ZkSessionTimeoutMsProp(), 200 + ""); + brokerConfigProperties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsDoc(), 200 + ""); + brokerConfigProperties.setProperty(KafkaConfig.AdvertisedHostNameProp(), "localhost"); + brokerConfigProperties.setProperty(KafkaConfig.AdvertisedPortProp(), brokerPort + ""); + brokerConfigProperties.setProperty(KafkaConfig.AdvertisedListenersProp(), "PLAINTEXT://localhost:" + brokerPort); + brokerConfigProperties.setProperty(KafkaConfig.HostNameProp(), "localhost"); + brokerConfigProperties.setProperty(KafkaConfig.MinInSyncReplicasProp(), Math.max(1, numBroker - 1) + ""); + final EphemeralKafkaBroker broker = new EphemeralKafkaBroker(zookeeper, brokerPort, brokerConfigProperties); + broker.start().get(); + brokers.add(broker); + return broker; + } + + public List getBrokers() { + return Collections.unmodifiableList(brokers); + } + + public String connectionString() { + return brokers.stream().filter(EphemeralKafkaBroker::isRunning).map(EphemeralKafkaBroker::getBrokerList).map(Optional::get).collect(Collectors.joining(",")); + } + + /** + * Create a minimal producer configuration that can be used to produce to this broker + * + * @return Properties + */ + public Properties producerConfig() { + Properties props = new Properties(); + props.put("bootstrap.servers", connectionString()); + props.put("acks", "all"); + props.put("batch.size", "100"); + props.put("client.id", "kafka-junit"); + props.put("request.timeout.ms", "5000"); + props.put("max.in.flight.requests.per.connection", "1"); + props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + props.put(ProducerConfig.LINGER_MS_CONFIG, 0); + + return props; + } + + /** + * Create a minimal consumer configuration with auto commit enabled. Offset is set to "earliest". + * + * @return Properies + */ + public Properties consumerConfig() { + return consumerConfig(true); + } + + /** + * Create a minimal consumer configuration. Offset is set to "earliest". + * + * @return Properties + */ + public Properties consumerConfig(boolean enableAutoCommit) { + Properties props = new Properties(); + props.put("bootstrap.servers", connectionString()); + props.put("group.id", "kafka-junit-consumer"); + props.put("enable.auto.commit", String.valueOf(enableAutoCommit)); + props.put("auto.commit.interval.ms", "100"); + props.put("auto.offset.reset", "earliest"); + props.put("heartbeat.interval.ms", "100"); + props.put("session.timeout.ms", "200"); + props.put("fetch.max.wait.ms", "500"); + props.put("metadata.max.age.ms", "100"); + return props; + } + + public void createTopics(String... topics) throws ExecutionException, InterruptedException { + Map adminConfigs = new HashMap<>(); + adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, connectionString()); + try(AdminClient admin = AdminClient.create(adminConfigs)) { + List newTopics = Stream.of(topics) + .map(t -> new NewTopic(t, numBroker, (short) numBroker)) + .collect(Collectors.toList()); + CreateTopicsResult createTopics = admin.createTopics(newTopics); + createTopics.all().get(); + } + } + + @Override + public void close() throws Exception { + stop(); + } +} diff --git a/src/main/java/com/github/charithe/kafka/KafkaJunitRule.java b/src/main/java/com/github/charithe/kafka/KafkaJunitRule.java index 13be81c..bf2da83 100644 --- a/src/main/java/com/github/charithe/kafka/KafkaJunitRule.java +++ b/src/main/java/com/github/charithe/kafka/KafkaJunitRule.java @@ -20,6 +20,7 @@ import org.junit.rules.ExternalResource; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static java.util.Objects.requireNonNull; @@ -47,7 +48,11 @@ protected void before() throws Throwable { @Override protected void after() { - broker.stop(); + try { + broker.stop(); + } catch (ExecutionException | InterruptedException e) { + throw new AssertionError(e); + } } /** diff --git a/src/test/java/com/github/charithe/kafka/EphemeralKafkaClusterTest.java b/src/test/java/com/github/charithe/kafka/EphemeralKafkaClusterTest.java new file mode 100644 index 0000000..592d0de --- /dev/null +++ b/src/test/java/com/github/charithe/kafka/EphemeralKafkaClusterTest.java @@ -0,0 +1,110 @@ +/* + * Copyright 2016 Charith Ellawala + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.charithe.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +public class EphemeralKafkaClusterTest { + + private static final String TEST_TOPIC = "test-topic"; + private static final String TEST_TOPIC2 = "test-topic2"; + private static EphemeralKafkaCluster cluster; + + @BeforeClass + public static void beforeClass() throws Exception { + cluster = EphemeralKafkaCluster.create(3); + } + + @AfterClass + public static void afterClass() throws InterruptedException, ExecutionException, IOException { + cluster.stop(); + } + + @Test + public void testStartAndStop() throws Exception { + try (KafkaConsumer consumer = new KafkaConsumer<>(cluster.consumerConfig(false), new IntegerDeserializer(), new StringDeserializer()); + KafkaProducer producer = new KafkaProducer<>(cluster.producerConfig(), new IntegerSerializer(), new StringSerializer())) { + cluster.createTopics(TEST_TOPIC); + + producer.send(new ProducerRecord<>(TEST_TOPIC, "value")); + producer.flush(); + + consumer.subscribe(Collections.singleton(TEST_TOPIC)); + ConsumerRecords poll = consumer.poll(10000); + assertThat(poll.count()).isEqualTo(1); + assertThat(poll.iterator().next().value()).isEqualTo("value"); + } + } + + @Test + public void testBrokerFailure() throws Exception { + try (KafkaConsumer consumer = new KafkaConsumer<>(cluster.consumerConfig(false), new IntegerDeserializer(), new StringDeserializer()); + KafkaProducer producer = new KafkaProducer<>(cluster.producerConfig(), new IntegerSerializer(), new StringSerializer())) { + cluster.createTopics(TEST_TOPIC2); + + producer.send(new ProducerRecord<>(TEST_TOPIC2, 1, "value")); + producer.send(new ProducerRecord<>(TEST_TOPIC2, 2, "value")); + producer.send(new ProducerRecord<>(TEST_TOPIC2, 3, "value")); + producer.flush(); + + consumer.subscribe(Collections.singleton(TEST_TOPIC2)); + int count = 0; + for(int i = 0 ; i < 10 ; ++i) { + ConsumerRecords poll = consumer.poll(1000); + count += poll.count(); + if(count == 3) + break; + } + assertThat(count).isEqualTo(3); + } + cluster.getBrokers().get(1).stop(); + try (KafkaConsumer consumer = new KafkaConsumer<>(cluster.consumerConfig(false), new IntegerDeserializer(), new StringDeserializer()); + KafkaProducer producer = new KafkaProducer<>(cluster.producerConfig(), new IntegerSerializer(), new StringSerializer())) { + producer.send(new ProducerRecord<>(TEST_TOPIC2, 1, "value")); + producer.send(new ProducerRecord<>(TEST_TOPIC2, 2, "value")); + producer.send(new ProducerRecord<>(TEST_TOPIC2, 3, "value")); + producer.flush(); + + consumer.subscribe(Collections.singleton(TEST_TOPIC2)); + int count = 0; + for(int i = 0 ; i < 10 ; ++i) { + ConsumerRecords poll = consumer.poll(1000); + count += poll.count(); + if(count == 6) + break; + } + assertThat(count).isEqualTo(6); + } + cluster.getBrokers().get(1).start().get(); + } +} diff --git a/src/test/java/com/github/charithe/kafka/KafkaHelperTest.java b/src/test/java/com/github/charithe/kafka/KafkaHelperTest.java index dddf3b4..95aa19d 100644 --- a/src/test/java/com/github/charithe/kafka/KafkaHelperTest.java +++ b/src/test/java/com/github/charithe/kafka/KafkaHelperTest.java @@ -43,7 +43,7 @@ public static void setup() throws Exception { } @AfterClass - public static void teardown() { + public static void teardown() throws ExecutionException, InterruptedException { broker.stop(); broker = null; helper = null; diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index d9e3181..890bcd5 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -31,7 +31,7 @@ - +