Skip to content

Commit

Permalink
Merge 7dc805c into c4e7e54
Browse files Browse the repository at this point in the history
  • Loading branch information
anmol1vw13 committed May 19, 2022
2 parents c4e7e54 + 7dc805c commit 6107fe4
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 45 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,26 @@ Ziggurat Config | Default Value | Description | Mandatory?
:address-resolver [:dns :keyword] ;;possible values [:dns :ip-list]. Default is :dns
:channel-pool {:max-wait-ms [5000 :int]
:min-idle [10 :int]
:max-idle [20 :int]}}}}}
:max-idle [20 :int]}
:publish-retry {:back-off-ms 5000
:non-recoverable-exception {:enabled true
:back-off-ms 1000
:count 5}}}}}}
```

- `:hosts` is a comma separated values of RabbitMQ hostnames (dns-names OR IPs).
- `:port` specifies the port number on which the RabbitMQ nodes are running.
- `:channel-pool` specifies the properties for the RabbitMQ channel pool used for publishing
- `:address-resolver` specifies the strategy to figure out RabbitMQ hosts IP addresses. `:dns` is the default and shoud
be used when `:hosts` specifies a DNS address. `:ip-list` should be used when comma separated IPs are provided.
- `:publish-retry` defines the config for recoverable and non-recoverable exceptions.
- Recoverable exceptions
- `:back-off-ms` - defines the time period after which a retry should happen
- Non-recoverable exceptions
- `:enabled` - defines whether retries should happen
- `:back-off-ms` - defines the time period after which a retry should happen
- `:count` - defines the number of retries
- By default, your queues and exchanges are replicated across (n+1)/2 nodes in the cluster

## Exponential Backoff based Retries

Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "4.6.4"
(defproject tech.gojek/ziggurat "4.7.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
7 changes: 6 additions & 1 deletion resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
:username "guest"
:password "guest"
:channel-timeout [2000 :int]
:address-resolver [:dns :keyword]} ;;possible values [:dns :ip-list]

:address-resolver [:dns :keyword] ;;possible values [:dns :ip-list]
:publish-retry {:back-off-ms [1 :int]
:non-recoverable-exception {:enabled [true :bool]
:back-off-ms [1 :int]
:count [5 :int]}}}
:jobs {:instant {:worker-count [4 :int]
:prefetch-count [4 :int]}}
:rabbit-mq {:delay {:queue-name "application_name_delay_queue_test"
Expand Down
6 changes: 5 additions & 1 deletion src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
:rabbit-mq-connection {:port 5672
:username "guest"
:password "guest"
:channel-timeout 2000}
:channel-timeout 2000
:publish-retry {:back-off-ms 5000
:non-recoverable-exception {:enabled true
:back-off-ms 5000
:count 5}}}
:jobs {:instant {:worker-count 4
:prefetch-count 4}}
:rabbit-mq {:delay {:queue-name "%s_delay_queue"
Expand Down
11 changes: 9 additions & 2 deletions src/ziggurat/messaging/channel_pool.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
(defn create-object-pool-config [config]
(let [standby-size 10
total-thread-count (calc-total-thread-count)
merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle (* standby-size 2)} config)]
merged-config (merge {:max-wait-ms 5000 :min-idle standby-size :max-idle total-thread-count} config)]
(doto (GenericObjectPoolConfig.)
(.setMaxWait (Duration/ofMillis (:max-wait-ms merged-config)))
(.setMinIdle (:min-idle merged-config))
Expand All @@ -35,10 +35,15 @@
(.setJmxNamePrefix "zig-rabbitmq-ch-pool"))))

(defn create-channel-pool [^Connection connection]
(let [pool-config (create-object-pool-config (get-in (ziggurat-config) [:rabbit-mq-connection :channel-pool]))
(let [pool-config (create-object-pool-config
(get-in (ziggurat-config)
[:rabbit-mq-connection :channel-pool]))
rmq-chan-pool (GenericObjectPool. (RabbitMQChannelFactory. connection) pool-config)]
rmq-chan-pool))

(defn is-pool-alive? [channel-pool]
(= (type channel-pool) GenericObjectPool))

(defn destroy-channel-pool [channel-pool]
(.close channel-pool))

Expand All @@ -49,3 +54,5 @@
(create-channel-pool c/producer-connection))
:stop (do (log/info "Stopping channel pool")
(destroy-channel-pool channel-pool)))


2 changes: 0 additions & 2 deletions src/ziggurat/messaging/connection.clj
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@
(log/info "Closing the RabbitMQ connection")
(rmq/close conn)))

(declare consumer-connection)
(defstate consumer-connection
:start (do (log/info "Creating consumer connection")
(start-connection false))
:stop (do (log/info "Stopping consume connection")
(stop-connection consumer-connection)))

(declare producer-connection)
(defstate producer-connection
:start (do (log/info "Creating producer connection")
(start-connection true))
Expand Down
85 changes: 61 additions & 24 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
[langohr.exchange :as le]
[langohr.http :as lh]
[langohr.queue :as lq]
[ziggurat.messaging.channel_pool :as cpool]
[ziggurat.messaging.channel_pool :as cpool :refer [is-pool-alive?]]
[taoensso.nippy :as nippy]
[ziggurat.config :refer [config ziggurat-config rabbitmq-config channel-retry-config]]
[ziggurat.messaging.connection :refer [producer-connection is-connection-required?]]
[ziggurat.messaging.util :as util]
[ziggurat.metrics :as metrics])
(:import (com.rabbitmq.client AlreadyClosedException Channel)
(java.io IOException)
(org.apache.commons.pool2.impl GenericObjectPool)))
(org.apache.commons.pool2.impl GenericObjectPool)
(java.util.concurrent TimeoutException)))

(def MAX_EXPONENTIAL_RETRIES 25)

Expand Down Expand Up @@ -64,44 +65,80 @@
props)))

(defn- handle-network-exception
[e message-payload]
(log/error e "Exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))})
true)
[e message-payload retry-counter]
(log/error e "Network exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))
:retry-attempt retry-counter})
:retry)

(defn return-to-pool [^GenericObjectPool pool ^Channel ch]
(.returnObject pool ch))

(defn borrow-from-pool [^GenericObjectPool pool]
(.borrowObject pool))

(defn- publish-internal
[exchange message-payload expiration]
[exchange message-payload expiration retry-counter]
(try
(let [ch (.borrowObject cpool/channel-pool)]
(let [ch (borrow-from-pool cpool/channel-pool)]
(try
(lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers))
(properties-for-publish expiration (:headers message-payload)))
false
(catch AlreadyClosedException e
(handle-network-exception e message-payload))
(catch IOException e
(handle-network-exception e message-payload))
(catch Exception e
(log/error e "Exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))})
false)
:success
(finally (return-to-pool cpool/channel-pool ch))))
(catch AlreadyClosedException e
(handle-network-exception e message-payload retry-counter))
(catch IOException e
(handle-network-exception e message-payload retry-counter))
(catch TimeoutException e
(handle-network-exception e message-payload retry-counter))
(catch Exception e
(log/error e "Exception occurred while borrowing a channel from the pool")
(metrics/increment-count ["rabbitmq" "publish" "channel_borrow"] {:topic-entity (name (:topic-entity message-payload))})
false)))
(log/error e "Exception was encountered while publishing to RabbitMQ")
(metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))
:retry-counter retry-counter})
:retry-with-counter)))

(defn- publish-retry-config []
(-> (ziggurat-config) :rabbit-mq-connection :publish-retry))

(defn- non-recoverable-exception-config []
(:non-recoverable-exception (publish-retry-config)))


(defn publish
"This is meant for publishing to rabbitmq.
* Checks if the pool is alive - We do this so that publish does not happen after the channel pool state is stopped.
* publish-internal returns multiple states
* :success - Message has been successfully produced to rabbitmq
* :retry - A retryable exception was encountered and message will be retried until it is successfully published.
* :retry-with-counter - A non recoverable exception is encountered, but the message will be retried for a few times. defined by the counter
{ :rabbit-mq-connection { :publish-retry { :non-recoverable-exception {:count}}}}}"
([exchange message-payload]
(publish exchange message-payload nil))
([exchange message-payload expiration]
(when (publish-internal exchange message-payload expiration)
(Thread/sleep 5000)
(log/info "Retrying publishing the message to " exchange)
(recur exchange message-payload expiration))))
(publish exchange message-payload expiration 0))
([exchange message-payload expiration retry-counter]
(when (is-pool-alive? cpool/channel-pool)
(let [result (publish-internal exchange message-payload expiration retry-counter)]
(when (pos? retry-counter)
(log/info "Retrying publishing the message to " exchange)
(log/info "Retry attempt " retry-counter))
(log/info "Publish result " result)
(cond
(= result :success) nil
(= result :retry) (do
(Thread/sleep (:back-off-ms (publish-retry-config)))
(recur exchange message-payload expiration (inc retry-counter)))
(= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config))
(< retry-counter (:count (non-recoverable-exception-config))))
(do
(log/info "Backing off")
(Thread/sleep (:back-off-ms (non-recoverable-exception-config)))
(recur exchange message-payload expiration (inc retry-counter)))
(do
(log/error "Publishing the message has failed. It is being dropped")
(metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload))
:retry-counter retry-counter}))))))))

(defn- retry-type []
(-> (ziggurat-config) :retry :type))
Expand Down
18 changes: 15 additions & 3 deletions test/ziggurat/messaging/channel_pool_test.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns ziggurat.messaging.channel-pool-test
(:require [clojure.test :refer :all]
[ziggurat.messaging.channel_pool :as cpool]
[ziggurat.messaging.channel_pool :as cpool :refer [channel-pool]]
[ziggurat.messaging.connection :refer [producer-connection]]
[ziggurat.fixtures :as fix])
(:import (org.apache.commons.pool2.impl GenericObjectPoolConfig GenericObjectPool)
Expand All @@ -18,7 +18,7 @@

(deftest create-object-pool-config-test
(testing "it should create a PoolConfig with default values"
(let [expected-config {:min-idle 10 :max-idle 20 :max-total 54 :max-wait-ms 5000}
(let [expected-config {:min-idle 10 :max-idle 44 :max-total 54 :max-wait-ms 5000}
pool-config-object ^GenericObjectPoolConfig (cpool/create-object-pool-config {})
min-idle (.getMinIdle pool-config-object)
max-idle (.getMaxIdle pool-config-object)
Expand Down Expand Up @@ -60,4 +60,16 @@
(is (not (.equals rmq-chan-2 rmq-chan)))
(is (.isOpen rmq-chan-2)))))


(deftest is-pool-alive?
(testing "it should return true when channel-pool state has started"
(let [states {#'producer-connection #'channel-pool}]
(mount.core/start states)
(is (cpool/is-pool-alive? channel-pool))
(mount.core/stop states)))
(testing "it should return false when channel-pool state has not been started"
(is (false? (cpool/is-pool-alive? channel-pool))))
(testing "it should return false when channel-pool state has been stopped"
(let [states {#'producer-connection #'channel-pool}]
(mount.core/start states)
(mount.core/stop states)
(is (false? (cpool/is-pool-alive? channel-pool))))))

0 comments on commit 6107fe4

Please sign in to comment.