Skip to content

Commit

Permalink
Merge pull request #32 from hypnoce/update_kafka_1
Browse files Browse the repository at this point in the history
Update kafka to 1.0.0. Update guava to 23.3-jre.
  • Loading branch information
charithe authored Nov 12, 2017
2 parents 234d988 + 3cdeb99 commit b3fe0f4
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 39 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Version | Kafka Version
3.0.4 | 0.10.2.1
3.1.0 | 0.11.0.0
3.1.1 | 0.11.0.1
4.0.0 | 1.0.0

Installation
-------------
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

<properties>
<java.version>1.8</java.version>
<kafka.version>0.11.0.1</kafka.version>
<kafka.version>1.0.0</kafka.version>
</properties>

<scm>
Expand Down Expand Up @@ -101,7 +101,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
<version>23.3-jre</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
93 changes: 59 additions & 34 deletions src/main/java/com/github/charithe/kafka/EphemeralKafkaBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -36,6 +37,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
Expand All @@ -50,6 +52,7 @@ public class EphemeralKafkaBroker {
private Properties overrideBrokerProperties;

private TestingServer zookeeper;
private boolean managedZk = false;
private KafkaServerStartable kafkaServer;
private Path kafkaLogDir;

Expand Down Expand Up @@ -104,6 +107,13 @@ public static EphemeralKafkaBroker create(int kafkaPort, int zookeeperPort, Prop
this.overrideBrokerProperties = overrideBrokerProperties;
}

EphemeralKafkaBroker(TestingServer zookeeper, int kafkaPort, Properties overrideBrokerProperties) {
this.zookeeper = zookeeper;
this.kafkaPort = kafkaPort;
this.overrideBrokerProperties = overrideBrokerProperties;
this.managedZk = true;
}

/**
* Start the Kafka broker
*/
Expand All @@ -120,11 +130,13 @@ public CompletableFuture<Void> start() throws Exception {
}

private CompletableFuture<Void> startBroker() throws Exception {
if (zookeeperPort == ALLOCATE_RANDOM_PORT) {
zookeeper = new TestingServer(true);
zookeeperPort = zookeeper.getPort();
} else {
zookeeper = new TestingServer(zookeeperPort, true);
if(!this.managedZk) {
if (zookeeperPort == ALLOCATE_RANDOM_PORT) {
zookeeper = new TestingServer(true);
zookeeperPort = zookeeper.getPort();
} else {
zookeeper = new TestingServer(zookeeperPort, true);
}
}

kafkaPort = kafkaPort == ALLOCATE_RANDOM_PORT ? InstanceSpec.getRandomPort() : kafkaPort;
Expand All @@ -134,13 +146,18 @@ private CompletableFuture<Void> startBroker() throws Exception {
LOGGER.info("Starting Kafka server with config: {}", kafkaConfig.props());
kafkaServer = new KafkaServerStartable(kafkaConfig);
brokerStarted = true;
final Integer brokerId = kafkaServer.serverConfig().getInt(KafkaConfig.BrokerIdProp());
if(brokerId != null) {
/* Avoid warning for missing meta.properties */
Files.write(kafkaLogDir.resolve("meta.properties"), ("version=0\nbroker.id=" + brokerId).getBytes(StandardCharsets.UTF_8));
}
return CompletableFuture.runAsync(() -> kafkaServer.startup());
}

/**
* Stop the Kafka broker
*/
public void stop() {
public void stop() throws ExecutionException, InterruptedException {
if (brokerStarted) {
synchronized (this) {
if (brokerStarted) {
Expand All @@ -151,37 +168,45 @@ public void stop() {
}
}

private void stopBroker() {
try {
if (kafkaServer != null) {
LOGGER.info("Shutting down Kafka Server");
kafkaServer.shutdown();
}
private void stopBroker() throws ExecutionException, InterruptedException {
stopBrokerAsync().get();
}

if (zookeeper != null) {
LOGGER.info("Shutting down Zookeeper");
zookeeper.close();
}
CompletableFuture<Void> stopBrokerAsync() {
return CompletableFuture.runAsync(() -> {
try {
if (kafkaServer != null) {
LOGGER.info("Shutting down Kafka Server");
kafkaServer.shutdown();
kafkaServer.awaitShutdown();
}

if (zookeeper != null && !this.managedZk) {
LOGGER.info("Shutting down Zookeeper");
zookeeper.close();
}

if (Files.exists(kafkaLogDir)) {
LOGGER.info("Deleting the log dir: {}", kafkaLogDir);
Files.walkFileTree(kafkaLogDir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.deleteIfExists(file);
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.deleteIfExists(dir);
return FileVisitResult.CONTINUE;
}
});
if (Files.exists(kafkaLogDir)) {
LOGGER.info("Deleting the log dir: {}", kafkaLogDir);
Files.walkFileTree(kafkaLogDir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.deleteIfExists(file);
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.deleteIfExists(dir);
return FileVisitResult.CONTINUE;
}
});
}
} catch (Exception e) {
LOGGER.error("Failed to clean-up Kafka", e);
}
} catch (Exception e) {
LOGGER.error("Failed to clean-up Kafka", e);
}
});

}

private KafkaConfig buildKafkaConfig(String zookeeperQuorum) throws IOException {
Expand Down
174 changes: 174 additions & 0 deletions src/main/java/com/github/charithe/kafka/EphemeralKafkaCluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package com.github.charithe.kafka;

import kafka.server.KafkaConfig;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class EphemeralKafkaCluster implements AutoCloseable {
private static final int ALLOCATE_RANDOM_PORT = -1;

private int numBroker;
private TestingServer zookeeper;
private final List<EphemeralKafkaBroker> brokers = new ArrayList<>();

private EphemeralKafkaCluster(int numBroker, int zookeeperPort) throws Exception {
this.zookeeper = new TestingServer(zookeeperPort);
this.numBroker = numBroker;
for (int i = 0; i< numBroker; ++i){
this.addBroker();
}
}

/**
* Create a new ephemeral Kafka cluster
*
* @return EphemeralKafkaCluster
*/
public static EphemeralKafkaCluster create(int numBroker) throws Exception {
return create(numBroker, ALLOCATE_RANDOM_PORT);
}

/**
* Create a new ephemeral Kafka cluster with the specified Zookeeper port
*
* @param zookeeperPort Port the Zookeeper should listen on
* @return EphemeralKafkaCluster
*/
public static EphemeralKafkaCluster create(int numBroker, int zookeeperPort) throws Exception {
return new EphemeralKafkaCluster(numBroker, zookeeperPort);
}

public boolean isHealthy() {
return brokers.stream().filter(b -> !b.isRunning()).count() == 0;
}

public boolean isRunning() {
return brokers.stream().filter(EphemeralKafkaBroker::isRunning).count() > 0;
}

public void stop() throws IOException, ExecutionException, InterruptedException {
CompletableFuture.allOf(brokers.stream().map(EphemeralKafkaBroker::stopBrokerAsync).toArray(CompletableFuture[]::new)).get();
brokers.clear();
zookeeper.stop();
}

private EphemeralKafkaBroker addBroker() throws Exception {
final int brokerPort = InstanceSpec.getRandomPort();
Properties brokerConfigProperties = new Properties();
brokerConfigProperties.setProperty(KafkaConfig.BrokerIdProp(), brokers.size() + "");
brokerConfigProperties.setProperty(KafkaConfig.ZkConnectProp(), zookeeper.getConnectString());
brokerConfigProperties.setProperty(KafkaConfig.ControlledShutdownEnableProp(), false + "");
brokerConfigProperties.setProperty(KafkaConfig.ControlledShutdownMaxRetriesProp(), "1");
brokerConfigProperties.setProperty(KafkaConfig.DeleteTopicEnableProp(), true + "");
brokerConfigProperties.setProperty(KafkaConfig.PortProp(), "" + brokerPort);
brokerConfigProperties.setProperty(KafkaConfig.SslEnabledProtocolsProp(), false + "");
brokerConfigProperties.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), true + "");
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "300");
brokerConfigProperties.setProperty(KafkaConfig.ReplicaFetchWaitMaxMsProp(), "100");
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "10");

brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), numBroker + "");
brokerConfigProperties.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), 1 + "");
brokerConfigProperties.setProperty(KafkaConfig.ZkSessionTimeoutMsProp(), 200 + "");
brokerConfigProperties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsDoc(), 200 + "");
brokerConfigProperties.setProperty(KafkaConfig.AdvertisedHostNameProp(), "localhost");
brokerConfigProperties.setProperty(KafkaConfig.AdvertisedPortProp(), brokerPort + "");
brokerConfigProperties.setProperty(KafkaConfig.AdvertisedListenersProp(), "PLAINTEXT://localhost:" + brokerPort);
brokerConfigProperties.setProperty(KafkaConfig.HostNameProp(), "localhost");
brokerConfigProperties.setProperty(KafkaConfig.MinInSyncReplicasProp(), Math.max(1, numBroker - 1) + "");
final EphemeralKafkaBroker broker = new EphemeralKafkaBroker(zookeeper, brokerPort, brokerConfigProperties);
broker.start().get();
brokers.add(broker);
return broker;
}

