Skip to content

Commit

Permalink
Fix kafka consumer (#3299)
Browse files Browse the repository at this point in the history
Signed-off-by: yaron2 <schneider.yaron@live.com>
  • Loading branch information
yaron2 committed Jan 2, 2024
1 parent 5f69fcf commit 75391eb
Showing 1 changed file with 78 additions and 53 deletions.
131 changes: 78 additions & 53 deletions internal/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ import (
)

type consumer struct {
k *Kafka
ready chan bool
running chan struct{}
stopped atomic.Bool
once sync.Once
mutex sync.Mutex
k *Kafka
ready chan bool
running chan struct{}
stopped atomic.Bool
once sync.Once
mutex sync.Mutex
skipConsume bool
consumeCtx context.Context
consumeCancel context.CancelFunc
}

func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
Expand Down Expand Up @@ -261,71 +264,93 @@ func (k *Kafka) Subscribe(ctx context.Context) error {
k.subscribeLock.Lock()
defer k.subscribeLock.Unlock()

// Close resources and reset synchronization primitives
k.closeSubscriptionResources()

topics := k.subscribeTopics.TopicList()
if len(topics) == 0 {
// Nothing to subscribe to
return nil
}
k.consumer.skipConsume = true

cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
if err != nil {
return err
}
ctxCreateFn := func() {
consumeCtx, cancel := context.WithCancel(context.Background())

k.cg = cg
k.consumer.consumeCtx = consumeCtx
k.consumer.consumeCancel = cancel

ready := make(chan bool)
k.consumer = consumer{
k: k,
ready: ready,
running: make(chan struct{}),
k.consumer.skipConsume = false
}

go func() {
k.logger.Debugf("Subscribed and listening to topics: %s", topics)
if k.cg == nil {
cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
if err != nil {
return err
}

for {
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops
// us out of the consume loop
if ctx.Err() != nil {
break
}
k.cg = cg

ready := make(chan bool)
k.consumer = consumer{
k: k,
ready: ready,
running: make(chan struct{}),
}

k.logger.Debugf("Starting loop to consume.")
ctxCreateFn()

// Consume the requested topics
bo := backoff.WithContext(backoff.NewConstantBackOff(k.consumeRetryInterval), ctx)
innerErr := retry.NotifyRecover(func() error {
if ctxErr := ctx.Err(); ctxErr != nil {
return backoff.Permanent(ctxErr)
go func() {
k.logger.Debugf("Subscribed and listening to topics: %s", topics)

for {
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops
// us out of the consume loop
if ctx.Err() != nil {
k.logger.Info("Consume context cancelled")
break
}

k.logger.Debugf("Starting loop to consume.")

if k.consumer.skipConsume {
continue
}

topics = k.subscribeTopics.TopicList()

// Consume the requested topics
bo := backoff.WithContext(backoff.NewConstantBackOff(k.consumeRetryInterval), ctx)
innerErr := retry.NotifyRecover(func() error {
if ctxErr := ctx.Err(); ctxErr != nil {
return backoff.Permanent(ctxErr)
}
return k.cg.Consume(k.consumer.consumeCtx, topics, &(k.consumer))
}, bo, func(err error, t time.Duration) {
k.logger.Errorf("Error consuming %v. Retrying...: %v", topics, err)
}, func() {
k.logger.Infof("Recovered consuming %v", topics)
})
if innerErr != nil && !errors.Is(innerErr, context.Canceled) {
k.logger.Errorf("Permanent error consuming %v: %v", topics, innerErr)
}
return k.cg.Consume(ctx, topics, &(k.consumer))
}, bo, func(err error, t time.Duration) {
k.logger.Errorf("Error consuming %v. Retrying...: %v", topics, err)
}, func() {
k.logger.Infof("Recovered consuming %v", topics)
})
if innerErr != nil && !errors.Is(innerErr, context.Canceled) {
k.logger.Errorf("Permanent error consuming %v: %v", topics, innerErr)
}
}

k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
err := k.cg.Close()
if err != nil {
k.logger.Errorf("Error closing consumer group: %v", err)
}
k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
err := k.cg.Close()
if err != nil {
k.logger.Errorf("Error closing consumer group: %v", err)
}

// Ensure running channel is only closed once.
if k.consumer.stopped.CompareAndSwap(false, true) {
close(k.consumer.running)
}
}()
// Ensure running channel is only closed once.
if k.consumer.stopped.CompareAndSwap(false, true) {
close(k.consumer.running)
}
}()

<-ready
<-ready
} else {
// The consumer group is already created and consuming topics. This means a new subscription is being added
k.consumer.consumeCancel()
ctxCreateFn()
}

return nil
}
Expand Down

0 comments on commit 75391eb

Please sign in to comment.