Skip to content

Commit

Permalink
Merge pull request #16 from GoodforGod/dev
Browse files Browse the repository at this point in the history
[0.9.3]
  • Loading branch information
GoodforGod committed Nov 18, 2023
2 parents a086f71 + 2278656 commit 4a9a351
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '11'
java-version: '17'
distribution: 'adopt'

- name: Build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '11'
java-version: '17'
distribution: 'adopt'

- name: Code Style
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
groupId=io.goodforgod
artifactRootId=testcontainers-extensions
artifactVersion=0.9.2-SNAPSHOT
artifactVersion=0.9.3-SNAPSHOT


##### GRADLE #####
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TopicExistsException;
Expand Down Expand Up @@ -117,8 +118,14 @@ static final class ConsumerImpl implements Consumer {
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.pollInterval(Duration.ofMillis(50))
.until(() -> this.consumer.listTopics(Duration.ofMinutes(1)),
result -> this.topics.containsAll(result.keySet()));
.until(() -> {
try {
return this.consumer.listTopics(Duration.ofSeconds(10));
} catch (Exception e) {
return Collections.<String, List<PartitionInfo>>emptyMap();
}
},
result -> new HashSet<>(result.keySet()).containsAll(this.topics));
logger.debug("KafkaConsumer topics {} assigned", this.topics);

logger.trace("KafkaConsumer topics {} poll starting", this.topics);
Expand Down Expand Up @@ -395,7 +402,7 @@ public void send(@NotNull String topic, @NotNull List<Event> events) {
try {
logger.trace("KafkaProducer sending event: {}", event);
var result = producer.send(new ProducerRecord<>(topic, null, key, event.value().asBytes(), headers))
.get(5, TimeUnit.SECONDS);
.get(10, TimeUnit.SECONDS);
logger.info("KafkaProducer sent event with offset '{}' with partition '{}' with timestamp '{}' event: {}",
result.offset(), result.partition(), result.timestamp(), event);
} catch (Exception e) {
Expand Down Expand Up @@ -486,7 +493,7 @@ void createTopicsIfNeeded(@NotNull Set<String> topics, boolean reset) {
static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set<String> topics, boolean reset) {
try {
logger.trace("Looking for existing topics...");
var existingTopics = admin.listTopics().names().get(2, TimeUnit.MINUTES);
var existingTopics = admin.listTopics().names().get(1, TimeUnit.MINUTES);
logger.debug("Found existing topics: {}", existingTopics);

var topicsToCreate = topics.stream()
Expand All @@ -505,7 +512,7 @@ static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set<String> topi
logger.info("Required topics {} created", topics);
} else if (reset && !topicsToReset.isEmpty()) {
logger.trace("Required topics {} already exist, but require reset, resetting...", topicsToReset);
admin.deleteTopics(topicsToReset).all().get(2, TimeUnit.MINUTES);
admin.deleteTopics(topicsToReset).all().get(1, TimeUnit.MINUTES);
logger.debug("Topics {} reset success", topicsToReset);

var topicsToCreateAfterReset = topicsToReset.stream()
Expand Down Expand Up @@ -565,7 +572,7 @@ public void createTopics(@NotNull Set<String> topics) {
public void dropTopics(@NotNull Set<String> topics) {
try (var admin = admin()) {
logger.trace("Looking for existing topics...");
var existingTopics = admin.listTopics().names().get(2, TimeUnit.MINUTES);
var existingTopics = admin.listTopics().names().get(1, TimeUnit.MINUTES);
logger.debug("Found existing topics: {}", existingTopics);

var topicsToDrop = existingTopics.stream()
Expand All @@ -576,7 +583,7 @@ public void dropTopics(@NotNull Set<String> topics) {
logger.trace("Topics {} dropping...", topicsToDrop);
var deleteTopicsResult = admin.deleteTopics(topics);
var deleteFutures = deleteTopicsResult.topicNameValues().values().toArray(KafkaFuture[]::new);
KafkaFuture.allOf(deleteFutures).get(2, TimeUnit.MINUTES);
KafkaFuture.allOf(deleteFutures).get(1, TimeUnit.MINUTES);
logger.info("Required topics {} dropped", topicsToDrop);
} else {
logger.debug("Required topics already dropped: {}", topics);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package io.goodforgod.testcontainers.extensions.kafka;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.jetbrains.annotations.NotNull;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;

public class KafkaContainerExtra extends KafkaContainer {

// https://docs.confluent.io/platform/7.0.0/release-notes/index.html#ak-raft-kraft
private static final String MIN_KRAFT_TAG = "7.0.0";

private static final String EXTERNAL_TEST_KAFKA_BOOTSTRAP = "EXTERNAL_TEST_KAFKA_BOOTSTRAP_SERVERS";
private static final String EXTERNAL_TEST_KAFKA_PREFIX = "EXTERNAL_TEST_KAFKA_";

Expand All @@ -38,11 +41,29 @@ public KafkaContainerExtra(DockerImageName dockerImageName) {
"org.apache.zookeeper=ERROR,org.kafka.zookeeper=ERROR,kafka.zookeeper=ERROR,org.apache.kafka=ERROR,kafka=ERROR,kafka.network=ERROR,kafka.cluster=ERROR,kafka.controller=ERROR,kafka.coordinator=INFO,kafka.log=ERROR,kafka.server=ERROR,state.change.logger=ERROR");
this.withEnv("ZOOKEEPER_LOG4J_LOGGERS",
"org.apache.zookeeper=ERROR,org.kafka.zookeeper=ERROR,org.kafka.zookeeper.server=ERROR,kafka.zookeeper=ERROR,org.apache.kafka=ERROR");
this.withEmbeddedZookeeper();
this.withExposedPorts(9092, KafkaContainer.KAFKA_PORT);
this.waitingFor(Wait.forListeningPort());
this.withStartupTimeout(Duration.ofMinutes(5));

var actualVersion = new ComparableVersion(DockerImageName.parse(getDockerImageName()).getVersionPart());
if (!actualVersion.isLessThan(MIN_KRAFT_TAG)) {
final Optional<Method> withKraft = Arrays.stream(KafkaContainer.class.getDeclaredMethods())
.filter(m -> m.getName().equals("withKraft"))
.findFirst();

if (withKraft.isPresent()) {
withKraft.get().setAccessible(true);
try {
withKraft.get().invoke(this);
logger().info("Kraft is enabled");
} catch (IllegalAccessException | InvocationTargetException e) {
this.withEmbeddedZookeeper();
}
} else {
this.withEmbeddedZookeeper();
}
}

this.setNetworkAliases(new ArrayList<>(List.of(alias)));
}

Expand Down

0 comments on commit 4a9a351

Please sign in to comment.