From 48661f9c1fb3804fbdb6d4d0b5b97515905db3f2 Mon Sep 17 00:00:00 2001 From: Seungsoo Lee Date: Sat, 24 Jul 2021 02:06:53 +0000 Subject: [PATCH] Updated kafka --- src/feedconsumer/consumer.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/feedconsumer/consumer.go b/src/feedconsumer/consumer.go index 0fb40434..b5c6c614 100644 --- a/src/feedconsumer/consumer.go +++ b/src/feedconsumer/consumer.go @@ -65,7 +65,7 @@ func (cfc *KnoxFeedConsumer) setupKafkaConfig() { sessionTimeoutMs := viper.GetString("feed-consumer.kafka.session-timeout-ms") autoOffsetReset := viper.GetString("feed-consumer.kafka.auto-offset-reset") - // groupID := viper.GetString("feed-consumer.kafka.group-id") + groupID := viper.GetString("feed-consumer.kafka.group-id") cfc.topics = viper.GetStringSlice("feed-consumer.kafka.topics") cfc.eventsBuffer = viper.GetInt("feed-consumer.kafka.events.buffer") @@ -80,11 +80,13 @@ func (cfc *KnoxFeedConsumer) setupKafkaConfig() { // Set up required configs cfc.kafkaConfig = kafka.ConfigMap{ - "bootstrap.servers": bootstrapServers, - "broker.address.family": brokderAddressFamily, - // "group.id": groupID, - "session.timeout.ms": sessionTimeoutMs, - "auto.offset.reset": autoOffsetReset, + "enable.auto.commit": true, + "auto.commit.interval.ms": 1000, + "bootstrap.servers": bootstrapServers, + "broker.address.family": brokderAddressFamily, + "group.id": groupID, + "session.timeout.ms": sessionTimeoutMs, + "auto.offset.reset": autoOffsetReset, } // Set up SSL specific configs if SSL is enabled