Skip to content

Commit

Permalink
Merge 3e4a400 into 4786a06
Browse files Browse the repository at this point in the history
  • Loading branch information
guptalakshya92 committed Jul 4, 2022
2 parents 4786a06 + 3e4a400 commit 494192e
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 133 deletions.
86 changes: 44 additions & 42 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,38 @@
(producer/publish-to-channel-instant-queue return-code message-payload))

(defn- create-user-payload
[message-payload]
(-> message-payload
(dissoc :headers)
(dissoc :retry-count)
(dissoc :topic-entity)))
[message-payload configured-retry-count]
(let [remaining-retry-count (get message-payload :retry-count configured-retry-count)]
(-> message-payload
(dissoc :headers)
(dissoc :retry-count)
(dissoc :topic-entity)
(assoc-in [:metadata :rabbitmq-retry-count] (- configured-retry-count remaining-retry-count)))))

(defn mapper-func [user-handler-fn channels]
(fn [{:keys [topic-entity] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
message-processing-namespace "message-processing"
base-metric-namespaces [service-name topic-entity-name]
message-processing-namespaces (conj base-metric-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name}
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
dead-letter-metric "dead-letter"
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
message-processing-namespace "message-processing"
base-metric-namespaces [service-name topic-entity-name]
message-processing-namespaces (conj base-metric-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name}
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
dead-letter-metric "dead-letter"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
user-payload (create-user-payload message-payload (producer/get-configured-retry-count))]
(clog/with-logging-context {:consumer-group topic-entity-name}
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags)
Expand All @@ -67,29 +69,29 @@

(defn channel-mapper-func [user-handler-fn channel]
(fn [{:keys [topic-entity] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
message-processing-namespace "message-processing"
base-metric-namespaces [service-name topic-entity-name channel-name]
message-processing-namespaces (conj base-metric-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name :channel_name channel-name}
metric-namespace (str/join "." message-processing-namespaces)
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
dead-letter-metric "dead-letter"
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
message-processing-namespace "message-processing"
base-metric-namespaces [service-name topic-entity-name channel-name]
message-processing-namespaces (conj base-metric-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name :channel_name channel-name}
metric-namespace (str/join "." message-processing-namespaces)
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
dead-letter-metric "dead-letter"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
user-payload (create-user-payload message-payload (producer/get-channel-retry-count topic-entity channel))]
(clog/with-logging-context {:consumer-group topic-entity-name :channel channel-name}
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags)
Expand Down
61 changes: 32 additions & 29 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@
([exchange message-payload expiration retry-counter]
(when (is-pool-alive? cpool/channel-pool)
(let [start-time (.toEpochMilli (Instant/now))
result (publish-internal exchange message-payload expiration retry-counter)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
_ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name (:topic-entity message-payload))
:exchange-name exchange})]
result (publish-internal exchange message-payload expiration retry-counter)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
_ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name (:topic-entity message-payload))
:exchange-name exchange})]
(when (pos? retry-counter)
(log/info "Retrying publishing the message to " exchange)
(log/info "Retry attempt " retry-counter))
Expand Down Expand Up @@ -151,12 +151,15 @@
(defn- channel-retry-type [topic-entity channel]
(:type (channel-retry-config topic-entity channel)))

(defn- get-channel-retry-count [topic-entity channel]
(defn get-channel-retry-count [topic-entity channel]
(:count (channel-retry-config topic-entity channel)))

(defn get-configured-retry-count []
(-> (ziggurat-config) :retry :count))

(defn- get-channel-queue-timeout-or-default-timeout [topic-entity channel]
(let [channel-queue-timeout-ms (:queue-timeout-ms (channel-retry-config topic-entity channel))
queue-timeout-ms (get-in (rabbitmq-config) [:delay :queue-timeout-ms])]
queue-timeout-ms (get-in (rabbitmq-config) [:delay :queue-timeout-ms])]
(or channel-queue-timeout-ms queue-timeout-ms)))

(defn- get-backoff-exponent
Expand Down Expand Up @@ -193,8 +196,8 @@
(defn get-queue-timeout-ms
"Calculate queue timeout for delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled."
[message-payload]
(let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms)
retry-count (-> (ziggurat-config) :retry :count)
(let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms)
retry-count (-> (ziggurat-config) :retry :count)
message-retry-count (:retry-count message-payload)]
(if (= :exponential (-> (ziggurat-config) :retry :type))
(get-exponential-backoff-timeout-ms retry-count message-retry-count queue-timeout-ms)
Expand All @@ -204,8 +207,8 @@
"Calculate queue timeout for channel delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled."
[topic-entity channel message-payload]
(let [channel-queue-timeout-ms (get-channel-queue-timeout-or-default-timeout topic-entity channel)
message-retry-count (:retry-count message-payload)
channel-retry-count (get-channel-retry-count topic-entity channel)]
message-retry-count (:retry-count message-payload)
channel-retry-count (get-channel-retry-count topic-entity channel)]
(if (= :exponential (channel-retry-type topic-entity channel))
(get-exponential-backoff-timeout-ms channel-retry-count message-retry-count channel-queue-timeout-ms)
channel-queue-timeout-ms)))
Expand All @@ -215,18 +218,18 @@
[topic-entity message-payload]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
exchange-name (util/prefixed-queue-name topic-entity exchange-name)
retry-count (-> (ziggurat-config) :retry :count)]
retry-count (-> (ziggurat-config) :retry :count)]
(if (= :exponential (-> (ziggurat-config) :retry :type))
(let [message-retry-count (:retry-count message-payload)
backoff-exponent (get-backoff-exponent retry-count message-retry-count)]
backoff-exponent (get-backoff-exponent retry-count message-retry-count)]
(util/prefixed-queue-name exchange-name backoff-exponent))
exchange-name)))

