Skip to content

Commit

Permalink
Merge fb0f7c3 into 7f7332c
Browse files Browse the repository at this point in the history
  • Loading branch information
anmol1vw13 committed Jun 10, 2022
2 parents 7f7332c + fb0f7c3 commit 4037821
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 206 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "4.7.4"
(defproject tech.gojek/ziggurat "4.7.5"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
27 changes: 15 additions & 12 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
[langohr.basic :as lb]
[langohr.channel :as lch]
[langohr.exchange :as le]
[langohr.http :as lh]
[langohr.queue :as lq]
[ziggurat.messaging.channel_pool :as cpool :refer [is-pool-alive?]]
[taoensso.nippy :as nippy]
[ziggurat.config :refer [config ziggurat-config rabbitmq-config channel-retry-config]]
[ziggurat.messaging.connection :refer [producer-connection is-connection-required?]]
[ziggurat.config :refer [channel-retry-config rabbitmq-config ziggurat-config]]
[ziggurat.messaging.channel_pool :as cpool :refer [is-pool-alive?]]
[ziggurat.messaging.connection :refer [is-connection-required? producer-connection]]
[ziggurat.messaging.util :as util]
[ziggurat.metrics :as metrics])
(:import (com.rabbitmq.client AlreadyClosedException Channel)
(java.io IOException)
(org.apache.commons.pool2.impl GenericObjectPool)
(java.util.concurrent TimeoutException)))
(java.time Instant)
(java.util.concurrent TimeoutException)
(org.apache.commons.pool2.impl GenericObjectPool)))

(def MAX_EXPONENTIAL_RETRIES 25)

Expand Down Expand Up @@ -67,8 +67,7 @@
(defn- handle-network-exception
[e message-payload retry-counter]
(log/error e "Network exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))
:retry-attempt retry-counter})
(metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))})
:retry)

(defn return-to-pool [^GenericObjectPool pool ^Channel ch]
Expand All @@ -94,8 +93,7 @@
(handle-network-exception e message-payload retry-counter))
(catch Exception e
(log/error e "Exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))
:retry-counter retry-counter})
(metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))})
:retry-with-counter)))

(defn- publish-retry-config []
Expand All @@ -118,7 +116,12 @@
(publish exchange message-payload expiration 0))
([exchange message-payload expiration retry-counter]
(when (is-pool-alive? cpool/channel-pool)
(let [result (publish-internal exchange message-payload expiration retry-counter)]
(let [start-time (.toEpochMilli (Instant/now))
result (publish-internal exchange message-payload expiration retry-counter)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
_ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name (:topic-entity message-payload))
:exchange-name exchange})]
(when (pos? retry-counter)
(log/info "Retrying publishing the message to " exchange)
(log/info "Retry attempt " retry-counter))
Expand All @@ -136,7 +139,7 @@
(recur exchange message-payload expiration (inc retry-counter)))
(do
(log/error "Publishing the message has failed. It is being dropped")
(metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload))
(metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload))
:retry-counter retry-counter}))))))))

(defn- retry-type []
Expand Down
Loading

0 comments on commit 4037821

Please sign in to comment.