New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6259: Make KafkaStreams.cleanup() clean global state directory #4255
Conversation
Call for review @bbejeck @guozhangwang @dguy Guess, we can merge this to |
retest this please |
1 similar comment
retest this please |
|
Updated this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments, otherwise LGTM.
cleanRemovedTasks(cleanupDelayMs, true); | ||
} catch (final IOException cannotHappen) { | ||
// dummy log to make findbugs happy | ||
log.error("{} Failed to delete the state directory due to an unexpected exception.", logPrefix(), cannotHappen); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we throw an IllegalStateException instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We set swallowException=true
-- wouldn't throwing an exception contradict this? Note, this code path is used for the task cleaner back ground thread -- an exception would kill this thread, what is the reason why I we want to swallow. Or do I miss anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing IllegalStateException
is served as the purpose that "this should never happen, and if it does it is a bug and hence it is ok to fail and stop the world".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Fine with me.
// this is already logged within cleanRemovedTasks | ||
firstException = new StreamsException(e); | ||
} | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we still continue deleting the global state directory if the local task directories cleanup failed? I'm thinking if the local stores deletion fails while global store succeeded, we will throw an exception, and user may retry in which case globalStateDir()
will be called and trying to re-create the folder and delete it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also "keep going" of cleaning a task directory fails (ie, don't stop on first error). Thus, it's a consistent pattern. We could of course change this via a "failFast" flag for cleanRemovedTasks
. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference is that in cleanRemovedTasks()
(originally we do not throw exception) we do not return a boolean or throwing exception indicating if it completely succeeds or not. So this call itself is effectively "best effort" but we do not comment it probably since it is an internal function. Now as we change it to throw exception if parameterized so I'm just wondering if we should still make this behavior.
Anyways, it is just a wild thought but not really a suggested comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just about code sharing. If we call it internally, we can keep "best effort" approach. If users calls it, we can apply fail-fast IMHO. Will update the code.
Updated this. |
retest this please |
checkstyle error:
|
Updates this. Jenkins did not startup properly. Retest this please. |
retest this please |
1 similar comment
retest this please |
final long now = time.milliseconds(); | ||
final long lastModifiedMs = taskDir.lastModified(); | ||
if (now > lastModifiedMs + cleanupDelayMs || manualUserCall) { | ||
if (!manualUserCall) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe always log the deletion of the state directory, but modify the message if it's from time expiring vs. a manualUserCall
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this when merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @guozhangwang
@mjsax a little late in reviewing, just one minor comment but otherwise LGTM |
LGTM. Merged to trunk. |
Committer Checklist (excluded from commit message)