Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide report channel Events() for confluent kafka producer #1031

Merged
merged 1 commit into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions protocol/kafka_confluent/v2/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// Option is the function signature required to be considered an kafka_confluent.Option.
type Option func(*Protocol) error

// WithConfigMap sets the configMap to init the kafka client. This option is not required.
// WithConfigMap sets the configMap to init the kafka client.
func WithConfigMap(config *kafka.ConfigMap) Option {
return func(p *Protocol) error {
if config == nil {
Expand All @@ -26,7 +26,7 @@ func WithConfigMap(config *kafka.ConfigMap) Option {
}
}

// WithSenderTopic sets the defaultTopic for the kafka.Producer. This option is not required.
// WithSenderTopic sets the defaultTopic for the kafka.Producer.
func WithSenderTopic(defaultTopic string) Option {
return func(p *Protocol) error {
if defaultTopic == "" {
Expand All @@ -37,7 +37,7 @@ func WithSenderTopic(defaultTopic string) Option {
}
}

// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required.
// WithReceiverTopics sets the topics for the kafka.Consumer.
func WithReceiverTopics(topics []string) Option {
return func(p *Protocol) error {
if topics == nil {
Expand All @@ -48,7 +48,7 @@ func WithReceiverTopics(topics []string) Option {
}
}

// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required.
// WithRebalanceCallBack sets the callback for rebalancing of the consumer group.
func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option {
return func(p *Protocol) error {
if rebalanceCb == nil {
Expand All @@ -59,15 +59,15 @@ func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option {
}
}

// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required.
// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout.
func WithPollTimeout(timeoutMs int) Option {
return func(p *Protocol) error {
p.consumerPollTimeout = timeoutMs
return nil
}
}

// WithSender set a kafka.Producer instance to init the client directly. This option is not required.
// WithSender set a kafka.Producer instance to init the client directly.
func WithSender(producer *kafka.Producer) Option {
return func(p *Protocol) error {
if producer == nil {
Expand All @@ -78,15 +78,15 @@ func WithSender(producer *kafka.Producer) Option {
}
}

// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required.
// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled.
func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option {
return func(p *Protocol) error {
p.consumerErrorHandler = handler
return nil
}
}

// WithSender set a kafka.Consumer instance to init the client directly. This option is not required.
// WithReceiver set a kafka.Consumer instance to init the client directly.
func WithReceiver(consumer *kafka.Consumer) Option {
return func(p *Protocol) error {
if consumer == nil {
Expand All @@ -97,12 +97,12 @@ func WithReceiver(consumer *kafka.Consumer) Option {
}
}

// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required.
// Opaque key type used to store topicPartitionOffsets: assign them from ctx.
type topicPartitionOffsetsType struct{}

var offsetKey = topicPartitionOffsetsType{}

// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required.
// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from.
func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context {
if len(topicPartitionOffsets) == 0 {
panic("the topicPartitionOffsets cannot be empty")
Expand Down
40 changes: 21 additions & 19 deletions protocol/kafka_confluent/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ type Protocol struct {
consumerTopics []string
consumerRebalanceCb kafka.RebalanceCb // optional
consumerPollTimeout int // optional
consumerErrorHandler func(ctx context.Context, err kafka.Error) //optional
consumerErrorHandler func(ctx context.Context, err kafka.Error) // optional
consumerMux sync.Mutex
consumerIncoming chan *kafka.Message
consumerCtx context.Context
consumerCancel context.CancelFunc

producer *kafka.Producer
producerDeliveryChan chan kafka.Event // optional
producerDefaultTopic string // optional
producerDefaultTopic string // optional

closerMux sync.Mutex
}
Expand Down Expand Up @@ -85,12 +84,18 @@ func New(opts ...Option) (*Protocol, error) {
if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil {
return nil, errors.New("at least one of the following to initialize the protocol must be set: config, producer, or consumer")
}
if p.producer != nil {
p.producerDeliveryChan = make(chan kafka.Event)
}
return p, nil
}

// Events returns the events channel used by Confluent Kafka to deliver the result from a produce, i.e., send, operation.
// When using this SDK to produce (send) messages, this channel must be monitored to avoid resource leaks and this channel becoming full. See Confluent SDK for Go for details on the implementation.
func (p *Protocol) Events() (chan kafka.Event, error) {
if p.producer == nil {
return nil, errors.New("producer not set")
}
return p.producer.Events(), nil
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add a comment on Send to reference/explain to also use the Events channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func (p *Protocol) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(p); err != nil {
Expand All @@ -100,6 +105,7 @@ func (p *Protocol) applyOptions(opts ...Option) error {
return nil
}

// Send message by kafka.Producer. You must monitor the Events() channel when using this function.
func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) {
if p.producer == nil {
return errors.New("producer client must be set")
Expand Down Expand Up @@ -128,19 +134,12 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ..
kafkaMsg.Key = []byte(messageKey)
}

err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...)
if err != nil {
return err
if err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...); err != nil {
return fmt.Errorf("create producer message: %w", err)
}

err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)
if err != nil {
return err
}
e := <-p.producerDeliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return m.TopicPartition.Error
if err = p.producer.Produce(kafkaMsg, nil); err != nil {
return fmt.Errorf("produce message: %w", err)
}
return nil
}
Expand Down Expand Up @@ -231,15 +230,18 @@ func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) {
func (p *Protocol) Close(ctx context.Context) error {
p.closerMux.Lock()
defer p.closerMux.Unlock()
logger := cecontext.LoggerFrom(ctx)

if p.consumerCancel != nil {
p.consumerCancel()
}

if p.producer != nil && !p.producer.IsClosed() {
// Flush and close the producer with a 10 seconds timeout (closes Events channel)
for p.producer.Flush(10000) > 0 {
logger.Info("Flushing outstanding messages")
}
p.producer.Close()
close(p.producerDeliveryChan)
}

return nil
}
5 changes: 3 additions & 2 deletions samples/kafka_confluent/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func main() {
}
defer receiver.Close(ctx)

// Setting the 'client.WithPollGoroutines(1)' to make sure the events from kafka partition are processed in order
c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1))
// The 'WithBlockingCallback()' is to make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1).
// These two options make sure the events from kafka partition are processed in order
c, err := cloudevents.NewClient(receiver, client.WithBlockingCallback(), client.WithPollGoroutines(1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wow, man...this is really not intuitive to use

if err != nil {
log.Fatalf("failed to create client, %v", err)
}
Expand Down
53 changes: 47 additions & 6 deletions samples/kafka_confluent/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package main
import (
"context"
"log"
"sync"

confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand All @@ -26,8 +27,49 @@ func main() {
sender, err := confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:9092",
}), confluent.WithSenderTopic(topic))
if err != nil {
log.Fatalf("failed to create protocol, %v", err)
}

var wg sync.WaitGroup
wg.Add(1)

defer sender.Close(ctx)
// Listen to all the events on the default events channel
// It's important to read these events otherwise the events channel will eventually fill up
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you're not waiting for this goroutine to finish before returning from main- which could prevent your goroutine from printing anything. typically use https://pkg.go.dev/golang.org/x/sync/errgroup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

defer wg.Done()
eventChan, err := sender.Events()
if err != nil {
log.Fatalf("failed to get events channel for sender, %v", err)
}
for e := range eventChan {
switch ev := e.(type) {
case *kafka.Message:
// The message delivery report, indicating success or
// permanent failure after retries have been exhausted.
// Application level retries won't help since the client
// is already configured to do that.
m := ev
if m.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
log.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
case kafka.Error:
// Generic client instance-level errors, such as
// broker connection failures, authentication issues, etc.
//
// These errors should generally be considered informational
// as the underlying client will automatically try to
// recover from any errors encountered, the application
// does not need to take action on them.
log.Printf("Error: %v\n", ev)
default:
log.Printf("Ignored event: %v\n", ev)
}
}
}()

c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
Expand All @@ -43,14 +85,13 @@ func main() {
"message": "Hello, World!",
})

if result := c.Send(
// Set the producer message key
confluent.WithMessageKey(ctx, e.ID()),
e,
); cloudevents.IsUndelivered(result) {
if result := c.Send(confluent.WithMessageKey(ctx, e.ID()), e); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", result)
} else {
log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}
}

sender.Close(ctx)
wg.Wait()
}
3 changes: 2 additions & 1 deletion test/integration/kafka_confluent/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func protocolFactory(sendTopic string, receiveTopic []string,
}
if sendTopic != "" {
p, err = confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{
"bootstrap.servers": BOOTSTRAP_SERVER,
"bootstrap.servers": BOOTSTRAP_SERVER,
"go.delivery.reports": false,
}), confluent.WithSenderTopic(sendTopic))
}
return p, err
Expand Down