Skip to content

Commit

Permalink
breaking out query cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
lct45 committed Jul 15, 2021
1 parent 536827f commit bcbf215
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 131 deletions.
Expand Up @@ -117,7 +117,8 @@ public KsqlEngine(
final List<QueryEventListener> queryEventListeners
) {
this.cleanupService = new QueryCleanupService();
this.orphanedTransientQueryCleaner = new OrphanedTransientQueryCleaner(this.cleanupService, ksqlConfig);
this.orphanedTransientQueryCleaner =
new OrphanedTransientQueryCleaner(this.cleanupService, ksqlConfig);
this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
this.engineMetrics = engineMetricsFactory.apply(this);
this.primaryContext = EngineContext.create(
Expand Down Expand Up @@ -396,7 +397,9 @@ public void onClose(
ksqlConfig.getKsqlStreamConfigProps()
.getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
StreamsConfig.configDef()
.defaultValues()
.get(StreamsConfig.STATE_DIR_CONFIG))
.toString()
));
}
Expand Down
Expand Up @@ -20,11 +20,11 @@
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,7 +36,8 @@ public class OrphanedTransientQueryCleaner {
private final QueryCleanupService cleanupService;
private final KsqlConfig ksqlConfig;

public OrphanedTransientQueryCleaner(final QueryCleanupService cleanupService, final KsqlConfig ksqlConfig) {
public OrphanedTransientQueryCleaner(final QueryCleanupService cleanupService,
final KsqlConfig ksqlConfig) {
this.cleanupService = requireNonNull(cleanupService);
this.ksqlConfig = ksqlConfig;
}
Expand Down
Expand Up @@ -22,7 +22,6 @@
import io.confluent.ksql.services.ServiceContext;

import java.io.File;
import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
Expand All @@ -40,8 +39,7 @@
* real/sandboxed engines.</p>
*/
@SuppressWarnings("UnstableApiUsage")
public
class QueryCleanupService extends AbstractExecutionThreadService {
public class QueryCleanupService extends AbstractExecutionThreadService {

private static final Logger LOG = LoggerFactory.getLogger(QueryCleanupService.class);
private static final Runnable SHUTDOWN_SENTINEL = () -> { };
Expand Down Expand Up @@ -116,19 +114,18 @@ public String getAppId() {
public void run() {
try {
FileUtils.deleteDirectory(new File(stateDir + appId));
LOG.warn(
"Deleted local state store for non-existing query {}. "
+ "This is not expected and was likely due to a "
+ "race condition when the query was dropped before.",
LOG.warn("Deleted local state store for non-existing query {}. "
+ "This is not expected and was likely due to a "
+ "race condition when the query was dropped before.",
appId);
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Error cleaning up state directory {}\n. {}", appId, e);
}
tryRun(
() -> SchemaRegistryUtil.cleanupInternalTopicSchemas(
appId,
serviceContext.getSchemaRegistryClient(),
isTransient),
appId,
serviceContext.getSchemaRegistryClient(),
isTransient),
"internal topic schemas"
);

Expand All @@ -147,6 +144,7 @@ private void tryRun(final Runnable runnable, final String resource) {
LOG.warn("Failed to cleanup {} for {}", resource, appId, e);
}
}

public void setStateDir(final String newStateDir) {
stateDir = newStateDir;
}
Expand Down
Expand Up @@ -88,6 +88,7 @@
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler;
import io.confluent.ksql.rest.util.PersistentQueryCleanup;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
Expand All @@ -104,7 +105,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.ReservedInternalTopics;
import io.confluent.ksql.util.RetryUtil;
import io.confluent.ksql.util.WelcomeMsgUtils;
Expand All @@ -118,27 +118,19 @@
import io.vertx.ext.dropwizard.Match;
import java.io.Console;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -482,14 +474,15 @@ private void initialize(final KsqlConfig configWithApplicationServer) {
processingLogContext.getConfig(),
ksqlConfigNoPort
);
commandRunner.processPriorCommands(
commandRunner.processPriorCommands(new PersistentQueryCleanup(
configWithApplicationServer
.getKsqlStreamConfigProps()
.getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
.getKsqlStreamConfigProps()
.getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
.toString(),
serviceContext);
serviceContext)
);

commandRunner.start();
maybeCreateProcessingLogStream(
Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KsqlServerMain {

private static final Logger log = LoggerFactory.getLogger(KsqlServerMain.class);
Expand Down
Expand Up @@ -16,42 +16,34 @@
package io.confluent.ksql.rest.server.computation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.engine.QueryCleanupService;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.PersistentQueryCleanup;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.RetryUtil;
import java.io.Closeable;
import java.io.File;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.avro.JsonProperties;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
Expand Down Expand Up @@ -259,7 +251,7 @@ private void closeEarly() {
/**
* Read and execute all commands on the command topic, starting at the earliest offset.
*/
public void processPriorCommands(final String stateDir, final ServiceContext serviceContext) {
public void processPriorCommands(final PersistentQueryCleanup queryCleanup) {
try {
final List<QueuedCommand> restoreCommands = commandStore.getRestoreCommands();
final List<QueuedCommand> compatibleCommands = checkForIncompatibleCommands(restoreCommands);
Expand Down Expand Up @@ -294,19 +286,7 @@ public void processPriorCommands(final String stateDir, final ServiceContext ser
.getKsqlEngine()
.getPersistentQueries();

final Set<String> stateStoreNames =
queries
.stream()
.map(PersistentQueryMetadata::getQueryApplicationId)
.collect(Collectors.toSet());
try {
final Set<String> allStateStores = new HashSet<>(Arrays.asList(new File(stateDir).list()));
allStateStores.removeAll(stateStoreNames);
QueryCleanupService queryCleanupService = new QueryCleanupService();
allStateStores.forEach((appId) -> queryCleanupService.addCleanupTask(new QueryCleanupService.QueryCleanupTask(serviceContext, appId, false, stateDir)));
} catch (NullPointerException e) {
LOG.info("No state stores to clean up");
}
queryCleanup.cleanupLeakedQueries(queries);
LOG.info("Restarting {} queries.", queries.size());

queries.forEach(PersistentQueryMetadata::start);
Expand Down
@@ -0,0 +1,67 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.util;

import io.confluent.ksql.engine.QueryCleanupService;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.io.File;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentQueryCleanup {
private static final Logger LOG = LoggerFactory.getLogger(PersistentQueryCleanup.class);

private final String stateDir;
private final ServiceContext serviceContext;
private final QueryCleanupService queryCleanupService;

public PersistentQueryCleanup(final String stateDir, final ServiceContext serviceContext) {
this.stateDir = stateDir;
this.serviceContext = serviceContext;
queryCleanupService = new QueryCleanupService();
queryCleanupService.startAsync();
}

public void cleanupLeakedQueries(final List<PersistentQueryMetadata> persistentQueries) {
final Set<String> stateStoreNames =
persistentQueries
.stream()
.map(PersistentQueryMetadata::getQueryApplicationId)
.collect(Collectors.toSet());
try {
final Set<String> allStateStores = new HashSet<>(Arrays.asList(new File(stateDir).list()));
allStateStores.removeAll(stateStoreNames);
allStateStores.forEach((appId) -> queryCleanupService.addCleanupTask(
new QueryCleanupService.QueryCleanupTask(
serviceContext,
appId,
false,
stateDir)));
} catch (NullPointerException e) {
LOG.info("No state stores to clean up");
}
}

public QueryCleanupService getQueryCleanupService() {
return queryCleanupService;
}
}
Expand Up @@ -120,7 +120,7 @@ public class QueryDescriptionFactoryTest {
public void setUp() {
when(topology.describe()).thenReturn(topologyDescription);
when(kafkaStreamsBuilder.build(any(), any())).thenReturn(queryStreams);
when(queryStreams.localThreadsMetadata()).thenReturn(Collections.emptySet());
when(queryStreams.metadataForLocalThreads()).thenReturn(Collections.emptySet());

when(sinkTopic.getKeyFormat()).thenReturn(
KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()), SerdeFeatures.of()));
Expand Down

0 comments on commit bcbf215

Please sign in to comment.