Skip to content

Commit

Permalink
Manage log.dir in the EmbeddedKafkaBroker
Browse files Browse the repository at this point in the history
Resolves spring-projects#194

Create the temporary directory in EKB instead of the broker to avoid
`NoSuchFileException`s during shutdown.

**cherry-pick to 2.4.x, 2.3.x, 2.2.x**
  • Loading branch information
garyrussell committed May 1, 2020
1 parent 9a961aa commit dafd9bd
Showing 1 changed file with 19 additions and 0 deletions.
Expand Up @@ -18,7 +18,10 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -136,6 +139,8 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {

private volatile ZooKeeperClient zooKeeperClient;

private String logDir;

public EmbeddedKafkaBroker(int count) {
this(count, false);
}
Expand Down Expand Up @@ -279,6 +284,7 @@ public synchronized EmbeddedKafkaBroker zkSessionTimeout(int zkSessionTimeout) {

@Override
public void afterPropertiesSet() {
logDir();
overrideExitMethods();
try {
this.zookeeper = new EmbeddedZookeeper(this.zkPort);
Expand Down Expand Up @@ -316,6 +322,19 @@ public void afterPropertiesSet() {
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
}

private void logDir() {
if (this.brokerProperties.get(KafkaConfig.LogDirProp()) == null) {
try {
Path dir = Files.createTempDirectory("spring.kafka" + System.currentTimeMillis());
this.logDir = dir.toString();
this.brokerProperties.put(KafkaConfig.LogDirProp(), this.logDir);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

private void overrideExitMethods() {
String exitMsg = "Exit.%s(%d, %s) called";
Exit.setExitProcedure((statusCode, message) -> {
Expand Down

0 comments on commit dafd9bd

Please sign in to comment.