New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning #11868
KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning #11868
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaFuture.allOf
is a nice find.
LGTM when you get a green build
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one comment about the fix itself.
ex.printStackTrace(); | ||
final KafkaFutureImpl<Void> resetOffsetsFuture = new KafkaFutureImpl<>(); | ||
try { | ||
removeTopologyFuture.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we have to wait on the first future before moving forward to construct the second future now? I thought the main fix is only in https://github.com/apache/kafka/pull/11868/files#diff-8baa5d7209fc00074bf3fe24d709c2dcf2a44c1623d7ced8c0e29c1d832a3bcbR1132 above, and with that we do not need to change behavior to wait for the removal of topology completes still?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that is the main fix, however I realized that we are currently in this awkward state of psueod-async-ness and I think we might ultimately want to scratch this whole RemoveNamedTopologyResult
and just make it fully blocking. Though I didn't want to go ahead and change the method signatures just yet, so I just have it block on the named topology future and then perform the offset reset.
The actual advantage here is that before this, we were actually making the StreamThread who completed the future perform the offset reset, which of course means it gets stuck for a bit and can't continue processing until basically the whole group has dropped this named topology. Better to have the caller thread do the offset reset to let the StreamThreads keep processing the other topologies.
(When we get to finally doing a KIP maybe we can discuss having a blocking and non-blocking option for these, but my feeling is let's not complicate things unnecessarily and it may be that we only really need a blocking version)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, that makes a lot of sense, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ableegoldman , lgtm!
|
Ah good catch @wcarlson5 , I need to move the offset reset logic out of the actual Anyways I'll push a fix for this, and then we should be good to go 👍 |
Thanks @wcarlson5 for the report! @ableegoldman please feel free to move on afterwards. |
@@ -239,12 +238,16 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo | |||
final boolean skipResetForUnstartedApplication = | |||
maybeCompleteFutureIfStillInCREATED(removeTopologyFuture, "removing topology " + topologyToRemove); | |||
|
|||
if (resetOffsets && !skipResetForUnstartedApplication) { | |||
if (resetOffsets && !skipResetForUnstartedApplication && !partitionsToReset.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the !partitionsToReset.isEmpty()
check here to make sure we don't log the line about resetting offsets if we don't actually have any offsets to reset
Thread.sleep(100); | ||
} catch (final InterruptedException ex) { | ||
ex.printStackTrace(); | ||
private void resetOffsets(final Set<TopicPartition> partitionsToReset) throws StreamsException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the large diff -- it's mainly due to spacing from having moved the 1!partitionsToReset.isEmpty()
, plus one small stylistic change to use a while true
loop with break
s because following the null status of the deleteOffsetsResult
was a bit confusing.
The real change though is that this method now just performs the offset resets directly, rather than directing whoever completes the removeNamedTopology
future to perform the offset reset (which is non-trivial and thus not appropriate for the StreamThreads to do).
We now invoke this directly when the user calls get()
on the future returned from the RemoveNamedTopologyResult.
This is the main change since being approved @wcarlson5 @guozhangwang
There's also the
if (resetOffsetsFuture == null) { | ||
return removeTopologyFuture; | ||
} else { | ||
return resetOffsetsFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically we now have the caller thread perform the offset reset and block on it when it goes to call get()
on the future returned by RemoveNamedTopologyResult#all
(or #resetOffsetsFuture
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These new changes make sense to me
Test failures are unrelated. Merging |
Merged to trunk |
@wcarlson5 @ableegoldman I'm wondering if we could still have live lock scenarios like this: say we have two topologies A and B, and two threads a and b each on a different KS instance. And each thread tries to remove one topology at a time, getting the future to make sure it is cleaned up, BUT they did it in a different order.
Would that be a concern? |
@guozhangwang yes that would be a concern. I think we need to document that these calls need to be made in the same order with the same topologies for each client. That is what KSQL does to make sure it works |
Sounds good, thanks for the clarification! |
This test has started to become flaky at a relatively low, but consistently reproducible, rate. Upon inspection, we find this is due to IOExceptions during the #cleanUpNamedTopology call -- specifically, most often a
DirectoryNotEmptyException
with an ocasionalFileNotFoundException
Basically, signs pointed to having returned from/completed the
#removeNamedTopology
future prematurely, and moving on to try and clear out the topology's state directory while there was a streamthread somewhere that was continuing to process/close its tasks.I believe this is due to updating the thread's topology version before we perform the actual topology update, in this case specifically the act of eg clearing out a directory. If one thread updates its version and then goes to perform the topology removal/cleanup when the second thread finishes its own topology removal, this other thread will check whether all threads are on the latest version and complete any waiting futures if so -- which means it can complete the future before the first thread has actually completed the corresponding action