diff --git a/protocol/kafka_confluent/v2/option.go b/protocol/kafka_confluent/v2/option.go index e3b0b566f..7eef74007 100644 --- a/protocol/kafka_confluent/v2/option.go +++ b/protocol/kafka_confluent/v2/option.go @@ -15,7 +15,7 @@ import ( // Option is the function signature required to be considered an kafka_confluent.Option. type Option func(*Protocol) error -// WithConfigMap sets the configMap to init the kafka client. This option is not required. +// WithConfigMap sets the configMap to init the kafka client. func WithConfigMap(config *kafka.ConfigMap) Option { return func(p *Protocol) error { if config == nil { @@ -26,7 +26,7 @@ func WithConfigMap(config *kafka.ConfigMap) Option { } } -// WithSenderTopic sets the defaultTopic for the kafka.Producer. This option is not required. +// WithSenderTopic sets the defaultTopic for the kafka.Producer. func WithSenderTopic(defaultTopic string) Option { return func(p *Protocol) error { if defaultTopic == "" { @@ -37,7 +37,7 @@ func WithSenderTopic(defaultTopic string) Option { } } -// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required. +// WithReceiverTopics sets the topics for the kafka.Consumer. func WithReceiverTopics(topics []string) Option { return func(p *Protocol) error { if topics == nil { @@ -48,7 +48,7 @@ func WithReceiverTopics(topics []string) Option { } } -// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required. +// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option { return func(p *Protocol) error { if rebalanceCb == nil { @@ -59,7 +59,7 @@ func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option { } } -// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required. +// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. func WithPollTimeout(timeoutMs int) Option { return func(p *Protocol) error { p.consumerPollTimeout = timeoutMs @@ -67,7 +67,7 @@ func WithPollTimeout(timeoutMs int) Option { } } -// WithSender set a kafka.Producer instance to init the client directly. This option is not required. +// WithSender set a kafka.Producer instance to init the client directly. func WithSender(producer *kafka.Producer) Option { return func(p *Protocol) error { if producer == nil { @@ -78,7 +78,7 @@ func WithSender(producer *kafka.Producer) Option { } } -// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required. +// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option { return func(p *Protocol) error { p.consumerErrorHandler = handler @@ -86,7 +86,7 @@ func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option } } -// WithSender set a kafka.Consumer instance to init the client directly. This option is not required. +// WithReceiver set a kafka.Consumer instance to init the client directly. func WithReceiver(consumer *kafka.Consumer) Option { return func(p *Protocol) error { if consumer == nil { @@ -97,12 +97,12 @@ func WithReceiver(consumer *kafka.Consumer) Option { } } -// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required. +// Opaque key type used to store topicPartitionOffsets: assign them from ctx. type topicPartitionOffsetsType struct{} var offsetKey = topicPartitionOffsetsType{} -// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required. +// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context { if len(topicPartitionOffsets) == 0 { panic("the topicPartitionOffsets cannot be empty") diff --git a/protocol/kafka_confluent/v2/protocol.go b/protocol/kafka_confluent/v2/protocol.go index 8aa906853..c527f1061 100644 --- a/protocol/kafka_confluent/v2/protocol.go +++ b/protocol/kafka_confluent/v2/protocol.go @@ -33,15 +33,14 @@ type Protocol struct { consumerTopics []string consumerRebalanceCb kafka.RebalanceCb // optional consumerPollTimeout int // optional - consumerErrorHandler func(ctx context.Context, err kafka.Error) //optional + consumerErrorHandler func(ctx context.Context, err kafka.Error) // optional consumerMux sync.Mutex consumerIncoming chan *kafka.Message consumerCtx context.Context consumerCancel context.CancelFunc producer *kafka.Producer - producerDeliveryChan chan kafka.Event // optional - producerDefaultTopic string // optional + producerDefaultTopic string // optional closerMux sync.Mutex } @@ -85,12 +84,18 @@ func New(opts ...Option) (*Protocol, error) { if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil { return nil, errors.New("at least one of the following to initialize the protocol must be set: config, producer, or consumer") } - if p.producer != nil { - p.producerDeliveryChan = make(chan kafka.Event) - } return p, nil } +// Events returns the events channel used by Confluent Kafka to deliver the result from a produce, i.e., send, operation. +// When using this SDK to produce (send) messages, this channel must be monitored to avoid resource leaks and this channel becoming full. See Confluent SDK for Go for details on the implementation. +func (p *Protocol) Events() (chan kafka.Event, error) { + if p.producer == nil { + return nil, errors.New("producer not set") + } + return p.producer.Events(), nil +} + func (p *Protocol) applyOptions(opts ...Option) error { for _, fn := range opts { if err := fn(p); err != nil { @@ -100,6 +105,7 @@ func (p *Protocol) applyOptions(opts ...Option) error { return nil } +// Send message by kafka.Producer. You must monitor the Events() channel when using this function. func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) { if p.producer == nil { return errors.New("producer client must be set") @@ -128,19 +134,12 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers .. kafkaMsg.Key = []byte(messageKey) } - err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...) - if err != nil { - return err + if err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...); err != nil { + return fmt.Errorf("create producer message: %w", err) } - err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan) - if err != nil { - return err - } - e := <-p.producerDeliveryChan - m := e.(*kafka.Message) - if m.TopicPartition.Error != nil { - return m.TopicPartition.Error + if err = p.producer.Produce(kafkaMsg, nil); err != nil { + return fmt.Errorf("produce message: %w", err) } return nil } @@ -231,15 +230,18 @@ func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { func (p *Protocol) Close(ctx context.Context) error { p.closerMux.Lock() defer p.closerMux.Unlock() + logger := cecontext.LoggerFrom(ctx) if p.consumerCancel != nil { p.consumerCancel() } if p.producer != nil && !p.producer.IsClosed() { + // Flush and close the producer with a 10 seconds timeout (closes Events channel) + for p.producer.Flush(10000) > 0 { + logger.Info("Flushing outstanding messages") + } p.producer.Close() - close(p.producerDeliveryChan) } - return nil } diff --git a/samples/kafka_confluent/receiver/main.go b/samples/kafka_confluent/receiver/main.go index 1817dd2f1..3b9efab6c 100644 --- a/samples/kafka_confluent/receiver/main.go +++ b/samples/kafka_confluent/receiver/main.go @@ -33,8 +33,9 @@ func main() { } defer receiver.Close(ctx) - // Setting the 'client.WithPollGoroutines(1)' to make sure the events from kafka partition are processed in order - c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1)) + // The 'WithBlockingCallback()' is to make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1). + // These two options make sure the events from kafka partition are processed in order + c, err := cloudevents.NewClient(receiver, client.WithBlockingCallback(), client.WithPollGoroutines(1)) if err != nil { log.Fatalf("failed to create client, %v", err) } diff --git a/samples/kafka_confluent/sender/main.go b/samples/kafka_confluent/sender/main.go index bfebf816c..777509224 100644 --- a/samples/kafka_confluent/sender/main.go +++ b/samples/kafka_confluent/sender/main.go @@ -8,6 +8,7 @@ package main import ( "context" "log" + "sync" confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -26,8 +27,49 @@ func main() { sender, err := confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{ "bootstrap.servers": "127.0.0.1:9092", }), confluent.WithSenderTopic(topic)) + if err != nil { + log.Fatalf("failed to create protocol, %v", err) + } + + var wg sync.WaitGroup + wg.Add(1) - defer sender.Close(ctx) + // Listen to all the events on the default events channel + // It's important to read these events otherwise the events channel will eventually fill up + go func() { + defer wg.Done() + eventChan, err := sender.Events() + if err != nil { + log.Fatalf("failed to get events channel for sender, %v", err) + } + for e := range eventChan { + switch ev := e.(type) { + case *kafka.Message: + // The message delivery report, indicating success or + // permanent failure after retries have been exhausted. + // Application level retries won't help since the client + // is already configured to do that. + m := ev + if m.TopicPartition.Error != nil { + log.Printf("Delivery failed: %v\n", m.TopicPartition.Error) + } else { + log.Printf("Delivered message to topic %s [%d] at offset %v\n", + *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) + } + case kafka.Error: + // Generic client instance-level errors, such as + // broker connection failures, authentication issues, etc. + // + // These errors should generally be considered informational + // as the underlying client will automatically try to + // recover from any errors encountered, the application + // does not need to take action on them. + log.Printf("Error: %v\n", ev) + default: + log.Printf("Ignored event: %v\n", ev) + } + } + }() c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) if err != nil { @@ -43,14 +85,13 @@ func main() { "message": "Hello, World!", }) - if result := c.Send( - // Set the producer message key - confluent.WithMessageKey(ctx, e.ID()), - e, - ); cloudevents.IsUndelivered(result) { + if result := c.Send(confluent.WithMessageKey(ctx, e.ID()), e); cloudevents.IsUndelivered(result) { log.Printf("failed to send: %v", result) } else { log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result)) } } + + sender.Close(ctx) + wg.Wait() } diff --git a/test/integration/kafka_confluent/kafka_test.go b/test/integration/kafka_confluent/kafka_test.go index ee3d77130..5ae429b2e 100644 --- a/test/integration/kafka_confluent/kafka_test.go +++ b/test/integration/kafka_confluent/kafka_test.go @@ -131,7 +131,8 @@ func protocolFactory(sendTopic string, receiveTopic []string, } if sendTopic != "" { p, err = confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{ - "bootstrap.servers": BOOTSTRAP_SERVER, + "bootstrap.servers": BOOTSTRAP_SERVER, + "go.delivery.reports": false, }), confluent.WithSenderTopic(sendTopic)) } return p, err