-
Notifications
You must be signed in to change notification settings - Fork 386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
retry committing for transient exceptions #1584
retry committing for transient exceptions #1584
Conversation
Hi @omeraha, Thank you for your contribution! We really value the time you've taken to put this together. Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement: |
@@ -588,7 +588,7 @@ import scala.util.control.NonFatal | |||
progressTracker.committed(offsets) | |||
replyTo.foreach(_ ! Done) | |||
|
|||
case e: RebalanceInProgressException => retryCommits(duration, e) | |||
case e @ (_: RebalanceInProgressException | _: TimeoutException) => retryCommits(duration, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your comment in #266 (comment) that this could handle all kinds of RetriableException
here instead. That would even catch RetriableCommitFailedException
below.
I assume most of the subclasses to RetriableException
can not happen just here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ennru I'll fix it and commit. I'll also take a look at the classes that (currently) extend RetriableException
. But it should be safe to retry all of them inside a callback on the commit result (as this means they must have been thrown during the commit itself)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ennru The following (specific) exceptions will be handled if we decide to go for RetriableException
:
- CoordinatorLoadInProgressException
- UnknownTopicOrPartitionException
- CoordinatorNotAvailableException
- NotCoordinatorException
- TimeoutException
While TimeoutException
and the CoordinatorLoadInProgressException
are clearly safe to retry (we've seen CoordinatorLoadInProgressException
in our services a couple of times), the other three are a bit more tricky, for instance: the UnknownTopicOrPartitionException
comment states that
This exception is used in contexts where a topic doesn't seem to exist based on possibly stale metadata.
This exception is retriable because the topic or partition might subsequently be created.
Which in our case might lead to endless retires (as currently retryCommits()
doesn't have a limit on the number of retries, if I understand the code correctly).
I've added CoordinatorLoadInProgressException
to the code, and think that we should consider adding additional exceptions upon request from the community
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for looking closely at those sub-exceptions. It was the endless retrying I was afraid of, stale metadata sounds like a reason to try, but if the topic or partition is never created we it will loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ennru Thanks. Anything else on my side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thank you for improving Alpakka Kafka! |
Background
Offset commits might fail due to timeout on the Kafka broker, in which case the broker returns an error stating the operation timed out. Timeouts are transient in nature, so it might be worth to retry the commit.
Changes
TimeoutException
exception.References #1111