From dafd9bd0d6e2494e20dca62eb4e98439e4da1ab2 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 1 May 2020 10:52:27 -0400 Subject: [PATCH] Manage log.dir in the EmbeddedKafkaBroker Resolves https://github.com/spring-projects/spring-kafka/issues/194 Create the temporary directory in EKB instead of the broker to avoid `NoSuchFileException`s during shutdown. **cherry-pick to 2.4.x, 2.3.x, 2.2.x** --- .../kafka/test/EmbeddedKafkaBroker.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java index 9f52d68c60..c49c2b5a6c 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java @@ -18,7 +18,10 @@ import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -136,6 +139,8 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean { private volatile ZooKeeperClient zooKeeperClient; + private String logDir; + public EmbeddedKafkaBroker(int count) { this(count, false); } @@ -279,6 +284,7 @@ public synchronized EmbeddedKafkaBroker zkSessionTimeout(int zkSessionTimeout) { @Override public void afterPropertiesSet() { + logDir(); overrideExitMethods(); try { this.zookeeper = new EmbeddedZookeeper(this.zkPort); @@ -316,6 +322,19 @@ public void afterPropertiesSet() { System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString()); } + private void logDir() { + if (this.brokerProperties.get(KafkaConfig.LogDirProp()) == null) { + try { + Path dir = Files.createTempDirectory("spring.kafka" + System.currentTimeMillis()); + this.logDir = dir.toString(); + this.brokerProperties.put(KafkaConfig.LogDirProp(), this.logDir); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + private void overrideExitMethods() { String exitMsg = "Exit.%s(%d, %s) called"; Exit.setExitProcedure((statusCode, message) -> {