From b5be01509b95a1599a37d29f54169c0ed774bf91 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 14:13:50 +0530 Subject: [PATCH 01/17] Adds TimeoutException and makes Channel Borrow Exceptions Retryable --- src/ziggurat/messaging/producer.clj | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index c0ace5ff..db4e4da1 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -13,7 +13,8 @@ [ziggurat.metrics :as metrics]) (:import (com.rabbitmq.client AlreadyClosedException Channel) (java.io IOException) - (org.apache.commons.pool2.impl GenericObjectPool))) + (org.apache.commons.pool2.impl GenericObjectPool) + (java.util.concurrent TimeoutException))) (def MAX_EXPONENTIAL_RETRIES 25) @@ -65,7 +66,7 @@ (defn- handle-network-exception [e message-payload] - (log/error e "Exception was encountered while publishing to RabbitMQ") + (log/error e "Network exception was encountered while publishing to RabbitMQ") (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))}) true) @@ -84,15 +85,17 @@ (handle-network-exception e message-payload)) (catch IOException e (handle-network-exception e message-payload)) + (catch TimeoutException e + (handle-network-exception e message-payload)) (catch Exception e (log/error e "Exception was encountered while publishing to RabbitMQ") (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))}) false) (finally (return-to-pool cpool/channel-pool ch)))) (catch Exception e - (log/error e "Exception occurred while borrowing a channel from the pool") + (log/error e "Exception was encountered while borrowing a channel from the pool") (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) - false))) + true))) (defn publish ([exchange message-payload] From 007579e709912d6ada2ba7c4b4879479b73d257f Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 17:02:17 +0530 Subject: [PATCH 02/17] Making non recoverable exception retries configurable --- resources/config.test.edn | 7 +++- src/ziggurat/config.clj | 6 ++- src/ziggurat/messaging/producer.clj | 22 ++++++++--- test/ziggurat/messaging/producer_test.clj | 47 +++++++++++++++++++---- 4 files changed, 68 insertions(+), 14 deletions(-) diff --git a/resources/config.test.edn b/resources/config.test.edn index 2da0b3f9..cc3e87f6 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -16,7 +16,12 @@ :username "guest" :password "guest" :channel-timeout [2000 :int] - :address-resolver [:dns :keyword]} ;;possible values [:dns :ip-list] + + :address-resolver [:dns :keyword] ;;possible values [:dns :ip-list] + :publish-retry {:sleep [1 :int] + :non-recoverable-exception {:enabled [true :bool] + :sleep [1 :int] + :count [5 :int]}}} :jobs {:instant {:worker-count [4 :int] :prefetch-count [4 :int]}} :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index a7a9053e..ebc2b5a8 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -24,7 +24,11 @@ :rabbit-mq-connection {:port 5672 :username "guest" :password "guest" - :channel-timeout 2000} + :channel-timeout 2000 + :publish-retry {:sleep 5000 + :non-recoverable-exception {:enabled true + :sleep 1000 + :count 5}}} :jobs {:instant {:worker-count 4 :prefetch-count 4}} :rabbit-mq {:delay {:queue-name "%s_delay_queue" diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index db4e4da1..8f996f40 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -95,16 +95,28 @@ (catch Exception e (log/error e "Exception was encountered while borrowing a channel from the pool") (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) - true))) + false))) + +(defn- publish-retry-config [] + (-> (ziggurat-config) :rabbit-mq-connection :publish-retry)) + +(defn- non-recoverable-exception-config [] + (:non-recoverable-exception (publish-retry-config))) (defn publish ([exchange message-payload] (publish exchange message-payload nil)) ([exchange message-payload expiration] - (when (publish-internal exchange message-payload expiration) - (Thread/sleep 5000) - (log/info "Retrying publishing the message to " exchange) - (recur exchange message-payload expiration)))) + (publish exchange message-payload expiration (:count (non-recoverable-exception-config)))) + ([exchange message-payload expiration counter] + (if (publish-internal exchange message-payload expiration) + (do + (Thread/sleep (:sleep (publish-retry-config))) + (log/info "Retrying publishing the message to " exchange) + (recur exchange message-payload expiration counter)) + (when (and (:enabled (non-recoverable-exception-config)) (> counter 0)) + (Thread/sleep (:sleep (non-recoverable-exception-config))) + (recur exchange message-payload expiration (dec counter)))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type)) diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index f3ea7b33..1d7b2e11 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -16,7 +16,8 @@ [ziggurat.metrics :as metrics]) (:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader] (com.rabbitmq.client Channel Connection ShutdownSignalException AlreadyClosedException) - (java.io IOException))) + (java.io IOException) + (java.util.concurrent TimeoutException))) (use-fixtures :once (join-fixtures [fix/init-rabbit-mq fix/silence-logging])) @@ -520,27 +521,59 @@ (let [publish-called (atom 0)] (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 2) + (when (< @publish-called 10) (swap! publish-called inc) (throw (IOException. "io exception")))) metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) - (is (= 2 @publish-called))))) + (is (= 10 @publish-called))))) (testing "publish/producer tries to publish again if already closed exception is received" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 2) + (when (< @publish-called 10) (swap! publish-called inc) (throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil))))) metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) - (is (= 2 @publish-called))))) - (testing "producer/publish does not try again if the exception thrown is neither IOException nor AlreadyClosedException" + (is (= 10 @publish-called))))) + (testing "publish/producer tries to publish again if TimeoutException is received" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 2) + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (TimeoutException. "timeout")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @publish-called))))) + (testing "producer/publish tries again the number of times defined in the config if the exception thrown is non recoverable and if retry is enabled" + (let [publish-called (atom 0) + config (config/ziggurat-config) + count 3 + config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled true + :sleep 1 + :count count}))] + (with-redefs [config/ziggurat-config (fn [] config) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (Exception. "non-io exception")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= (inc count) @publish-called))))) + (testing "producer/publish does not retry again if the exception thrown is non recoverable and if retry is disabled" + (let [publish-called (atom 0) + config (config/ziggurat-config) + count 3 + config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled false + :sleep 1 + :count count}))] + (with-redefs [config/ziggurat-config (fn [] config) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) (swap! publish-called inc) (throw (Exception. "non-io exception")))) metrics/increment-count (fn [_ _ _] nil)] From 11ae126219b7736a4a0ff5bd0bf654a1068c00db Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 17:08:27 +0530 Subject: [PATCH 03/17] Update ReadME --- README.md | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 6d8f14e0..becb34af 100644 --- a/README.md +++ b/README.md @@ -364,14 +364,29 @@ Ziggurat Config | Default Value | Description | Mandatory? :address-resolver [:dns :keyword] ;;possible values [:dns :ip-list]. Default is :dns :channel-pool {:max-wait-ms [5000 :int] :min-idle [10 :int] - :max-idle [20 :int]}}}}} + :max-idle [20 :int]} + :publish-retry {:sleep 5000 + :non-recoverable-exception {:enabled true + :sleep 1000 + :count 5}}}}}} ``` - `:hosts` is a comma separated values of RabbitMQ hostnames (dns-names OR IPs). - `:port` specifies the port number on which the RabbitMQ nodes are running. +<<<<<<< HEAD - `:channel-pool` specifies the properties for the RabbitMQ channel pool used for publishing - `:address-resolver` specifies the strategy to figure out RabbitMQ hosts IP addresses. `:dns` is the default and shoud be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided. +======= +- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster +- publish-retry defines the config for recoverable and non-recoverable exceptions. + - Recoverable exceptions + - `:sleep` - defines the time period after which a retry should happen + - Non-recoverable exceptions + - `:enabled` - defines whether retries should happen + - `:sleep` - defines the time period after which a retry should happen + - `:count` - defines the number of retries +>>>>>>> c27fd46... Update ReadME ## Exponential Backoff based Retries From 0edd91dd806b3bc0d3ea12c6e6e29bb97ce90bc9 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 18:04:28 +0530 Subject: [PATCH 04/17] Publish only if channel pool is alive --- src/ziggurat/messaging/channel_pool.clj | 5 +++++ src/ziggurat/messaging/producer.clj | 19 ++++++++++--------- test/ziggurat/messaging/channel_pool_test.clj | 18 +++++++++++++++++- test/ziggurat/messaging/producer_test.clj | 15 ++++++++++++++- 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/src/ziggurat/messaging/channel_pool.clj b/src/ziggurat/messaging/channel_pool.clj index 45ec8063..528c88e5 100644 --- a/src/ziggurat/messaging/channel_pool.clj +++ b/src/ziggurat/messaging/channel_pool.clj @@ -39,6 +39,9 @@ rmq-chan-pool (GenericObjectPool. (RabbitMQChannelFactory. connection) pool-config)] rmq-chan-pool)) +(defn is-pool-alive? [channel-pool] + (= (type channel-pool) GenericObjectPool)) + (defn destroy-channel-pool [channel-pool] (.close channel-pool)) @@ -49,3 +52,5 @@ (create-channel-pool c/producer-connection)) :stop (do (log/info "Stopping channel pool") (destroy-channel-pool channel-pool))) + + diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 8f996f40..d43e5c69 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -5,7 +5,7 @@ [langohr.exchange :as le] [langohr.http :as lh] [langohr.queue :as lq] - [ziggurat.messaging.channel_pool :as cpool] + [ziggurat.messaging.channel_pool :as cpool :refer [is-pool-alive?]] [taoensso.nippy :as nippy] [ziggurat.config :refer [config ziggurat-config rabbitmq-config channel-retry-config]] [ziggurat.messaging.connection :refer [producer-connection is-connection-required?]] @@ -109,14 +109,15 @@ ([exchange message-payload expiration] (publish exchange message-payload expiration (:count (non-recoverable-exception-config)))) ([exchange message-payload expiration counter] - (if (publish-internal exchange message-payload expiration) - (do - (Thread/sleep (:sleep (publish-retry-config))) - (log/info "Retrying publishing the message to " exchange) - (recur exchange message-payload expiration counter)) - (when (and (:enabled (non-recoverable-exception-config)) (> counter 0)) - (Thread/sleep (:sleep (non-recoverable-exception-config))) - (recur exchange message-payload expiration (dec counter)))))) + (when (is-pool-alive? cpool/channel-pool) + (if (publish-internal exchange message-payload expiration) + (do + (Thread/sleep (:sleep (publish-retry-config))) + (log/info "Retrying publishing the message to " exchange) + (recur exchange message-payload expiration counter)) + (when (and (:enabled (non-recoverable-exception-config)) (> counter 0)) + (Thread/sleep (:sleep (non-recoverable-exception-config))) + (recur exchange message-payload expiration (dec counter))))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type)) diff --git a/test/ziggurat/messaging/channel_pool_test.clj b/test/ziggurat/messaging/channel_pool_test.clj index 2dd49c7f..bc4b2645 100644 --- a/test/ziggurat/messaging/channel_pool_test.clj +++ b/test/ziggurat/messaging/channel_pool_test.clj @@ -1,6 +1,6 @@ (ns ziggurat.messaging.channel-pool-test (:require [clojure.test :refer :all] - [ziggurat.messaging.channel_pool :as cpool] + [ziggurat.messaging.channel_pool :as cpool :refer [channel-pool]] [ziggurat.messaging.connection :refer [producer-connection]] [ziggurat.fixtures :as fix]) (:import (org.apache.commons.pool2.impl GenericObjectPoolConfig GenericObjectPool) @@ -60,4 +60,20 @@ (is (not (.equals rmq-chan-2 rmq-chan))) (is (.isOpen rmq-chan-2))))) +(deftest is-pool-alive? + (testing "it should return true when channel-pool state has started" + (let [states {#'producer-connection #'channel-pool}] + (mount.core/start states) + (is (cpool/is-pool-alive? channel-pool)) + (mount.core/stop states))) + (testing "it should return false when channel-pool state has not been started" + (is (false? (cpool/is-pool-alive? channel-pool)))) + (testing "it should return false when channel-pool state has been stopped" + (let [states {#'producer-connection #'channel-pool}] + (mount.core/start states) + (mount.core/stop states) + (is (false? (cpool/is-pool-alive? channel-pool)))))) + + + diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index 1d7b2e11..d1714427 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -7,6 +7,7 @@ [ziggurat.fixtures :as fix] [ziggurat.messaging.connection :refer [producer-connection]] [ziggurat.messaging.producer :as producer] + [ziggurat.messaging.channel_pool :as cpool] [ziggurat.messaging.util :as util] [ziggurat.util.rabbitmq :as rmq] [langohr.basic :as lb] @@ -563,6 +564,7 @@ metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= (inc count) @publish-called))))) + (testing "producer/publish does not retry again if the exception thrown is non recoverable and if retry is disabled" (let [publish-called (atom 0) config (config/ziggurat-config) @@ -578,7 +580,18 @@ (throw (Exception. "non-io exception")))) metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) - (is (= 1 @publish-called)))))) + (is (= 1 @publish-called))))) + (testing "producer/publish does not publish even once if channel pool is not alive" + (let [publish-called (atom 0)] + (with-redefs [cpool/is-pool-alive? (fn [_] false) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (Exception. "non-io exception")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 0 @publish-called)))))) (deftest publish-to-delay-queue-test (testing "creates a span when tracer is enabled" From 7e3e9fd7e35a30b3e25a0d072a75189115c5e0e4 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 18:09:54 +0530 Subject: [PATCH 05/17] Fix lint --- src/ziggurat/messaging/producer.clj | 2 +- test/ziggurat/messaging/channel_pool_test.clj | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index d43e5c69..6785c8b4 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -115,7 +115,7 @@ (Thread/sleep (:sleep (publish-retry-config))) (log/info "Retrying publishing the message to " exchange) (recur exchange message-payload expiration counter)) - (when (and (:enabled (non-recoverable-exception-config)) (> counter 0)) + (when (and (:enabled (non-recoverable-exception-config)) (pos? counter)) (Thread/sleep (:sleep (non-recoverable-exception-config))) (recur exchange message-payload expiration (dec counter))))))) diff --git a/test/ziggurat/messaging/channel_pool_test.clj b/test/ziggurat/messaging/channel_pool_test.clj index bc4b2645..f8de8971 100644 --- a/test/ziggurat/messaging/channel_pool_test.clj +++ b/test/ziggurat/messaging/channel_pool_test.clj @@ -73,7 +73,3 @@ (mount.core/start states) (mount.core/stop states) (is (false? (cpool/is-pool-alive? channel-pool)))))) - - - - From cdc52e1084809272485d202dcef728f72add93a1 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 18:14:19 +0530 Subject: [PATCH 06/17] Update README --- README.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index becb34af..be0361ab 100644 --- a/README.md +++ b/README.md @@ -373,20 +373,17 @@ Ziggurat Config | Default Value | Description | Mandatory? - `:hosts` is a comma separated values of RabbitMQ hostnames (dns-names OR IPs). - `:port` specifies the port number on which the RabbitMQ nodes are running. -<<<<<<< HEAD - `:channel-pool` specifies the properties for the RabbitMQ channel pool used for publishing - `:address-resolver` specifies the strategy to figure out RabbitMQ hosts IP addresses. `:dns` is the default and shoud be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided. -======= -- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster -- publish-retry defines the config for recoverable and non-recoverable exceptions. +- `:publish-retry` defines the config for recoverable and non-recoverable exceptions. - Recoverable exceptions - `:sleep` - defines the time period after which a retry should happen - Non-recoverable exceptions - `:enabled` - defines whether retries should happen - `:sleep` - defines the time period after which a retry should happen - `:count` - defines the number of retries ->>>>>>> c27fd46... Update ReadME +- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster ## Exponential Backoff based Retries From ce9b0be31048757104d1e3933d7fa017b5df6ec7 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Mon, 25 Apr 2022 13:26:52 +0530 Subject: [PATCH 07/17] Add retry states for publish --- src/ziggurat/messaging/producer.clj | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 6785c8b4..4df9db8e 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -68,7 +68,7 @@ [e message-payload] (log/error e "Network exception was encountered while publishing to RabbitMQ") (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))}) - true) + :retry) (defn return-to-pool [^GenericObjectPool pool ^Channel ch] (.returnObject pool ch)) @@ -80,7 +80,7 @@ (try (lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) (properties-for-publish expiration (:headers message-payload))) - false + :success (catch AlreadyClosedException e (handle-network-exception e message-payload)) (catch IOException e @@ -90,12 +90,12 @@ (catch Exception e (log/error e "Exception was encountered while publishing to RabbitMQ") (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))}) - false) + :retry-with-counter) (finally (return-to-pool cpool/channel-pool ch)))) (catch Exception e (log/error e "Exception was encountered while borrowing a channel from the pool") (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) - false))) + :retry-with-counter))) (defn- publish-retry-config [] (-> (ziggurat-config) :rabbit-mq-connection :publish-retry)) @@ -110,14 +110,16 @@ (publish exchange message-payload expiration (:count (non-recoverable-exception-config)))) ([exchange message-payload expiration counter] (when (is-pool-alive? cpool/channel-pool) - (if (publish-internal exchange message-payload expiration) - (do - (Thread/sleep (:sleep (publish-retry-config))) - (log/info "Retrying publishing the message to " exchange) - (recur exchange message-payload expiration counter)) - (when (and (:enabled (non-recoverable-exception-config)) (pos? counter)) - (Thread/sleep (:sleep (non-recoverable-exception-config))) - (recur exchange message-payload expiration (dec counter))))))) + (let [result (publish-internal exchange message-payload expiration)] + (cond + (= result :success) nil + (= result :retry) (do + (Thread/sleep (:sleep (publish-retry-config))) + (log/info "Retrying publishing the message to " exchange) + (recur exchange message-payload expiration counter)) + (= result :retry-with-counter) (when (and (:enabled (non-recoverable-exception-config)) (pos? counter)) + (Thread/sleep (:sleep (non-recoverable-exception-config))) + (recur exchange message-payload expiration (dec counter)))))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type)) From 3b376ac8d1ebf26eea8db83278a60df169cb91d6 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Mon, 25 Apr 2022 13:34:13 +0530 Subject: [PATCH 08/17] Clojure doc for the method publish --- src/ziggurat/messaging/producer.clj | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 4df9db8e..d20fc87a 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -104,6 +104,13 @@ (:non-recoverable-exception (publish-retry-config))) (defn publish + "This is meant for publishing to rabbitmq. + * Checks if the pool is alive - We do this so that publish does not happen after the channel pool state is stopped. + * publish-internal returns multiple states + * :success - Message has been successfully produced to rabbitmq + * :retry - A retryable exception was encountered and message will be retried until it is successfully published. + * :retry-with-counter - A non recoverable exception is encountered, but the message will be retried for a few times. defined by the counter + { :rabbit-mq-connection { :publish-retry { :non-recoverable-exception {:count}}}}}" ([exchange message-payload] (publish exchange message-payload nil)) ([exchange message-payload expiration] From 58a8826ee98a0245c9a3c3d42d5686fb53b1509f Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Tue, 26 Apr 2022 13:52:16 +0530 Subject: [PATCH 09/17] Adds error log --- src/ziggurat/messaging/producer.clj | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index d20fc87a..2bf88699 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -124,9 +124,11 @@ (Thread/sleep (:sleep (publish-retry-config))) (log/info "Retrying publishing the message to " exchange) (recur exchange message-payload expiration counter)) - (= result :retry-with-counter) (when (and (:enabled (non-recoverable-exception-config)) (pos? counter)) - (Thread/sleep (:sleep (non-recoverable-exception-config))) - (recur exchange message-payload expiration (dec counter)))))))) + (= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config)) (pos? counter)) + (do + (Thread/sleep (:sleep (non-recoverable-exception-config))) + (recur exchange message-payload expiration (dec counter))) + (log/error "Publishing the message has failed. It is being dropped"))))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type)) From fcc2c5af97cdc6b4e8475aa293abb6b9dfa1c7a4 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 14:13:50 +0530 Subject: [PATCH 10/17] Adds TimeoutException and makes Channel Borrow Exceptions Retryable --- src/ziggurat/messaging/producer.clj | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 2bf88699..09daed10 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -103,6 +103,7 @@ (defn- non-recoverable-exception-config [] (:non-recoverable-exception (publish-retry-config))) + (defn publish "This is meant for publishing to rabbitmq. * Checks if the pool is alive - We do this so that publish does not happen after the channel pool state is stopped. From c59b380429b6c13831c88841422b595d8f6869ad Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 17:02:17 +0530 Subject: [PATCH 11/17] Making non recoverable exception retries configurable --- src/ziggurat/messaging/producer.clj | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 09daed10..f4134bf1 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -97,6 +97,7 @@ (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) :retry-with-counter))) + (defn- publish-retry-config [] (-> (ziggurat-config) :rabbit-mq-connection :publish-retry)) @@ -131,6 +132,7 @@ (recur exchange message-payload expiration (dec counter))) (log/error "Publishing the message has failed. It is being dropped"))))))) + (defn- retry-type [] (-> (ziggurat-config) :retry :type)) From 5159e77d670fa0244619ac47ad68cedfefb419a7 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 17:08:27 +0530 Subject: [PATCH 12/17] Update ReadME --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index be0361ab..1506e69a 100644 --- a/README.md +++ b/README.md @@ -376,7 +376,7 @@ Ziggurat Config | Default Value | Description | Mandatory? - `:channel-pool` specifies the properties for the RabbitMQ channel pool used for publishing - `:address-resolver` specifies the strategy to figure out RabbitMQ hosts IP addresses. `:dns` is the default and shoud be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided. -- `:publish-retry` defines the config for recoverable and non-recoverable exceptions. +- `:publish-retry` defines the config for recoverable and non-recoverable exceptions. - Recoverable exceptions - `:sleep` - defines the time period after which a retry should happen - Non-recoverable exceptions From c9e4a9cbe9cdc132d47227be664b95d501463c4a Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 18:04:28 +0530 Subject: [PATCH 13/17] Publish only if channel pool is alive --- src/ziggurat/messaging/producer.clj | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index f4134bf1..25d3d02b 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -132,7 +132,6 @@ (recur exchange message-payload expiration (dec counter))) (log/error "Publishing the message has failed. It is being dropped"))))))) - (defn- retry-type [] (-> (ziggurat-config) :retry :type)) From 280c35dd974703bdb26a18a3734022db97eecd02 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 18:14:19 +0530 Subject: [PATCH 14/17] Update README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1506e69a..be0361ab 100644 --- a/README.md +++ b/README.md @@ -376,7 +376,7 @@ Ziggurat Config | Default Value | Description | Mandatory? - `:channel-pool` specifies the properties for the RabbitMQ channel pool used for publishing - `:address-resolver` specifies the strategy to figure out RabbitMQ hosts IP addresses. `:dns` is the default and shoud be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided. -- `:publish-retry` defines the config for recoverable and non-recoverable exceptions. +- `:publish-retry` defines the config for recoverable and non-recoverable exceptions. - Recoverable exceptions - `:sleep` - defines the time period after which a retry should happen - Non-recoverable exceptions From b1e38717c63ed08fbdc8626e58a569f78d3d7772 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Mon, 25 Apr 2022 13:26:52 +0530 Subject: [PATCH 15/17] Add retry states for publish --- src/ziggurat/messaging/producer.clj | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 25d3d02b..7fbe5023 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -96,7 +96,10 @@ (log/error e "Exception was encountered while borrowing a channel from the pool") (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) :retry-with-counter))) +<<<<<<< HEAD +======= +>>>>>>> 6a8eb91... Add retry states for publish (defn- publish-retry-config [] (-> (ziggurat-config) :rabbit-mq-connection :publish-retry)) From b10d80b6b50824f6ca21fba0e0df8fadfe75cda4 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Wed, 18 May 2022 16:57:26 +0530 Subject: [PATCH 16/17] Few improvements in the error handling of rabbitmq publish flow 1. Addressed PR comments 2. Changed max-idle to be equal to total number of threads. 3. Fixed an issue where in a few errors thrown while borrowing from channel were not being retried indefinitely --- README.md | 8 +-- project.clj | 2 +- resources/config.test.edn | 4 +- src/ziggurat/config.clj | 4 +- src/ziggurat/messaging/channel_pool.clj | 6 +- src/ziggurat/messaging/connection.clj | 2 - src/ziggurat/messaging/producer.clj | 62 +++++++++++-------- test/ziggurat/messaging/channel_pool_test.clj | 2 +- test/ziggurat/messaging/producer_test.clj | 42 +++++++++++-- 9 files changed, 86 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index be0361ab..d3c3af14 100644 --- a/README.md +++ b/README.md @@ -365,9 +365,9 @@ Ziggurat Config | Default Value | Description | Mandatory? :channel-pool {:max-wait-ms [5000 :int] :min-idle [10 :int] :max-idle [20 :int]} - :publish-retry {:sleep 5000 + :publish-retry {:back-off-ms 5000 :non-recoverable-exception {:enabled true - :sleep 1000 + :back-off-ms 1000 :count 5}}}}}} ``` @@ -378,10 +378,10 @@ Ziggurat Config | Default Value | Description | Mandatory? be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided. - `:publish-retry` defines the config for recoverable and non-recoverable exceptions. - Recoverable exceptions - - `:sleep` - defines the time period after which a retry should happen + - `:back-off-ms` - defines the time period after which a retry should happen - Non-recoverable exceptions - `:enabled` - defines whether retries should happen - - `:sleep` - defines the time period after which a retry should happen + - `:back-off-ms` - defines the time period after which a retry should happen - `:count` - defines the number of retries - By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster diff --git a/project.clj b/project.clj index 63929eb1..2439f928 100644 --- a/project.clj +++ b/project.clj @@ -2,7 +2,7 @@ (cemerick.pomegranate.aether/register-wagon-factory! "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) -(defproject tech.gojek/ziggurat "4.6.4" +(defproject tech.gojek/ziggurat "4.7.0" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0" diff --git a/resources/config.test.edn b/resources/config.test.edn index cc3e87f6..fdcd2505 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -18,9 +18,9 @@ :channel-timeout [2000 :int] :address-resolver [:dns :keyword] ;;possible values [:dns :ip-list] - :publish-retry {:sleep [1 :int] + :publish-retry {:back-off-ms [1 :int] :non-recoverable-exception {:enabled [true :bool] - :sleep [1 :int] + :back-off-ms [1 :int] :count [5 :int]}}} :jobs {:instant {:worker-count [4 :int] :prefetch-count [4 :int]}} diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index ebc2b5a8..6460ef7b 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -25,9 +25,9 @@ :username "guest" :password "guest" :channel-timeout 2000 - :publish-retry {:sleep 5000 + :publish-retry {:back-off-ms 5000 :non-recoverable-exception {:enabled true - :sleep 1000 + :back-off-ms 5000 :count 5}}} :jobs {:instant {:worker-count 4 :prefetch-count 4}} diff --git a/src/ziggurat/messaging/channel_pool.clj b/src/ziggurat/messaging/channel_pool.clj index 528c88e5..dde6c3ad 100644 --- a/src/ziggurat/messaging/channel_pool.clj +++ b/src/ziggurat/messaging/channel_pool.clj @@ -25,7 +25,7 @@ (defn create-object-pool-config [config] (let [standby-size 10 total-thread-count (calc-total-thread-count) - merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle (* standby-size 2)} config)] + merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle total-thread-count} config)] (doto (GenericObjectPoolConfig.) (.setMaxWait (Duration/ofMillis (:max-wait-ms merged-config))) (.setMinIdle (:min-idle merged-config)) @@ -35,7 +35,9 @@ (.setJmxNamePrefix "zig-rabbitmq-ch-pool")))) (defn create-channel-pool [^Connection connection] - (let [pool-config (create-object-pool-config (get-in (ziggurat-config) [:rabbit-mq-connection :channel-pool])) + (let [pool-config (create-object-pool-config + (get-in (ziggurat-config) + [:rabbit-mq-connection :channel-pool])) rmq-chan-pool (GenericObjectPool. (RabbitMQChannelFactory. connection) pool-config)] rmq-chan-pool)) diff --git a/src/ziggurat/messaging/connection.clj b/src/ziggurat/messaging/connection.clj index 22a7eae4..bc2eb335 100644 --- a/src/ziggurat/messaging/connection.clj +++ b/src/ziggurat/messaging/connection.clj @@ -87,14 +87,12 @@ (log/info "Closing the RabbitMQ connection") (rmq/close conn))) -(declare consumer-connection) (defstate consumer-connection :start (do (log/info "Creating consumer connection") (start-connection false)) :stop (do (log/info "Stopping consume connection") (stop-connection consumer-connection))) -(declare producer-connection) (defstate producer-connection :start (do (log/info "Creating producer connection") (start-connection true)) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 7fbe5023..fda943c4 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -65,36 +65,37 @@ props))) (defn- handle-network-exception - [e message-payload] + [e message-payload retry-counter] (log/error e "Network exception was encountered while publishing to RabbitMQ") - (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))}) + (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload)) + :retry-attempt retry-counter}) :retry) (defn return-to-pool [^GenericObjectPool pool ^Channel ch] (.returnObject pool ch)) +(defn borrow-from-pool [^GenericObjectPool pool] + (.borrowObject pool)) + (defn- publish-internal - [exchange message-payload expiration] + [exchange message-payload expiration retry-counter] (try - (let [ch (.borrowObject cpool/channel-pool)] + (let [ch (borrow-from-pool cpool/channel-pool)] (try (lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) (properties-for-publish expiration (:headers message-payload))) :success - (catch AlreadyClosedException e - (handle-network-exception e message-payload)) - (catch IOException e - (handle-network-exception e message-payload)) - (catch TimeoutException e - (handle-network-exception e message-payload)) - (catch Exception e - (log/error e "Exception was encountered while publishing to RabbitMQ") - (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))}) - :retry-with-counter) (finally (return-to-pool cpool/channel-pool ch)))) + (catch AlreadyClosedException e + (handle-network-exception e message-payload retry-counter)) + (catch IOException e + (handle-network-exception e message-payload retry-counter)) + (catch TimeoutException e + (handle-network-exception e message-payload retry-counter)) (catch Exception e - (log/error e "Exception was encountered while borrowing a channel from the pool") - (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) + (log/error e "Exception was encountered while publishing to RabbitMQ") + (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload)) + :retry-counter retry-counter}) :retry-with-counter))) <<<<<<< HEAD @@ -119,21 +120,30 @@ ([exchange message-payload] (publish exchange message-payload nil)) ([exchange message-payload expiration] - (publish exchange message-payload expiration (:count (non-recoverable-exception-config)))) - ([exchange message-payload expiration counter] + (publish exchange message-payload expiration 0)) + ([exchange message-payload expiration retry-counter] (when (is-pool-alive? cpool/channel-pool) - (let [result (publish-internal exchange message-payload expiration)] + (let [result (publish-internal exchange message-payload expiration retry-counter)] + (when (pos? retry-counter) + (do + (log/info "Retrying publishing the message to " exchange) + (log/info "Retry attempt " retry-counter))) + (log/info "Publish result " result) (cond (= result :success) nil (= result :retry) (do - (Thread/sleep (:sleep (publish-retry-config))) - (log/info "Retrying publishing the message to " exchange) - (recur exchange message-payload expiration counter)) - (= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config)) (pos? counter)) + (Thread/sleep (:back-off-ms (publish-retry-config))) + (recur exchange message-payload expiration (inc retry-counter))) + (= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config)) + (< retry-counter (:count (non-recoverable-exception-config)))) + (do + (log/info "Backing off") + (Thread/sleep (:back-off-ms (non-recoverable-exception-config))) + (recur exchange message-payload expiration (inc retry-counter))) (do - (Thread/sleep (:sleep (non-recoverable-exception-config))) - (recur exchange message-payload expiration (dec counter))) - (log/error "Publishing the message has failed. It is being dropped"))))))) + (log/error "Publishing the message has failed. It is being dropped") + (metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload)) + :retry-counter retry-counter})))))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type)) diff --git a/test/ziggurat/messaging/channel_pool_test.clj b/test/ziggurat/messaging/channel_pool_test.clj index f8de8971..70145a68 100644 --- a/test/ziggurat/messaging/channel_pool_test.clj +++ b/test/ziggurat/messaging/channel_pool_test.clj @@ -18,7 +18,7 @@ (deftest create-object-pool-config-test (testing "it should create a PoolConfig with default values" - (let [expected-config {:min-idle 10 :max-idle 20 :max-total 54 :max-wait-ms 5000} + (let [expected-config {:min-idle 10 :max-idle 44 :max-total 54 :max-wait-ms 5000} pool-config-object ^GenericObjectPoolConfig (cpool/create-object-pool-config {}) min-idle (.getMinIdle pool-config-object) max-idle (.getMaxIdle pool-config-object) diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index d1714427..1262af2e 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -518,7 +518,7 @@ (is (true? @publish-called?)))))) (deftest publish-behaviour-on-rabbitmq-disconnection-test - (testing "producer/publish tries to publish again if IOException is thrown" + (testing "producer/publish tries to publish again if IOException is thrown via lb/publish" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] @@ -528,7 +528,7 @@ metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @publish-called))))) - (testing "publish/producer tries to publish again if already closed exception is received" + (testing "publish/producer tries to publish again if already closed exception is received via lb/publish" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] @@ -538,7 +538,7 @@ metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @publish-called))))) - (testing "publish/producer tries to publish again if TimeoutException is received" + (testing "publish/producer tries to publish again if TimeoutException is received via lb/publish" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] @@ -548,12 +548,42 @@ metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @publish-called))))) - (testing "producer/publish tries again the number of times defined in the config if the exception thrown is non recoverable and if retry is enabled" + (testing "producer/publish tries to publish again if IOException is thrown while borrowing from channel" + (let [borrow-from-pool-called (atom 0)] + (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (IOException. "io exception")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @borrow-from-pool-called))))) + (testing "publish/producer tries to publish again if already closed exception is received while borrowing from channel" + (let [borrow-from-pool-called (atom 0)] + (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil))))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @borrow-from-pool-called))))) + (testing "publish/producer tries to publish again if TimeoutException is received while borrowing from channel" + (let [borrow-from-pool-called (atom 0)] + (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (TimeoutException. "timeout")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @borrow-from-pool-called))))) + (testing "producer/publish retries publishing for a certain number of times (configurable) when a non-recoverable exception is thrown" (let [publish-called (atom 0) config (config/ziggurat-config) count 3 config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled true - :sleep 1 + :back-off-ms 1 :count count}))] (with-redefs [config/ziggurat-config (fn [] config) lch/open (fn [^Connection _] (reify Channel (close [_] nil))) @@ -570,7 +600,7 @@ config (config/ziggurat-config) count 3 config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled false - :sleep 1 + :back-off-ms 1 :count count}))] (with-redefs [config/ziggurat-config (fn [] config) lch/open (fn [^Connection _] (reify Channel (close [_] nil))) From 7dc805cbb1f985613f0f5efaac7f90a8e573f1de Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Wed, 18 May 2022 17:03:59 +0530 Subject: [PATCH 17/17] Fixed linting issue --- src/ziggurat/messaging/producer.clj | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index fda943c4..bc01a654 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -97,10 +97,6 @@ (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload)) :retry-counter retry-counter}) :retry-with-counter))) -<<<<<<< HEAD - -======= ->>>>>>> 6a8eb91... Add retry states for publish (defn- publish-retry-config [] (-> (ziggurat-config) :rabbit-mq-connection :publish-retry)) @@ -125,9 +121,8 @@ (when (is-pool-alive? cpool/channel-pool) (let [result (publish-internal exchange message-payload expiration retry-counter)] (when (pos? retry-counter) - (do - (log/info "Retrying publishing the message to " exchange) - (log/info "Retry attempt " retry-counter))) + (log/info "Retrying publishing the message to " exchange) + (log/info "Retry attempt " retry-counter)) (log/info "Publish result " result) (cond (= result :success) nil