From b10d80b6b50824f6ca21fba0e0df8fadfe75cda4 Mon Sep 17 00:00:00 2001 From: Anmol Vijaywargiya Date: Wed, 18 May 2022 16:57:26 +0530 Subject: [PATCH] Few improvements in the error handling of rabbitmq publish flow 1. Addressed PR comments 2. Changed max-idle to be equal to total number of threads. 3. Fixed an issue where in a few errors thrown while borrowing from channel were not being retried indefinitely --- README.md | 8 +-- project.clj | 2 +- resources/config.test.edn | 4 +- src/ziggurat/config.clj | 4 +- src/ziggurat/messaging/channel_pool.clj | 6 +- src/ziggurat/messaging/connection.clj | 2 - src/ziggurat/messaging/producer.clj | 62 +++++++++++-------- test/ziggurat/messaging/channel_pool_test.clj | 2 +- test/ziggurat/messaging/producer_test.clj | 42 +++++++++++-- 9 files changed, 86 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index be0361ab..d3c3af14 100644 --- a/README.md +++ b/README.md @@ -365,9 +365,9 @@ Ziggurat Config | Default Value | Description | Mandatory? :channel-pool {:max-wait-ms [5000 :int] :min-idle [10 :int] :max-idle [20 :int]} - :publish-retry {:sleep 5000 + :publish-retry {:back-off-ms 5000 :non-recoverable-exception {:enabled true - :sleep 1000 + :back-off-ms 1000 :count 5}}}}}} ``` @@ -378,10 +378,10 @@ Ziggurat Config | Default Value | Description | Mandatory? be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided. - `:publish-retry` defines the config for recoverable and non-recoverable exceptions. - Recoverable exceptions - - `:sleep` - defines the time period after which a retry should happen + - `:back-off-ms` - defines the time period after which a retry should happen - Non-recoverable exceptions - `:enabled` - defines whether retries should happen - - `:sleep` - defines the time period after which a retry should happen + - `:back-off-ms` - defines the time period after which a retry should happen - `:count` - defines the number of retries - By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster diff --git a/project.clj b/project.clj index 63929eb1..2439f928 100644 --- a/project.clj +++ b/project.clj @@ -2,7 +2,7 @@ (cemerick.pomegranate.aether/register-wagon-factory! "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) -(defproject tech.gojek/ziggurat "4.6.4" +(defproject tech.gojek/ziggurat "4.7.0" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0" diff --git a/resources/config.test.edn b/resources/config.test.edn index cc3e87f6..fdcd2505 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -18,9 +18,9 @@ :channel-timeout [2000 :int] :address-resolver [:dns :keyword] ;;possible values [:dns :ip-list] - :publish-retry {:sleep [1 :int] + :publish-retry {:back-off-ms [1 :int] :non-recoverable-exception {:enabled [true :bool] - :sleep [1 :int] + :back-off-ms [1 :int] :count [5 :int]}}} :jobs {:instant {:worker-count [4 :int] :prefetch-count [4 :int]}} diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index ebc2b5a8..6460ef7b 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -25,9 +25,9 @@ :username "guest" :password "guest" :channel-timeout 2000 - :publish-retry {:sleep 5000 + :publish-retry {:back-off-ms 5000 :non-recoverable-exception {:enabled true - :sleep 1000 + :back-off-ms 5000 :count 5}}} :jobs {:instant {:worker-count 4 :prefetch-count 4}} diff --git a/src/ziggurat/messaging/channel_pool.clj b/src/ziggurat/messaging/channel_pool.clj index 528c88e5..dde6c3ad 100644 --- a/src/ziggurat/messaging/channel_pool.clj +++ b/src/ziggurat/messaging/channel_pool.clj @@ -25,7 +25,7 @@ (defn create-object-pool-config [config] (let [standby-size 10 total-thread-count (calc-total-thread-count) - merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle (* standby-size 2)} config)] + merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle total-thread-count} config)] (doto (GenericObjectPoolConfig.) (.setMaxWait (Duration/ofMillis (:max-wait-ms merged-config))) (.setMinIdle (:min-idle merged-config)) @@ -35,7 +35,9 @@ (.setJmxNamePrefix "zig-rabbitmq-ch-pool")))) (defn create-channel-pool [^Connection connection] - (let [pool-config (create-object-pool-config (get-in (ziggurat-config) [:rabbit-mq-connection :channel-pool])) + (let [pool-config (create-object-pool-config + (get-in (ziggurat-config) + [:rabbit-mq-connection :channel-pool])) rmq-chan-pool (GenericObjectPool. (RabbitMQChannelFactory. connection) pool-config)] rmq-chan-pool)) diff --git a/src/ziggurat/messaging/connection.clj b/src/ziggurat/messaging/connection.clj index 22a7eae4..bc2eb335 100644 --- a/src/ziggurat/messaging/connection.clj +++ b/src/ziggurat/messaging/connection.clj @@ -87,14 +87,12 @@ (log/info "Closing the RabbitMQ connection") (rmq/close conn))) -(declare consumer-connection) (defstate consumer-connection :start (do (log/info "Creating consumer connection") (start-connection false)) :stop (do (log/info "Stopping consume connection") (stop-connection consumer-connection))) -(declare producer-connection) (defstate producer-connection :start (do (log/info "Creating producer connection") (start-connection true)) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 7fbe5023..fda943c4 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -65,36 +65,37 @@ props))) (defn- handle-network-exception - [e message-payload] + [e message-payload retry-counter] (log/error e "Network exception was encountered while publishing to RabbitMQ") - (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))}) + (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload)) + :retry-attempt retry-counter}) :retry) (defn return-to-pool [^GenericObjectPool pool ^Channel ch] (.returnObject pool ch)) +(defn borrow-from-pool [^GenericObjectPool pool] + (.borrowObject pool)) + (defn- publish-internal - [exchange message-payload expiration] + [exchange message-payload expiration retry-counter] (try - (let [ch (.borrowObject cpool/channel-pool)] + (let [ch (borrow-from-pool cpool/channel-pool)] (try (lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) (properties-for-publish expiration (:headers message-payload))) :success - (catch AlreadyClosedException e - (handle-network-exception e message-payload)) - (catch IOException e - (handle-network-exception e message-payload)) - (catch TimeoutException e - (handle-network-exception e message-payload)) - (catch Exception e - (log/error e "Exception was encountered while publishing to RabbitMQ") - (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))}) - :retry-with-counter) (finally (return-to-pool cpool/channel-pool ch)))) + (catch AlreadyClosedException e + (handle-network-exception e message-payload retry-counter)) + (catch IOException e + (handle-network-exception e message-payload retry-counter)) + (catch TimeoutException e + (handle-network-exception e message-payload retry-counter)) (catch Exception e - (log/error e "Exception was encountered while borrowing a channel from the pool") - (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) + (log/error e "Exception was encountered while publishing to RabbitMQ") + (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload)) + :retry-counter retry-counter}) :retry-with-counter))) <<<<<<< HEAD @@ -119,21 +120,30 @@ ([exchange message-payload] (publish exchange message-payload nil)) ([exchange message-payload expiration] - (publish exchange message-payload expiration (:count (non-recoverable-exception-config)))) - ([exchange message-payload expiration counter] + (publish exchange message-payload expiration 0)) + ([exchange message-payload expiration retry-counter] (when (is-pool-alive? cpool/channel-pool) - (let [result (publish-internal exchange message-payload expiration)] + (let [result (publish-internal exchange message-payload expiration retry-counter)] + (when (pos? retry-counter) + (do + (log/info "Retrying publishing the message to " exchange) + (log/info "Retry attempt " retry-counter))) + (log/info "Publish result " result) (cond (= result :success) nil (= result :retry) (do - (Thread/sleep (:sleep (publish-retry-config))) - (log/info "Retrying publishing the message to " exchange) - (recur exchange message-payload expiration counter)) - (= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config)) (pos? counter)) + (Thread/sleep (:back-off-ms (publish-retry-config))) + (recur exchange message-payload expiration (inc retry-counter))) + (= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config)) + (< retry-counter (:count (non-recoverable-exception-config)))) + (do + (log/info "Backing off") + (Thread/sleep (:back-off-ms (non-recoverable-exception-config))) + (recur exchange message-payload expiration (inc retry-counter))) (do - (Thread/sleep (:sleep (non-recoverable-exception-config))) - (recur exchange message-payload expiration (dec counter))) - (log/error "Publishing the message has failed. It is being dropped"))))))) + (log/error "Publishing the message has failed. It is being dropped") + (metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload)) + :retry-counter retry-counter})))))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type)) diff --git a/test/ziggurat/messaging/channel_pool_test.clj b/test/ziggurat/messaging/channel_pool_test.clj index f8de8971..70145a68 100644 --- a/test/ziggurat/messaging/channel_pool_test.clj +++ b/test/ziggurat/messaging/channel_pool_test.clj @@ -18,7 +18,7 @@ (deftest create-object-pool-config-test (testing "it should create a PoolConfig with default values" - (let [expected-config {:min-idle 10 :max-idle 20 :max-total 54 :max-wait-ms 5000} + (let [expected-config {:min-idle 10 :max-idle 44 :max-total 54 :max-wait-ms 5000} pool-config-object ^GenericObjectPoolConfig (cpool/create-object-pool-config {}) min-idle (.getMinIdle pool-config-object) max-idle (.getMaxIdle pool-config-object) diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index d1714427..1262af2e 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -518,7 +518,7 @@ (is (true? @publish-called?)))))) (deftest publish-behaviour-on-rabbitmq-disconnection-test - (testing "producer/publish tries to publish again if IOException is thrown" + (testing "producer/publish tries to publish again if IOException is thrown via lb/publish" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] @@ -528,7 +528,7 @@ metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @publish-called))))) - (testing "publish/producer tries to publish again if already closed exception is received" + (testing "publish/producer tries to publish again if already closed exception is received via lb/publish" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] @@ -538,7 +538,7 @@ metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @publish-called))))) - (testing "publish/producer tries to publish again if TimeoutException is received" + (testing "publish/producer tries to publish again if TimeoutException is received via lb/publish" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] @@ -548,12 +548,42 @@ metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @publish-called))))) - (testing "producer/publish tries again the number of times defined in the config if the exception thrown is non recoverable and if retry is enabled" + (testing "producer/publish tries to publish again if IOException is thrown while borrowing from channel" + (let [borrow-from-pool-called (atom 0)] + (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (IOException. "io exception")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @borrow-from-pool-called))))) + (testing "publish/producer tries to publish again if already closed exception is received while borrowing from channel" + (let [borrow-from-pool-called (atom 0)] + (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil))))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @borrow-from-pool-called))))) + (testing "publish/producer tries to publish again if TimeoutException is received while borrowing from channel" + (let [borrow-from-pool-called (atom 0)] + (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (TimeoutException. "timeout")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @borrow-from-pool-called))))) + (testing "producer/publish retries publishing for a certain number of times (configurable) when a non-recoverable exception is thrown" (let [publish-called (atom 0) config (config/ziggurat-config) count 3 config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled true - :sleep 1 + :back-off-ms 1 :count count}))] (with-redefs [config/ziggurat-config (fn [] config) lch/open (fn [^Connection _] (reify Channel (close [_] nil))) @@ -570,7 +600,7 @@ config (config/ziggurat-config) count 3 config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled false - :sleep 1 + :back-off-ms 1 :count count}))] (with-redefs [config/ziggurat-config (fn [] config) lch/open (fn [^Connection _] (reify Channel (close [_] nil)))