From 8a3d8a47622f9eae0c33f4102573c98519d664b7 Mon Sep 17 00:00:00 2001 From: Sanket Sudake Date: Mon, 28 Nov 2022 16:31:34 +0530 Subject: [PATCH] Optimize Kafka Client in Kafka Connector (#2630) Signed-off-by: Sanket Sudake --- pkg/mqtrigger/messageQueue/kafka/consumer.go | 4 +- pkg/mqtrigger/messageQueue/kafka/kafka.go | 49 +++++++++++--------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/pkg/mqtrigger/messageQueue/kafka/consumer.go b/pkg/mqtrigger/messageQueue/kafka/consumer.go index a2838dc71a..f2395bc536 100644 --- a/pkg/mqtrigger/messageQueue/kafka/consumer.go +++ b/pkg/mqtrigger/messageQueue/kafka/consumer.go @@ -106,8 +106,8 @@ func (ch MqtConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSessi topic := claim.Topic() partition := string(claim.Partition()) - // initially set metrics to -1 - mqtrigger.SetMessageLagCount(trigger, triggerNamespace, topic, partition, -1) + // initially set message lag count + mqtrigger.SetMessageLagCount(trigger, triggerNamespace, topic, partition, claim.HighWaterMarkOffset()-claim.InitialOffset()) // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine diff --git a/pkg/mqtrigger/messageQueue/kafka/kafka.go b/pkg/mqtrigger/messageQueue/kafka/kafka.go index 69e6655146..862ccfc740 100644 --- a/pkg/mqtrigger/messageQueue/kafka/kafka.go +++ b/pkg/mqtrigger/messageQueue/kafka/kafka.go @@ -54,6 +54,7 @@ type ( routerUrl string brokers []string version sarama.KafkaVersion + client sarama.Client authKeys map[string][]byte tls bool } @@ -110,24 +111,18 @@ func New(logger *zap.Logger, mqCfg messageQueue.Config, routerUrl string) (messa logger.Info("created kafka queue", zap.Any("kafka brokers", kafka.brokers), zap.Any("kafka version", kafka.version)) - return kafka, nil -} -func (kafka Kafka) Subscribe(trigger *fv1.MessageQueueTrigger) (messageQueue.Subscription, error) { - kafka.logger.Debug("inside kakfa subscribe", zap.Any("trigger", trigger)) - kafka.logger.Debug("brokers set", zap.Strings("brokers", kafka.brokers)) + // Create new config + saramaConfig := sarama.NewConfig() + saramaConfig.Version = kafka.version - // Create new consumer - consumerConfig := sarama.NewConfig() - consumerConfig.Consumer.Return.Errors = true - consumerConfig.Version = kafka.version + // consumer config + saramaConfig.Consumer.Return.Errors = true - // Create new producer - producerConfig := sarama.NewConfig() - producerConfig.Producer.RequiredAcks = sarama.WaitForAll - producerConfig.Producer.Retry.Max = 10 - producerConfig.Producer.Return.Successes = true - producerConfig.Version = kafka.version + // producer config + saramaConfig.Producer.RequiredAcks = sarama.WaitForAll + saramaConfig.Producer.Retry.Max = 10 + saramaConfig.Producer.Return.Successes = true // Setup TLS for both producer and consumer if kafka.tls { @@ -137,18 +132,30 @@ func (kafka Kafka) Subscribe(trigger *fv1.MessageQueueTrigger) (messageQueue.Sub return nil, err } - producerConfig.Net.TLS.Enable = true - producerConfig.Net.TLS.Config = tlsConfig - consumerConfig.Net.TLS.Enable = true - consumerConfig.Net.TLS.Config = tlsConfig + saramaConfig.Net.TLS.Enable = true + saramaConfig.Net.TLS.Config = tlsConfig + } + + saramaClient, err := sarama.NewClient(kafka.brokers, saramaConfig) + if err != nil { + return nil, err } - consumer, err := sarama.NewConsumerGroup(kafka.brokers, string(trigger.ObjectMeta.UID), consumerConfig) + kafka.client = saramaClient + + return kafka, nil +} + +func (kafka Kafka) Subscribe(trigger *fv1.MessageQueueTrigger) (messageQueue.Subscription, error) { + kafka.logger.Debug("inside kakfa subscribe", zap.Any("trigger", trigger)) + kafka.logger.Debug("brokers set", zap.Strings("brokers", kafka.brokers)) + + consumer, err := sarama.NewConsumerGroupFromClient(string(trigger.ObjectMeta.UID), kafka.client) if err != nil { return nil, err } - producer, err := sarama.NewSyncProducer(kafka.brokers, producerConfig) + producer, err := sarama.NewSyncProducerFromClient(kafka.client) if err != nil { return nil, err }