Skip to content

Commit

Permalink
reply review
Browse files Browse the repository at this point in the history
Signed-off-by: Meng Yan <myan@redhat.com>
  • Loading branch information
yanmxa committed Apr 8, 2024
1 parent 04f6d68 commit 9720fb1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 14 deletions.
22 changes: 11 additions & 11 deletions protocol/kafka_confluent/v2/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// Option is the function signature required to be considered an kafka_confluent.Option.
type Option func(*Protocol) error

// WithConfigMap sets the configMap to init the kafka client. This option is not required.
// WithConfigMap sets the configMap to init the kafka client.
func WithConfigMap(config *kafka.ConfigMap) Option {
return func(p *Protocol) error {
if config == nil {
Expand All @@ -26,7 +26,7 @@ func WithConfigMap(config *kafka.ConfigMap) Option {
}
}

// WithSenderTopic sets the defaultTopic for the kafka.Producer. This option is not required.
// WithSenderTopic sets the defaultTopic for the kafka.Producer.
func WithSenderTopic(defaultTopic string) Option {
return func(p *Protocol) error {
if defaultTopic == "" {
Expand All @@ -37,7 +37,7 @@ func WithSenderTopic(defaultTopic string) Option {
}
}

// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required.
// WithReceiverTopics sets the topics for the kafka.Consumer.
func WithReceiverTopics(topics []string) Option {
return func(p *Protocol) error {
if topics == nil {
Expand All @@ -48,7 +48,7 @@ func WithReceiverTopics(topics []string) Option {
}
}

// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required.
// WithRebalanceCallBack sets the callback for rebalancing of the consumer group.
func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option {
return func(p *Protocol) error {
if rebalanceCb == nil {
Expand All @@ -59,15 +59,15 @@ func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option {
}
}

// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required.
// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout.
func WithPollTimeout(timeoutMs int) Option {
return func(p *Protocol) error {
p.consumerPollTimeout = timeoutMs
return nil
}
}

// WithSender set a kafka.Producer instance to init the client directly. This option is not required.
// WithSender set a kafka.Producer instance to init the client directly.
func WithSender(producer *kafka.Producer) Option {
return func(p *Protocol) error {
if producer == nil {
Expand All @@ -78,23 +78,23 @@ func WithSender(producer *kafka.Producer) Option {
}
}

// WithEventHandler provide a func on how to handle the kafka.Event for the producer.Events() chan. This option is not required.
// WithEventHandler provide a func on how to handle the kafka.Event for the producer.Events() chan.
func WithEventHandler(handler func(ctx context.Context, err kafka.Event)) Option {
return func(p *Protocol) error {
p.producerEventHandler = handler
return nil
}
}

// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required.
// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled.
func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option {
return func(p *Protocol) error {
p.consumerErrorHandler = handler
return nil
}
}

// WithSender set a kafka.Consumer instance to init the client directly. This option is not required.
// WithSender set a kafka.Consumer instance to init the client directly.
func WithReceiver(consumer *kafka.Consumer) Option {
return func(p *Protocol) error {
if consumer == nil {
Expand All @@ -105,12 +105,12 @@ func WithReceiver(consumer *kafka.Consumer) Option {
}
}

// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required.
// Opaque key type used to store topicPartitionOffsets: assign them from ctx.
type topicPartitionOffsetsType struct{}

var offsetKey = topicPartitionOffsetsType{}

// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required.
// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from.
func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context {
if len(topicPartitionOffsets) == 0 {
panic("the topicPartitionOffsets cannot be empty")
Expand Down
24 changes: 21 additions & 3 deletions protocol/kafka_confluent/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func New(opts ...Option) (*Protocol, error) {
if p.producer != nil {
p.producerDeliveryChan = make(chan kafka.Event)
if p.producerEventHandler == nil {
p.producerEventHandler = func(ctx context.Context, err kafka.Event) {}
p.producerEventHandler = defaultEventHandler
}
}
return p, nil
Expand Down Expand Up @@ -173,9 +173,9 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ..
e := <-p.producerDeliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return m.TopicPartition.Error
err = m.TopicPartition.Error
}
return nil
return err
}

func (p *Protocol) OpenInbound(ctx context.Context) error {
Expand Down Expand Up @@ -274,3 +274,21 @@ func (p *Protocol) Close(ctx context.Context) error {
}
return nil
}

func defaultEventHandler(ctx context.Context, e kafka.Event) {
logger := cecontext.LoggerFrom(ctx)

switch ev := e.(type) {
case kafka.Error:
// Generic client instance-level errors, such as
// broker connection failures, authentication issues, etc.
//
// These errors should generally be considered informational
// as the underlying client will automatically try to
// recover from any errors encountered, the application
// does not need to take action on them.
logger.Infof("get an error event from producer.Events() chan: %v", ev)
default:
logger.Infof("ingore the event from producer.Events() chan: %s", ev)
}
}

0 comments on commit 9720fb1

Please sign in to comment.