From 105ae85b143de4a97b17b5f66f224b5d2dae2d77 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 17:48:53 +0000 Subject: [PATCH 01/16] Log fetch error with exponential backoff --- kafka/consumer.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 23f44088..0e804764 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "math" + "math/rand/v2" "strings" "sync" "time" @@ -353,12 +354,21 @@ func (c *Consumer) Run(ctx context.Context) error { clientCtx, c.stopPoll = context.WithCancel(ctx) c.mu.Unlock() for { + exponentialBackoff := ExponentialBackoff{ + base: 1 * time.Second, + max: 1 * time.Minute, + } + var attempt int if err := c.fetch(clientCtx); err != nil { if errors.Is(err, context.Canceled) { return nil // Return no error if err == context.Canceled. } - return fmt.Errorf("cannot fetch records: %w", err) + backoff := exponentialBackoff.Backoff(attempt) + c.cfg.Logger.Error("kafka: failed to fetch kafka message: %w", zap.Int64("backoff", int64(backoff))) + attempt++ + continue } + attempt = 0 } } @@ -652,3 +662,12 @@ func (c *pc) consumeRecords(ftp kgo.FetchTopicPartition) { // wait blocks until all the records have been processed. func (c *pc) wait() error { return c.g.Wait() } + +type ExponentialBackoff struct { + base, max time.Duration +} + +func (e ExponentialBackoff) Backoff(attempt int) time.Duration { + temp := int64(min(e.max, e.base*(1< Date: Mon, 3 Nov 2025 17:50:12 +0000 Subject: [PATCH 02/16] Check client context --- kafka/consumer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 0e804764..47da7fad 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -360,8 +360,8 @@ func (c *Consumer) Run(ctx context.Context) error { } var attempt int if err := c.fetch(clientCtx); err != nil { - if errors.Is(err, context.Canceled) { - return nil // Return no error if err == context.Canceled. + if errors.Is(clientCtx.Err(), context.Canceled) { + return nil // Return no error if client context is canceled. } backoff := exponentialBackoff.Backoff(attempt) c.cfg.Logger.Error("kafka: failed to fetch kafka message: %w", zap.Int64("backoff", int64(backoff))) From f61db6dfb85188332615dc1da294dcd0aae7fd50 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 17:50:59 +0000 Subject: [PATCH 03/16] Fix log --- kafka/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 47da7fad..cbbc5490 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -364,7 +364,7 @@ func (c *Consumer) Run(ctx context.Context) error { return nil // Return no error if client context is canceled. } backoff := exponentialBackoff.Backoff(attempt) - c.cfg.Logger.Error("kafka: failed to fetch kafka message: %w", zap.Int64("backoff", int64(backoff))) + c.cfg.Logger.Error("kafka: failed to fetch kafka message", zap.Int64("backoff", int64(backoff)), zap.Error(err)) attempt++ continue } From ea9686815b1d2924d28fe75215208ffb7cbc5487 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 17:52:50 +0000 Subject: [PATCH 04/16] Propagate a specific error --- kafka/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index cbbc5490..dec0382f 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -381,7 +381,7 @@ func (c *Consumer) fetch(ctx context.Context) error { if fetches.IsClientClosed() || errors.Is(fetches.Err0(), context.Canceled) || errors.Is(fetches.Err0(), context.DeadlineExceeded) { - return context.Canceled + return fetches.Err0() } c.mu.RLock() defer c.mu.RUnlock() From 7205d92329191662bae865ab69eedfc0800b4647 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 17:54:50 +0000 Subject: [PATCH 05/16] Actually sleep --- kafka/consumer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/consumer.go b/kafka/consumer.go index dec0382f..83c24009 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -366,6 +366,7 @@ func (c *Consumer) Run(ctx context.Context) error { backoff := exponentialBackoff.Backoff(attempt) c.cfg.Logger.Error("kafka: failed to fetch kafka message", zap.Int64("backoff", int64(backoff)), zap.Error(err)) attempt++ + time.Sleep(backoff) continue } attempt = 0 From 9e5e8b93e0d394659df3471da80510605dbbd76f Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 17:56:00 +0000 Subject: [PATCH 06/16] Unexport --- kafka/consumer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 83c24009..12261e54 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -354,7 +354,7 @@ func (c *Consumer) Run(ctx context.Context) error { clientCtx, c.stopPoll = context.WithCancel(ctx) c.mu.Unlock() for { - exponentialBackoff := ExponentialBackoff{ + exp := exponentialBackoff{ base: 1 * time.Second, max: 1 * time.Minute, } @@ -363,7 +363,7 @@ func (c *Consumer) Run(ctx context.Context) error { if errors.Is(clientCtx.Err(), context.Canceled) { return nil // Return no error if client context is canceled. } - backoff := exponentialBackoff.Backoff(attempt) + backoff := exp.Backoff(attempt) c.cfg.Logger.Error("kafka: failed to fetch kafka message", zap.Int64("backoff", int64(backoff)), zap.Error(err)) attempt++ time.Sleep(backoff) @@ -664,11 +664,11 @@ func (c *pc) consumeRecords(ftp kgo.FetchTopicPartition) { // wait blocks until all the records have been processed. func (c *pc) wait() error { return c.g.Wait() } -type ExponentialBackoff struct { +type exponentialBackoff struct { base, max time.Duration } -func (e ExponentialBackoff) Backoff(attempt int) time.Duration { +func (e exponentialBackoff) Backoff(attempt int) time.Duration { temp := int64(min(e.max, e.base*(1< Date: Mon, 3 Nov 2025 17:56:34 +0000 Subject: [PATCH 07/16] Fix var --- kafka/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 12261e54..1f424cc3 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -353,12 +353,12 @@ func (c *Consumer) Run(ctx context.Context) error { var clientCtx context.Context clientCtx, c.stopPoll = context.WithCancel(ctx) c.mu.Unlock() + var attempt int for { exp := exponentialBackoff{ base: 1 * time.Second, max: 1 * time.Minute, } - var attempt int if err := c.fetch(clientCtx); err != nil { if errors.Is(clientCtx.Err(), context.Canceled) { return nil // Return no error if client context is canceled. From 7d68ddea033afb54ade296ccc2035f9c687c515a Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 17:58:01 +0000 Subject: [PATCH 08/16] Better log --- kafka/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 1f424cc3..010f5b5d 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -364,7 +364,7 @@ func (c *Consumer) Run(ctx context.Context) error { return nil // Return no error if client context is canceled. } backoff := exp.Backoff(attempt) - c.cfg.Logger.Error("kafka: failed to fetch kafka message", zap.Int64("backoff", int64(backoff)), zap.Error(err)) + c.cfg.Logger.Error("consumer: fetch error; retrying after backoff", zap.Int64("backoff", int64(backoff)), zap.Error(err)) attempt++ time.Sleep(backoff) continue From c5dd68c59564167dccb85b39f848df0ac1841c76 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 18:02:05 +0000 Subject: [PATCH 09/16] Respect client ctx during backoff --- kafka/consumer.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 010f5b5d..bdcf74c8 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -354,11 +354,11 @@ func (c *Consumer) Run(ctx context.Context) error { clientCtx, c.stopPoll = context.WithCancel(ctx) c.mu.Unlock() var attempt int + exp := exponentialBackoff{ + base: 1 * time.Second, + max: 1 * time.Minute, + } for { - exp := exponentialBackoff{ - base: 1 * time.Second, - max: 1 * time.Minute, - } if err := c.fetch(clientCtx); err != nil { if errors.Is(clientCtx.Err(), context.Canceled) { return nil // Return no error if client context is canceled. @@ -366,7 +366,10 @@ func (c *Consumer) Run(ctx context.Context) error { backoff := exp.Backoff(attempt) c.cfg.Logger.Error("consumer: fetch error; retrying after backoff", zap.Int64("backoff", int64(backoff)), zap.Error(err)) attempt++ - time.Sleep(backoff) + select { + case <-clientCtx.Done(): + case <-time.After(backoff): + } continue } attempt = 0 From 3f92fbededb857fee62d5f18200a8fdaf0ff5373 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 18:07:41 +0000 Subject: [PATCH 10/16] Use fetchErr --- kafka/consumer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index bdcf74c8..ac24a463 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -382,10 +382,10 @@ func (c *Consumer) fetch(ctx context.Context) error { fetches := c.client.PollRecords(ctx, c.cfg.MaxPollRecords) defer c.client.AllowRebalance() - if fetches.IsClientClosed() || - errors.Is(fetches.Err0(), context.Canceled) || - errors.Is(fetches.Err0(), context.DeadlineExceeded) { - return fetches.Err0() + if fetchErr := fetches.Err0(); errors.Is(fetchErr, kgo.ErrClientClosed) || + errors.Is(fetchErr, context.Canceled) || + errors.Is(fetchErr, context.DeadlineExceeded) { + return fetchErr } c.mu.RLock() defer c.mu.RUnlock() From 3828da862eec10fd16b28e11a859d16941b2a9f2 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 18:10:00 +0000 Subject: [PATCH 11/16] Remove backoff --- kafka/consumer.go | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index ac24a463..79b98926 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -353,26 +353,14 @@ func (c *Consumer) Run(ctx context.Context) error { var clientCtx context.Context clientCtx, c.stopPoll = context.WithCancel(ctx) c.mu.Unlock() - var attempt int - exp := exponentialBackoff{ - base: 1 * time.Second, - max: 1 * time.Minute, - } + for { if err := c.fetch(clientCtx); err != nil { if errors.Is(clientCtx.Err(), context.Canceled) { return nil // Return no error if client context is canceled. } - backoff := exp.Backoff(attempt) - c.cfg.Logger.Error("consumer: fetch error; retrying after backoff", zap.Int64("backoff", int64(backoff)), zap.Error(err)) - attempt++ - select { - case <-clientCtx.Done(): - case <-time.After(backoff): - } - continue + c.cfg.Logger.Error("consumer: fetch error; retrying", zap.Error(err)) } - attempt = 0 } } @@ -666,12 +654,3 @@ func (c *pc) consumeRecords(ftp kgo.FetchTopicPartition) { // wait blocks until all the records have been processed. func (c *pc) wait() error { return c.g.Wait() } - -type exponentialBackoff struct { - base, max time.Duration -} - -func (e exponentialBackoff) Backoff(attempt int) time.Duration { - temp := int64(min(e.max, e.base*(1< Date: Mon, 3 Nov 2025 18:14:33 +0000 Subject: [PATCH 12/16] Only retry deadline exceeded --- kafka/consumer.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 79b98926..6e7b6d7c 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "math" - "math/rand/v2" "strings" "sync" "time" @@ -359,7 +358,11 @@ func (c *Consumer) Run(ctx context.Context) error { if errors.Is(clientCtx.Err(), context.Canceled) { return nil // Return no error if client context is canceled. } - c.cfg.Logger.Error("consumer: fetch error; retrying", zap.Error(err)) + if errors.Is(err, context.DeadlineExceeded) { + c.cfg.Logger.Error("consumer: fetch error; retrying", zap.Error(err)) + } else { + return err + } } } } From 1310ad0dcdb83ee5a759fc61fbe2459c9fd9d115 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 18:17:28 +0000 Subject: [PATCH 13/16] Retry context canceled as well --- kafka/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 6e7b6d7c..a7606972 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -358,7 +358,7 @@ func (c *Consumer) Run(ctx context.Context) error { if errors.Is(clientCtx.Err(), context.Canceled) { return nil // Return no error if client context is canceled. } - if errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { c.cfg.Logger.Error("consumer: fetch error; retrying", zap.Error(err)) } else { return err From c29f219bc6866fc11664d047f02bcde8687975cb Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 18:18:44 +0000 Subject: [PATCH 14/16] Clearer control flow --- kafka/consumer.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index a7606972..3612d57f 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -358,11 +358,13 @@ func (c *Consumer) Run(ctx context.Context) error { if errors.Is(clientCtx.Err(), context.Canceled) { return nil // Return no error if client context is canceled. } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { c.cfg.Logger.Error("consumer: fetch error; retrying", zap.Error(err)) - } else { - return err + continue } + + return err } } } From bf3217c7f9f1fa0dceef66780b50d4d205cca7ac Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 18:21:03 +0000 Subject: [PATCH 15/16] Log non-canceled error on canceled --- kafka/consumer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/consumer.go b/kafka/consumer.go index 3612d57f..82bc0649 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -356,6 +356,9 @@ func (c *Consumer) Run(ctx context.Context) error { for { if err := c.fetch(clientCtx); err != nil { if errors.Is(clientCtx.Err(), context.Canceled) { + if !errors.Is(err, context.Canceled) { + c.cfg.Logger.Error("consumer: fetch error on context canceled", zap.Error(err)) + } return nil // Return no error if client context is canceled. } From b5afff60e8b7ac5d25b41312dda42f52742a249c Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 3 Nov 2025 18:31:54 +0000 Subject: [PATCH 16/16] Wrap error --- kafka/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 82bc0649..ba51e08e 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -367,7 +367,7 @@ func (c *Consumer) Run(ctx context.Context) error { continue } - return err + return fmt.Errorf("consumer fetch error: %w", err) } } }