diff --git a/README.md b/README.md index 6d8f14e0..d3c3af14 100644 --- a/README.md +++ b/README.md @@ -364,7 +364,11 @@ Ziggurat Config | Default Value | Description | Mandatory? :address-resolver [:dns :keyword] ;;possible values [:dns :ip-list]. Default is :dns :channel-pool {:max-wait-ms [5000 :int] :min-idle [10 :int] - :max-idle [20 :int]}}}}} + :max-idle [20 :int]} + :publish-retry {:back-off-ms 5000 + :non-recoverable-exception {:enabled true + :back-off-ms 1000 + :count 5}}}}}} ``` - `:hosts` is a comma separated values of RabbitMQ hostnames (dns-names OR IPs). @@ -372,6 +376,14 @@ Ziggurat Config | Default Value | Description | Mandatory? - `:channel-pool` specifies the properties for the RabbitMQ channel pool used for publishing - `:address-resolver` specifies the strategy to figure out RabbitMQ hosts IP addresses. `:dns` is the default and shoud be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided. +- `:publish-retry` defines the config for recoverable and non-recoverable exceptions. + - Recoverable exceptions + - `:back-off-ms` - defines the time period after which a retry should happen + - Non-recoverable exceptions + - `:enabled` - defines whether retries should happen + - `:back-off-ms` - defines the time period after which a retry should happen + - `:count` - defines the number of retries +- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster ## Exponential Backoff based Retries diff --git a/project.clj b/project.clj index 63929eb1..2439f928 100644 --- a/project.clj +++ b/project.clj @@ -2,7 +2,7 @@ (cemerick.pomegranate.aether/register-wagon-factory! "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) -(defproject tech.gojek/ziggurat "4.6.4" +(defproject tech.gojek/ziggurat "4.7.0" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0" diff --git a/resources/config.test.edn b/resources/config.test.edn index 2da0b3f9..fdcd2505 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -16,7 +16,12 @@ :username "guest" :password "guest" :channel-timeout [2000 :int] - :address-resolver [:dns :keyword]} ;;possible values [:dns :ip-list] + + :address-resolver [:dns :keyword] ;;possible values [:dns :ip-list] + :publish-retry {:back-off-ms [1 :int] + :non-recoverable-exception {:enabled [true :bool] + :back-off-ms [1 :int] + :count [5 :int]}}} :jobs {:instant {:worker-count [4 :int] :prefetch-count [4 :int]}} :rabbit-mq {:delay {:queue-name "application_name_delay_queue_test" diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index a7a9053e..6460ef7b 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -24,7 +24,11 @@ :rabbit-mq-connection {:port 5672 :username "guest" :password "guest" - :channel-timeout 2000} + :channel-timeout 2000 + :publish-retry {:back-off-ms 5000 + :non-recoverable-exception {:enabled true + :back-off-ms 5000 + :count 5}}} :jobs {:instant {:worker-count 4 :prefetch-count 4}} :rabbit-mq {:delay {:queue-name "%s_delay_queue" diff --git a/src/ziggurat/messaging/channel_pool.clj b/src/ziggurat/messaging/channel_pool.clj index 45ec8063..dde6c3ad 100644 --- a/src/ziggurat/messaging/channel_pool.clj +++ b/src/ziggurat/messaging/channel_pool.clj @@ -25,7 +25,7 @@ (defn create-object-pool-config [config] (let [standby-size 10 total-thread-count (calc-total-thread-count) - merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle (* standby-size 2)} config)] + merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle total-thread-count} config)] (doto (GenericObjectPoolConfig.) (.setMaxWait (Duration/ofMillis (:max-wait-ms merged-config))) (.setMinIdle (:min-idle merged-config)) @@ -35,10 +35,15 @@ (.setJmxNamePrefix "zig-rabbitmq-ch-pool")))) (defn create-channel-pool [^Connection connection] - (let [pool-config (create-object-pool-config (get-in (ziggurat-config) [:rabbit-mq-connection :channel-pool])) + (let [pool-config (create-object-pool-config + (get-in (ziggurat-config) + [:rabbit-mq-connection :channel-pool])) rmq-chan-pool (GenericObjectPool. (RabbitMQChannelFactory. connection) pool-config)] rmq-chan-pool)) +(defn is-pool-alive? [channel-pool] + (= (type channel-pool) GenericObjectPool)) + (defn destroy-channel-pool [channel-pool] (.close channel-pool)) @@ -49,3 +54,5 @@ (create-channel-pool c/producer-connection)) :stop (do (log/info "Stopping channel pool") (destroy-channel-pool channel-pool))) + + diff --git a/src/ziggurat/messaging/connection.clj b/src/ziggurat/messaging/connection.clj index 22a7eae4..bc2eb335 100644 --- a/src/ziggurat/messaging/connection.clj +++ b/src/ziggurat/messaging/connection.clj @@ -87,14 +87,12 @@ (log/info "Closing the RabbitMQ connection") (rmq/close conn))) -(declare consumer-connection) (defstate consumer-connection :start (do (log/info "Creating consumer connection") (start-connection false)) :stop (do (log/info "Stopping consume connection") (stop-connection consumer-connection))) -(declare producer-connection) (defstate producer-connection :start (do (log/info "Creating producer connection") (start-connection true)) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index c0ace5ff..bc01a654 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -5,7 +5,7 @@ [langohr.exchange :as le] [langohr.http :as lh] [langohr.queue :as lq] - [ziggurat.messaging.channel_pool :as cpool] + [ziggurat.messaging.channel_pool :as cpool :refer [is-pool-alive?]] [taoensso.nippy :as nippy] [ziggurat.config :refer [config ziggurat-config rabbitmq-config channel-retry-config]] [ziggurat.messaging.connection :refer [producer-connection is-connection-required?]] @@ -13,7 +13,8 @@ [ziggurat.metrics :as metrics]) (:import (com.rabbitmq.client AlreadyClosedException Channel) (java.io IOException) - (org.apache.commons.pool2.impl GenericObjectPool))) + (org.apache.commons.pool2.impl GenericObjectPool) + (java.util.concurrent TimeoutException))) (def MAX_EXPONENTIAL_RETRIES 25) @@ -64,44 +65,80 @@ props))) (defn- handle-network-exception - [e message-payload] - (log/error e "Exception was encountered while publishing to RabbitMQ") - (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))}) - true) + [e message-payload retry-counter] + (log/error e "Network exception was encountered while publishing to RabbitMQ") + (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload)) + :retry-attempt retry-counter}) + :retry) (defn return-to-pool [^GenericObjectPool pool ^Channel ch] (.returnObject pool ch)) +(defn borrow-from-pool [^GenericObjectPool pool] + (.borrowObject pool)) + (defn- publish-internal - [exchange message-payload expiration] + [exchange message-payload expiration retry-counter] (try - (let [ch (.borrowObject cpool/channel-pool)] + (let [ch (borrow-from-pool cpool/channel-pool)] (try (lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) (properties-for-publish expiration (:headers message-payload))) - false - (catch AlreadyClosedException e - (handle-network-exception e message-payload)) - (catch IOException e - (handle-network-exception e message-payload)) - (catch Exception e - (log/error e "Exception was encountered while publishing to RabbitMQ") - (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))}) - false) + :success (finally (return-to-pool cpool/channel-pool ch)))) + (catch AlreadyClosedException e + (handle-network-exception e message-payload retry-counter)) + (catch IOException e + (handle-network-exception e message-payload retry-counter)) + (catch TimeoutException e + (handle-network-exception e message-payload retry-counter)) (catch Exception e - (log/error e "Exception occurred while borrowing a channel from the pool") - (metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))}) - false))) + (log/error e "Exception was encountered while publishing to RabbitMQ") + (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload)) + :retry-counter retry-counter}) + :retry-with-counter))) + +(defn- publish-retry-config [] + (-> (ziggurat-config) :rabbit-mq-connection :publish-retry)) + +(defn- non-recoverable-exception-config [] + (:non-recoverable-exception (publish-retry-config))) + (defn publish + "This is meant for publishing to rabbitmq. + * Checks if the pool is alive - We do this so that publish does not happen after the channel pool state is stopped. + * publish-internal returns multiple states + * :success - Message has been successfully produced to rabbitmq + * :retry - A retryable exception was encountered and message will be retried until it is successfully published. + * :retry-with-counter - A non recoverable exception is encountered, but the message will be retried for a few times. defined by the counter + { :rabbit-mq-connection { :publish-retry { :non-recoverable-exception {:count}}}}}" ([exchange message-payload] (publish exchange message-payload nil)) ([exchange message-payload expiration] - (when (publish-internal exchange message-payload expiration) - (Thread/sleep 5000) - (log/info "Retrying publishing the message to " exchange) - (recur exchange message-payload expiration)))) + (publish exchange message-payload expiration 0)) + ([exchange message-payload expiration retry-counter] + (when (is-pool-alive? cpool/channel-pool) + (let [result (publish-internal exchange message-payload expiration retry-counter)] + (when (pos? retry-counter) + (log/info "Retrying publishing the message to " exchange) + (log/info "Retry attempt " retry-counter)) + (log/info "Publish result " result) + (cond + (= result :success) nil + (= result :retry) (do + (Thread/sleep (:back-off-ms (publish-retry-config))) + (recur exchange message-payload expiration (inc retry-counter))) + (= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config)) + (< retry-counter (:count (non-recoverable-exception-config)))) + (do + (log/info "Backing off") + (Thread/sleep (:back-off-ms (non-recoverable-exception-config))) + (recur exchange message-payload expiration (inc retry-counter))) + (do + (log/error "Publishing the message has failed. It is being dropped") + (metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload)) + :retry-counter retry-counter})))))))) (defn- retry-type [] (-> (ziggurat-config) :retry :type)) diff --git a/test/ziggurat/messaging/channel_pool_test.clj b/test/ziggurat/messaging/channel_pool_test.clj index 2dd49c7f..70145a68 100644 --- a/test/ziggurat/messaging/channel_pool_test.clj +++ b/test/ziggurat/messaging/channel_pool_test.clj @@ -1,6 +1,6 @@ (ns ziggurat.messaging.channel-pool-test (:require [clojure.test :refer :all] - [ziggurat.messaging.channel_pool :as cpool] + [ziggurat.messaging.channel_pool :as cpool :refer [channel-pool]] [ziggurat.messaging.connection :refer [producer-connection]] [ziggurat.fixtures :as fix]) (:import (org.apache.commons.pool2.impl GenericObjectPoolConfig GenericObjectPool) @@ -18,7 +18,7 @@ (deftest create-object-pool-config-test (testing "it should create a PoolConfig with default values" - (let [expected-config {:min-idle 10 :max-idle 20 :max-total 54 :max-wait-ms 5000} + (let [expected-config {:min-idle 10 :max-idle 44 :max-total 54 :max-wait-ms 5000} pool-config-object ^GenericObjectPoolConfig (cpool/create-object-pool-config {}) min-idle (.getMinIdle pool-config-object) max-idle (.getMaxIdle pool-config-object) @@ -60,4 +60,16 @@ (is (not (.equals rmq-chan-2 rmq-chan))) (is (.isOpen rmq-chan-2))))) - +(deftest is-pool-alive? + (testing "it should return true when channel-pool state has started" + (let [states {#'producer-connection #'channel-pool}] + (mount.core/start states) + (is (cpool/is-pool-alive? channel-pool)) + (mount.core/stop states))) + (testing "it should return false when channel-pool state has not been started" + (is (false? (cpool/is-pool-alive? channel-pool)))) + (testing "it should return false when channel-pool state has been stopped" + (let [states {#'producer-connection #'channel-pool}] + (mount.core/start states) + (mount.core/stop states) + (is (false? (cpool/is-pool-alive? channel-pool)))))) diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index f3ea7b33..1262af2e 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -7,6 +7,7 @@ [ziggurat.fixtures :as fix] [ziggurat.messaging.connection :refer [producer-connection]] [ziggurat.messaging.producer :as producer] + [ziggurat.messaging.channel_pool :as cpool] [ziggurat.messaging.util :as util] [ziggurat.util.rabbitmq :as rmq] [langohr.basic :as lb] @@ -16,7 +17,8 @@ [ziggurat.metrics :as metrics]) (:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader] (com.rabbitmq.client Channel Connection ShutdownSignalException AlreadyClosedException) - (java.io IOException))) + (java.io IOException) + (java.util.concurrent TimeoutException))) (use-fixtures :once (join-fixtures [fix/init-rabbit-mq fix/silence-logging])) @@ -516,36 +518,110 @@ (is (true? @publish-called?)))))) (deftest publish-behaviour-on-rabbitmq-disconnection-test - (testing "producer/publish tries to publish again if IOException is thrown" + (testing "producer/publish tries to publish again if IOException is thrown via lb/publish" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 2) + (when (< @publish-called 10) (swap! publish-called inc) (throw (IOException. "io exception")))) metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) - (is (= 2 @publish-called))))) - (testing "publish/producer tries to publish again if already closed exception is received" + (is (= 10 @publish-called))))) + (testing "publish/producer tries to publish again if already closed exception is received via lb/publish" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 2) + (when (< @publish-called 10) (swap! publish-called inc) (throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil))))) metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) - (is (= 2 @publish-called))))) - (testing "producer/publish does not try again if the exception thrown is neither IOException nor AlreadyClosedException" + (is (= 10 @publish-called))))) + (testing "publish/producer tries to publish again if TimeoutException is received via lb/publish" (let [publish-called (atom 0)] (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 2) + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (TimeoutException. "timeout")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @publish-called))))) + (testing "producer/publish tries to publish again if IOException is thrown while borrowing from channel" + (let [borrow-from-pool-called (atom 0)] + (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (IOException. "io exception")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @borrow-from-pool-called))))) + (testing "publish/producer tries to publish again if already closed exception is received while borrowing from channel" + (let [borrow-from-pool-called (atom 0)] + (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil))))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @borrow-from-pool-called))))) + (testing "publish/producer tries to publish again if TimeoutException is received while borrowing from channel" + (let [borrow-from-pool-called (atom 0)] + (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (TimeoutException. "timeout")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 10 @borrow-from-pool-called))))) + (testing "producer/publish retries publishing for a certain number of times (configurable) when a non-recoverable exception is thrown" + (let [publish-called (atom 0) + config (config/ziggurat-config) + count 3 + config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled true + :back-off-ms 1 + :count count}))] + (with-redefs [config/ziggurat-config (fn [] config) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (Exception. "non-io exception")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= (inc count) @publish-called))))) + + (testing "producer/publish does not retry again if the exception thrown is non recoverable and if retry is disabled" + (let [publish-called (atom 0) + config (config/ziggurat-config) + count 3 + config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled false + :back-off-ms 1 + :count count}))] + (with-redefs [config/ziggurat-config (fn [] config) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (Exception. "non-io exception")))) + metrics/increment-count (fn [_ _ _] nil)] + (producer/publish "random-exchange" {:topic-entity "hello"} 12345) + (is (= 1 @publish-called))))) + (testing "producer/publish does not publish even once if channel pool is not alive" + (let [publish-called (atom 0)] + (with-redefs [cpool/is-pool-alive? (fn [_] false) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) (swap! publish-called inc) (throw (Exception. "non-io exception")))) metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) - (is (= 1 @publish-called)))))) + (is (= 0 @publish-called)))))) (deftest publish-to-delay-queue-test (testing "creates a span when tracer is enabled"