Skip to content

Commit

Permalink
Merge 6032e56 into 0841311
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhang93 committed Aug 24, 2023
2 parents 0841311 + 6032e56 commit f0763af
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 55 deletions.
121 changes: 67 additions & 54 deletions src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,61 +8,66 @@
(:import (java.util Properties)
[org.apache.kafka.common.config SaslConfigs])
(:gen-class
:methods [^{:static true} [get [String] Object]
^{:static true} [getIn [java.lang.Iterable] Object]]
:methods
[^{:static true} [get [String] Object]
^{:static true} [getIn [java.lang.Iterable] Object]]
:name tech.gojek.ziggurat.internal.Config))

(def config-file "config.edn")

(def default-config {:ziggurat {:nrepl-server {:port 70171}
:statsd {:port 8125
:enabled false}
:sentry {:enabled false
:worker-count 10
:queue-size 10
:thread-termination-wait-s 1}
:rabbit-mq-connection {:port 5672
:username "guest"
:password "guest"
:channel-timeout 2000
:publish-retry {:back-off-ms 5000
:non-recoverable-exception {:enabled true
:back-off-ms 5000
:count 5}}}
:jobs {:instant {:worker-count 4
:prefetch-count 4}}
:rabbit-mq {:delay {:queue-name "%s_delay_queue"
:exchange-name "%s_delay_exchange"
:dead-letter-exchange "%s_instant_exchange"
:queue-timeout-ms 5000}
:instant {:queue-name "%s_instant_queue"
:exchange-name "%s_instant_exchange"}
:dead-letter {:queue-name "%s_dead_letter_queue"
:exchange-name "%s_dead_letter_exchange"}}
:retry {:count 5
:enabled false}
:http-server {:middlewares {:swagger {:enabled false}}
:port 8080
:thread-count 100}
:new-relic {:report-errors false}
:log-format "text"}})
(def default-config
{:ziggurat {:nrepl-server {:port 70171}
:statsd {:port 8125
:enabled false}
:sentry {:enabled false
:worker-count 10
:queue-size 10
:thread-termination-wait-s 1}
:rabbit-mq-connection {:port 5672
:username "guest"
:password "guest"
:channel-timeout 2000
:publish-retry {:back-off-ms 5000
:non-recoverable-exception {:enabled true
:back-off-ms 5000
:count 5}}}
:jobs {:instant {:worker-count 4
:prefetch-count 4}}
:rabbit-mq {:delay {:queue-name "%s_delay_queue"
:exchange-name "%s_delay_exchange"
:dead-letter-exchange "%s_instant_exchange"
:queue-timeout-ms 5000}
:instant {:queue-name "%s_instant_queue"
:exchange-name "%s_instant_exchange"}
:dead-letter {:queue-name "%s_dead_letter_queue"
:exchange-name "%s_dead_letter_exchange"}}
:retry {:count 5
:enabled false}
:http-server {:middlewares {:swagger {:enabled false}}
:port 8080
:thread-count 100}
:new-relic {:report-errors false}
:log-format "text"}})

(defn- interpolate-val [val app-name]
(if (string? val)
(format val app-name)
val))

(defn- interpolate-config [config app-name]
(reduce-kv (fn [m k v]
(if (map? v)
(assoc m k (interpolate-config v app-name))
(assoc m k (interpolate-val v app-name)))) {} config))
(reduce-kv
(fn [m k v]
(if (map? v)
(assoc m k (interpolate-config v app-name))
(assoc m k (interpolate-val v app-name))))
{} config))

(defn- deep-merge [& maps]
(apply merge-with (fn [& args]
(if (every? map? args)
(apply deep-merge args)
(last args)))
(apply merge-with
(fn [& args]
(if (every? map? args)
(apply deep-merge args)
(last args)))
maps))

(defn- edn-config [config-file]
Expand All @@ -77,9 +82,10 @@
(declare config)

(defstate config
:start (let [config-values-from-env (config-from-env config-file)
app-name (-> config-values-from-env :ziggurat :app-name)]
(deep-merge (interpolate-config default-config app-name) config-values-from-env)))
:start
(let [config-values-from-env (config-from-env config-file)
app-name (-> config-values-from-env :ziggurat :app-name)]
(deep-merge (interpolate-config default-config app-name) config-values-from-env)))

(defn ziggurat-config []
(get config :ziggurat))
Expand All @@ -101,7 +107,10 @@
(get-in (ziggurat-config) ks default)))

(defn channel-retry-config [topic-entity channel]
(get-in (ziggurat-config) [:stream-router topic-entity :channels channel :retry]))
(get-in (ziggurat-config)
[:stream-router topic-entity
:channels channel
:retry]))

(defn- java-response
"When returning config from -get or -getIn, we can either return a Map or string (based on the key/keys passed).
Expand Down Expand Up @@ -171,7 +180,7 @@
(str/trim
(cond
(keyword? v) (name v)
:else (str v))))
:else (str v))))

(defn set-property
[mapping-table p k v]
Expand All @@ -195,11 +204,12 @@
(defn- add-jaas-properties
[properties jaas-config]
(if (some? jaas-config)
(let [username (get jaas-config :username)
password (get jaas-config :password)
(let [username (get jaas-config :username)
password (get jaas-config :password)
mechanism (get jaas-config :mechanism)]
(doto properties
(.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties username password mechanism))))
(.put SaslConfigs/SASL_JAAS_CONFIG
(create-jaas-properties username password mechanism))))
properties))

(defn build-ssl-properties
Expand All @@ -222,7 +232,7 @@
:mechanism <>}}}
"
(let [ssl-configs-enabled (:enabled ssl-config-map)
jaas-config (get ssl-config-map :jaas)]
jaas-config (get ssl-config-map :jaas)]
(if (true? ssl-configs-enabled)
(as-> properties pr
(add-jaas-properties pr jaas-config)
Expand Down Expand Up @@ -252,11 +262,14 @@
(build-ssl-properties pr set-property-fn (ssl-config))
(reduce-kv set-property-fn pr config-map)))

(def build-consumer-config-properties (partial build-properties (partial set-property consumer-config-mapping-table)))
(def build-consumer-config-properties
(partial build-properties (partial set-property consumer-config-mapping-table)))

(def build-producer-config-properties (partial build-properties (partial set-property producer-config-mapping-table)))
(def build-producer-config-properties
(partial build-properties (partial set-property producer-config-mapping-table)))

(def build-streams-config-properties (partial build-properties (partial set-property streams-config-mapping-table)))
(def build-streams-config-properties
(partial build-properties (partial set-property streams-config-mapping-table)))

(defn get-configured-retry-count []
(-> (ziggurat-config) :retry :count))
Expand Down
4 changes: 3 additions & 1 deletion src/ziggurat/kafka_consumer/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
(def default-consumer-config
{:commit-interval-ms 15000
:session-timeout-ms-config 60000
:max-poll-interval-ms 300000
:default-api-timeout-ms-config 60000
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"})
Expand All @@ -16,7 +17,8 @@
[topic-entity consumer-group-config]
(try
(let [merged-consumer-group-config (umap/deep-merge consumer-group-config default-consumer-config)
consumer (KafkaConsumer. (cfg/build-consumer-config-properties merged-consumer-group-config))
consumer (KafkaConsumer.
(cfg/build-consumer-config-properties merged-consumer-group-config))
topic-pattern (Pattern/compile (:origin-topic merged-consumer-group-config))]
(.subscribe consumer topic-pattern)
consumer)
Expand Down

0 comments on commit f0763af

Please sign in to comment.