Skip to content

Commit

Permalink
[1.13] Fix Kafka consumer cancellation (#3347)
Browse files Browse the repository at this point in the history
Signed-off-by: yaron2 <schneider.yaron@live.com>
  • Loading branch information
yaron2 committed Feb 9, 2024
1 parent a83ed92 commit dd4db99
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 3 deletions.
6 changes: 4 additions & 2 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (k *Kafka) Subscribe(ctx context.Context) error {
for {
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops
// us out of the consume loop
if ctx.Err() != nil {
if k.consumer.consumeCtx.Err() != nil {
k.logger.Info("Consume context cancelled")
break
}
Expand Down Expand Up @@ -377,12 +377,14 @@ func (k *Kafka) Subscribe(ctx context.Context) error {
}

// Close down consumer group resources, refresh once.
func (k *Kafka) closeSubscriptionResources() {
func (k *Kafka) CloseSubscriptionResources() {
if k.cg != nil {
k.consumer.consumeCancel()
err := k.cg.Close()
if err != nil {
k.logger.Errorf("Error closing consumer group: %v", err)
}
k.cg = nil

k.consumer.once.Do(func() {
// Wait for shutdown to be complete
Expand Down
2 changes: 1 addition & 1 deletion common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
}

func (k *Kafka) Close() (err error) {
k.closeSubscriptionResources()
k.CloseSubscriptionResources()

if k.producer != nil {
err = k.producer.Close()
Expand Down
1 change: 1 addition & 0 deletions pubsub/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (p *PubSub) subscribeUtil(ctx context.Context, req pubsub.SubscribeRequest,
// Wait for context cancelation
select {
case <-ctx.Done():
p.kafka.CloseSubscriptionResources()
case <-p.closeCh:
}

Expand Down

0 comments on commit dd4db99

Please sign in to comment.