Skip to content

Commit

Permalink
Merge pull request #1031 from yanmxa/br_confluent_bug
Browse files Browse the repository at this point in the history
  • Loading branch information
embano1 committed Apr 13, 2024
2 parents e6a74ef + 7fef294 commit f765e03
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 38 deletions.
20 changes: 10 additions & 10 deletions protocol/kafka_confluent/v2/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// Option is the function signature required to be considered an kafka_confluent.Option.
type Option func(*Protocol) error

// WithConfigMap sets the configMap to init the kafka client. This option is not required.
// WithConfigMap sets the configMap to init the kafka client.
func WithConfigMap(config *kafka.ConfigMap) Option {
return func(p *Protocol) error {
if config == nil {
Expand All @@ -26,7 +26,7 @@ func WithConfigMap(config *kafka.ConfigMap) Option {
}
}

// WithSenderTopic sets the defaultTopic for the kafka.Producer. This option is not required.
// WithSenderTopic sets the defaultTopic for the kafka.Producer.
func WithSenderTopic(defaultTopic string) Option {
return func(p *Protocol) error {
if defaultTopic == "" {
Expand All @@ -37,7 +37,7 @@ func WithSenderTopic(defaultTopic string) Option {
}
}

// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required.
// WithReceiverTopics sets the topics for the kafka.Consumer.
func WithReceiverTopics(topics []string) Option {
return func(p *Protocol) error {
if topics == nil {
Expand All @@ -48,7 +48,7 @@ func WithReceiverTopics(topics []string) Option {
}
}

// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required.
// WithRebalanceCallBack sets the callback for rebalancing of the consumer group.
func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option {
return func(p *Protocol) error {
if rebalanceCb == nil {
Expand All @@ -59,15 +59,15 @@ func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option {
}
}

// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required.
// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout.
func WithPollTimeout(timeoutMs int) Option {
return func(p *Protocol) error {
p.consumerPollTimeout = timeoutMs
return nil
}
}

// WithSender set a kafka.Producer instance to init the client directly. This option is not required.
// WithSender set a kafka.Producer instance to init the client directly.
func WithSender(producer *kafka.Producer) Option {
return func(p *Protocol) error {
if producer == nil {
Expand All @@ -78,15 +78,15 @@ func WithSender(producer *kafka.Producer) Option {
}
}

// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required.
// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled.
func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option {
return func(p *Protocol) error {
p.consumerErrorHandler = handler
return nil
}
}

// WithSender set a kafka.Consumer instance to init the client directly. This option is not required.
// WithReceiver set a kafka.Consumer instance to init the client directly.
func WithReceiver(consumer *kafka.Consumer) Option {
return func(p *Protocol) error {
if consumer == nil {
Expand All @@ -97,12 +97,12 @@ func WithReceiver(consumer *kafka.Consumer) Option {
}
}

// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required.
// Opaque key type used to store topicPartitionOffsets: assign them from ctx.
type topicPartitionOffsetsType struct{}

var offsetKey = topicPartitionOffsetsType{}

// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required.
// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from.
func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context {
if len(topicPartitionOffsets) == 0 {
panic("the topicPartitionOffsets cannot be empty")
Expand Down
40 changes: 21 additions & 19 deletions protocol/kafka_confluent/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ type Protocol struct {
consumerTopics []string
consumerRebalanceCb kafka.RebalanceCb // optional
consumerPollTimeout int // optional
consumerErrorHandler func(ctx context.Context, err kafka.Error) //optional
consumerErrorHandler func(ctx context.Context, err kafka.Error) // optional
consumerMux sync.Mutex
consumerIncoming chan *kafka.Message
consumerCtx context.Context
consumerCancel context.CancelFunc

producer *kafka.Producer
producerDeliveryChan chan kafka.Event // optional
producerDefaultTopic string // optional
producerDefaultTopic string // optional

closerMux sync.Mutex
}
Expand Down Expand Up @@ -85,12 +84,18 @@ func New(opts ...Option) (*Protocol, error) {
if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil {
return nil, errors.New("at least one of the following to initialize the protocol must be set: config, producer, or consumer")
}
if p.producer != nil {
p.producerDeliveryChan = make(chan kafka.Event)
}
return p, nil
}

// Events returns the events channel used by Confluent Kafka to deliver the result from a produce, i.e., send, operation.
// When using this SDK to produce (send) messages, this channel must be monitored to avoid resource leaks and this channel becoming full. See Confluent SDK for Go for details on the implementation.
func (p *Protocol) Events() (chan kafka.Event, error) {
if p.producer == nil {
return nil, errors.New("producer not set")
}
return p.producer.Events(), nil
}

func (p *Protocol) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(p); err != nil {
Expand All @@ -100,6 +105,7 @@ func (p *Protocol) applyOptions(opts ...Option) error {
return nil
}

// Send message by kafka.Producer. You must monitor the Events() channel when using this function.
func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) {
if p.producer == nil {
return errors.New("producer client must be set")
Expand Down Expand Up @@ -128,19 +134,12 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ..
kafkaMsg.Key = []byte(messageKey)
}

err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...)
if err != nil {
return err
if err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...); err != nil {
return fmt.Errorf("create producer message: %w", err)
}

err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)
if err != nil {
return err
}
e := <-p.producerDeliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return m.TopicPartition.Error
if err = p.producer.Produce(kafkaMsg, nil); err != nil {
return fmt.Errorf("produce message: %w", err)
}
return nil
}
Expand Down Expand Up @@ -231,15 +230,18 @@ func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) {
func (p *Protocol) Close(ctx context.Context) error {
p.closerMux.Lock()
defer p.closerMux.Unlock()
logger := cecontext.LoggerFrom(ctx)

if p.consumerCancel != nil {
p.consumerCancel()
}

if p.producer != nil && !p.producer.IsClosed() {
// Flush and close the producer with a 10 seconds timeout (closes Events channel)
for p.producer.Flush(10000) > 0 {
logger.Info("Flushing outstanding messages")
}
p.producer.Close()
close(p.producerDeliveryChan)
}

return nil
}
5 changes: 3 additions & 2 deletions samples/kafka_confluent/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func main() {
}
defer receiver.Close(ctx)

// Setting the 'client.WithPollGoroutines(1)' to make sure the events from kafka partition are processed in order
c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1))
// The 'WithBlockingCallback()' is to make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1).
// These two options make sure the events from kafka partition are processed in order
c, err := cloudevents.NewClient(receiver, client.WithBlockingCallback(), client.WithPollGoroutines(1))
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
Expand Down
53 changes: 47 additions & 6 deletions samples/kafka_confluent/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package main
import (
"context"
"log"
"sync"

confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand All @@ -26,8 +27,49 @@ func main() {
sender, err := confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{
"bootstrap.servers": "127.0.0.1:9092",
}), confluent.WithSenderTopic(topic))
if err != nil {
log.Fatalf("failed to create protocol, %v", err)
}

var wg sync.WaitGroup
wg.Add(1)

defer sender.Close(ctx)
// Listen to all the events on the default events channel
// It's important to read these events otherwise the events channel will eventually fill up
go func() {
defer wg.Done()
eventChan, err := sender.Events()
if err != nil {
log.Fatalf("failed to get events channel for sender, %v", err)
}
for e := range eventChan {
switch ev := e.(type) {
case *kafka.Message:
// The message delivery report, indicating success or
// permanent failure after retries have been exhausted.
// Application level retries won't help since the client
// is already configured to do that.
m := ev
if m.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
log.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
case kafka.Error:
// Generic client instance-level errors, such as
// broker connection failures, authentication issues, etc.
//
// These errors should generally be considered informational
// as the underlying client will automatically try to
// recover from any errors encountered, the application
// does not need to take action on them.
log.Printf("Error: %v\n", ev)
default:
log.Printf("Ignored event: %v\n", ev)
}
}
}()

c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
Expand All @@ -43,14 +85,13 @@ func main() {
"message": "Hello, World!",
})

if result := c.Send(
// Set the producer message key
confluent.WithMessageKey(ctx, e.ID()),
e,
); cloudevents.IsUndelivered(result) {
if result := c.Send(confluent.WithMessageKey(ctx, e.ID()), e); cloudevents.IsUndelivered(result) {
log.Printf("failed to send: %v", result)
} else {
log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}
}

sender.Close(ctx)
wg.Wait()
}
3 changes: 2 additions & 1 deletion test/integration/kafka_confluent/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func protocolFactory(sendTopic string, receiveTopic []string,
}
if sendTopic != "" {
p, err = confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{
"bootstrap.servers": BOOTSTRAP_SERVER,
"bootstrap.servers": BOOTSTRAP_SERVER,
"go.delivery.reports": false,
}), confluent.WithSenderTopic(sendTopic))
}
return p, err
Expand Down

0 comments on commit f765e03

Please sign in to comment.