From 170b9dc42a61ff5c943a4a42535782b34d56ec41 Mon Sep 17 00:00:00 2001 From: "shubhang.balkundi" Date: Thu, 24 Aug 2023 12:00:06 +0530 Subject: [PATCH 1/2] adds max-poll-interval-ms --- src/ziggurat/config.clj | 123 ++++++++++++++++++++++------------------ 1 file changed, 69 insertions(+), 54 deletions(-) diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index 59805bd3..1d4028fa 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -8,44 +8,46 @@ (:import (java.util Properties) [org.apache.kafka.common.config SaslConfigs]) (:gen-class - :methods [^{:static true} [get [String] Object] - ^{:static true} [getIn [java.lang.Iterable] Object]] + :methods + [^{:static true} [get [String] Object] + ^{:static true} [getIn [java.lang.Iterable] Object]] :name tech.gojek.ziggurat.internal.Config)) (def config-file "config.edn") -(def default-config {:ziggurat {:nrepl-server {:port 70171} - :statsd {:port 8125 - :enabled false} - :sentry {:enabled false - :worker-count 10 - :queue-size 10 - :thread-termination-wait-s 1} - :rabbit-mq-connection {:port 5672 - :username "guest" - :password "guest" - :channel-timeout 2000 - :publish-retry {:back-off-ms 5000 - :non-recoverable-exception {:enabled true - :back-off-ms 5000 - :count 5}}} - :jobs {:instant {:worker-count 4 - :prefetch-count 4}} - :rabbit-mq {:delay {:queue-name "%s_delay_queue" - :exchange-name "%s_delay_exchange" - :dead-letter-exchange "%s_instant_exchange" - :queue-timeout-ms 5000} - :instant {:queue-name "%s_instant_queue" - :exchange-name "%s_instant_exchange"} - :dead-letter {:queue-name "%s_dead_letter_queue" - :exchange-name "%s_dead_letter_exchange"}} - :retry {:count 5 - :enabled false} - :http-server {:middlewares {:swagger {:enabled false}} - :port 8080 - :thread-count 100} - :new-relic {:report-errors false} - :log-format "text"}}) +(def default-config + {:ziggurat {:nrepl-server {:port 70171} + :statsd {:port 8125 + :enabled false} + :sentry {:enabled false + :worker-count 10 + :queue-size 10 + :thread-termination-wait-s 1} + :rabbit-mq-connection {:port 5672 + :username "guest" + :password "guest" + :channel-timeout 2000 + :publish-retry {:back-off-ms 5000 + :non-recoverable-exception {:enabled true + :back-off-ms 5000 + :count 5}}} + :jobs {:instant {:worker-count 4 + :prefetch-count 4}} + :rabbit-mq {:delay {:queue-name "%s_delay_queue" + :exchange-name "%s_delay_exchange" + :dead-letter-exchange "%s_instant_exchange" + :queue-timeout-ms 5000} + :instant {:queue-name "%s_instant_queue" + :exchange-name "%s_instant_exchange"} + :dead-letter {:queue-name "%s_dead_letter_queue" + :exchange-name "%s_dead_letter_exchange"}} + :retry {:count 5 + :enabled false} + :http-server {:middlewares {:swagger {:enabled false}} + :port 8080 + :thread-count 100} + :new-relic {:report-errors false} + :log-format "text"}}) (defn- interpolate-val [val app-name] (if (string? val) @@ -53,16 +55,19 @@ val)) (defn- interpolate-config [config app-name] - (reduce-kv (fn [m k v] - (if (map? v) - (assoc m k (interpolate-config v app-name)) - (assoc m k (interpolate-val v app-name)))) {} config)) + (reduce-kv + (fn [m k v] + (if (map? v) + (assoc m k (interpolate-config v app-name)) + (assoc m k (interpolate-val v app-name)))) + {} config)) (defn- deep-merge [& maps] - (apply merge-with (fn [& args] - (if (every? map? args) - (apply deep-merge args) - (last args))) + (apply merge-with + (fn [& args] + (if (every? map? args) + (apply deep-merge args) + (last args))) maps)) (defn- edn-config [config-file] @@ -77,9 +82,10 @@ (declare config) (defstate config - :start (let [config-values-from-env (config-from-env config-file) - app-name (-> config-values-from-env :ziggurat :app-name)] - (deep-merge (interpolate-config default-config app-name) config-values-from-env))) + :start + (let [config-values-from-env (config-from-env config-file) + app-name (-> config-values-from-env :ziggurat :app-name)] + (deep-merge (interpolate-config default-config app-name) config-values-from-env))) (defn ziggurat-config [] (get config :ziggurat)) @@ -101,7 +107,10 @@ (get-in (ziggurat-config) ks default))) (defn channel-retry-config [topic-entity channel] - (get-in (ziggurat-config) [:stream-router topic-entity :channels channel :retry])) + (get-in (ziggurat-config) + [:stream-router topic-entity + :channels channel + :retry])) (defn- java-response "When returning config from -get or -getIn, we can either return a Map or string (based on the key/keys passed). @@ -127,6 +136,8 @@ :default-api-timeout-ms-config :default-api-timeout-ms :key-deserializer-class-config :key-deserializer :session-timeout-ms-config :session-timeout-ms + :max-poll-interval-ms :max-poll-interval-ms + :max-poll-records :max-poll-recrords :value-deserializer-class-config :value-deserializer}) (def producer-config-mapping-table @@ -171,7 +182,7 @@ (str/trim (cond (keyword? v) (name v) - :else (str v)))) + :else (str v)))) (defn set-property [mapping-table p k v] @@ -195,11 +206,12 @@ (defn- add-jaas-properties [properties jaas-config] (if (some? jaas-config) - (let [username (get jaas-config :username) - password (get jaas-config :password) + (let [username (get jaas-config :username) + password (get jaas-config :password) mechanism (get jaas-config :mechanism)] (doto properties - (.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties username password mechanism)))) + (.put SaslConfigs/SASL_JAAS_CONFIG + (create-jaas-properties username password mechanism)))) properties)) (defn build-ssl-properties @@ -222,7 +234,7 @@ :mechanism <>}}} " (let [ssl-configs-enabled (:enabled ssl-config-map) - jaas-config (get ssl-config-map :jaas)] + jaas-config (get ssl-config-map :jaas)] (if (true? ssl-configs-enabled) (as-> properties pr (add-jaas-properties pr jaas-config) @@ -252,11 +264,14 @@ (build-ssl-properties pr set-property-fn (ssl-config)) (reduce-kv set-property-fn pr config-map))) -(def build-consumer-config-properties (partial build-properties (partial set-property consumer-config-mapping-table))) +(def build-consumer-config-properties + (partial build-properties (partial set-property consumer-config-mapping-table))) -(def build-producer-config-properties (partial build-properties (partial set-property producer-config-mapping-table))) +(def build-producer-config-properties + (partial build-properties (partial set-property producer-config-mapping-table))) -(def build-streams-config-properties (partial build-properties (partial set-property streams-config-mapping-table))) +(def build-streams-config-properties + (partial build-properties (partial set-property streams-config-mapping-table))) (defn get-configured-retry-count [] (-> (ziggurat-config) :retry :count)) From 6032e56db7d62d75abb3658d123c8feba4321136 Mon Sep 17 00:00:00 2001 From: "shubhang.balkundi" Date: Thu, 24 Aug 2023 14:45:51 +0530 Subject: [PATCH 2/2] fix: default config --- src/ziggurat/config.clj | 2 -- src/ziggurat/kafka_consumer/consumer.clj | 4 +++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index 1d4028fa..edc1a06e 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -136,8 +136,6 @@ :default-api-timeout-ms-config :default-api-timeout-ms :key-deserializer-class-config :key-deserializer :session-timeout-ms-config :session-timeout-ms - :max-poll-interval-ms :max-poll-interval-ms - :max-poll-records :max-poll-recrords :value-deserializer-class-config :value-deserializer}) (def producer-config-mapping-table diff --git a/src/ziggurat/kafka_consumer/consumer.clj b/src/ziggurat/kafka_consumer/consumer.clj index 1f040f3c..f14b930d 100644 --- a/src/ziggurat/kafka_consumer/consumer.clj +++ b/src/ziggurat/kafka_consumer/consumer.clj @@ -8,6 +8,7 @@ (def default-consumer-config {:commit-interval-ms 15000 :session-timeout-ms-config 60000 + :max-poll-interval-ms 300000 :default-api-timeout-ms-config 60000 :key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer" :value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}) @@ -16,7 +17,8 @@ [topic-entity consumer-group-config] (try (let [merged-consumer-group-config (umap/deep-merge consumer-group-config default-consumer-config) - consumer (KafkaConsumer. (cfg/build-consumer-config-properties merged-consumer-group-config)) + consumer (KafkaConsumer. + (cfg/build-consumer-config-properties merged-consumer-group-config)) topic-pattern (Pattern/compile (:origin-topic merged-consumer-group-config))] (.subscribe consumer topic-pattern) consumer)