Skip to content

Commit

Permalink
Add retry states for publish
Browse files Browse the repository at this point in the history
  • Loading branch information
Anmol Vijaywargiya committed Apr 25, 2022
1 parent 8531bdc commit 5519e6c
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions src/ziggurat/messaging/producer.clj
Expand Up @@ -68,7 +68,7 @@
[e message-payload]
(log/error e "Network exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))})
true)
:retry)

(defn return-to-pool [^GenericObjectPool pool ^Channel ch]
(.returnObject pool ch))
Expand All @@ -80,7 +80,7 @@
(try
(lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers))
(properties-for-publish expiration (:headers message-payload)))
false
:success
(catch AlreadyClosedException e
(handle-network-exception e message-payload))
(catch IOException e
Expand All @@ -90,12 +90,12 @@
(catch Exception e
(log/error e "Exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))})
false)
:retry-with-counter)
(finally (return-to-pool cpool/channel-pool ch))))
(catch Exception e
(log/error e "Exception was encountered while borrowing a channel from the pool")
(metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))})
false)))
:retry-with-counter)))

(defn- publish-retry-config []
(-> (ziggurat-config) :rabbit-mq-connection :publish-retry))
Expand All @@ -110,14 +110,16 @@
(publish exchange message-payload expiration (:count (non-recoverable-exception-config))))
([exchange message-payload expiration counter]
(when (is-pool-alive? cpool/channel-pool)
(if (publish-internal exchange message-payload expiration)
(do
(Thread/sleep (:sleep (publish-retry-config)))
(log/info "Retrying publishing the message to " exchange)
(recur exchange message-payload expiration counter))
(when (and (:enabled (non-recoverable-exception-config)) (pos? counter))
(Thread/sleep (:sleep (non-recoverable-exception-config)))
(recur exchange message-payload expiration (dec counter)))))))
(let [result (publish-internal exchange message-payload expiration)]
(cond
(= result :success) nil
(= result :retry) (do
(Thread/sleep (:sleep (publish-retry-config)))
(log/info "Retrying publishing the message to " exchange)
(recur exchange message-payload expiration counter))
(= result :retry-with-counter) (when (and (:enabled (non-recoverable-exception-config)) (pos? counter))
(Thread/sleep (:sleep (non-recoverable-exception-config)))
(recur exchange message-payload expiration (dec counter))))))))

(defn- retry-type []
(-> (ziggurat-config) :retry :type))
Expand Down

0 comments on commit 5519e6c

Please sign in to comment.