Skip to content
24 changes: 17 additions & 7 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,22 @@ func (c *Consumer) Run(ctx context.Context) error {
var clientCtx context.Context
clientCtx, c.stopPoll = context.WithCancel(ctx)
c.mu.Unlock()

for {
if err := c.fetch(clientCtx); err != nil {
if errors.Is(err, context.Canceled) {
return nil // Return no error if err == context.Canceled.
if errors.Is(clientCtx.Err(), context.Canceled) {
if !errors.Is(err, context.Canceled) {
c.cfg.Logger.Error("consumer: fetch error on context canceled", zap.Error(err))
}
return nil // Return no error if client context is canceled.
}
return fmt.Errorf("cannot fetch records: %w", err)

if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
c.cfg.Logger.Error("consumer: fetch error; retrying", zap.Error(err))
continue
}

return fmt.Errorf("consumer fetch error: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add some follow-up comments on this code block? I find it pretty confusing to read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
}
Expand All @@ -368,10 +378,10 @@ func (c *Consumer) fetch(ctx context.Context) error {
fetches := c.client.PollRecords(ctx, c.cfg.MaxPollRecords)
defer c.client.AllowRebalance()

if fetches.IsClientClosed() ||
errors.Is(fetches.Err0(), context.Canceled) ||
errors.Is(fetches.Err0(), context.DeadlineExceeded) {
return context.Canceled
if fetchErr := fetches.Err0(); errors.Is(fetchErr, kgo.ErrClientClosed) ||
errors.Is(fetchErr, context.Canceled) ||
errors.Is(fetchErr, context.DeadlineExceeded) {
return fetchErr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this fix, we were masking the underlying error here....

}
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down