Skip to content

Commit

Permalink
Few improvements in the error handling of rabbitmq publish flow
Browse files Browse the repository at this point in the history
1. Addressed PR comments
2. Changed max-idle to be equal to total number of threads.
3. Fixed an issue where in a few errors thrown while borrowing from channel were not being retried indefinitely
  • Loading branch information
Anmol Vijaywargiya committed May 19, 2022
1 parent b1e3871 commit b10d80b
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 46 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,9 @@ Ziggurat Config | Default Value | Description | Mandatory?
:channel-pool {:max-wait-ms [5000 :int]
:min-idle [10 :int]
:max-idle [20 :int]}
:publish-retry {:sleep 5000
:publish-retry {:back-off-ms 5000
:non-recoverable-exception {:enabled true
:sleep 1000
:back-off-ms 1000
:count 5}}}}}}
```

Expand All @@ -378,10 +378,10 @@ Ziggurat Config | Default Value | Description | Mandatory?
be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided.
- `:publish-retry` defines the config for recoverable and non-recoverable exceptions.
- Recoverable exceptions
- `:sleep` - defines the time period after which a retry should happen
- `:back-off-ms` - defines the time period after which a retry should happen
- Non-recoverable exceptions
- `:enabled` - defines whether retries should happen
- `:sleep` - defines the time period after which a retry should happen
- `:back-off-ms` - defines the time period after which a retry should happen
- `:count` - defines the number of retries
- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster

Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "4.6.4"
(defproject tech.gojek/ziggurat "4.7.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
4 changes: 2 additions & 2 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
:channel-timeout [2000 :int]

:address-resolver [:dns :keyword] ;;possible values [:dns :ip-list]
:publish-retry {:sleep [1 :int]
:publish-retry {:back-off-ms [1 :int]
:non-recoverable-exception {:enabled [true :bool]
:sleep [1 :int]
:back-off-ms [1 :int]
:count [5 :int]}}}
:jobs {:instant {:worker-count [4 :int]
:prefetch-count [4 :int]}}
Expand Down
4 changes: 2 additions & 2 deletions src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
:username "guest"
:password "guest"
:channel-timeout 2000
:publish-retry {:sleep 5000
:publish-retry {:back-off-ms 5000
:non-recoverable-exception {:enabled true
:sleep 1000
:back-off-ms 5000
:count 5}}}
:jobs {:instant {:worker-count 4
:prefetch-count 4}}
Expand Down
6 changes: 4 additions & 2 deletions src/ziggurat/messaging/channel_pool.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
(defn create-object-pool-config [config]
(let [standby-size 10
total-thread-count (calc-total-thread-count)
merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle (* standby-size 2)} config)]
merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle total-thread-count} config)]
(doto (GenericObjectPoolConfig.)
(.setMaxWait (Duration/ofMillis (:max-wait-ms merged-config)))
(.setMinIdle (:min-idle merged-config))
Expand All @@ -35,7 +35,9 @@
(.setJmxNamePrefix "zig-rabbitmq-ch-pool"))))

(defn create-channel-pool [^Connection connection]
(let [pool-config (create-object-pool-config (get-in (ziggurat-config) [:rabbit-mq-connection :channel-pool]))
(let [pool-config (create-object-pool-config
(get-in (ziggurat-config)
[:rabbit-mq-connection :channel-pool]))
rmq-chan-pool (GenericObjectPool. (RabbitMQChannelFactory. connection) pool-config)]
rmq-chan-pool))

Expand Down
2 changes: 0 additions & 2 deletions src/ziggurat/messaging/connection.clj
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@
(log/info "Closing the RabbitMQ connection")
(rmq/close conn)))

(declare consumer-connection)
(defstate consumer-connection
:start (do (log/info "Creating consumer connection")
(start-connection false))
:stop (do (log/info "Stopping consume connection")
(stop-connection consumer-connection)))

(declare producer-connection)
(defstate producer-connection
:start (do (log/info "Creating producer connection")
(start-connection true))
Expand Down
62 changes: 36 additions & 26 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -65,36 +65,37 @@
props)))

(defn- handle-network-exception
[e message-payload]
[e message-payload retry-counter]
(log/error e "Network exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))})
(metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))
:retry-attempt retry-counter})
:retry)

(defn return-to-pool [^GenericObjectPool pool ^Channel ch]
(.returnObject pool ch))

(defn borrow-from-pool [^GenericObjectPool pool]
(.borrowObject pool))

(defn- publish-internal
[exchange message-payload expiration]
[exchange message-payload expiration retry-counter]
(try
(let [ch (.borrowObject cpool/channel-pool)]
(let [ch (borrow-from-pool cpool/channel-pool)]
(try
(lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers))
(properties-for-publish expiration (:headers message-payload)))
:success
(catch AlreadyClosedException e
(handle-network-exception e message-payload))
(catch IOException e
(handle-network-exception e message-payload))
(catch TimeoutException e
(handle-network-exception e message-payload))
(catch Exception e
(log/error e "Exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))})
:retry-with-counter)
(finally (return-to-pool cpool/channel-pool ch))))
(catch AlreadyClosedException e
(handle-network-exception e message-payload retry-counter))
(catch IOException e
(handle-network-exception e message-payload retry-counter))
(catch TimeoutException e
(handle-network-exception e message-payload retry-counter))
(catch Exception e
(log/error e "Exception was encountered while borrowing a channel from the pool")
(metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))})
(log/error e "Exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))
:retry-counter retry-counter})
:retry-with-counter)))
<<<<<<< HEAD

Expand All @@ -119,21 +120,30 @@
([exchange message-payload]
(publish exchange message-payload nil))
([exchange message-payload expiration]
(publish exchange message-payload expiration (:count (non-recoverable-exception-config))))
([exchange message-payload expiration counter]
(publish exchange message-payload expiration 0))
([exchange message-payload expiration retry-counter]
(when (is-pool-alive? cpool/channel-pool)
(let [result (publish-internal exchange message-payload expiration)]
(let [result (publish-internal exchange message-payload expiration retry-counter)]
(when (pos? retry-counter)
(do
(log/info "Retrying publishing the message to " exchange)
(log/info "Retry attempt " retry-counter)))
(log/info "Publish result " result)
(cond
(= result :success) nil
(= result :retry) (do
(Thread/sleep (:sleep (publish-retry-config)))
(log/info "Retrying publishing the message to " exchange)
(recur exchange message-payload expiration counter))
(= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config)) (pos? counter))
(Thread/sleep (:back-off-ms (publish-retry-config)))
(recur exchange message-payload expiration (inc retry-counter)))
(= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config))
(< retry-counter (:count (non-recoverable-exception-config))))
(do
(log/info "Backing off")
(Thread/sleep (:back-off-ms (non-recoverable-exception-config)))
(recur exchange message-payload expiration (inc retry-counter)))
(do
(Thread/sleep (:sleep (non-recoverable-exception-config)))
(recur exchange message-payload expiration (dec counter)))
(log/error "Publishing the message has failed. It is being dropped")))))))
(log/error "Publishing the message has failed. It is being dropped")
(metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload))
:retry-counter retry-counter}))))))))

(defn- retry-type []
(-> (ziggurat-config) :retry :type))
Expand Down
2 changes: 1 addition & 1 deletion test/ziggurat/messaging/channel_pool_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

(deftest create-object-pool-config-test
(testing "it should create a PoolConfig with default values"
(let [expected-config {:min-idle 10 :max-idle 20 :max-total 54 :max-wait-ms 5000}
(let [expected-config {:min-idle 10 :max-idle 44 :max-total 54 :max-wait-ms 5000}
pool-config-object ^GenericObjectPoolConfig (cpool/create-object-pool-config {})
min-idle (.getMinIdle pool-config-object)
max-idle (.getMaxIdle pool-config-object)
Expand Down
42 changes: 36 additions & 6 deletions test/ziggurat/messaging/producer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@
(is (true? @publish-called?))))))

(deftest publish-behaviour-on-rabbitmq-disconnection-test
(testing "producer/publish tries to publish again if IOException is thrown"
(testing "producer/publish tries to publish again if IOException is thrown via lb/publish"
(let [publish-called (atom 0)]
(with-redefs [lch/open (fn [_] (reify Channel (close [_] nil)))
lb/publish (fn [_ _ _ _ _]
Expand All @@ -528,7 +528,7 @@
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 10 @publish-called)))))
(testing "publish/producer tries to publish again if already closed exception is received"
(testing "publish/producer tries to publish again if already closed exception is received via lb/publish"
(let [publish-called (atom 0)]
(with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
lb/publish (fn [_ _ _ _ _]
Expand All @@ -538,7 +538,7 @@
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 10 @publish-called)))))
(testing "publish/producer tries to publish again if TimeoutException is received"
(testing "publish/producer tries to publish again if TimeoutException is received via lb/publish"
(let [publish-called (atom 0)]
(with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
lb/publish (fn [_ _ _ _ _]
Expand All @@ -548,12 +548,42 @@
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 10 @publish-called)))))
(testing "producer/publish tries again the number of times defined in the config if the exception thrown is non recoverable and if retry is enabled"
(testing "producer/publish tries to publish again if IOException is thrown while borrowing from channel"
(let [borrow-from-pool-called (atom 0)]
(with-redefs [lch/open (fn [_] (reify Channel (close [_] nil)))
producer/borrow-from-pool (fn [_]
(when (< @borrow-from-pool-called 10)
(swap! borrow-from-pool-called inc)
(throw (IOException. "io exception"))))
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 10 @borrow-from-pool-called)))))
(testing "publish/producer tries to publish again if already closed exception is received while borrowing from channel"
(let [borrow-from-pool-called (atom 0)]
(with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
producer/borrow-from-pool (fn [_]
(when (< @borrow-from-pool-called 10)
(swap! borrow-from-pool-called inc)
(throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil)))))
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 10 @borrow-from-pool-called)))))
(testing "publish/producer tries to publish again if TimeoutException is received while borrowing from channel"
(let [borrow-from-pool-called (atom 0)]
(with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
producer/borrow-from-pool (fn [_]
(when (< @borrow-from-pool-called 10)
(swap! borrow-from-pool-called inc)
(throw (TimeoutException. "timeout"))))
metrics/increment-count (fn [_ _ _] nil)]
(producer/publish "random-exchange" {:topic-entity "hello"} 12345)
(is (= 10 @borrow-from-pool-called)))))
(testing "producer/publish retries publishing for a certain number of times (configurable) when a non-recoverable exception is thrown"
(let [publish-called (atom 0)
config (config/ziggurat-config)
count 3
config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled true
:sleep 1
:back-off-ms 1
:count count}))]
(with-redefs [config/ziggurat-config (fn [] config)
lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
Expand All @@ -570,7 +600,7 @@
config (config/ziggurat-config)
count 3
config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled false
:sleep 1
:back-off-ms 1
:count count}))]
(with-redefs [config/ziggurat-config (fn [] config)
lch/open (fn [^Connection _] (reify Channel (close [_] nil)))
Expand Down

0 comments on commit b10d80b

Please sign in to comment.