Skip to content

Commit

Permalink
Optimize Kafka Client in Kafka Connector (#2630)
Browse files Browse the repository at this point in the history
Signed-off-by: Sanket Sudake <sanketsudake@gmail.com>
  • Loading branch information
sanketsudake committed Nov 28, 2022
1 parent 38d3809 commit 8a3d8a4
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 23 deletions.
4 changes: 2 additions & 2 deletions pkg/mqtrigger/messageQueue/kafka/consumer.go
Expand Up @@ -106,8 +106,8 @@ func (ch MqtConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSessi
topic := claim.Topic()
partition := string(claim.Partition())

// initially set metrics to -1
mqtrigger.SetMessageLagCount(trigger, triggerNamespace, topic, partition, -1)
// initially set message lag count
mqtrigger.SetMessageLagCount(trigger, triggerNamespace, topic, partition, claim.HighWaterMarkOffset()-claim.InitialOffset())

// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine
Expand Down
49 changes: 28 additions & 21 deletions pkg/mqtrigger/messageQueue/kafka/kafka.go
Expand Up @@ -54,6 +54,7 @@ type (
routerUrl string
brokers []string
version sarama.KafkaVersion
client sarama.Client
authKeys map[string][]byte
tls bool
}
Expand Down Expand Up @@ -110,24 +111,18 @@ func New(logger *zap.Logger, mqCfg messageQueue.Config, routerUrl string) (messa

logger.Info("created kafka queue", zap.Any("kafka brokers", kafka.brokers),
zap.Any("kafka version", kafka.version))
return kafka, nil
}

func (kafka Kafka) Subscribe(trigger *fv1.MessageQueueTrigger) (messageQueue.Subscription, error) {
kafka.logger.Debug("inside kakfa subscribe", zap.Any("trigger", trigger))
kafka.logger.Debug("brokers set", zap.Strings("brokers", kafka.brokers))
// Create new config
saramaConfig := sarama.NewConfig()
saramaConfig.Version = kafka.version

// Create new consumer
consumerConfig := sarama.NewConfig()
consumerConfig.Consumer.Return.Errors = true
consumerConfig.Version = kafka.version
// consumer config
saramaConfig.Consumer.Return.Errors = true

// Create new producer
producerConfig := sarama.NewConfig()
producerConfig.Producer.RequiredAcks = sarama.WaitForAll
producerConfig.Producer.Retry.Max = 10
producerConfig.Producer.Return.Successes = true
producerConfig.Version = kafka.version
// producer config
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
saramaConfig.Producer.Retry.Max = 10
saramaConfig.Producer.Return.Successes = true

// Setup TLS for both producer and consumer
if kafka.tls {
Expand All @@ -137,18 +132,30 @@ func (kafka Kafka) Subscribe(trigger *fv1.MessageQueueTrigger) (messageQueue.Sub
return nil, err
}

producerConfig.Net.TLS.Enable = true
producerConfig.Net.TLS.Config = tlsConfig
consumerConfig.Net.TLS.Enable = true
consumerConfig.Net.TLS.Config = tlsConfig
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = tlsConfig
}

saramaClient, err := sarama.NewClient(kafka.brokers, saramaConfig)
if err != nil {
return nil, err
}

consumer, err := sarama.NewConsumerGroup(kafka.brokers, string(trigger.ObjectMeta.UID), consumerConfig)
kafka.client = saramaClient

return kafka, nil
}

func (kafka Kafka) Subscribe(trigger *fv1.MessageQueueTrigger) (messageQueue.Subscription, error) {
kafka.logger.Debug("inside kakfa subscribe", zap.Any("trigger", trigger))
kafka.logger.Debug("brokers set", zap.Strings("brokers", kafka.brokers))

consumer, err := sarama.NewConsumerGroupFromClient(string(trigger.ObjectMeta.UID), kafka.client)
if err != nil {
return nil, err
}

producer, err := sarama.NewSyncProducer(kafka.brokers, producerConfig)
producer, err := sarama.NewSyncProducerFromClient(kafka.client)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 8a3d8a4

Please sign in to comment.