Skip to content

Commit

Permalink
cleanup -> interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lct45 committed Jul 20, 2021
1 parent bcbf215 commit 6d77b88
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 92 deletions.
Expand Up @@ -22,6 +22,8 @@
import io.confluent.ksql.services.ServiceContext;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -113,7 +115,8 @@ public String getAppId() {
@Override
public void run() {
try {
FileUtils.deleteDirectory(new File(stateDir + appId));
final Path pathName = Paths.get(stateDir + "/" + appId);
FileUtils.deleteDirectory(new File(String.valueOf(pathName.normalize())));
LOG.warn("Deleted local state store for non-existing query {}. "
+ "This is not expected and was likely due to a "
+ "race condition when the query was dropped before.",
Expand Down
Expand Up @@ -88,7 +88,7 @@
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler;
import io.confluent.ksql.rest.util.PersistentQueryCleanup;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
Expand Down Expand Up @@ -474,7 +474,7 @@ private void initialize(final KsqlConfig configWithApplicationServer) {
processingLogContext.getConfig(),
ksqlConfigNoPort
);
commandRunner.processPriorCommands(new PersistentQueryCleanup(
commandRunner.processPriorCommands(new PersistentQueryCleanupImpl(
configWithApplicationServer
.getKsqlStreamConfigProps()
.getOrDefault(
Expand Down
Expand Up @@ -21,7 +21,7 @@
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.PersistentQueryCleanup;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.Pair;
Expand Down Expand Up @@ -251,7 +251,7 @@ private void closeEarly() {
/**
* Read and execute all commands on the command topic, starting at the earliest offset.
*/
public void processPriorCommands(final PersistentQueryCleanup queryCleanup) {
public void processPriorCommands(final PersistentQueryCleanupImpl queryCleanup) {
try {
final List<QueuedCommand> restoreCommands = commandStore.getRestoreCommands();
final List<QueuedCommand> compatibleCommands = checkForIncompatibleCommands(restoreCommands);
Expand Down
Expand Up @@ -16,52 +16,13 @@
package io.confluent.ksql.rest.util;

import io.confluent.ksql.engine.QueryCleanupService;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentQueryCleanup {
private static final Logger LOG = LoggerFactory.getLogger(PersistentQueryCleanup.class);

private final String stateDir;
private final ServiceContext serviceContext;
private final QueryCleanupService queryCleanupService;
import java.util.List;

public PersistentQueryCleanup(final String stateDir, final ServiceContext serviceContext) {
this.stateDir = stateDir;
this.serviceContext = serviceContext;
queryCleanupService = new QueryCleanupService();
queryCleanupService.startAsync();
}
public interface PersistentQueryCleanup {

public void cleanupLeakedQueries(final List<PersistentQueryMetadata> persistentQueries) {
final Set<String> stateStoreNames =
persistentQueries
.stream()
.map(PersistentQueryMetadata::getQueryApplicationId)
.collect(Collectors.toSet());
try {
final Set<String> allStateStores = new HashSet<>(Arrays.asList(new File(stateDir).list()));
allStateStores.removeAll(stateStoreNames);
allStateStores.forEach((appId) -> queryCleanupService.addCleanupTask(
new QueryCleanupService.QueryCleanupTask(
serviceContext,
appId,
false,
stateDir)));
} catch (NullPointerException e) {
LOG.info("No state stores to clean up");
}
}
void cleanupLeakedQueries(List<PersistentQueryMetadata> persistentQueries);

public QueryCleanupService getQueryCleanupService() {
return queryCleanupService;
}
QueryCleanupService getQueryCleanupService();
}
@@ -0,0 +1,67 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.util;

import io.confluent.ksql.engine.QueryCleanupService;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentQueryCleanupImpl implements PersistentQueryCleanup {
private static final Logger LOG = LoggerFactory.getLogger(PersistentQueryCleanupImpl.class);

private final String stateDir;
private final ServiceContext serviceContext;
private final QueryCleanupService queryCleanupService;

public PersistentQueryCleanupImpl(final String stateDir, final ServiceContext serviceContext) {
this.stateDir = stateDir;
this.serviceContext = serviceContext;
queryCleanupService = new QueryCleanupService();
queryCleanupService.startAsync();
}

public void cleanupLeakedQueries(final List<PersistentQueryMetadata> persistentQueries) {
final Set<String> stateStoreNames =
persistentQueries
.stream()
.map(PersistentQueryMetadata::getQueryApplicationId)
.collect(Collectors.toSet());
try {
final Set<String> allStateStores = new HashSet<>(Arrays.asList(new File(stateDir).list()));
allStateStores.removeAll(stateStoreNames);
allStateStores.forEach((appId) -> queryCleanupService.addCleanupTask(
new QueryCleanupService.QueryCleanupTask(
serviceContext,
appId,
false,
stateDir)));
} catch (NullPointerException e) {
LOG.info("No state stores to clean up");
}
}

public QueryCleanupService getQueryCleanupService() {
return queryCleanupService;
}
}
Expand Up @@ -18,6 +18,7 @@
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static io.confluent.ksql.util.KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static java.lang.Thread.sleep;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -94,7 +95,7 @@ public static void classSetUp() throws IOException {
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_METASTORE_BACKUP_LOCATION, BACKUP_LOCATION.getPath())
.withProperty(StreamsConfig.STATE_DIR_CONFIG, "/tmp/cat")
.withProperty(StreamsConfig.STATE_DIR_CONFIG, "/tmp/cat/")
.build();
}

Expand Down Expand Up @@ -215,20 +216,20 @@ public void shouldSkipIncompatibleCommands() throws Exception {

@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
@Test
public void shouldCleanUpLeftoverStateStores() throws SecurityException {
public void shouldCleanUpLeftoverStateStores() throws InterruptedException {
// Given:
File tempDir = new File("/tmp/cat");
File tempDir = new File("/tmp/cat/");
if (!tempDir.exists()){
tempDir.mkdirs();
}
File fakeStateStore = new File(tempDir.getAbsolutePath() + "/fakeStateStore");
if (!fakeStateStore.exists()){
fakeStateStore.mkdirs();
}
makeKsqlRequest("CREATE STREAM TOPIC3 (ID INT, price int) "
makeKsqlRequest("CREATE STREAM new_stream (ID INT, price int) "
+ "WITH (KAFKA_TOPIC='temp_top', partitions=3, VALUE_FORMAT='JSON');");
makeKsqlRequest("CREATE TABLE stream3 AS SELECT id, sum(price) FROM topic3 group by ID;");
File realStateStore = new File(tempDir.getAbsolutePath() + "/_confluent-ksql-default_query_CTAS_STREAM3_1");
makeKsqlRequest("CREATE TABLE new_stream_3 AS SELECT id, sum(price) FROM new_stream group by ID;");
File realStateStore = new File(tempDir.getAbsolutePath() + "/_confluent-ksql-default_query_CTAS_NEW_STREAM_3_1");

assertTrue(tempDir.exists());
assertTrue(fakeStateStore.exists());
Expand All @@ -237,6 +238,7 @@ public void shouldCleanUpLeftoverStateStores() throws SecurityException {
// When:
REST_APP.stop();
REST_APP.start();
sleep(3000);

// Then:
assertFalse(fakeStateStore.exists());
Expand Down
Expand Up @@ -35,21 +35,18 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.api.server.SlidingWindowRateLimiter;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogServerUtils;
import io.confluent.ksql.logging.query.TestAppender;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.scalablepush.PushRouting;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand All @@ -63,16 +60,15 @@
import io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.PersistentQueryCleanup;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.version.metrics.VersionCheckerAgent;
import io.vertx.core.Vertx;
import java.io.File;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -82,9 +78,6 @@
import java.util.function.Consumer;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -171,8 +164,8 @@ public class KsqlRestApplicationTest {

private final ArgumentCaptor<KsqlSecurityContext> securityContextArgumentCaptor =
ArgumentCaptor.forClass(KsqlSecurityContext.class);
private final ArgumentCaptor<PersistentQueryCleanup> queryCleanupArgumentCaptor =
ArgumentCaptor.forClass(PersistentQueryCleanup.class);
private final ArgumentCaptor<PersistentQueryCleanupImpl> queryCleanupArgumentCaptor =
ArgumentCaptor.forClass(PersistentQueryCleanupImpl.class);

@SuppressWarnings({"unchecked", "rawtypes"})
@Before
Expand Down

0 comments on commit 6d77b88

Please sign in to comment.