Skip to content

Commit

Permalink
feat: cleanup transient query resources (#8694)
Browse files Browse the repository at this point in the history
* feat: cleanup transient query resources

* checkpoint feature flagged

* checkstyle

* compliation fix

* add configs

* integration test

* int test

* add negative integration test

* fix tests

* more cleanup

* await till condition true int test

* await till condition true int test 2

* await till condition true int test 3

* await till condition true int test 4

* refactor

* tweak logic

* read LocalCommands

* refactor

* phew

* more int test

* fix int test compilation issue

* pass int test refactor

* tweak regex

* address comments

* majrr refactrr

* nit

* unit tests

* cleanup

* cleanup 2

* nit

* unit tests fix

* nit

* nit

* nit
  • Loading branch information
cprasad1 committed Feb 15, 2022
1 parent 742a7c4 commit 24b2a7a
Show file tree
Hide file tree
Showing 7 changed files with 959 additions and 5 deletions.
41 changes: 41 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,26 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_QUERY_CLEANUP_SHUTDOWN_TIMEOUT_MS_DOC
= "The total time that the query cleanup spends trying to clean things up on shutdown.";

public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE
= "ksql.transient.query.cleanup.service.enable";
public static final boolean KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE_DEFAULT = true;
public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE_DOC
= "Enable transient query cleanup service.";

public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS
= "ksql.transient.query.cleanup.service.initial.delay.seconds";
public static final Integer KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS_DEFAULT
= 600;
public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS_DOC
= "The time to delay the first execution of the transient query cleanup service in seconds.";

public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS
= "ksql.transient.query.cleanup.service.period.seconds";
public static final Integer KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DEFAULT
= 600;
public static final String KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC
= "the period between successive executions of the transient query cleanup service.";

public static final String KSQL_ENDPOINT_MIGRATE_QUERY_CONFIG
= "ksql.endpoint.migrate.query";
private static final boolean KSQL_ENDPOINT_MIGRATE_QUERY_DEFAULT = true;
Expand Down Expand Up @@ -1388,6 +1408,27 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_ENDPOINT_MIGRATE_QUERY_DOC
)
.define(
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE,
Type.BOOLEAN,
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE_DEFAULT,
Importance.LOW,
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE_DOC
)
.define(
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS,
Type.INT,
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS_DEFAULT,
Importance.LOW,
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS_DOC
)
.define(
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS,
Type.INT,
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DEFAULT,
Importance.LOW,
KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.KsqlExecutionContext;
Expand All @@ -33,6 +34,7 @@
import io.confluent.ksql.internal.KsqlEngineMetrics;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.internal.TransientQueryCleanupListener;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.query.QueryLogger;
import io.confluent.ksql.metastore.MetaStore;
Expand Down Expand Up @@ -109,6 +111,7 @@ public class KsqlEngine implements KsqlExecutionContext, Closeable {
private final QueryCleanupService cleanupService;
private final OrphanedTransientQueryCleaner orphanedTransientQueryCleaner;
private final MetricCollectors metricCollectors;
private TransientQueryCleanupService transientQueryCleanupService;

public KsqlEngine(
final ServiceContext serviceContext,
Expand Down Expand Up @@ -152,22 +155,32 @@ public KsqlEngine(
final MetricCollectors metricCollectors
) {
this.cleanupService = new QueryCleanupService();

this.orphanedTransientQueryCleaner =
new OrphanedTransientQueryCleaner(this.cleanupService, ksqlConfig);
this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
this.engineMetrics = engineMetricsFactory.apply(this);
final Builder<QueryEventListener> registrationListeners =
ImmutableList.<QueryEventListener>builder()
.addAll(queryEventListeners)
.add(engineMetrics.getQueryEventListener())
.add(new CleanupListener(cleanupService, serviceContext, ksqlConfig));

if (getTransientQueryCleanupServiceEnabled(ksqlConfig)) {
this.transientQueryCleanupService = new TransientQueryCleanupService(
serviceContext,
ksqlConfig);
registrationListeners.add(new TransientQueryCleanupListener(transientQueryCleanupService));
}

this.primaryContext = EngineContext.create(
serviceContext,
processingLogContext,
metaStore,
queryIdGenerator,
cleanupService,
ksqlConfig,
ImmutableList.<QueryEventListener>builder()
.addAll(queryEventListeners)
.add(engineMetrics.getQueryEventListener())
.add(new CleanupListener(cleanupService, serviceContext, ksqlConfig))
.build(),
registrationListeners.build(),
metricCollectors
);
this.aggregateMetricsCollector = Executors.newSingleThreadScheduledExecutor();
Expand All @@ -186,6 +199,11 @@ public KsqlEngine(
this.metricCollectors = metricCollectors;

cleanupService.startAsync();
if (getTransientQueryCleanupServiceEnabled(ksqlConfig)) {
this.transientQueryCleanupService
.setQueryRegistry(this.primaryContext.getQueryRegistry());
this.transientQueryCleanupService.startAsync();
}
}

public int numberOfLiveQueries() {
Expand Down Expand Up @@ -601,6 +619,9 @@ public void close(final boolean closeQueries) {

@Override
public void close() {
if (getTransientQueryCleanupServiceEnabled(getKsqlConfig())) {
transientQueryCleanupService.stopAsync();
}
close(false);
}

Expand All @@ -616,6 +637,12 @@ public void cleanupOrphanedInternalTopics(
.cleanupOrphanedInternalTopics(serviceContext, queryApplicationIds);
}

public void populateTransientQueryCleanupServiceWithOldCommands(
final Set<String> queryApplicationIds
) {
this.transientQueryCleanupService.setLocalCommandsQueryAppIds(queryApplicationIds);
}

/**
* Determines if a statement is executable by the engine.
*
Expand Down Expand Up @@ -722,4 +749,8 @@ private static boolean getRowpartitionRowoffsetEnabled(

return ksqlConfig.getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED);
}

private boolean getTransientQueryCleanupServiceEnabled(final KsqlConfig ksqlConfig) {
return ksqlConfig.getBoolean(KsqlConfig.KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_ENABLE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.engine;

import static io.confluent.ksql.util.QueryApplicationId.buildInternalTopicPrefix;
import static java.nio.file.Files.deleteIfExists;

import com.google.common.util.concurrent.AbstractScheduledService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.query.QueryRegistry;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransientQueryCleanupService extends AbstractScheduledService {
private static final Logger LOG = LoggerFactory.getLogger(TransientQueryCleanupService.class);
private final Pattern internalTopicPrefixPattern;
private final Pattern transientAppIdPattern;
private final Set<String> queriesGuaranteedToBeRunningAtSomePoint;
private final String stateDir;
private final KafkaTopicClient topicClient;
private final int initialDelay;
private final int intervalPeriod;
private Optional<Set<String>> localCommandsQueryAppIds;
private QueryRegistry queryRegistry;
private int numLeakedTopics;
private int numLeakedStateFiles;

public TransientQueryCleanupService(final ServiceContext serviceContext,
final KsqlConfig ksqlConfig) {
final String internalTopicPrefix = buildInternalTopicPrefix(ksqlConfig, false);
this.internalTopicPrefixPattern = Pattern.compile(internalTopicPrefix);
this.transientAppIdPattern = Pattern.compile(internalTopicPrefix + ".*_[0-9]\\d*_[0-9]\\d*");

this.initialDelay = ksqlConfig.getInt(
KsqlConfig.KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_INITIAL_DELAY_SECONDS);

this.intervalPeriod = ksqlConfig.getInt(
KsqlConfig.KSQL_TRANSIENT_QUERY_CLEANUP_SERVICE_PERIOD_SECONDS);

this.stateDir = ksqlConfig.getKsqlStreamConfigProps()
.getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef()
.defaultValues()
.get(StreamsConfig.STATE_DIR_CONFIG))
.toString();

this.topicClient = serviceContext.getTopicClient();
this.localCommandsQueryAppIds = Optional.empty();
this.queriesGuaranteedToBeRunningAtSomePoint = new HashSet<>();
this.numLeakedTopics = 0;
this.numLeakedStateFiles = 0;
}

@Override
protected void runOneIteration() {
try {
final List<String> leakedTopics = findLeakedTopics();
this.numLeakedTopics = leakedTopics.size();
LOG.info("Cleaning up {} leaked topics: {}", numLeakedTopics, leakedTopics);
getTopicClient().deleteTopics(leakedTopics);
} catch (Throwable t) {
LOG.error(
"Failed to clean up topics with exception: " + t.getMessage(), t);
}
try {
final List<String> leakedStateDirs = findLeakedStateDirs();
this.numLeakedStateFiles = leakedStateDirs.size();
LOG.info("Cleaning up {} leaked state directories: {}",
numLeakedStateFiles,
leakedStateDirs.stream().map(file -> stateDir + "/" + file)
.collect(Collectors.toList()));
leakedStateDirs.forEach(this::deleteLeakedStateDir);
} catch (Throwable t) {
LOG.error(
"Failed to clean up state directories with exception: " + t.getMessage(), t);
}
}

@Override
public Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(initialDelay, intervalPeriod, TimeUnit.SECONDS);
}

public void setQueryRegistry(final QueryRegistry queryRegistry) {
this.queryRegistry = queryRegistry;
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP2")
public void setLocalCommandsQueryAppIds(final Set<String> ids) {
this.localCommandsQueryAppIds = Optional.of(ids);
}

private void deleteLeakedStateDir(final String filename) {
final String path = stateDir + "/" + filename;
final Path pathName = Paths.get(path);
try {
deleteIfExists(pathName);
} catch (IOException e) {
LOG.info("Transient Query Cleanup Service failed "
+ "to delete leaked state directory: " + path, e);
}
}

List<String> findLeakedTopics() {
return getTopicClient()
.listTopicNames()
.stream()
.filter(this::isLeaked)
.collect(Collectors.toList());
}

List<String> findLeakedStateDirs() {
return listAllStateFiles()
.stream()
.filter(this::isLeaked)
.collect(Collectors.toList());
}

List<String> listAllStateFiles() {
final File folder = new File(stateDir);
final File[] listOfFiles = folder.listFiles();

if (listOfFiles == null) {
return Collections.emptyList();
}
return Arrays.stream(listOfFiles)
.map(File::getName)
.collect(Collectors.toList());
}

boolean isLeaked(final String resource) {
if (foundInLocalCommands(resource)) {
return true;
}
if (!internalTopicPrefixPattern.matcher(resource).find()) {
return false;
}
if (!isCorrespondingQueryTerminated(resource)) {
return false;
}
final Matcher appIdMatcher = transientAppIdPattern.matcher(resource);
if (appIdMatcher.find()) {
return wasQueryGuaranteedToBeRunningAtSomePoint(appIdMatcher.group());
}

return false;
}

boolean isCorrespondingQueryTerminated(final String appId) {
return this.queryRegistry
.getAllLiveQueries()
.stream()
.map(qm -> qm.getQueryId().toString())
.noneMatch(appId::contains);
}

public void registerRunningQuery(final String appId) {
queriesGuaranteedToBeRunningAtSomePoint.add(appId);
}

boolean wasQueryGuaranteedToBeRunningAtSomePoint(final String appId) {
return queriesGuaranteedToBeRunningAtSomePoint.contains(appId);
}

boolean foundInLocalCommands(final String resourceName) {
return localCommandsQueryAppIds
.map(strings -> strings.stream().anyMatch(resourceName::contains))
.orElse(false);
}

KafkaTopicClient getTopicClient() {
return topicClient;
}

String getStateDir() {
return stateDir;
}
}
Loading

0 comments on commit 24b2a7a

Please sign in to comment.