diff --git a/kafka/consumer.go b/kafka/consumer.go index ba51e08e..3928b67b 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -357,16 +357,24 @@ func (c *Consumer) Run(ctx context.Context) error { if err := c.fetch(clientCtx); err != nil { if errors.Is(clientCtx.Err(), context.Canceled) { if !errors.Is(err, context.Canceled) { + // It is unexpected for the fetch to fail for a different reason if the client context is canceled. + // Log it as it is not fatal. c.cfg.Logger.Error("consumer: fetch error on context canceled", zap.Error(err)) } return nil // Return no error if client context is canceled. } + // Retry immediately on known retryable errors. + // It is expected for c.fetch to wait. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { c.cfg.Logger.Error("consumer: fetch error; retrying", zap.Error(err)) continue } + // Propagate the error to caller if error is not a known retryable error. + // + // If it is later determined that a class of errors is safe to retry, + // it should be added to the list above. return fmt.Errorf("consumer fetch error: %w", err) } }