Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KCI-612] Clean up a terminated query's state stores #7729

Merged
merged 9 commits into from Jul 20, 2021
Merged
Expand Up @@ -64,6 +64,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -115,7 +117,8 @@ public KsqlEngine(
final List<QueryEventListener> queryEventListeners
) {
this.cleanupService = new QueryCleanupService();
this.orphanedTransientQueryCleaner = new OrphanedTransientQueryCleaner(this.cleanupService);
this.orphanedTransientQueryCleaner =
new OrphanedTransientQueryCleaner(this.cleanupService, ksqlConfig);
this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
this.engineMetrics = engineMetricsFactory.apply(this);
this.primaryContext = EngineContext.create(
Expand All @@ -128,7 +131,7 @@ public KsqlEngine(
ImmutableList.<QueryEventListener>builder()
.addAll(queryEventListeners)
.add(engineMetrics.getQueryEventListener())
.add(new CleanupListener(cleanupService, serviceContext))
.add(new CleanupListener(cleanupService, serviceContext, ksqlConfig))
.build()
);
this.aggregateMetricsCollector = Executors.newSingleThreadScheduledExecutor();
Expand Down Expand Up @@ -369,12 +372,15 @@ public static boolean isExecutableStatement(final Statement statement) {
private static final class CleanupListener implements QueryEventListener {
final QueryCleanupService cleanupService;
final ServiceContext serviceContext;
final KsqlConfig ksqlConfig;

private CleanupListener(
final QueryCleanupService cleanupService,
final ServiceContext serviceContext) {
final ServiceContext serviceContext,
final KsqlConfig ksqlConfig) {
this.cleanupService = cleanupService;
this.serviceContext = serviceContext;
this.ksqlConfig = ksqlConfig;
}

@Override
Expand All @@ -387,7 +393,14 @@ public void onClose(
new QueryCleanupService.QueryCleanupTask(
serviceContext,
applicationId,
query instanceof TransientQueryMetadata
query instanceof TransientQueryMetadata,
ksqlConfig.getKsqlStreamConfigProps()
.getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef()
.defaultValues()
.get(StreamsConfig.STATE_DIR_CONFIG))
.toString()
));
}

Expand Down
Expand Up @@ -20,9 +20,12 @@
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,9 +34,12 @@ public class OrphanedTransientQueryCleaner {
private static final Logger LOG = LoggerFactory.getLogger(OrphanedTransientQueryCleaner.class);

private final QueryCleanupService cleanupService;
private final KsqlConfig ksqlConfig;

public OrphanedTransientQueryCleaner(final QueryCleanupService cleanupService) {
public OrphanedTransientQueryCleaner(final QueryCleanupService cleanupService,
final KsqlConfig ksqlConfig) {
this.cleanupService = requireNonNull(cleanupService);
this.ksqlConfig = ksqlConfig;
}

/**
Expand Down Expand Up @@ -65,7 +71,12 @@ public void cleanupOrphanedInternalTopics(
new QueryCleanupService.QueryCleanupTask(
serviceContext,
queryApplicationId,
true
true,
ksqlConfig.getKsqlStreamConfigProps()
.getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
.toString()
));
}
}
Expand Down
Expand Up @@ -17,8 +17,13 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.spun.util.io.FileUtils;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.services.ServiceContext;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
Expand All @@ -36,14 +41,14 @@
* real/sandboxed engines.</p>
*/
@SuppressWarnings("UnstableApiUsage")
class QueryCleanupService extends AbstractExecutionThreadService {
public class QueryCleanupService extends AbstractExecutionThreadService {

private static final Logger LOG = LoggerFactory.getLogger(QueryCleanupService.class);
private static final Runnable SHUTDOWN_SENTINEL = () -> { };

private final BlockingQueue<Runnable> cleanupTasks;

QueryCleanupService() {
public QueryCleanupService() {
cleanupTasks = new LinkedBlockingDeque<>();
}

Expand Down Expand Up @@ -85,19 +90,22 @@ public void addCleanupTask(final QueryCleanupTask task) {
cleanupTasks.add(task);
}

static class QueryCleanupTask implements Runnable {
public static class QueryCleanupTask implements Runnable {
private final String appId;
private final boolean isTransient;
private final ServiceContext serviceContext;
private String stateDir;

QueryCleanupTask(
public QueryCleanupTask(
final ServiceContext serviceContext,
final String appId,
final boolean isTransient
final boolean isTransient,
final String stateDir
) {
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.appId = Objects.requireNonNull(appId, "appId");
this.isTransient = isTransient;
this.stateDir = stateDir;
}

public String getAppId() {
Expand All @@ -106,11 +114,21 @@ public String getAppId() {

@Override
public void run() {
try {
final Path pathName = Paths.get(stateDir + "/" + appId);
FileUtils.deleteDirectory(new File(String.valueOf(pathName.normalize())));
LOG.warn("Deleted local state store for non-existing query {}. "
+ "This is not expected and was likely due to a "
+ "race condition when the query was dropped before.",
appId);
} catch (Exception e) {
LOG.error("Error cleaning up state directory {}\n. {}", appId, e);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After running through testing I actually think this is kind of messy. We don't really want to try anything else in this run() call, right? With the tests, it spits out a bunch of errors because it can't clean up the schema registry etc. Repurposing this feels a little weird unless we really do want those checks for any leftover persistent queries. Thoughts @rodesai ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow what the problem is. Which checks are you referring to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below this we check if we can cleanup internal topic schemas and delete internal topics. This is what the logs end up looking like:
[2021-07-15 08:08:07,100] WARN Could not clean up the schema registry for query: fakeStateStore (io.confluent.ksql.schema.registry.SchemaRegistryUtil:72)
java.lang.NullPointerException
at io.confluent.ksql.schema.registry.SchemaRegistryUtil.getSubjectNames(SchemaRegistryUtil.java:70)
at io.confluent.ksql.schema.registry.SchemaRegistryUtil.getInternalSubjectNames(SchemaRegistryUtil.java:194)
at io.confluent.ksql.schema.registry.SchemaRegistryUtil.cleanupInternalTopicSchemas(SchemaRegistryUtil.java:53)
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.lambda$run$0(QueryCleanupService.java:127)
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.tryRun(QueryCleanupService.java:144)
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.run(QueryCleanupService.java:126)
at io.confluent.ksql.engine.QueryCleanupService.run(QueryCleanupService.java:64)
at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:66)
at com.google.common.util.concurrent.Callables$4.run(Callables.java:117)
at java.lang.Thread.run(Thread.java:748)
[2021-07-15 08:08:07,105] WARN Failed to cleanup internal topics for fakeStateStore (io.confluent.ksql.engine.QueryCleanupService:146)
java.lang.NullPointerException
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.lambda$run$1(QueryCleanupService.java:134)
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.tryRun(QueryCleanupService.java:144)
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.run(QueryCleanupService.java:134)
at io.confluent.ksql.engine.QueryCleanupService.run(QueryCleanupService.java:64)
at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:66)
at com.google.common.util.concurrent.Callables$4.run(Callables.java:117)
at java.lang.Thread.run(Thread.java:748)
[2021-07-15 08:08:07,108] WARN Failed to cleanup internal consumer groups for fakeStateStore (io.confluent.ksql.engine.QueryCleanupService:146)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup task should do it's best to clean up everything that could possibly be left behind locally and in kafka/sr for the query. Are you worried about the case where the schema/group/topic doesn't exist? The cleanup code should be able to handle the case where something it's supposed to clean up doesn't exist by just logging-and-continuing. If the resource doesn't exist anymore there's nothing to clean up - so the state is what we want it to be.

In this case I'd guess that the test setup has some problem which causes us to throw an NPE where we would never actually throw an NPE when actually running ksql. Do we know what value is getting set to null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I think it's the context you pass to PersistentQueryCleanup from PersistentQueryCleanupTest - you'd need to setup that context to return a mock schema registry client and admin client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhhhhh okay that makes more sense, this cleanup order should be good then

tryRun(
() -> SchemaRegistryUtil.cleanupInternalTopicSchemas(
appId,
serviceContext.getSchemaRegistryClient(),
isTransient),
appId,
serviceContext.getSchemaRegistryClient(),
isTransient),
"internal topic schemas"
);

Expand All @@ -129,6 +147,10 @@ private void tryRun(final Runnable runnable, final String resource) {
LOG.warn("Failed to cleanup {} for {}", resource, appId, e);
}
}

public void setStateDir(final String newStateDir) {
stateDir = newStateDir;
}
}

}
Expand Up @@ -1906,7 +1906,7 @@ private PreparedStatement<?> prepare(final ParsedStatement stmt) {
private void awaitCleanupComplete() {
// add a task to the end of the queue to make sure that
// we've finished processing everything up until this point
ksqlEngine.getCleanupService().addCleanupTask(new QueryCleanupTask(serviceContext, "", false) {
ksqlEngine.getCleanupService().addCleanupTask(new QueryCleanupTask(serviceContext, "", false, "") {
@Override
public void run() {
// do nothing
Expand Down
Expand Up @@ -23,11 +23,13 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.engine.QueryCleanupService.QueryCleanupTask;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -62,6 +64,8 @@ public class OrphanedTransientQueryCleanerTest {
private ServiceContext serviceContext;
@Mock
private KafkaTopicClient topicClient;
@Mock
private KsqlConfig ksqlConfig;
@Captor
private ArgumentCaptor<QueryCleanupTask> taskCaptor;

Expand All @@ -70,7 +74,8 @@ public class OrphanedTransientQueryCleanerTest {
@Before
public void setUp() {
when(serviceContext.getTopicClient()).thenReturn(topicClient);
cleaner = new OrphanedTransientQueryCleaner(queryCleanupService);
when(ksqlConfig.getKsqlStreamConfigProps()).thenReturn(ImmutableMap.of("state.dir", "tmp/cat/"));
cleaner = new OrphanedTransientQueryCleaner(queryCleanupService, ksqlConfig);
}

@Test
Expand Down
Expand Up @@ -313,7 +313,7 @@ private <T extends QueryMetadata> T executeStatement(
final String statement,
final String... args
) {
final String formatted = String.format(statement, (Object[])args);
final String formatted = format(statement, (Object[])args);
log.debug("Sending statement: {}", formatted);

final List<QueryMetadata> queries = ksqlContext.sql(formatted);
Expand Down
Expand Up @@ -5,7 +5,7 @@
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;

class TestAppender extends AppenderSkeleton {
public class TestAppender extends AppenderSkeleton {
private final List<LoggingEvent> log = new ArrayList<>();
@Override
public boolean requiresLayout() {
Expand Down
Expand Up @@ -88,6 +88,7 @@
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
Expand Down Expand Up @@ -473,7 +474,16 @@ private void initialize(final KsqlConfig configWithApplicationServer) {
processingLogContext.getConfig(),
ksqlConfigNoPort
);
commandRunner.processPriorCommands();
commandRunner.processPriorCommands(new PersistentQueryCleanupImpl(
configWithApplicationServer
.getKsqlStreamConfigProps()
.getOrDefault(
StreamsConfig.STATE_DIR_CONFIG,
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG))
.toString(),
serviceContext)
);

commandRunner.start();
maybeCreateProcessingLogStream(
processingLogContext.getConfig(),
Expand Down
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.Pair;
Expand All @@ -42,6 +43,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
Expand Down Expand Up @@ -249,7 +251,7 @@ private void closeEarly() {
/**
* Read and execute all commands on the command topic, starting at the earliest offset.
*/
public void processPriorCommands() {
public void processPriorCommands(final PersistentQueryCleanupImpl queryCleanup) {
try {
final List<QueuedCommand> restoreCommands = commandStore.getRestoreCommands();
final List<QueuedCommand> compatibleCommands = checkForIncompatibleCommands(restoreCommands);
Expand Down Expand Up @@ -284,6 +286,7 @@ public void processPriorCommands() {
.getKsqlEngine()
.getPersistentQueries();

queryCleanup.cleanupLeakedQueries(queries);
LOG.info("Restarting {} queries.", queries.size());

queries.forEach(PersistentQueryMetadata::start);
Expand Down
@@ -0,0 +1,28 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.util;

import io.confluent.ksql.engine.QueryCleanupService;
import io.confluent.ksql.util.PersistentQueryMetadata;

import java.util.List;

public interface PersistentQueryCleanup {

void cleanupLeakedQueries(List<PersistentQueryMetadata> persistentQueries);

QueryCleanupService getQueryCleanupService();
}