From 9720fb161064846a3cbe5e6bcad008141053b417 Mon Sep 17 00:00:00 2001 From: Meng Yan Date: Mon, 1 Apr 2024 08:43:31 +0000 Subject: [PATCH] reply review Signed-off-by: Meng Yan --- protocol/kafka_confluent/v2/option.go | 22 +++++++++++----------- protocol/kafka_confluent/v2/protocol.go | 24 +++++++++++++++++++++--- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/protocol/kafka_confluent/v2/option.go b/protocol/kafka_confluent/v2/option.go index dc376471..cb0f9e12 100644 --- a/protocol/kafka_confluent/v2/option.go +++ b/protocol/kafka_confluent/v2/option.go @@ -15,7 +15,7 @@ import ( // Option is the function signature required to be considered an kafka_confluent.Option. type Option func(*Protocol) error -// WithConfigMap sets the configMap to init the kafka client. This option is not required. +// WithConfigMap sets the configMap to init the kafka client. func WithConfigMap(config *kafka.ConfigMap) Option { return func(p *Protocol) error { if config == nil { @@ -26,7 +26,7 @@ func WithConfigMap(config *kafka.ConfigMap) Option { } } -// WithSenderTopic sets the defaultTopic for the kafka.Producer. This option is not required. +// WithSenderTopic sets the defaultTopic for the kafka.Producer. func WithSenderTopic(defaultTopic string) Option { return func(p *Protocol) error { if defaultTopic == "" { @@ -37,7 +37,7 @@ func WithSenderTopic(defaultTopic string) Option { } } -// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required. +// WithReceiverTopics sets the topics for the kafka.Consumer. func WithReceiverTopics(topics []string) Option { return func(p *Protocol) error { if topics == nil { @@ -48,7 +48,7 @@ func WithReceiverTopics(topics []string) Option { } } -// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required. +// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option { return func(p *Protocol) error { if rebalanceCb == nil { @@ -59,7 +59,7 @@ func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option { } } -// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required. +// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. func WithPollTimeout(timeoutMs int) Option { return func(p *Protocol) error { p.consumerPollTimeout = timeoutMs @@ -67,7 +67,7 @@ func WithPollTimeout(timeoutMs int) Option { } } -// WithSender set a kafka.Producer instance to init the client directly. This option is not required. +// WithSender set a kafka.Producer instance to init the client directly. func WithSender(producer *kafka.Producer) Option { return func(p *Protocol) error { if producer == nil { @@ -78,7 +78,7 @@ func WithSender(producer *kafka.Producer) Option { } } -// WithEventHandler provide a func on how to handle the kafka.Event for the producer.Events() chan. This option is not required. +// WithEventHandler provide a func on how to handle the kafka.Event for the producer.Events() chan. func WithEventHandler(handler func(ctx context.Context, err kafka.Event)) Option { return func(p *Protocol) error { p.producerEventHandler = handler @@ -86,7 +86,7 @@ func WithEventHandler(handler func(ctx context.Context, err kafka.Event)) Option } } -// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required. +// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option { return func(p *Protocol) error { p.consumerErrorHandler = handler @@ -94,7 +94,7 @@ func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option } } -// WithSender set a kafka.Consumer instance to init the client directly. This option is not required. +// WithSender set a kafka.Consumer instance to init the client directly. func WithReceiver(consumer *kafka.Consumer) Option { return func(p *Protocol) error { if consumer == nil { @@ -105,12 +105,12 @@ func WithReceiver(consumer *kafka.Consumer) Option { } } -// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required. +// Opaque key type used to store topicPartitionOffsets: assign them from ctx. type topicPartitionOffsetsType struct{} var offsetKey = topicPartitionOffsetsType{} -// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required. +// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context { if len(topicPartitionOffsets) == 0 { panic("the topicPartitionOffsets cannot be empty") diff --git a/protocol/kafka_confluent/v2/protocol.go b/protocol/kafka_confluent/v2/protocol.go index 0aedbaee..2293f4ae 100644 --- a/protocol/kafka_confluent/v2/protocol.go +++ b/protocol/kafka_confluent/v2/protocol.go @@ -91,7 +91,7 @@ func New(opts ...Option) (*Protocol, error) { if p.producer != nil { p.producerDeliveryChan = make(chan kafka.Event) if p.producerEventHandler == nil { - p.producerEventHandler = func(ctx context.Context, err kafka.Event) {} + p.producerEventHandler = defaultEventHandler } } return p, nil @@ -173,9 +173,9 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers .. e := <-p.producerDeliveryChan m := e.(*kafka.Message) if m.TopicPartition.Error != nil { - return m.TopicPartition.Error + err = m.TopicPartition.Error } - return nil + return err } func (p *Protocol) OpenInbound(ctx context.Context) error { @@ -274,3 +274,21 @@ func (p *Protocol) Close(ctx context.Context) error { } return nil } + +func defaultEventHandler(ctx context.Context, e kafka.Event) { + logger := cecontext.LoggerFrom(ctx) + + switch ev := e.(type) { + case kafka.Error: + // Generic client instance-level errors, such as + // broker connection failures, authentication issues, etc. + // + // These errors should generally be considered informational + // as the underlying client will automatically try to + // recover from any errors encountered, the application + // does not need to take action on them. + logger.Infof("get an error event from producer.Events() chan: %v", ev) + default: + logger.Infof("ingore the event from producer.Events() chan: %s", ev) + } +}