diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 6785c8b4..4df9db8e 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -68,7 +68,7 @@ [e message-payload] (log/error e "Network exception was encountered while publishing to RabbitMQ") (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))}) - true) + :retry) (defn return-to-pool [^GenericObjectPool pool ^Channel ch] (.returnObject pool ch)) @@ -80,7 +80,7 @@ (try (lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) (properties-for-publish expiration (:headers message-payload))) - false + :success (catch AlreadyClosedException e (handle-network-exception e message-payload)) (catch IOException e @@ -90,12 +90,12 @@ (catch Exception e (log/error e "Exception was encountered while publishing to RabbitMQ") (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))}) - false) + :retry-with-counter) (finally (return-to-pool cpool/channel-pool ch)))) (catch Exception e (log/error e "Exception was encountered while borrowing a channel from the pool") (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) - false))) + :retry-with-counter))) (defn- publish-retry-config [] (-> (ziggurat-config) :rabbit-mq-connection :publish-retry)) @@ -110,14 +110,16 @@ (publish exchange message-payload expiration (:count (non-recoverable-exception-config)))) ([exchange message-payload expiration counter] (when (is-pool-alive? cpool/channel-pool) - (if (publish-internal exchange message-payload expiration) - (do - (Thread/sleep (:sleep (publish-retry-config))) - (log/info "Retrying publishing the message to " exchange) - (recur exchange message-payload expiration counter)) - (when (and (:enabled (non-recoverable-exception-config)) (pos? counter)) - (Thread/sleep (:sleep (non-recoverable-exception-config))) - (recur exchange message-payload expiration (dec counter))))))) + (let [result (publish-internal exchange message-payload expiration)] + (cond + (= result :success) nil + (= result :retry) (do + (Thread/sleep (:sleep (publish-retry-config))) + (log/info "Retrying publishing the message to " exchange) + (recur exchange message-payload expiration counter)) + (= result :retry-with-counter) (when (and (:enabled (non-recoverable-exception-config)) (pos? counter)) + (Thread/sleep (:sleep (non-recoverable-exception-config))) + (recur exchange message-payload expiration (dec counter)))))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type))