From 1e2882ea1ed33137539c8024d8ee7c519a37b50c Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 14:13:50 +0530 Subject: [PATCH 1/8] Adds TimeoutException and makes Channel Borrow Exceptions Retryable --- src/ziggurat/messaging/producer.clj | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index c0ace5ff..db4e4da1 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -13,7 +13,8 @@ [ziggurat.metrics :as metrics]) (:import (com.rabbitmq.client AlreadyClosedException Channel) (java.io IOException) - (org.apache.commons.pool2.impl GenericObjectPool))) + (org.apache.commons.pool2.impl GenericObjectPool) + (java.util.concurrent TimeoutException))) (def MAX_EXPONENTIAL_RETRIES 25) @@ -65,7 +66,7 @@ (defn- handle-network-exception [e message-payload] - (log/error e "Exception was encountered while publishing to RabbitMQ") + (log/error e "Network exception was encountered while publishing to RabbitMQ") (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))}) true) @@ -84,15 +85,17 @@ (handle-network-exception e message-payload)) (catch IOException e (handle-network-exception e message-payload)) + (catch TimeoutException e + (handle-network-exception e message-payload)) (catch Exception e (log/error e "Exception was encountered while publishing to RabbitMQ") (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))}) false) (finally (return-to-pool cpool/channel-pool ch)))) (catch Exception e - (log/error e "Exception occurred while borrowing a channel from the pool") + (log/error e "Exception was encountered while borrowing a channel from the pool") (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) - false))) + true))) (defn publish ([exchange message-payload] From 4f0243dffa690bd286877e5d8565e5c8a247d492 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 17:02:17 +0530 Subject: [PATCH 2/8] Making non recoverable exception retries configurable --- resources/config.test.edn | 7 +++- src/ziggurat/config.clj | 6 ++- src/ziggurat/messaging/producer.clj | 22 ++++++++--- test/ziggurat/messaging/producer_test.clj | 47 +++++++++++++++++++---- 4 files changed, 68 insertions(+), 14 deletions(-) diff --git a/resources/config.test.edn b/resources/config.test.edn index 2da0b3f9..cc3e87f6 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -16,7 +16,12 @@ :username "guest" :password "guest" :channel-timeout [2000 :int] - :address-resolver [:dns :keyword]} ;;possible values [:dns :ip-list] + + :address-resolver [:dns :keyword] ;;possible values [:dns :ip-list] + :publish-retry {:sleep [1 :int] + :non-recoverable-exception {:enabled [true :bool] + :sleep [1 :int] + :count [5 :int]}}} :jobs {:instant {:worker-count [4 :int] :prefetch-count [4 :int]}} :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index a7a9053e..ebc2b5a8 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -24,7 +24,11 @@ :rabbit-mq-connection {:port 5672 :username "guest" :password "guest" - :channel-timeout 2000} + :channel-timeout 2000 + :publish-retry {:sleep 5000 + :non-recoverable-exception {:enabled true + :sleep 1000 + :count 5}}} :jobs {:instant {:worker-count 4 :prefetch-count 4}} :rabbit-mq {:delay {:queue-name "%s_delay_queue" diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index db4e4da1..8f996f40 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -95,16 +95,28 @@ (catch Exception e (log/error e "Exception was encountered while borrowing a channel from the pool") (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) - true))) + false))) + +(defn- publish-retry-config [] + (-> (ziggurat-config) :rabbit-mq-connection :publish-retry)) + +(defn- non-recoverable-exception-config [] + (:non-recoverable-exception (publish-retry-config))) (defn publish ([exchange message-payload] (publish exchange message-payload nil)) ([exchange message-payload expiration] - (when (publish-internal exchange message-payload expiration) - (Thread/sleep 5000) - (log/info "Retrying publishing the message to " exchange) - (recur exchange message-payload expiration)))) + (publish exchange message-payload expiration (:count (non-recoverable-exception-config)))) + ([exchange message-payload expiration counter] + (if (publish-internal exchange message-payload expiration) + (do + (Thread/sleep (:sleep (publish-retry-config))) + (log/info "Retrying publishing the message to " exchange) + (recur exchange message-payload expiration counter)) + (when (and (:enabled (non-recoverable-exception-config)) (> counter 0)) + (Thread/sleep (:sleep (non-recoverable-exception-config))) + (recur exchange message-payload expiration (dec counter)))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type)) diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index f3ea7b33..1d7b2e11 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -16,7 +16,8 @@ [ziggurat.metrics :as metrics]) (:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader] (com.rabbitmq.client Channel Connection ShutdownSignalException AlreadyClosedException) - (java.io IOException))) + (java.io IOException) + (java.util.concurrent TimeoutException))) (use-fixtures :once (join-fixtures [fix/init-rabbit-mq fix/silence-logging])) @@ -520,27 +521,59 @@ (let [publish-called (atom 0)] (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 2) + (when (< @publish-called 10) (swap! publish-called inc) (throw (IOException. "io exception")))) metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) - (is (= 2 @publish-called))))) + (is (= 10 @publish-called))))) (testing "publish/producer tries to publish again if already closed exception is received" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 2) + (when (< @publish-called 10) (swap! publish-called inc) (throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil))))) metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) - (is (= 2 @publish-called))))) - (testing "producer/publish does not try again if the exception thrown is neither IOException nor AlreadyClosedException" + (is (= 10 @publish-called))))) + (testing "publish/producer tries to publish again if TimeoutException is received" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 2) + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (TimeoutException. "timeout")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @publish-called))))) + (testing "producer/publish tries again the number of times defined in the config if the exception thrown is non recoverable and if retry is enabled" + (let [publish-called (atom 0) + config (config/ziggurat-config) + count 3 + config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled true + :sleep 1 + :count count}))] + (with-redefs [config/ziggurat-config (fn [] config) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (Exception. "non-io exception")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= (inc count) @publish-called))))) + (testing "producer/publish does not retry again if the exception thrown is non recoverable and if retry is disabled" + (let [publish-called (atom 0) + config (config/ziggurat-config) + count 3 + config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled false + :sleep 1 + :count count}))] + (with-redefs [config/ziggurat-config (fn [] config) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) (swap! publish-called inc) (throw (Exception. "non-io exception")))) metrics/increment-count (fn [_ _ _] nil)] From ae0dfed45ba9a7af98f69eb3d334e00d06e4fc2d Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 17:08:27 +0530 Subject: [PATCH 3/8] Update ReadME --- README.md | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 6d8f14e0..becb34af 100644 --- a/README.md +++ b/README.md @@ -364,14 +364,29 @@ Ziggurat Config | Default Value | Description | Mandatory? :address-resolver [:dns :keyword] ;;possible values [:dns :ip-list]. Default is :dns :channel-pool {:max-wait-ms [5000 :int] :min-idle [10 :int] - :max-idle [20 :int]}}}}} + :max-idle [20 :int]} + :publish-retry {:sleep 5000 + :non-recoverable-exception {:enabled true + :sleep 1000 + :count 5}}}}}} ``` - `:hosts` is a comma separated values of RabbitMQ hostnames (dns-names OR IPs). - `:port` specifies the port number on which the RabbitMQ nodes are running. +<<<<<<< HEAD - `:channel-pool` specifies the properties for the RabbitMQ channel pool used for publishing - `:address-resolver` specifies the strategy to figure out RabbitMQ hosts IP addresses. `:dns` is the default and shoud be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided. +======= +- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster +- publish-retry defines the config for recoverable and non-recoverable exceptions. + - Recoverable exceptions + - `:sleep` - defines the time period after which a retry should happen + - Non-recoverable exceptions + - `:enabled` - defines whether retries should happen + - `:sleep` - defines the time period after which a retry should happen + - `:count` - defines the number of retries +>>>>>>> c27fd46... Update ReadME ## Exponential Backoff based Retries From 8a3830d72f54f9f948df24d80563f6bf32e51cba Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 18:04:28 +0530 Subject: [PATCH 4/8] Publish only if channel pool is alive --- src/ziggurat/messaging/channel_pool.clj | 5 +++++ src/ziggurat/messaging/producer.clj | 19 ++++++++++--------- test/ziggurat/messaging/channel_pool_test.clj | 18 +++++++++++++++++- test/ziggurat/messaging/producer_test.clj | 15 ++++++++++++++- 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/src/ziggurat/messaging/channel_pool.clj b/src/ziggurat/messaging/channel_pool.clj index 598090ee..9a59737a 100644 --- a/src/ziggurat/messaging/channel_pool.clj +++ b/src/ziggurat/messaging/channel_pool.clj @@ -39,6 +39,9 @@ rmq-chan-pool (GenericObjectPool. (RabbitMQChannelFactory. connection) pool-config)] rmq-chan-pool)) +(defn is-pool-alive? [channel-pool] + (= (type channel-pool) GenericObjectPool)) + (defn destroy-channel-pool [channel-pool] (.close channel-pool)) @@ -49,3 +52,5 @@ (create-channel-pool c/producer-connection)) :stop (do (log/info "Stopping channel pool") (destroy-channel-pool channel-pool))) + + diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 8f996f40..d43e5c69 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -5,7 +5,7 @@ [langohr.exchange :as le] [langohr.http :as lh] [langohr.queue :as lq] - [ziggurat.messaging.channel_pool :as cpool] + [ziggurat.messaging.channel_pool :as cpool :refer [is-pool-alive?]] [taoensso.nippy :as nippy] [ziggurat.config :refer [config ziggurat-config rabbitmq-config channel-retry-config]] [ziggurat.messaging.connection :refer [producer-connection is-connection-required?]] @@ -109,14 +109,15 @@ ([exchange message-payload expiration] (publish exchange message-payload expiration (:count (non-recoverable-exception-config)))) ([exchange message-payload expiration counter] - (if (publish-internal exchange message-payload expiration) - (do - (Thread/sleep (:sleep (publish-retry-config))) - (log/info "Retrying publishing the message to " exchange) - (recur exchange message-payload expiration counter)) - (when (and (:enabled (non-recoverable-exception-config)) (> counter 0)) - (Thread/sleep (:sleep (non-recoverable-exception-config))) - (recur exchange message-payload expiration (dec counter)))))) + (when (is-pool-alive? cpool/channel-pool) + (if (publish-internal exchange message-payload expiration) + (do + (Thread/sleep (:sleep (publish-retry-config))) + (log/info "Retrying publishing the message to " exchange) + (recur exchange message-payload expiration counter)) + (when (and (:enabled (non-recoverable-exception-config)) (> counter 0)) + (Thread/sleep (:sleep (non-recoverable-exception-config))) + (recur exchange message-payload expiration (dec counter))))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type)) diff --git a/test/ziggurat/messaging/channel_pool_test.clj b/test/ziggurat/messaging/channel_pool_test.clj index 25b49a9a..2f258daa 100644 --- a/test/ziggurat/messaging/channel_pool_test.clj +++ b/test/ziggurat/messaging/channel_pool_test.clj @@ -1,6 +1,6 @@ (ns ziggurat.messaging.channel-pool-test (:require [clojure.test :refer :all] - [ziggurat.messaging.channel_pool :as cpool] + [ziggurat.messaging.channel_pool :as cpool :refer [channel-pool]] [ziggurat.messaging.connection :refer [producer-connection]] [ziggurat.fixtures :as fix]) (:import (org.apache.commons.pool2.impl GenericObjectPoolConfig GenericObjectPool) @@ -60,4 +60,20 @@ (is (not (.equals rmq-chan-2 rmq-chan))) (is (.isOpen rmq-chan-2))))) +(deftest is-pool-alive? + (testing "it should return true when channel-pool state has started" + (let [states {#'producer-connection #'channel-pool}] + (mount.core/start states) + (is (cpool/is-pool-alive? channel-pool)) + (mount.core/stop states))) + (testing "it should return false when channel-pool state has not been started" + (is (false? (cpool/is-pool-alive? channel-pool)))) + (testing "it should return false when channel-pool state has been stopped" + (let [states {#'producer-connection #'channel-pool}] + (mount.core/start states) + (mount.core/stop states) + (is (false? (cpool/is-pool-alive? channel-pool)))))) + + + diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index 1d7b2e11..d1714427 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -7,6 +7,7 @@ [ziggurat.fixtures :as fix] [ziggurat.messaging.connection :refer [producer-connection]] [ziggurat.messaging.producer :as producer] + [ziggurat.messaging.channel_pool :as cpool] [ziggurat.messaging.util :as util] [ziggurat.util.rabbitmq :as rmq] [langohr.basic :as lb] @@ -563,6 +564,7 @@ metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= (inc count) @publish-called))))) + (testing "producer/publish does not retry again if the exception thrown is non recoverable and if retry is disabled" (let [publish-called (atom 0) config (config/ziggurat-config) @@ -578,7 +580,18 @@ (throw (Exception. "non-io exception")))) metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) - (is (= 1 @publish-called)))))) + (is (= 1 @publish-called))))) + (testing "producer/publish does not publish even once if channel pool is not alive" + (let [publish-called (atom 0)] + (with-redefs [cpool/is-pool-alive? (fn [_] false) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (Exception. "non-io exception")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 0 @publish-called)))))) (deftest publish-to-delay-queue-test (testing "creates a span when tracer is enabled" From 5b7229a12365543732058659b2833a7d93898c7e Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 18:09:54 +0530 Subject: [PATCH 5/8] Fix lint --- src/ziggurat/messaging/producer.clj | 2 +- test/ziggurat/messaging/channel_pool_test.clj | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index d43e5c69..6785c8b4 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -115,7 +115,7 @@ (Thread/sleep (:sleep (publish-retry-config))) (log/info "Retrying publishing the message to " exchange) (recur exchange message-payload expiration counter)) - (when (and (:enabled (non-recoverable-exception-config)) (> counter 0)) + (when (and (:enabled (non-recoverable-exception-config)) (pos? counter)) (Thread/sleep (:sleep (non-recoverable-exception-config))) (recur exchange message-payload expiration (dec counter))))))) diff --git a/test/ziggurat/messaging/channel_pool_test.clj b/test/ziggurat/messaging/channel_pool_test.clj index 2f258daa..f73b98e0 100644 --- a/test/ziggurat/messaging/channel_pool_test.clj +++ b/test/ziggurat/messaging/channel_pool_test.clj @@ -73,7 +73,3 @@ (mount.core/start states) (mount.core/stop states) (is (false? (cpool/is-pool-alive? channel-pool)))))) - - - - From 8531bdc8fd42993c2aaeb0b3e728b7fe9d6a66a5 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Fri, 22 Apr 2022 18:14:19 +0530 Subject: [PATCH 6/8] Update README --- README.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index becb34af..be0361ab 100644 --- a/README.md +++ b/README.md @@ -373,20 +373,17 @@ Ziggurat Config | Default Value | Description | Mandatory? - `:hosts` is a comma separated values of RabbitMQ hostnames (dns-names OR IPs). - `:port` specifies the port number on which the RabbitMQ nodes are running. -<<<<<<< HEAD - `:channel-pool` specifies the properties for the RabbitMQ channel pool used for publishing - `:address-resolver` specifies the strategy to figure out RabbitMQ hosts IP addresses. `:dns` is the default and shoud be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided. -======= -- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster -- publish-retry defines the config for recoverable and non-recoverable exceptions. +- `:publish-retry` defines the config for recoverable and non-recoverable exceptions. - Recoverable exceptions - `:sleep` - defines the time period after which a retry should happen - Non-recoverable exceptions - `:enabled` - defines whether retries should happen - `:sleep` - defines the time period after which a retry should happen - `:count` - defines the number of retries ->>>>>>> c27fd46... Update ReadME +- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster ## Exponential Backoff based Retries From 5519e6c84f5af9d30bd35bb98806bdbdc19d7292 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Mon, 25 Apr 2022 13:26:52 +0530 Subject: [PATCH 7/8] Add retry states for publish --- src/ziggurat/messaging/producer.clj | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 6785c8b4..4df9db8e 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -68,7 +68,7 @@ [e message-payload] (log/error e "Network exception was encountered while publishing to RabbitMQ") (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))}) - true) + :retry) (defn return-to-pool [^GenericObjectPool pool ^Channel ch] (.returnObject pool ch)) @@ -80,7 +80,7 @@ (try (lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) (properties-for-publish expiration (:headers message-payload))) - false + :success (catch AlreadyClosedException e (handle-network-exception e message-payload)) (catch IOException e @@ -90,12 +90,12 @@ (catch Exception e (log/error e "Exception was encountered while publishing to RabbitMQ") (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))}) - false) + :retry-with-counter) (finally (return-to-pool cpool/channel-pool ch)))) (catch Exception e (log/error e "Exception was encountered while borrowing a channel from the pool") (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) - false))) + :retry-with-counter))) (defn- publish-retry-config [] (-> (ziggurat-config) :rabbit-mq-connection :publish-retry)) @@ -110,14 +110,16 @@ (publish exchange message-payload expiration (:count (non-recoverable-exception-config)))) ([exchange message-payload expiration counter] (when (is-pool-alive? cpool/channel-pool) - (if (publish-internal exchange message-payload expiration) - (do - (Thread/sleep (:sleep (publish-retry-config))) - (log/info "Retrying publishing the message to " exchange) - (recur exchange message-payload expiration counter)) - (when (and (:enabled (non-recoverable-exception-config)) (pos? counter)) - (Thread/sleep (:sleep (non-recoverable-exception-config))) - (recur exchange message-payload expiration (dec counter))))))) + (let [result (publish-internal exchange message-payload expiration)] + (cond + (= result :success) nil + (= result :retry) (do + (Thread/sleep (:sleep (publish-retry-config))) + (log/info "Retrying publishing the message to " exchange) + (recur exchange message-payload expiration counter)) + (= result :retry-with-counter) (when (and (:enabled (non-recoverable-exception-config)) (pos? counter)) + (Thread/sleep (:sleep (non-recoverable-exception-config))) + (recur exchange message-payload expiration (dec counter)))))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type)) From 490652071f5cb0477ebbe5e990ec5d70f7c7f22c Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Mon, 25 Apr 2022 13:34:13 +0530 Subject: [PATCH 8/8] Clojure doc for the method publish --- src/ziggurat/messaging/producer.clj | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 4df9db8e..d20fc87a 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -104,6 +104,13 @@ (:non-recoverable-exception (publish-retry-config))) (defn publish + "This is meant for publishing to rabbitmq. + * Checks if the pool is alive - We do this so that publish does not happen after the channel pool state is stopped. + * publish-internal returns multiple states + * :success - Message has been successfully produced to rabbitmq + * :retry - A retryable exception was encountered and message will be retried until it is successfully published. + * :retry-with-counter - A non recoverable exception is encountered, but the message will be retried for a few times. defined by the counter + { :rabbit-mq-connection { :publish-retry { :non-recoverable-exception {:count}}}}}" ([exchange message-payload] (publish exchange message-payload nil)) ([exchange message-payload expiration]