Skip to content

Commit

Permalink
Merge 515492d into 505ff6e
Browse files Browse the repository at this point in the history
  • Loading branch information
anmol1vw13 committed Apr 26, 2022
2 parents 505ff6e + 515492d commit e9b9be7
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 25 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,26 @@ Ziggurat Config | Default Value | Description | Mandatory?
:address-resolver [:dns :keyword] ;;possible values [:dns :ip-list]. Default is :dns
:channel-pool {:max-wait-ms [5000 :int]
:min-idle [10 :int]
:max-idle [20 :int]}}}}}
:max-idle [20 :int]}
:publish-retry {:sleep 5000
:non-recoverable-exception {:enabled true
:sleep 1000
:count 5}}}}}}
```

- `:hosts` is a comma separated values of RabbitMQ hostnames (dns-names OR IPs).
- `:port` specifies the port number on which the RabbitMQ nodes are running.
- `:channel-pool` specifies the properties for the RabbitMQ channel pool used for publishing
- `:address-resolver` specifies the strategy to figure out RabbitMQ hosts IP addresses. `:dns` is the default and shoud
be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided.
- `:publish-retry` defines the config for recoverable and non-recoverable exceptions.
- Recoverable exceptions
- `:sleep` - defines the time period after which a retry should happen
- Non-recoverable exceptions
- `:enabled` - defines whether retries should happen
- `:sleep` - defines the time period after which a retry should happen
- `:count` - defines the number of retries
- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster

## Exponential Backoff based Retries

Expand Down
7 changes: 6 additions & 1 deletion resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
:username "guest"
:password "guest"
:channel-timeout [2000 :int]
:address-resolver [:dns :keyword]} ;;possible values [:dns :ip-list]

:address-resolver [:dns :keyword] ;;possible values [:dns :ip-list]
:publish-retry {:sleep [1 :int]
:non-recoverable-exception {:enabled [true :bool]
:sleep [1 :int]
:count [5 :int]}}}
:jobs {:instant {:worker-count [4 :int]
:prefetch-count [4 :int]}}
:rabbit-mq {:delay {:queue-name "application_name_delay_queue_test"
Expand Down
6 changes: 5 additions & 1 deletion src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
:rabbit-mq-connection {:port 5672
:username "guest"
:password "guest"
:channel-timeout 2000}
:channel-timeout 2000
:publish-retry {:sleep 5000
:non-recoverable-exception {:enabled true
:sleep 1000
:count 5}}}
:jobs {:instant {:worker-count 4
:prefetch-count 4}}
:rabbit-mq {:delay {:queue-name "%s_delay_queue"
Expand Down
5 changes: 5 additions & 0 deletions src/ziggurat/messaging/channel_pool.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
rmq-chan-pool (GenericObjectPool. (RabbitMQChannelFactory. connection) pool-config)]
rmq-chan-pool))

(defn is-pool-alive? [channel-pool]
(= (type channel-pool) GenericObjectPool))

(defn destroy-channel-pool [channel-pool]
(.close channel-pool))

Expand All @@ -49,3 +52,5 @@
(create-channel-pool c/producer-connection))
:stop (do (log/info "Stopping channel pool")
(destroy-channel-pool channel-pool)))


51 changes: 39 additions & 12 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
[langohr.exchange :as le]
[langohr.http :as lh]
[langohr.queue :as lq]
[ziggurat.messaging.channel_pool :as cpool]
[ziggurat.messaging.channel_pool :as cpool :refer [is-pool-alive?]]
[taoensso.nippy :as nippy]
[ziggurat.config :refer [config ziggurat-config rabbitmq-config channel-retry-config]]
[ziggurat.messaging.connection :refer [producer-connection is-connection-required?]]
[ziggurat.messaging.util :as util]
[ziggurat.metrics :as metrics])
(:import (com.rabbitmq.client AlreadyClosedException Channel)
(java.io IOException)
(org.apache.commons.pool2.impl GenericObjectPool)))
(org.apache.commons.pool2.impl GenericObjectPool)
(java.util.concurrent TimeoutException)))

(def MAX_EXPONENTIAL_RETRIES 25)

Expand Down Expand Up @@ -65,9 +66,9 @@

(defn- handle-network-exception
[e message-payload]
(log/error e "Exception was encountered while publishing to RabbitMQ")
(log/error e "Network exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))})
true)
:retry)

(defn return-to-pool [^GenericObjectPool pool ^Channel ch]
(.returnObject pool ch))
Expand All @@ -79,29 +80,55 @@
(try
(lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers))
(properties-for-publish expiration (:headers message-payload)))
false
:success
(catch AlreadyClosedException e
(handle-network-exception e message-payload))
(catch IOException e
(handle-network-exception e message-payload))
(catch TimeoutException e
(handle-network-exception e message-payload))
(catch Exception e
(log/error e "Exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))})
false)
:retry-with-counter)
(finally (return-to-pool cpool/channel-pool ch))))
(catch Exception e
(log/error e "Exception occurred while borrowing a channel from the pool")
(log/error e "Exception was encountered while borrowing a channel from the pool")
(metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))})
false)))
:retry-with-counter)))

(defn- publish-retry-config []
(-> (ziggurat-config) :rabbit-mq-connection :publish-retry))

(defn- non-recoverable-exception-config []
(:non-recoverable-exception (publish-retry-config)))

(defn publish
"This is meant for publishing to rabbitmq.
* Checks if the pool is alive - We do this so that publish does not happen after the channel pool state is stopped.
* publish-internal returns multiple states
* :success - Message has been successfully produced to rabbitmq
* :retry - A retryable exception was encountered and message will be retried until it is successfully published.
* :retry-with-counter - A non recoverable exception is encountered, but the message will be retried for a few times. defined by the counter
{ :rabbit-mq-connection { :publish-retry { :non-recoverable-exception {:count}}}}}"
([exchange message-payload]
(publish exchange message-payload nil))
([exchange message-payload expiration]
(when (publish-internal exchange message-payload expiration)
(Thread/sleep 5000)
(log/info "Retrying publishing the message to " exchange)
(recur exchange message-payload expiration))))
(publish exchange message-payload expiration (:count (non-recoverable-exception-config))))
([exchange message-payload expiration counter]
(when (is-pool-alive? cpool/channel-pool)
(let [result (publish-internal exchange message-payload expiration)]
(cond
(= result :success) nil
(= result :retry) (do
(Thread/sleep (:sleep (publish-retry-config)))
(log/info "Retrying publishing the message to " exchange)
(recur exchange message-payload expiration counter))
(= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config)) (pos? counter))
(do
(Thread/sleep (:sleep (non-recoverable-exception-config)))
(recur exchange message-payload expiration (dec counter)))
(log/error "Publishing the message has failed. It is being dropped")))))))

(defn- retry-type []
(-> (ziggurat-config) :retry :type))
Expand Down
16 changes: 14 additions & 2 deletions test/ziggurat/messaging/channel_pool_test.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns ziggurat.messaging.channel-pool-test
(:require [clojure.test :refer :all]
[ziggurat.messaging.channel_pool :as cpool]
[ziggurat.messaging.channel_pool :as cpool :refer [channel-pool]]
[ziggurat.messaging.connection :refer [producer-connection]]
[ziggurat.fixtures :as fix])
(:import (org.apache.commons.pool2.impl GenericObjectPoolConfig GenericObjectPool)
Expand Down Expand Up @@ -60,4 +60,16 @@
(is (not (.equals rmq-chan-2 rmq-chan)))
(is (.isOpen rmq-chan-2)))))


(deftest is-pool-alive?
(testing "it should return true when channel-pool state has started"
(let [states {#'producer-connection #'channel-pool}]
(mount.core/start states)
(is (cpool/is-pool-alive? channel-pool))
(mount.core/stop states)))
(testing "it should return false when channel-pool state has not been started"
(is (false? (cpool/is-pool-alive? channel-pool))))
(testing "it should return false when channel-pool state has been stopped"
(let [states {#'producer-connection #'channel-pool}]
(mount.core/start states)
(mount.core/stop states)
(is (false? (cpool/is-pool-alive? channel-pool))))))
62 changes: 54 additions & 8 deletions test/ziggurat/messaging/producer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
[ziggurat.fixtures :as fix]
[ziggurat.messaging.connection :refer [producer-connection]]
[ziggurat.messaging.producer :as producer]
[ziggurat.messaging.channel_pool :as cpool]
[ziggurat.messaging.util :as util]
[ziggurat.util.rabbitmq :as rmq]
[langohr.basic :as lb]
Expand All @@ -16,7 +17,8 @@
[ziggurat.metrics :as metrics])
(:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader]
(com.rabbitmq.client Channel Connection ShutdownSignalException AlreadyClosedException)
(java.io IOException)))
(java.io IOException)
(java.util.concurrent TimeoutException)))

(use-fixtures :once (join-fixtures [fix/init-rabbit-mq
fix/silence-logging]))
Expand Down Expand Up @@ -520,32 +522,76 @@
(let [publish-called (atom 0)]
(with-redefs [lch/open (fn [_] (reify Channel (close [_] nil)))
lb/publish (fn [_ _ _ _ _]
(when (< @publish-called 2)
(when (< @publish-called 10)
(swap! publish-called inc)
(throw (IOException. "io exception"))))
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 2 @publish-called)))))
(is (= 10 @publish-called)))))
(testing "publish/producer tries to publish again if already closed exception is received"
(let [publish-called (atom 0)]
(with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
lb/publish (fn [_ _ _ _ _]
(when (< @publish-called 2)
(when (< @publish-called 10)
(swap! publish-called inc)
(throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil)))))
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 2 @publish-called)))))
(testing "producer/publish does not try again if the exception thrown is neither IOException nor AlreadyClosedException"
(is (= 10 @publish-called)))))
(testing "publish/producer tries to publish again if TimeoutException is received"
(let [publish-called (atom 0)]
(with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
lb/publish (fn [_ _ _ _ _]
(when (< @publish-called 2)
(when (< @publish-called 10)
(swap! publish-called inc)
(throw (TimeoutException. "timeout"))))
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 10 @publish-called)))))
(testing "producer/publish tries again the number of times defined in the config if the exception thrown is non recoverable and if retry is enabled"
(let [publish-called (atom 0)
config (config/ziggurat-config)
count 3
config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled true
:sleep 1
:count count}))]
(with-redefs [config/ziggurat-config (fn [] config)
lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
lb/publish (fn [_ _ _ _ _]
(when (< @publish-called 10)
(swap! publish-called inc)
(throw (Exception. "non-io exception"))))
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= (inc count) @publish-called)))))

(testing "producer/publish does not retry again if the exception thrown is non recoverable and if retry is disabled"
(let [publish-called (atom 0)
config (config/ziggurat-config)
count 3
config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled false
:sleep 1
:count count}))]
(with-redefs [config/ziggurat-config (fn [] config)
lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
lb/publish (fn [_ _ _ _ _]
(when (< @publish-called 10)
(swap! publish-called inc)
(throw (Exception. "non-io exception"))))
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 1 @publish-called)))))
(testing "producer/publish does not publish even once if channel pool is not alive"
(let [publish-called (atom 0)]
(with-redefs [cpool/is-pool-alive? (fn [_] false)
lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
lb/publish (fn [_ _ _ _ _]
(when (< @publish-called 10)
(swap! publish-called inc)
(throw (Exception. "non-io exception"))))
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 1 @publish-called))))))
(is (= 0 @publish-called))))))

(deftest publish-to-delay-queue-test
(testing "creates a span when tracer is enabled"
Expand Down

0 comments on commit e9b9be7

Please sign in to comment.