public List<EphemeralKafkaBroker> getBrokers() {
return Collections.unmodifiableList(brokers);
}

public String connectionString() {
return brokers.stream().filter(EphemeralKafkaBroker::isRunning).map(EphemeralKafkaBroker::getBrokerList).map(Optional::get).collect(Collectors.joining(","));
}

/**
* Create a minimal producer configuration that can be used to produce to this broker
*
* @return Properties
*/
public Properties producerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", connectionString());
props.put("acks", "all");
props.put("batch.size", "100");
props.put("client.id", "kafka-junit");
props.put("request.timeout.ms", "5000");
props.put("max.in.flight.requests.per.connection", "1");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);

return props;
}

/**
* Create a minimal consumer configuration with auto commit enabled. Offset is set to "earliest".
*
* @return Properies
*/
public Properties consumerConfig() {
return consumerConfig(true);
}

/**
* Create a minimal consumer configuration. Offset is set to "earliest".
*
* @return Properties
*/
public Properties consumerConfig(boolean enableAutoCommit) {
Properties props = new Properties();
props.put("bootstrap.servers", connectionString());
props.put("group.id", "kafka-junit-consumer");
props.put("enable.auto.commit", String.valueOf(enableAutoCommit));
props.put("auto.commit.interval.ms", "100");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "100");
props.put("session.timeout.ms", "200");
props.put("fetch.max.wait.ms", "500");
props.put("metadata.max.age.ms", "100");
return props;
}

public void createTopics(String... topics) throws ExecutionException, InterruptedException {
Map<String, Object> adminConfigs = new HashMap<>();
adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, connectionString());
try(AdminClient admin = AdminClient.create(adminConfigs)) {
List<NewTopic> newTopics = Stream.of(topics)
.map(t -> new NewTopic(t, numBroker, (short) numBroker))
.collect(Collectors.toList());
CreateTopicsResult createTopics = admin.createTopics(newTopics);
createTopics.all().get();
}
}

@Override
public void close() throws Exception {
stop();
}
}
7 changes: 6 additions & 1 deletion src/main/java/com/github/charithe/kafka/KafkaJunitRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.rules.ExternalResource;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -47,7 +48,11 @@ protected void before() throws Throwable {

@Override
protected void after() {
broker.stop();
try {
broker.stop();
} catch (ExecutionException | InterruptedException e) {
throw new AssertionError(e);
}
}

/**
Expand Down
Loading

0 comments on commit b3fe0f4

Please sign in to comment.