(defn get-channel-delay-exchange-name
"This function return delay exchange name for retry when using channel flow. It will return exchange name with retry count as suffix if exponential backoff enabled."
[topic-entity channel message-payload]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)
channel-retry-count (get-channel-retry-count topic-entity channel)]
(if (= :exponential (channel-retry-type topic-entity channel))
(let [message-retry-count (:retry-count message-payload)
Expand All @@ -235,38 +238,38 @@
exchange-name)))

(defn publish-to-delay-queue [message-payload]
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-delay-exchange-name topic-entity message-payload)
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-delay-exchange-name topic-entity message-payload)
queue-timeout-ms (get-queue-timeout-ms message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-dead-queue [message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
topic-entity (:topic-entity message-payload)
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload)))

(defn publish-to-instant-queue [message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
topic-entity (:topic-entity message-payload)
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload)))

(defn publish-to-channel-delay-queue [channel message-payload]
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload)
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload)
queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-channel-dead-queue [channel message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
topic-entity (:topic-entity message-payload)
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload)))

(defn publish-to-channel-instant-queue [channel message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
topic-entity (:topic-entity message-payload)
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload)))

Expand All @@ -286,17 +289,17 @@

(defn- make-delay-queue [topic-entity]
(let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config))
queue-name (delay-queue-name topic-entity queue-name)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)
queue-name (delay-queue-name topic-entity queue-name)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)
dead-letter-exchange-name (util/prefixed-queue-name topic-entity dead-letter-exchange)]
(create-and-bind-queue queue-name exchange-name dead-letter-exchange-name)))

(defn- make-delay-queue-with-retry-count [topic-entity retry-count]
(let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config))
queue-name (delay-queue-name topic-entity queue-name)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)
queue-name (delay-queue-name topic-entity queue-name)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)
dead-letter-exchange-name (util/prefixed-queue-name topic-entity dead-letter-exchange)
sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))]
sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))]
(doseq [s (range 1 sequence)]
(create-and-bind-queue (util/prefixed-queue-name queue-name s) (util/prefixed-queue-name exchange-name s) dead-letter-exchange-name))))

Expand All @@ -308,7 +311,7 @@

(defn- make-queue [topic-identifier queue-type]
(let [{:keys [queue-name exchange-name]} (queue-type (rabbitmq-config))
queue-name (util/prefixed-queue-name topic-identifier queue-name)
queue-name (util/prefixed-queue-name topic-identifier queue-name)
exchange-name (util/prefixed-queue-name topic-identifier exchange-name)]
(create-and-bind-queue queue-name exchange-name)))

Expand Down Expand Up @@ -340,7 +343,7 @@
(defn make-queues [routes]
(when (is-connection-required?)
(doseq [topic-entity (keys routes)]
(let [channels (util/get-channel-names routes topic-entity)
(let [channels (util/get-channel-names routes topic-entity)
retry-type (retry-type)]
(make-channel-queues channels topic-entity)
(when (-> (ziggurat-config) :retry :enabled)
Expand Down

0 comments on commit 494192e

Please sign in to comment.