From dd4db996c6a449ef513fec9ca78dc5cf91afd718 Mon Sep 17 00:00:00 2001 From: Yaron Schneider Date: Fri, 9 Feb 2024 15:40:46 -0800 Subject: [PATCH] [1.13] Fix Kafka consumer cancellation (#3347) Signed-off-by: yaron2 --- common/component/kafka/consumer.go | 6 ++++-- common/component/kafka/kafka.go | 2 +- pubsub/kafka/kafka.go | 1 + 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/common/component/kafka/consumer.go b/common/component/kafka/consumer.go index 21d2c65b9d..83604a6fdf 100644 --- a/common/component/kafka/consumer.go +++ b/common/component/kafka/consumer.go @@ -324,7 +324,7 @@ func (k *Kafka) Subscribe(ctx context.Context) error { for { // If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops // us out of the consume loop - if ctx.Err() != nil { + if k.consumer.consumeCtx.Err() != nil { k.logger.Info("Consume context cancelled") break } @@ -377,12 +377,14 @@ func (k *Kafka) Subscribe(ctx context.Context) error { } // Close down consumer group resources, refresh once. -func (k *Kafka) closeSubscriptionResources() { +func (k *Kafka) CloseSubscriptionResources() { if k.cg != nil { + k.consumer.consumeCancel() err := k.cg.Close() if err != nil { k.logger.Errorf("Error closing consumer group: %v", err) } + k.cg = nil k.consumer.once.Do(func() { // Wait for shutdown to be complete diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index d9cb1d16c9..bdf4e93de1 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -211,7 +211,7 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { } func (k *Kafka) Close() (err error) { - k.closeSubscriptionResources() + k.CloseSubscriptionResources() if k.producer != nil { err = k.producer.Close() diff --git a/pubsub/kafka/kafka.go b/pubsub/kafka/kafka.go index 6a81184a73..36ea750dbf 100644 --- a/pubsub/kafka/kafka.go +++ b/pubsub/kafka/kafka.go @@ -90,6 +90,7 @@ func (p *PubSub) subscribeUtil(ctx context.Context, req pubsub.SubscribeRequest, // Wait for context cancelation select { case <-ctx.Done(): + p.kafka.CloseSubscriptionResources() case <-p.closeCh: }