diff --git a/kafka/consumer.go b/kafka/consumer.go index 23f44088..ba51e08e 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -352,12 +352,22 @@ func (c *Consumer) Run(ctx context.Context) error { var clientCtx context.Context clientCtx, c.stopPoll = context.WithCancel(ctx) c.mu.Unlock() + for { if err := c.fetch(clientCtx); err != nil { - if errors.Is(err, context.Canceled) { - return nil // Return no error if err == context.Canceled. + if errors.Is(clientCtx.Err(), context.Canceled) { + if !errors.Is(err, context.Canceled) { + c.cfg.Logger.Error("consumer: fetch error on context canceled", zap.Error(err)) + } + return nil // Return no error if client context is canceled. } - return fmt.Errorf("cannot fetch records: %w", err) + + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + c.cfg.Logger.Error("consumer: fetch error; retrying", zap.Error(err)) + continue + } + + return fmt.Errorf("consumer fetch error: %w", err) } } } @@ -368,10 +378,10 @@ func (c *Consumer) fetch(ctx context.Context) error { fetches := c.client.PollRecords(ctx, c.cfg.MaxPollRecords) defer c.client.AllowRebalance() - if fetches.IsClientClosed() || - errors.Is(fetches.Err0(), context.Canceled) || - errors.Is(fetches.Err0(), context.DeadlineExceeded) { - return context.Canceled + if fetchErr := fetches.Err0(); errors.Is(fetchErr, kgo.ErrClientClosed) || + errors.Is(fetchErr, context.Canceled) || + errors.Is(fetchErr, context.DeadlineExceeded) { + return fetchErr } c.mu.RLock() defer c.mu.RUnlock()