Skip to content

Commit

Permalink
PubSub Kafka: Respect Subscribe context (#3363)
Browse files Browse the repository at this point in the history
Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Feb 28, 2024
1 parent fff4d41 commit 6413239
Show file tree
Hide file tree
Showing 10 changed files with 817 additions and 209 deletions.
27 changes: 12 additions & 15 deletions bindings/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,29 +100,26 @@ func (b *Binding) Read(ctx context.Context, handler bindings.Handler) error {
return nil
}

handlerConfig := kafka.SubscriptionHandlerConfig{
IsBulkSubscribe: false,
Handler: adaptHandler(handler),
}
for _, t := range b.topics {
b.kafka.AddTopicHandler(t, handlerConfig)
}
ctx, cancel := context.WithCancel(ctx)

b.wg.Add(1)
go func() {
defer b.wg.Done()
// Wait for context cancelation or closure.
select {
case <-ctx.Done():
case <-b.closeCh:
}

// Remove the topic handlers.
for _, t := range b.topics {
b.kafka.RemoveTopicHandler(t)
}
cancel()
b.wg.Done()
}()

return b.kafka.Subscribe(ctx)
handlerConfig := kafka.SubscriptionHandlerConfig{
IsBulkSubscribe: false,
Handler: adaptHandler(handler),
}

b.kafka.Subscribe(ctx, handlerConfig, b.topics...)

return nil
}

