Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add produce consume check to KafkaContainerCluster #1131

Merged
merged 5 commits into from Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,7 +21,12 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** This container wraps Confluent Kafka and Zookeeper (optionally) */
/**
* This container wraps Confluent Kafka and Zookeeper (optionally)
*
* <p>This is a copy of KafkaContainer from testcontainers/testcontainers-java that we can tweak as
* needed
*/
@InternalApi
public class AlpakkaKafkaContainer extends GenericContainer<AlpakkaKafkaContainer> {

Expand Down
Expand Up @@ -9,13 +9,17 @@
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.core.command.ExecStartResultCallback;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;

import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -32,6 +36,12 @@ public class KafkaContainerCluster implements Startable {
AlpakkaKafkaContainer.DEFAULT_CP_PLATFORM_VERSION;
public static final int START_TIMEOUT_SECONDS = 120;

private static final String READINESS_CHECK_SCRIPT = "/testcontainers_readiness_check.sh";
private static final String READINESS_CHECK_TOPIC = "ready-kafka-container-cluster";
private static final Version BOOTSTRAP_PARAM_MIN_VERSION = new Version("5.2.0");

private final Logger log = LoggerFactory.getLogger(getClass());
private final Version confluentPlatformVersion;
private final int brokersNum;
private final Network network;
private final GenericContainer zookeeper;
Expand All @@ -54,6 +64,7 @@ public KafkaContainerCluster(
+ "' must be less than brokersNum and greater than 0");
}

this.confluentPlatformVersion = new Version(confluentPlatformVersion);
this.brokersNum = brokersNum;
this.network = Network.newNetwork();

Expand Down Expand Up @@ -101,24 +112,40 @@ public String getBootstrapServers() {
}

private Stream<GenericContainer> allContainers() {
Stream<GenericContainer> genericBrokers = this.brokers.stream().map(b -> (GenericContainer) b);
Stream<GenericContainer> genericBrokers = this.brokers.stream().map(b -> b);
seglo marked this conversation as resolved.
Show resolved Hide resolved
Stream<GenericContainer> zookeeper = Stream.of(this.zookeeper);
return Stream.concat(genericBrokers, zookeeper);
}

@Override
public void start() {
try {
Stream<Startable> startables = this.brokers.stream().map(b -> (Startable) b);
Stream<Startable> startables = this.brokers.stream().map(b -> b);
seglo marked this conversation as resolved.
Show resolved Hide resolved
Startables.deepStart(startables).get(START_TIMEOUT_SECONDS, SECONDS);

// assert that cluster has formed
Unreliables.retryUntilTrue(
START_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
() ->
Stream.of(this.zookeeper)
.map(this::clusterBrokers)
.anyMatch(brokers -> brokers.split(",").length == this.brokersNum));

this.brokers.stream()
.findFirst()
.ifPresent(
broker -> {
broker.copyFileToContainer(
Transferable.of(readinessCheckScript().getBytes(StandardCharsets.UTF_8), 700),
READINESS_CHECK_SCRIPT);
});

// test produce & consume message with full cluster involvement
Unreliables.retryUntilTrue(
START_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
() -> this.brokers.stream().findFirst().map(this::runReadinessCheck).orElse(false));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
Expand Down Expand Up @@ -148,8 +175,114 @@ private String clusterBrokers(GenericContainer c) {
}
}

private String readinessCheckScript() {
String connect = kafkaTopicConnectParam();
String command = "#!/bin/bash \n";
command += "set -e \n";
command +=
"kafka-topics "
+ connect
+ " --delete --topic "
+ READINESS_CHECK_TOPIC
+ " || echo \"topic does not exist\" \n";
command +=
"kafka-topics "
+ connect
+ " --topic "
+ READINESS_CHECK_TOPIC
+ " --create --partitions "
+ this.brokersNum
+ " --replication-factor "
+ this.brokersNum
+ " --config min.insync.replicas="
+ this.brokersNum
+ " \n";
command += "MESSAGE=\"`date -u`\" \n";
command +=
"echo \"$MESSAGE\" | kafka-console-producer --broker-list localhost:9092 --topic "
+ READINESS_CHECK_TOPIC
+ " --producer-property acks=all \n";
command +=
"kafka-console-consumer --bootstrap-server localhost:9092 --topic "
+ READINESS_CHECK_TOPIC
+ " --from-beginning --timeout-ms 2000 --max-messages 1 | grep \"$MESSAGE\" \n";
command += "kafka-topics " + connect + " --delete --topic " + READINESS_CHECK_TOPIC + " \n";
command += "echo \"test succeeded\" \n";
return command;
}

private String kafkaTopicConnectParam() {
if (this.confluentPlatformVersion.compareTo(BOOTSTRAP_PARAM_MIN_VERSION) >= 0) {
return "--bootstrap-server localhost:9092";
} else {
return "--zookeeper zookeeper:" + AlpakkaKafkaContainer.ZOOKEEPER_PORT;
}
}

private Boolean runReadinessCheck(GenericContainer c) {
try {
ByteArrayOutputStream stdoutStream = new ByteArrayOutputStream();
ByteArrayOutputStream stderrStream = new ByteArrayOutputStream();
dockerClient
.execStartCmd(
dockerClient
.execCreateCmd(c.getContainerId())
.withAttachStdout(true)
.withAttachStderr(true)
.withCmd("sh", "-c", READINESS_CHECK_SCRIPT)
.exec()
.getId())
.exec(new ExecStartResultCallback(stdoutStream, stderrStream))
.awaitCompletion();
log.debug("Readiness check returned errors:\n{}", stderrStream.toString());
return stdoutStream.toString().contains("test succeeded");
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

@Override
public void stop() {
allContainers().parallel().forEach(GenericContainer::stop);
}
}

@InternalApi
class Version implements Comparable<Version> {

private String version;

public final String get() {
return this.version;
}

public Version(String version) {
if (version == null) throw new IllegalArgumentException("Version can not be null");
if (!version.matches("[0-9]+(\\.[0-9]+)*"))
throw new IllegalArgumentException("Invalid version format");
this.version = version;
}

@Override
public int compareTo(Version that) {
if (that == null) return 1;
String[] thisParts = this.get().split("\\.");
String[] thatParts = that.get().split("\\.");
int length = Math.max(thisParts.length, thatParts.length);
for (int i = 0; i < length; i++) {
int thisPart = i < thisParts.length ? Integer.parseInt(thisParts[i]) : 0;
int thatPart = i < thatParts.length ? Integer.parseInt(thatParts[i]) : 0;
if (thisPart < thatPart) return -1;
if (thisPart > thatPart) return 1;
}
return 0;
}

@Override
public boolean equals(Object that) {
if (this == that) return true;
if (that == null) return false;
if (this.getClass() != that.getClass()) return false;
return this.compareTo((Version) that) == 0;
}
}