New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KCI-612] Clean up a terminated query's state stores #7729
Conversation
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made a quick pass on the non-testing code.
f -> { | ||
if (stateStoreNames.stream().noneMatch((name) -> f.getName().endsWith(name))) { | ||
try { | ||
Files.walk(f.toPath()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I personally like the Files.walkFileTree
since it allows us to print a more detailed error message, e.g. which file delete encounters the error, with the visitor pattern.
An example of its usage: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L812
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plus, since we would not expect to be finding any existing state store dirs that do not match existing persistent stores, we'd also log a warning if we indeed successfully delete something here.. e.g. "WARN: Deleted local state store for non-existing query {}, this is not expected and probably due to a race condition when query {} was dropped before".
@@ -466,6 +509,8 @@ private void initialize(final KsqlConfig configWithApplicationServer) { | |||
ksqlConfigNoPort | |||
); | |||
commandRunner.processPriorCommands(); | |||
cleanupOldStateDirectories(configWithApplicationServer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we indeed have large amount of dangling state store dirs here there's a small risk that the restoring of existing queries -- i.e. starting the execution of them -- may already failed. Hence I'm wondering if we can refactor the timing of triggering this function a bit, i.e. after we've consumed through all the commands from the cmd topic, but before we start executing the restoration of queries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me, probably right here: https://github.com/confluentinc/ksql/blob/master/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java#L286. WDYT? I can make that shift
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that seems a better ordering to me.
does this fix #7720? cc @AlanConfluent |
Also it makes me thinking: since we only tries to do the cleanup upon (re-)starting the app, we would still accumulate leaked state stores as we continue to run until we exhaust 500GB --- in the future, it may be 125GB --- in which case we would error out and trying to replace the thread only, and hence would fall into the loop still. I.e. unless the pod is bounced either due to upgrade/scaling or pro-actively by ourselves, we would still have this issue. Is that right? |
public void cleanupOldStateDirectories(final KsqlConfig configWithApplicationServer) { | ||
final String stateDir = configWithApplicationServer.getKsqlStreamConfigProps().getOrDefault( | ||
StreamsConfig.STATE_DIR_CONFIG, | ||
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG)).toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This default is a little confusing to me... We confirm that the state directory is available here, in ksql server main, but we don't set the config if we end up using the default. This means that if we're using a state store from the default, we can't use the config to access the store name anywhere...
I assume if we don't set the config there (if it's not already set ofc), it's so users can set the config later ..? But it does make it a little tricky to remember to use the default instead of the config. cc @swist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. I agree this is indeed a bit weird, maybe overlooked rather than intentional?
.map(PersistentQueryMetadata::getQueryApplicationId) | ||
.collect(Collectors.toSet()); | ||
try { | ||
Files.walkFileTree(Paths.get(stateDir), new SimpleFileVisitor<Path>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang updated this, lmk if it looks good to you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few more comments.
public void cleanupOldStateDirectories(final KsqlConfig configWithApplicationServer) { | ||
final String stateDir = configWithApplicationServer.getKsqlStreamConfigProps().getOrDefault( | ||
StreamsConfig.STATE_DIR_CONFIG, | ||
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG)).toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. I agree this is indeed a bit weird, maybe overlooked rather than intentional?
StreamsConfig.STATE_DIR_CONFIG, | ||
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG)).toString(); | ||
|
||
final Set<String> stateStoreNames = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is not stateStoreNames, but queryNames?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a valid comment? Please correct me if I'm wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be both, right? I guess semantically we can call it queryNames, although it's really queryApplicationId's since those have the formatting that matches the state store names. But this list should match the directories within the top level state store folder (aside from queries that didn't get cleaned up)
try { | ||
Files.delete(path); | ||
log.warn("Deleted local state store for non-existing query {}. This is not expected and was likely due to a " + | ||
"race condition when the query was dropped before.", path.getFileName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to be forward-looking a bit since as we binpack queries into KS, the local state dir names may not be the query ids but named topology ids (cc @ableegoldman @wcarlson5 could you clarify)?
.map(PersistentQueryMetadata::getQueryApplicationId) | ||
.collect(Collectors.toSet()); | ||
try { | ||
Files.walkFileTree(Paths.get(stateDir), new SimpleFileVisitor<Path>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few more comments.
|
||
@Override | ||
public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) { | ||
if (!stateStoreNames.contains(path.getFileName())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. are we deleting at the per-query granularity (i.e. if we check the top-level query is not in the persistent query metadata, we would just delete all its sub-folders/files) or per-state-store granularity here? I think the stateStoreNames
here is really storing query ids right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to queryApplication Ids
@@ -466,6 +509,8 @@ private void initialize(final KsqlConfig configWithApplicationServer) { | |||
ksqlConfigNoPort | |||
); | |||
commandRunner.processPriorCommands(); | |||
cleanupOldStateDirectories(configWithApplicationServer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that seems a better ordering to me.
StreamsConfig.STATE_DIR_CONFIG, | ||
StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG)).toString(); | ||
|
||
final Set<String> stateStoreNames = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a valid comment? Please correct me if I'm wrong.
try { | ||
Files.delete(path); | ||
|
||
log.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is path.getFileName()
gonna be query names? Or would it be possible to be at the state store or even deeper level? Ditto below for postVisitDirectory
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how this file visting system works like the first one did, but everything within the first level of the state store should match query names. Anything within those directories that don't match current query names should be deleted.
So, if we hit visitFile
for the first layer within the state store it should match the query application id, which is the list we've pulled above. Anything deeper I don't know -> makes me think we need to rethink this algo a bit. We want to delete recursively those directories in the first layer without checking to see if the files match
I think it would resolve #7720 @vvcephei @agavra if I'm reading that correctly. It sounds like the issue @AlanConfluent reported is the same one @guozhangwang reported in #7729 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late feedback 😬 - this kept flying under my radar. I think we can simplify the logic here, and reuse some of the existing interfaces for cleaning up leaked state that @AlanConfluent implemented in #6714
At a high level, this patch needs to do the following:
- build up a list of IDs for queries that were terminated which we may have missed cleaning up.
- clean up the resources associated with the list of IDs we built up in (1)
For (1) I think we can use the local state dirs like you've done here. However we just need to list the contents of the state dir and compare it to the IDs in ksqlEngine.getPersistentQueries()
- we don't need to walk the whole tree.
To do the actual clean up I think we should reuse QueryCleanupService.QueryCleanupTask
. This means we add the state directory cleanup to that class's run method. This will also have us cleanup leaked stores for transient queries.
@rodesai made some of the changes you suggested but I'm not totally sure I got everything you were suggesting so if you could glance over it that would be great. @guozhangwang I moved the obsolete state directory list into the command runner but had to pass in new parameters to get everything to work. LMK if this looks good and I'll update the tests that I broke |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good. Just a couple minor points inline.
@@ -106,6 +114,16 @@ public String getAppId() { | |||
|
|||
@Override | |||
public void run() { | |||
try { | |||
FileUtils.deleteDirectory(new File(stateDir + appId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any way to get this path out of streams instead of computing it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, and I'm not sure it would make sense. We get the list of directories to cleanup as strings from the top-level state directory. To get the corresponding path, if it exists, we'd have to take the string of the directory, map it back to the metadata or streams instance, and then get the path name.
We could compute the path name in the command runner
and pass in the full path here as appId, but that would make the next few checks somewhat messy since we wouldn't really have an appId in that case. That would also still require the new File
. WDYT @rodesai ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could compute the path name in the command runner and pass in the full path here as appId, but that would make the next few checks somewhat messy since we wouldn't really have an appId in that case. That would also still require the new File. WDYT @rodesai ?
Yeah I agree what's already implemented is preferable. No worries I think it's ok as is - it would be an incompatible change for streams to change the way the state dir is computed anyway.
@@ -284,6 +294,19 @@ public void processPriorCommands() { | |||
.getKsqlEngine() | |||
.getPersistentQueries(); | |||
|
|||
final Set<String> stateStoreNames = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this feels a little out of place at the top level of this method, and also leaks some internal details about the underlying stream processing runtime into this class. Could we break it into it's own interface like this that we pass into this class and call here?
class PersistentQueryCleanup {
void cleanupLeakedQueries(List<PersistentQueryMetadata> activeQueryIds);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface would get initialized from ksqlServerRestApplication
, right? So we'd get the stateDir
and serviceContext
from there? What's the benefit of doing this as an interface instead of a class, and if we do an interface is the idea that it's implemented in ksqlServerRestApplication
or commandRunner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd implement the interface from KsqlServerRestApplication
, which is the benefit. Ideally we should avoid writing code in this class that assumes the underlying runtime is KafkaStreams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow - what's the benefit of doing PersistentQueryCleanup
as an interface in KsqlServerRestApplication
instead of a stand-alone class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a good goal to try to keep the kafka-streams specific code abstracted away from the rest of the server/engine. It keeps things decoupled which makes the code easier to understand and reason about. Putting an interface between the command runner and the streams code that does clean up makes it easier to potentially try out different runtimes in the future. We'll probably never do this (might be a fun hack day project 😄) but it's a good principle to try to structure the code this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a pass at doing an interface - I can't tell if it's what you want. I get that we want to keep streams code abstracted, but it's odd to me that we'd want to implement the PersistentQueryCleanup
inside of the rest app if we're trying to keep it separate, it makes more sense to me to create a standalone class for PersistenteQueryCleanup
that the rest app instantiates and then passes to the command runner, who then calls a method in the standalone class, rather than doing a callback to a method inside of the rest app (which is what called the command runner in the first place).
The other option for interface would be to have KsqlRestApplication
as a whole implement PersistentQueryCleanup
but this seemed like it would make testing harder since there wouldn't be a simple constructor for a PersistentQueryCleanup
object. LMK your thoughts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it's odd to me that we'd want to implement the PersistentQueryCleanup inside of the rest app if we're trying to keep it separate, it makes more sense to me to create a standalone class for PersistenteQueryCleanup that the rest app instantiates and then passes to the command runner
Either way is fine by me - putting it in a standalone class is probably cleaner. It would be good to have an interface the standalone class implements to keep the implementation completely separate.
appId); | ||
} catch (IOException e) { | ||
LOG.error("Error cleaning up state directory {}\n. {}", appId, e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After running through testing I actually think this is kind of messy. We don't really want to try anything else in this run()
call, right? With the tests, it spits out a bunch of errors because it can't clean up the schema registry etc. Repurposing this feels a little weird unless we really do want those checks for any leftover persistent queries. Thoughts @rodesai ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I follow what the problem is. Which checks are you referring to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below this we check if we can cleanup internal topic schemas and delete internal topics. This is what the logs end up looking like:
[2021-07-15 08:08:07,100] WARN Could not clean up the schema registry for query: fakeStateStore (io.confluent.ksql.schema.registry.SchemaRegistryUtil:72)
java.lang.NullPointerException
at io.confluent.ksql.schema.registry.SchemaRegistryUtil.getSubjectNames(SchemaRegistryUtil.java:70)
at io.confluent.ksql.schema.registry.SchemaRegistryUtil.getInternalSubjectNames(SchemaRegistryUtil.java:194)
at io.confluent.ksql.schema.registry.SchemaRegistryUtil.cleanupInternalTopicSchemas(SchemaRegistryUtil.java:53)
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.lambda$run$0(QueryCleanupService.java:127)
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.tryRun(QueryCleanupService.java:144)
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.run(QueryCleanupService.java:126)
at io.confluent.ksql.engine.QueryCleanupService.run(QueryCleanupService.java:64)
at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:66)
at com.google.common.util.concurrent.Callables$4.run(Callables.java:117)
at java.lang.Thread.run(Thread.java:748)
[2021-07-15 08:08:07,105] WARN Failed to cleanup internal topics for fakeStateStore (io.confluent.ksql.engine.QueryCleanupService:146)
java.lang.NullPointerException
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.lambda$run$1(QueryCleanupService.java:134)
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.tryRun(QueryCleanupService.java:144)
at io.confluent.ksql.engine.QueryCleanupService$QueryCleanupTask.run(QueryCleanupService.java:134)
at io.confluent.ksql.engine.QueryCleanupService.run(QueryCleanupService.java:64)
at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:66)
at com.google.common.util.concurrent.Callables$4.run(Callables.java:117)
at java.lang.Thread.run(Thread.java:748)
[2021-07-15 08:08:07,108] WARN Failed to cleanup internal consumer groups for fakeStateStore (io.confluent.ksql.engine.QueryCleanupService:146)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup task should do it's best to clean up everything that could possibly be left behind locally and in kafka/sr for the query. Are you worried about the case where the schema/group/topic doesn't exist? The cleanup code should be able to handle the case where something it's supposed to clean up doesn't exist by just logging-and-continuing. If the resource doesn't exist anymore there's nothing to clean up - so the state is what we want it to be.
In this case I'd guess that the test setup has some problem which causes us to throw an NPE where we would never actually throw an NPE when actually running ksql. Do we know what value is getting set to null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case I think it's the context
you pass to PersistentQueryCleanup
from PersistentQueryCleanupTest
- you'd need to setup that context
to return a mock schema registry client and admin client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahhhhhhh okay that makes more sense, this cleanup order should be good then
bdfa5e4
to
bcbf215
Compare
755773b
to
6d77b88
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Sorry for getting late on the final pass of this, I also like the new approach better -- doing that in the background task than tying up the starting app thread. LGTM! |
Description
In a recent disk out of space incident, a few terminated queries had state stores that didn't get cleaned up. This took up a lot of disk space and ultimately had to be cleaned up manually. It seems like because the query was terminated and quickly after the pod was rolled, the query wasn't restarted but the stores didn't get cleaned up before we rolled the pod.
Now we'll check state stores when we initialize the app and remove and that don't match a currently running query.
Testing done
Unit + integration tests included
Reviewer checklist