-
Notifications
You must be signed in to change notification settings - Fork 16
Fix indefinitely stuck consumer on fetch DeadlineExceeded #720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
kafka/consumer.go
Outdated
| exponentialBackoff := ExponentialBackoff{ | ||
| base: 1 * time.Second, | ||
| max: 1 * time.Minute, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run's main ctx is ignored. If that is canceled we should exit.
kafka/consumer.go
Outdated
| c.mu.Unlock() | ||
| var attempt int | ||
| for { | ||
| exp := exponentialBackoff{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need exponential backoff? Isn't polling dependent on accumulating fetches until maxPollRecords is available introducing an inbuilt delay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you imply we can retry immediately? Do you expect the next fetch call to block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you imply we can retry immediately?
Yes, poll records doesn't seem to actually issue requests so there is no issue in overloading kafka. As per the godocs,
// PollRecords waits for fetches to be available, returning as soon as any
// broker returns a fetch. If the context is nil, this function will return
// immediately with any currently buffered fetches.
This to me means there is no point in doing a backoff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but i wonder what happens if there's a network disruption that causes a client connection error. Will it keep trying to establish new connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it keep trying to establish new connection?
Connection to what? It doesn't seem like PollRecords issues any request, it just waits for the fetches from kafka to get the required number of records.
kafka/consumer.go
Outdated
| if errors.Is(clientCtx.Err(), context.Canceled) { | ||
| return nil // Return no error if client context is canceled. | ||
| } | ||
| backoff := exp.Backoff(attempt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All errors will be retried, I don't think this is a good idea given there can be unrecoverable errors too, for example failing to commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do we want to do in that case? Propagate upwards to crash it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe only retry DeadlineExceeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it is possible for the context to be canceled while request context (clientCtx) isn't canceled, and in those cases how we want to handle those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retrying both context canceled and deadline exceeded, as long as client context is not canceled
lahsivjar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Let's wait for @marclop 's review too
lahsivjar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
| if fetchErr := fetches.Err0(); errors.Is(fetchErr, kgo.ErrClientClosed) || | ||
| errors.Is(fetchErr, context.Canceled) || | ||
| errors.Is(fetchErr, context.DeadlineExceeded) { | ||
| return fetchErr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this fix, we were masking the underlying error here....
| continue | ||
| } | ||
|
|
||
| return fmt.Errorf("consumer fetch error: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please add some follow-up comments on this code block? I find it pretty confusing to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment for change in #720
Handle DeadlineExceeded properly without stopping consumer, as these can happen when network is unstable.