func adaptHandler(handler bindings.Handler) kafka.EventHandler {
Expand Down
148 changes: 2 additions & 146 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ limitations under the License.
package kafka

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/IBM/sarama"
Expand All @@ -29,15 +27,8 @@ import (
)

type consumer struct {
k *Kafka
ready chan bool
running chan struct{}
stopped atomic.Bool
once sync.Once
mutex sync.Mutex
skipConsume bool
consumeCtx context.Context
consumeCancel context.CancelFunc
k *Kafka
mutex sync.Mutex
}

func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
Expand Down Expand Up @@ -233,27 +224,9 @@ func (consumer *consumer) Cleanup(sarama.ConsumerGroupSession) error {
}

func (consumer *consumer) Setup(sarama.ConsumerGroupSession) error {
consumer.once.Do(func() {
close(consumer.ready)
})

return nil
}

// AddTopicHandler adds a handler and configuration for a topic
func (k *Kafka) AddTopicHandler(topic string, handlerConfig SubscriptionHandlerConfig) {
k.subscribeLock.Lock()
k.subscribeTopics[topic] = handlerConfig
k.subscribeLock.Unlock()
}

// RemoveTopicHandler removes a topic handler
func (k *Kafka) RemoveTopicHandler(topic string) {
k.subscribeLock.Lock()
delete(k.subscribeTopics, topic)
k.subscribeLock.Unlock()
}

// checkBulkSubscribe checks if a bulk handler and config are correctly registered for provided topic
func (k *Kafka) checkBulkSubscribe(topic string) bool {
if bulkHandlerConfig, ok := k.subscribeTopics[topic]; ok &&
Expand All @@ -275,120 +248,3 @@ func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig,
return SubscriptionHandlerConfig{},
fmt.Errorf("any handler for messages of topic %s not found", topic)
}

// Subscribe to topic in the Kafka cluster, in a background goroutine
func (k *Kafka) Subscribe(ctx context.Context) error {
if k.consumerGroup == "" {
return errors.New("kafka: consumerGroup must be set to subscribe")
}

k.subscribeLock.Lock()
defer k.subscribeLock.Unlock()

topics := k.subscribeTopics.TopicList()
if len(topics) == 0 {
// Nothing to subscribe to
return nil
}
k.consumer.skipConsume = true

ctxCreateFn := func() {
consumeCtx, cancel := context.WithCancel(context.Background())

k.consumer.consumeCtx = consumeCtx
k.consumer.consumeCancel = cancel

k.consumer.skipConsume = false
}

if k.cg == nil {
cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
if err != nil {
return err
}

k.cg = cg

ready := make(chan bool)
k.consumer = consumer{
k: k,
ready: ready,
running: make(chan struct{}),
}

ctxCreateFn()

go func() {
k.logger.Debugf("Subscribed and listening to topics: %s", topics)

for {
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops
// us out of the consume loop
if ctx.Err() != nil {
k.logger.Info("Consume context cancelled")
break
}

k.logger.Debugf("Starting loop to consume.")

if k.consumer.skipConsume {
continue
}

topics = k.subscribeTopics.TopicList()

// Consume the requested topics
bo := backoff.WithContext(backoff.NewConstantBackOff(k.consumeRetryInterval), ctx)
innerErr := retry.NotifyRecover(func() error {
if ctxErr := ctx.Err(); ctxErr != nil {
return backoff.Permanent(ctxErr)
}
return k.cg.Consume(k.consumer.consumeCtx, topics, &(k.consumer))
}, bo, func(err error, t time.Duration) {
k.logger.Errorf("Error consuming %v. Retrying...: %v", topics, err)
}, func() {
k.logger.Infof("Recovered consuming %v", topics)
})
if innerErr != nil && !errors.Is(innerErr, context.Canceled) {
k.logger.Errorf("Permanent error consuming %v: %v", topics, innerErr)
}
}

k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
err := k.cg.Close()
if err != nil {
k.logger.Errorf("Error closing consumer group: %v", err)
}

// Ensure running channel is only closed once.
if k.consumer.stopped.CompareAndSwap(false, true) {
close(k.consumer.running)
}
}()

<-ready
} else {
// The consumer group is already created and consuming topics. This means a new subscription is being added
k.consumer.consumeCancel()
ctxCreateFn()
}

return nil
}

// Close down consumer group resources, refresh once.
func (k *Kafka) closeSubscriptionResources() {
if k.cg != nil {
err := k.cg.Close()
if err != nil {
k.logger.Errorf("Error closing consumer group: %v", err)
}

k.consumer.once.Do(func() {
// Wait for shutdown to be complete
<-k.consumer.running
close(k.consumer.ready)
k.consumer.once = sync.Once{}
})
}
}
71 changes: 48 additions & 23 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/IBM/sarama"
Expand All @@ -34,19 +35,24 @@ import (

// Kafka allows reading/writing to a Kafka consumer group.
type Kafka struct {
producer sarama.SyncProducer
consumerGroup string
brokers []string
logger logger.Logger
authType string
saslUsername string
saslPassword string
initialOffset int64
producer sarama.SyncProducer
consumerGroup string
brokers []string
logger logger.Logger
authType string
saslUsername string
saslPassword string
initialOffset int64
config *sarama.Config

cg sarama.ConsumerGroup
consumer consumer
config *sarama.Config
subscribeTopics TopicHandlerConfig
subscribeLock sync.Mutex
consumerCancel context.CancelFunc
consumerWG sync.WaitGroup
closeCh chan struct{}
closed atomic.Bool
wg sync.WaitGroup

// schema registry settings
srClient srclient.ISchemaRegistryClient
Expand Down Expand Up @@ -106,7 +112,7 @@ func NewKafka(logger logger.Logger) *Kafka {
return &Kafka{
logger: logger,
subscribeTopics: make(TopicHandlerConfig),
subscribeLock: sync.Mutex{},
closeCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -184,11 +190,11 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {

// Default retry configuration is used if no
// backOff properties are set.
if err := retry.DecodeConfigWithPrefix(
if rerr := retry.DecodeConfigWithPrefix(
&k.backOffConfig,
metadata,
"backOff"); err != nil {
return err
"backOff"); rerr != nil {
return rerr
}
k.consumeRetryEnabled = meta.ConsumeRetryEnabled
k.consumeRetryInterval = meta.ConsumeRetryInterval
Expand All @@ -207,22 +213,41 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
}
k.logger.Debug("Kafka message bus initialization complete")

k.cg, err = sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
if err != nil {
return err
}

return nil
}

func (k *Kafka) Close() (err error) {
k.closeSubscriptionResources()
func (k *Kafka) Close() error {
defer k.wg.Wait()
defer k.consumerWG.Wait()

if k.producer != nil {
err = k.producer.Close()
k.producer = nil
}
errs := make([]error, 2)
if k.closed.CompareAndSwap(false, true) {
close(k.closeCh)

if k.producer != nil {
errs[0] = k.producer.Close()
k.producer = nil
}

if k.internalContext != nil {
k.internalContextCancel()
}

k.subscribeLock.Lock()
if k.consumerCancel != nil {
k.consumerCancel()
}
k.subscribeLock.Unlock()

if k.internalContext != nil {
k.internalContextCancel()
errs[1] = k.cg.Close()
}

return err
return errors.Join(errs...)
}

func getSchemaSubject(topic string) string {
Expand Down
Loading

0 comments on commit 6413239

Please sign in to comment.