Skip to content

Commit

Permalink
review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
lct45 committed Jun 29, 2021
1 parent 13c6b75 commit d50e5af
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 50 deletions.
Expand Up @@ -50,7 +50,6 @@
import io.confluent.ksql.physical.scalablepush.PushRouting;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.rest.Errors;
Expand Down Expand Up @@ -129,12 +128,14 @@
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -463,36 +464,52 @@ private void cleanupOldState() {
}

public void cleanupOldStateDirectories(final KsqlConfig configWithApplicationServer) {
final String stateDir =
configWithApplicationServer
.getKsqlStreamConfigProps()
.get(StreamsConfig.STATE_DIR_CONFIG)
.toString();
final String stateDir = configWithApplicationServer.getKsqlStreamConfigProps().getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG)).toString();

final Set<String> stateStoreNames =
ksqlEngine.getPersistentQueries()
.stream()
.map(PersistentQueryMetadata::getQueryId)
.map(QueryId::toString)
.map(PersistentQueryMetadata::getQueryApplicationId)
.collect(Collectors.toSet());
try {
Files.list(Paths.get(stateDir))
.map(Path::toFile)
.forEach(
f -> {
if (stateStoreNames.stream().noneMatch((name) -> f.getName().endsWith(name))) {
try {
Files.walk(f.toPath())
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} catch (IOException e) {
log.error("Error cleaning up obsolete {} state directory", f.getName());
}
}
});
Files.walkFileTree(Paths.get(stateDir), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFileFailed(Path path, IOException exc) {
log.error("Error cleaning up obsolete state directories \n", exc);
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
if (!stateStoreNames.contains(path.getFileName())) {
try {
Files.delete(path);
log.warn("Deleted local state store for non-existing query {}. This is not expected and was likely due to a " +
"race condition when the query was dropped before.", path.getFileName());
} catch (IOException e) {
log.error("Error cleaning up state directory {}\n. {}", path.getFileName(), e);
}
}
return FileVisitResult.CONTINUE;
}

public FileVisitResult postVisitDirectory(Path path, IOException exc) {
if (!path.toString().equals(stateDir)) {
try {
Files.delete(path);
log.warn("Deleted local state store for non-existing query {}. This is not expected and was likely due to a " +
"race condition when the query was dropped before.", path.getFileName());
} catch (IOException e) {
log.error("Error cleaning up state directory {}\n. {}", path.getFileName(), e);
}
}
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
log.error("Failed to clean a state directory {}", stateDir);
log.error("Failed to clean state directory {}\n {}", stateDir, e);
}
}

Expand Down
Expand Up @@ -22,16 +22,17 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KsqlServerMain {

Expand Down
Expand Up @@ -190,7 +190,8 @@ public void setUp() {
when(topicClient.isTopicExists(CMD_TOPIC_NAME)).thenReturn(false);

when(ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)).thenReturn("ksql-id");

when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of("state.dir", "/tmp/cat"));

when(precondition1.checkPrecondition(any(), any())).thenReturn(Optional.empty());
when(precondition2.checkPrecondition(any(), any())).thenReturn(Optional.empty());

Expand Down Expand Up @@ -477,6 +478,10 @@ public void shouldDeleteExtraStateStores() {
ImmutableMap.of(
KsqlRestConfig.LISTENERS_CONFIG, "http://some.host:1244,https://some.other.host:1258"));

final TestAppender appender = new TestAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);

File tempFile = new File(ksqlConfig.getKsqlStreamConfigProps().get(StreamsConfig.STATE_DIR_CONFIG).toString());
if (!tempFile.exists()){
tempFile.mkdirs();
Expand All @@ -491,7 +496,15 @@ public void shouldDeleteExtraStateStores() {
app.startKsql(ksqlConfig);

// Then:
final List<LoggingEvent> log = appender.getLog();
final LoggingEvent firstLogEntry = log.get(1);

assertThat(firstLogEntry.getLevel(), is(Level.WARN));
assertThat((String) firstLogEntry.getMessage(), is(
"Deleted local state store for non-existing query fakeStateStore. " +
"This is not expected and was likely due to a race condition when the query was dropped before."));
assertFalse(fakeStateStore.exists());
assertTrue(tempFile.exists());
}

@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
Expand All @@ -515,29 +528,10 @@ public void shouldKeepStateStoresBelongingToRunningQueries() {
}
// When:
app.startKsql(ksqlConfig);
// Then:
assertTrue(fakeStateStore.exists());
}

@Test
public void shouldThrowExceptionIfNoStateStoreDir() {
// Given:
final TestAppender appender = new TestAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
givenAppWithRestConfig(
ImmutableMap.of(
KsqlRestConfig.LISTENERS_CONFIG, "http://some.host:1244,https://some.other.host:1258"));

// When:
app.startKsql(ksqlConfig);

// Then:
final List<LoggingEvent> log = appender.getLog();
// will probably be 2 instead of 0
final LoggingEvent firstLogEntry = log.get(0);
assertThat(firstLogEntry.getLevel(), is(Level.ERROR));
assertThat((String) firstLogEntry.getMessage(), is("Failed to clean a state directory /tmp/cat"));
assertTrue(fakeStateStore.exists());
assertTrue(tempFile.exists());
}

private void givenAppWithRestConfig(final Map<String, Object> restConfigMap) {
Expand Down

0 comments on commit d50e5af

Please sign in to comment.