Skip to content

Commit

Permalink
Merge 862fccf into daa2725
Browse files Browse the repository at this point in the history
  • Loading branch information
guptalakshya92 committed Jul 6, 2022
2 parents daa2725 + 862fccf commit 1c01e72
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 155 deletions.
43 changes: 23 additions & 20 deletions src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
(:import (java.util Properties)
[org.apache.kafka.common.config SaslConfigs])
(:gen-class
:methods [^{:static true} [get [String] Object]
^{:static true} [getIn [java.lang.Iterable] Object]]
:name tech.gojek.ziggurat.internal.Config))
:methods [^{:static true} [get [String] Object]
^{:static true} [getIn [java.lang.Iterable] Object]]
:name tech.gojek.ziggurat.internal.Config))

(def config-file "config.edn")

Expand All @@ -25,10 +25,10 @@
:username "guest"
:password "guest"
:channel-timeout 2000
:publish-retry {:back-off-ms 5000
:non-recoverable-exception {:enabled true
:back-off-ms 5000
:count 5}}}
:publish-retry {:back-off-ms 5000
:non-recoverable-exception {:enabled true
:back-off-ms 5000
:count 5}}}
:jobs {:instant {:worker-count 4
:prefetch-count 4}}
:rabbit-mq {:delay {:queue-name "%s_delay_queue"
Expand Down Expand Up @@ -77,9 +77,9 @@
(declare config)

(defstate config
:start (let [config-values-from-env (config-from-env config-file)
app-name (-> config-values-from-env :ziggurat :app-name)]
(deep-merge (interpolate-config default-config app-name) config-values-from-env)))
:start (let [config-values-from-env (config-from-env config-file)
app-name (-> config-values-from-env :ziggurat :app-name)]
(deep-merge (interpolate-config default-config app-name) config-values-from-env)))

(defn ziggurat-config []
(get config :ziggurat))
Expand Down Expand Up @@ -169,9 +169,9 @@
(defn- normalize-value
[v]
(str/trim
(cond
(keyword? v) (name v)
:else (str v))))
(cond
(keyword? v) (name v)
:else (str v))))

(defn set-property
[mapping-table p k v]
Expand All @@ -195,8 +195,8 @@
(defn- add-jaas-properties
[properties jaas-config]
(if (some? jaas-config)
(let [username (get jaas-config :username)
password (get jaas-config :password)
(let [username (get jaas-config :username)
password (get jaas-config :password)
mechanism (get jaas-config :mechanism)]
(doto properties
(.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties username password mechanism))))
Expand All @@ -222,11 +222,11 @@
:mechanism <>}}}
"
(let [ssl-configs-enabled (:enabled ssl-config-map)
jaas-config (get ssl-config-map :jaas)]
jaas-config (get ssl-config-map :jaas)]
(if (true? ssl-configs-enabled)
(as-> properties pr
(add-jaas-properties pr jaas-config)
(reduce-kv set-property-fn pr ssl-config-map))
(add-jaas-properties pr jaas-config)
(reduce-kv set-property-fn pr ssl-config-map))
properties)))

(defn build-properties
Expand All @@ -249,11 +249,14 @@
"
[set-property-fn config-map]
(as-> (Properties.) pr
(build-ssl-properties pr set-property-fn (ssl-config))
(reduce-kv set-property-fn pr config-map)))
(build-ssl-properties pr set-property-fn (ssl-config))
(reduce-kv set-property-fn pr config-map)))

(def build-consumer-config-properties (partial build-properties (partial set-property consumer-config-mapping-table)))

(def build-producer-config-properties (partial build-properties (partial set-property producer-config-mapping-table)))

(def build-streams-config-properties (partial build-properties (partial set-property streams-config-mapping-table)))

(defn get-configured-retry-count []
(-> (ziggurat-config) :retry :count))
88 changes: 45 additions & 43 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns ziggurat.mapper
(:require [clojure.string :as str]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.config :refer [ziggurat-config get-configured-retry-count]]
[ziggurat.messaging.producer :as producer]
[ziggurat.metrics :as metrics]
[ziggurat.new-relic :as nr]
Expand All @@ -16,36 +16,38 @@
(producer/publish-to-channel-instant-queue return-code message-payload))

(defn- create-user-payload
[message-payload]
(-> message-payload
(dissoc :headers)
(dissoc :retry-count)
(dissoc :topic-entity)))
[message-payload configured-retry-count]
(let [remaining-retry-count (get message-payload :retry-count configured-retry-count)]
(-> message-payload
(dissoc :headers)
(dissoc :retry-count)
(dissoc :topic-entity)
(assoc-in [:metadata :rabbitmq-retry-count] (- configured-retry-count remaining-retry-count)))))

(defn mapper-func [user-handler-fn channels]
(fn [{:keys [topic-entity] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
message-processing-namespace "message-processing"
base-metric-namespaces [service-name topic-entity-name]
message-processing-namespaces (conj base-metric-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name}
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
dead-letter-metric "dead-letter"
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
message-processing-namespace "message-processing"
base-metric-namespaces [service-name topic-entity-name]
message-processing-namespaces (conj base-metric-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name}
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
dead-letter-metric "dead-letter"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
user-payload (create-user-payload message-payload (get-configured-retry-count))]
(clog/with-logging-context {:consumer-group topic-entity-name}
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags)
Expand All @@ -67,29 +69,29 @@

(defn channel-mapper-func [user-handler-fn channel]
(fn [{:keys [topic-entity] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
message-processing-namespace "message-processing"
base-metric-namespaces [service-name topic-entity-name channel-name]
message-processing-namespaces (conj base-metric-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name :channel_name channel-name}
metric-namespace (str/join "." message-processing-namespaces)
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
dead-letter-metric "dead-letter"
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
message-processing-namespace "message-processing"
base-metric-namespaces [service-name topic-entity-name channel-name]
message-processing-namespaces (conj base-metric-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name :channel_name channel-name}
metric-namespace (str/join "." message-processing-namespaces)
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
dead-letter-metric "dead-letter"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
user-payload (create-user-payload message-payload (producer/get-channel-retry-count topic-entity channel))]
(clog/with-logging-context {:consumer-group topic-entity-name :channel channel-name}
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags)
Expand Down

0 comments on commit 1c01e72

Please sign in to comment.