diff --git a/CHANGELOG.md b/CHANGELOG.md index 72669cc5..ed666bb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,18 @@ All notable changes to this project will be documented in this file. This change ## Unreleased Changes +## 3.2.0 - 2020-01-09 +- Changes Exponential backoff config contract. + - Adds a `:type` key to retry-config + - Adds a limit on the number of retries possible in exponential backoff + - Releasing exponential backoff as an alpha feature +- Fixes [issue](https://github.com/gojek/ziggurat/issues/136) where dead-set replay doesn't send the message to the retry-flow +- Fixes [issue](https://github.com/gojek/ziggurat/issues/129) by updating tools.nrepl dependency to nrepl/nrepl +- Fixes [this bug](https://github.com/gojek/ziggurat/issues/133) where dead set replay broke on Ziggurat upgrade from 2.x to 3.x . +- Fixes [this bug](https://github.com/gojek/ziggurat/issues/115) in RabbitMQ message processing flow +- Adds support for exponential backoffs in channels and normal retry flow +- exponential backoffs can be enabled from the config + ## 3.2.0-alpha.5 - 2019-12-17 - Fixes [issue](https://github.com/gojek/ziggurat/issues/136) where dead-set replay doesn't send the message to the retry-flow diff --git a/README.md b/README.md index 6135326a..b2082a95 100644 --- a/README.md +++ b/README.md @@ -338,6 +338,40 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur * jobs - The number of consumers that should be reading from the retry queues and the prefetch count of each consumer * http-server - Ziggurat starts an http server by default and gives apis for ping health-check and deadset management. This defines the port and the number of threads of the http server. +## Alpha (Experimental) Features +The contract and interface for experimental features in Ziggurat can be changed as we iterate towards better designs for that feature. +For all purposes these features should be considered unstable and should only be used after understanding their risks and implementations. + +### Exponential Backoff based Retries +In addition to linear retries, Ziggurat users can now use exponential backoff strategy for retries. This means that the message +timeouts after every retry increase by a factor of 2. So, if your configured timeout is 100ms the backoffs will have timeouts as +`200, 300, 700, 1500 ..`. These timeouts are calculated using the formula `(queue-timeout-ms * ((2**exponent) - 1))` where `exponent` falls in this range `[1,(min 25, configured-retry-count)]`. + +The number of retries possible in this case are capped at 25. The number of queues created in the RabbitMQ are equal to the configured-retry-count or 25, whichever is smaller. + +Exponential retries can be configured as described below. + +```$xslt +:ziggurat {:stream-router {:default {:application-id "application_name"...}}} + :retry {:type [:exponential :keyword] + :count [10 :int] + :enable [true :bool]} + +``` + +Exponential retries can be configured for channels too. Additionally, a user can specify a custom `queue-timeout-ms` value per channel. +Timeouts for exponential backoffs are calculated using `queue-timeout-ms`. This implies that each channel can have separate count of retries +and different timeout values. + +```$xslt +:ziggurat {:stream-router {:default {:application-id "application_name"... + :channels {:channel-1 ..... + :retry {:type [:exponential :keyword] + :count [10 :int] + :queue-timeout-ms 2000 + :enable [true :bool]}}}}} +``` + ## Contribution - For dev setup and contributions please refer to CONTRIBUTING.md diff --git a/docker-compose.yml b/docker-compose.yml index 020b1f3f..7ee678e6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,9 +2,10 @@ version: '3.3' services: rabbitmq: - image: 'rabbitmq' + image: 'rabbitmq:3.8.2-management-alpine' ports: - '5672:5672' + - '15672:15672' container_name: 'ziggurat_rabbitmq' zookeeper: image: 'bitnami/zookeeper:latest' diff --git a/project.clj b/project.clj index 0449a57c..7d12c403 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject tech.gojek/ziggurat "3.2.0-alpha.5" +(defproject tech.gojek/ziggurat "3.2.0" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0" diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn index 6101345d..586c0699 100644 --- a/resources/config.test.ci.edn +++ b/resources/config.test.ci.edn @@ -24,7 +24,8 @@ :exchange-name "application_name_instant_exchange_test"} :dead-letter {:queue-name "application_name_dead_letter_queue_test" :exchange-name "application_name_dead_letter_exchange_test"}} - :retry {:count [5 :int] + :retry {:type [:linear :keyword] + :count [5 :int] :enabled [true :bool]} :http-server {:port [8010 :int] :thread-count [100 :int]} @@ -34,7 +35,8 @@ :origin-topic "topic" :upgrade-from "1.1" :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] + :retry {:type [:linear :keyword] + :count [5 :int] :enabled [true :bool]}}} :producer {:bootstrap-servers "localhost:9092" :acks "all" diff --git a/resources/config.test.edn b/resources/config.test.edn index 2b063bb0..a19eeae0 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -25,6 +25,7 @@ :dead-letter {:queue-name "application_name_dead_letter_queue_test" :exchange-name "application_name_dead_letter_exchange_test"}} :retry {:count [5 :int] + :type [:linear :keyword] :enabled [true :bool]} :http-server {:port [8010 :int] :thread-count [100 :int]} @@ -34,7 +35,8 @@ :origin-topic "topic" :upgrade-from "1.1" :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] + :retry {:type [:linear :keyword] + :count [5 :int] :enabled [true :bool]}}} :producer {:bootstrap-servers "localhost:9092" :acks "all" diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 401120d9..c22f8b3b 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -12,6 +12,8 @@ [ziggurat.retry :refer [with-retry]] [ziggurat.sentry :refer [sentry-reporter]])) +(def MAX_EXPONENTIAL_RETRIES 25) + (defn delay-queue-name [topic-entity queue-name] (prefixed-queue-name topic-entity queue-name)) @@ -72,73 +74,79 @@ (sentry/report-error sentry-reporter e "Pushing message to rabbitmq failed, data: " message-payload))))) +(defn- retry-type [] + (-> (ziggurat-config) :retry :type)) + +(defn- channel-retries-enabled [topic-entity channel] + (:enabled (channel-retry-config topic-entity channel))) + +(defn- channel-retry-type [topic-entity channel] + (:type (channel-retry-config topic-entity channel))) + (defn- get-channel-retry-count [topic-entity channel] (:count (channel-retry-config topic-entity channel))) -(defn- get-channel-retry-queue-timeout-ms [topic-entity channel] - (:queue-timeout-ms (channel-retry-config topic-entity channel))) - -(defn- channel-retries-exponential-backoff-config [topic-entity channel] - "Get exponential backoff enabled for specific channel from config." - (:exponential-backoff (channel-retry-config topic-entity channel))) - -(defn- get-backoff-exponent [retry-count message-retry-count exponential-backoff-count] - "Calculates backoff exponent for the given message-retry-count (number of retries available for the message) and retry-count (number of max retries possible). Returns the min of calculated exponent and exponential-backoff-count" - (let [exponent (- retry-count message-retry-count) - exponential-backoff-count (or exponential-backoff-count 10)] - (if (< exponent exponential-backoff-count) - exponent - exponential-backoff-count))) - -(defn- get-exponential-backoff-timeout-ms [retry-count message-retry-count queue-timeout-ms exponential-backoff-count] - "Calculates the exponential timeout value from the number of max retries possible (retry-count), the number of retries available for a message (message-retry-count) and base timeout value (queue-timeout-ms). It uses this formula ((2^n)-1)*queue-timeout-ms, where n is the current message retry-count. - - Sample config to use exponential backoff: +(defn- get-channel-queue-timeout-or-default-timeout [topic-entity channel] + (let [channel-queue-timeout-ms (:queue-timeout-ms (channel-retry-config topic-entity channel)) + queue-timeout-ms (get-in (rabbitmq-config) [:delay :queue-timeout-ms])] + (or channel-queue-timeout-ms queue-timeout-ms))) + +(defn- get-backoff-exponent [retry-count message-retry-count] + "Calculates the exponent using the formula `retry-count` and `message-retry-count`, where `retry-count` is the total retries + possible and `message-retry-count` is the count of retries available for the message. + + Caps the value of `retry-count` to MAX_EXPONENTIAL_RETRIES. + + Returns 1, if `message-retry-count` is higher than `max(MAX_EXPONENTIAL_RETRIES, retry-count)`." + (let [exponent (- (min MAX_EXPONENTIAL_RETRIES retry-count) message-retry-count)] + (max 1 exponent))) + +(defn- get-exponential-backoff-timeout-ms "Calculates the exponential timeout value from the number of max retries possible (`retry-count`), + the number of retries available for a message (`message-retry-count`) and base timeout value (`queue-timeout-ms`). + It uses this formula `((2^n)-1)*queue-timeout-ms`, where `n` is the current message retry-count. + + Sample config to use exponential backoff: {:ziggurat {:retry {:enabled true - :count 5 - :exponential-backoff {:enabled true :count 10}}}} - + :count 5 + :type :exponential}}} + Sample config to use exponential backoff when using channel flow: {:ziggurat {:stream-router {topic-entity {:channels {channel {:retry {:count 5 :enabled true :queue-timeout-ms 1000 - :exponential-backoff {:enabled true :count 10}}}}}}}} - + :type :exponential}}}}}}} + _NOTE: Exponential backoff for channel retries is an experimental feature. It should not be used until released in a stable version._" - (let [exponential-backoff (get-backoff-exponent retry-count message-retry-count exponential-backoff-count)] + [retry-count message-retry-count queue-timeout-ms] + (let [exponential-backoff (get-backoff-exponent retry-count message-retry-count)] (int (* (dec (Math/pow 2 exponential-backoff)) queue-timeout-ms)))) (defn get-queue-timeout-ms [message-payload] - "Calculate queue timeout for delay queue. Use value from get-exponential-backoff-timeout-ms if exponential backoff enabled." + "Calculate queue timeout for delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled." (let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms) - exponential-backoff-config (-> (ziggurat-config) :retry :exponential-backoff) retry-count (-> (ziggurat-config) :retry :count) message-retry-count (:retry-count message-payload)] - (if (:enabled exponential-backoff-config) - (get-exponential-backoff-timeout-ms retry-count message-retry-count queue-timeout-ms (:count exponential-backoff-config)) + (if (= :exponential (-> (ziggurat-config) :retry :type)) + (get-exponential-backoff-timeout-ms retry-count message-retry-count queue-timeout-ms) queue-timeout-ms))) (defn get-channel-queue-timeout-ms [topic-entity channel message-payload] - "Calculate queue timeout for channel delay queue. Use value from get-exponential-backoff-timeout-ms if exponential backoff enabled." - (let [exponential-backoff-config (channel-retries-exponential-backoff-config topic-entity channel) - queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms) - channel-queue-timeout-ms (get-channel-retry-queue-timeout-ms topic-entity channel) - retry-count (-> (ziggurat-config) :retry :count) + "Calculate queue timeout for channel delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled." + (let [channel-queue-timeout-ms (get-channel-queue-timeout-or-default-timeout topic-entity channel) message-retry-count (:retry-count message-payload) channel-retry-count (get-channel-retry-count topic-entity channel)] - (if (:enabled exponential-backoff-config) - (get-exponential-backoff-timeout-ms (or channel-retry-count retry-count) message-retry-count (or channel-queue-timeout-ms queue-timeout-ms) (:count exponential-backoff-config)) - (or channel-queue-timeout-ms queue-timeout-ms)))) + (if (= :exponential (channel-retry-type topic-entity channel)) + (get-exponential-backoff-timeout-ms channel-retry-count message-retry-count channel-queue-timeout-ms) + channel-queue-timeout-ms))) (defn get-delay-exchange-name [topic-entity message-payload] "This function return delay exchange name for retry when using flow without channel. It will return exchange name with retry count as suffix if exponential backoff enabled." (let [{:keys [exchange-name]} (:delay (rabbitmq-config)) exchange-name (prefixed-queue-name topic-entity exchange-name) - exponential-backoff-config (-> (ziggurat-config) :retry :exponential-backoff) retry-count (-> (ziggurat-config) :retry :count)] - (if (:enabled exponential-backoff-config) + (if (= :exponential (-> (ziggurat-config) :retry :type)) (let [message-retry-count (:retry-count message-payload) - backoff-exponent (get-backoff-exponent retry-count message-retry-count (:count exponential-backoff-config))] + backoff-exponent (get-backoff-exponent retry-count message-retry-count)] (prefixed-queue-name exchange-name backoff-exponent)) exchange-name))) @@ -146,11 +154,10 @@ "This function return delay exchange name for retry when using channel flow. It will return exchange name with retry count as suffix if exponential backoff enabled." (let [{:keys [exchange-name]} (:delay (rabbitmq-config)) exchange-name (prefixed-channel-name topic-entity channel exchange-name) - exponential-backoff-config (channel-retries-exponential-backoff-config topic-entity channel) channel-retry-count (get-channel-retry-count topic-entity channel)] - (if (:enabled exponential-backoff-config) + (if (= :exponential (channel-retry-type topic-entity channel)) (let [message-retry-count (:retry-count message-payload) - exponential-backoff (get-backoff-exponent channel-retry-count message-retry-count (:count exponential-backoff-config))] + exponential-backoff (get-backoff-exponent channel-retry-count message-retry-count)] (str (name exchange-name) "_" exponential-backoff)) exchange-name))) @@ -172,9 +179,6 @@ exchange-name (prefixed-queue-name topic-entity exchange-name)] (publish exchange-name message-payload))) -(defn- channel-retries-enabled [topic-entity channel] - (-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :enabled)) - (defn publish-to-channel-delay-queue [channel message-payload] (let [topic-entity (:topic-entity message-payload) exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload) @@ -214,17 +218,17 @@ dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange)] (create-and-bind-queue queue-name exchange-name dead-letter-exchange-name))) -(defn- make-delay-queue-with-retry-count [topic-entity retry-count exponential-backoff-count] +(defn- make-delay-queue-with-retry-count [topic-entity retry-count] (let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config)) queue-name (delay-queue-name topic-entity queue-name) exchange-name (prefixed-queue-name topic-entity exchange-name) dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange) - sequence (if (<= retry-count exponential-backoff-count) (inc retry-count) (inc exponential-backoff-count))] + sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))] (doseq [s (range 1 sequence)] (create-and-bind-queue (prefixed-queue-name queue-name s) (prefixed-queue-name exchange-name s) dead-letter-exchange-name)))) -(defn- make-channel-delay-queue-with-retry-count [topic-entity channel retry-count exponential-backoff-count] - (make-delay-queue-with-retry-count (with-channel-name topic-entity channel) retry-count exponential-backoff-count)) +(defn- make-channel-delay-queue-with-retry-count [topic-entity channel retry-count] + (make-delay-queue-with-retry-count (with-channel-name topic-entity channel) retry-count)) (defn- make-channel-delay-queue [topic-entity channel] (make-delay-queue (with-channel-name topic-entity channel))) @@ -243,20 +247,45 @@ (make-channel-queue topic-entity channel :instant) (when (channel-retries-enabled topic-entity channel) (make-channel-queue topic-entity channel :dead-letter) - (let [exponential-backoff-config (channel-retries-exponential-backoff-config topic-entity channel)] - (if (:enabled exponential-backoff-config) - (make-channel-delay-queue-with-retry-count topic-entity channel (get-channel-retry-count topic-entity channel) (:count exponential-backoff-config)) - (make-channel-delay-queue topic-entity channel)))))) + (let [channel-retry-type (channel-retry-type topic-entity channel)] + (cond + (= :exponential channel-retry-type) (do + (log/warn "[Alpha Feature]: Exponential backoff based retries is an alpha feature." + "Please use it only after understanding its risks and implications." + "Its contract can change in the future releases of Ziggurat.") + (make-channel-delay-queue-with-retry-count topic-entity channel (get-channel-retry-count topic-entity channel))) + (= :linear channel-retry-type) (make-channel-delay-queue topic-entity channel) + (nil? channel-retry-type) (do + (log/warn "[Deprecation Notice]: Please note that the configuration for channel retries has changed." + "Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide" + "Use :type to specify the type of retry mechanism in the channel config.") + (make-channel-delay-queue topic-entity channel)) + :else (do + (log/warn "Incorrect keyword for type passed, falling back to linear backoff for channel: " channel) + (make-channel-delay-queue topic-entity channel))))))) (defn make-queues [stream-routes] (when (is-connection-required?) (doseq [topic-entity (keys stream-routes)] - (let [channels (get-channel-names stream-routes topic-entity)] + (let [channels (get-channel-names stream-routes topic-entity) + retry-type (retry-type)] (make-channel-queues channels topic-entity) (when (-> (ziggurat-config) :retry :enabled) (make-queue topic-entity :instant) (make-queue topic-entity :dead-letter) - (if (-> (ziggurat-config) :retry :exponential-backoff :enabled) - (let [exponential-backoff-count (-> (ziggurat-config) :retry :exponential-backoff :count)] - (make-delay-queue-with-retry-count topic-entity (-> (ziggurat-config) :retry :count) exponential-backoff-count)) - (make-delay-queue topic-entity))))))) + (cond + (= :exponential retry-type) (do + (log/warn "[Alpha Feature]: Exponential backoff based retries is an alpha feature." + "Please use it only after understanding its risks and implications." + "Its contract can change in the future releases of Ziggurat.") + (make-delay-queue-with-retry-count topic-entity (-> (ziggurat-config) :retry :count))) + (= :linear retry-type) (make-delay-queue topic-entity) + (nil? retry-type) (do + (log/warn "[Deprecation Notice]: Please note that the configuration for retries has changed." + "Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide" + "Use :type to specify the type of retry mechanism in the config.") + (make-delay-queue topic-entity)) + :else (do + (log/warn "Incorrect keyword for type passed, falling back to linear backoff for topic Entity: " topic-entity) + (make-delay-queue topic-entity)))))))) + diff --git a/test/ziggurat/messaging/consumer_test.clj b/test/ziggurat/messaging/consumer_test.clj index d4520847..4fc2c46f 100644 --- a/test/ziggurat/messaging/consumer_test.clj +++ b/test/ziggurat/messaging/consumer_test.clj @@ -220,7 +220,7 @@ (util/close ch))))) (deftest start-channels-subscriber-test - (testing "the mapper-fn for channel subscriber should be retried until return success when retry is enabled to for that channel" + (testing "the mapper-fn for channel subscriber should be retried until return success when retry is enabled for that channel" (let [retry-counter (atom 0) call-counter (atom 0) success-promise (promise) diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index 6fdbb8a3..72aba652 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -56,6 +56,26 @@ (let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)] (is (= expected-message-payload message-from-mq)))))) + (testing "message in channel will be retried in delay queue with suffix 1 if message retry-count exceeds retry count in channel config" + (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) + :stream-router + {:default + {:channels + {:exponential-retry + {:retry {:count 5 + :enabled true + :type :exponential + :queue-timeout-ms 1000}}}}}))] + (fix/with-queues + {:default {:handler-fn #(constantly nil) + :exponential-retry #(constantly nil)}} + (let [channel :exponential-retry + retry-message-payload (assoc message-payload :retry-count 10) + expected-message-payload (assoc message-payload :retry-count 9) + _ (producer/retry-for-channel retry-message-payload channel) + message-from-mq (rmq/get-message-from-channel-retry-queue topic-entity channel 1)] + (is (= expected-message-payload message-from-mq)))))) + (testing "message in channel will be retried with linear queue timeout" (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) :stream-router {:default {:channels {:linear-retry {:retry {:count 5 @@ -77,13 +97,12 @@ (let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)] (is (= expected-message-payload message-from-mq))))))) - (testing "message in channel will be retried with exponential timeout calculated fro channel specific queue-timeout-ms value" + (testing "message in channel will be retried with exponential timeout calculated from channel specific queue-timeout-ms value" (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) :stream-router {:default {:channels {:exponential-retry {:retry {:count 5 :enabled true - :queue-timeout-ms 1000 - :exponential-backoff {:enabled true - :count 10}}}}}}))] + :type :exponential + :queue-timeout-ms 1000}}}}}))] (fix/with-queues {:default {:handler-fn #(constantly nil) :exponential-retry #(constantly nil)}} @@ -93,28 +112,6 @@ retry-message-payload (assoc message-payload :retry-count @retry-count) expected-message-payload (assoc message-payload :retry-count channel-retry-count)] (producer/retry-for-channel retry-message-payload channel) - (while (> @retry-count 0) - (swap! retry-count dec) - (let [message-from-mq (rmq/get-message-from-channel-retry-queue topic-entity channel (- 5 @retry-count))] - (producer/retry-for-channel message-from-mq channel))) - (let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)] - (is (= expected-message-payload message-from-mq))))))) - - (testing "message in channel will be retried with exponential timeout calculated from queue-timeout-ms value" - (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) - :stream-router {:default {:channels {:channel-exponential-retry {:retry {:count 5 - :enabled true - :exponential-backoff {:enabled true - :count 10}}}}}}))] - (fix/with-queues - {:default {:handler-fn #(constantly nil) - :channel-exponential-retry #(constantly nil)}} - (let [retry-count (atom 5) - channel :channel-exponential-retry - channel-retry-count (:count (channel-retry-config topic-entity channel)) - retry-message-payload (assoc message-payload :retry-count @retry-count) - expected-message-payload (assoc message-payload :retry-count @retry-count)] - (producer/retry-for-channel retry-message-payload channel) (while (> @retry-count 0) (swap! retry-count dec) (let [message-from-mq (rmq/get-message-from-channel-retry-queue topic-entity channel (- 5 @retry-count))] @@ -208,13 +205,30 @@ (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] (producer/retry message-from-mq))) (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] - (is (= expected-message-payload message-from-mq))))))) + (is (= expected-message-payload message-from-mq)))))) + + (testing "[Backward Compatiblity] Messages will be retried even if retry type is not present in the config." + (with-redefs [ziggurat-config (constantly (update-in (ziggurat-config) [:retry] dissoc :type))] + (fix/with-queues + {:default {:handler-fn #(constantly nil)}} + (let [retry-count (atom 2) + retry-message-payload (assoc message-payload :retry-count @retry-count) + expected-message-payload (assoc message-payload :retry-count (retry-count-config))] + (producer/retry retry-message-payload) + (while (> @retry-count 0) + (swap! retry-count dec) + (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] + (producer/retry message-from-mq))) + (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] + (is (= expected-message-payload message-from-mq)))))))) (deftest retry-with-exponential-backoff-test (testing "message will publish to delay with retry count queue when exponential backoff enabled" (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) - :retry {:count 5 :enabled true :exponential-backoff {:enabled true :count 10}}))] - (testing "message with no retry count will publish to delay with retry count queue" + :retry {:count 5 + :enabled true + :type :exponential}))] + (testing "message with no retry count will publish to delay queue with suffix 1" (fix/with-queues {:default {:handler-fn #(constantly nil)}} (let [expected-message (assoc message-payload :retry-count 4)] @@ -238,13 +252,23 @@ expected-message (assoc message-payload :retry-count 0)] (producer/retry retry-message-payload) (let [message-from-mq (rmq/get-message-from-retry-queue "default" 5)] - (is (= message-from-mq expected-message))))))))) + (is (= message-from-mq expected-message)))))) + + (testing "message will be retried in delay queue with suffix 1 if message retry-count exceeds retry count in config" + (fix/with-queues + {:default {:handler-fn #(constantly nil)}} + (let [retry-message-payload (assoc message-payload :retry-count 10) + expected-message-payload (assoc message-payload :retry-count 9)] + (producer/retry retry-message-payload) + (let [message-from-mq (rmq/get-message-from-retry-queue "default" 1)] + (is (= message-from-mq expected-message-payload))))))))) (deftest make-queues-test (let [ziggurat-config (ziggurat-config)] (testing "When retries are enabled" - (with-redefs [ziggurat-config (constantly (assoc ziggurat-config - :retry {:enabled true}))] + (with-redefs [config/ziggurat-config (constantly (assoc ziggurat-config + :retry {:enabled true + :type :linear}))] (testing "it does not create queues when stream-routes are not passed" (let [counter (atom 0)] (with-redefs [producer/create-and-bind-queue (fn @@ -308,7 +332,100 @@ (lq/delete ch dead-queue-name) (le/delete ch delay-exchange-name) (le/delete ch instant-exchange-name) - (le/delete ch dead-exchange-name)))))) + (le/delete ch dead-exchange-name)))) + (testing "it creates queues with suffixes in the range [1, retry-count] when exponential backoff is enabled" + (with-open [ch (lch/open connection)] + (let [stream-routes {:default {:handler-fn #(constantly :success)}} + retry-count (get-in ziggurat-config [:retry :count]) + instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config)))) + instant-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:instant (rabbitmq-config)))) + delay-queue-name (util/prefixed-queue-name "default" (:queue-name (:delay (rabbitmq-config)))) + delay-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:delay (rabbitmq-config)))) + dead-queue-name (util/prefixed-queue-name "default" (:queue-name (:dead-letter (rabbitmq-config)))) + dead-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:dead-letter (rabbitmq-config)))) + expected-queue-status {:message-count 0 :consumer-count 0} + exponential-delay-queue-name #(util/prefixed-queue-name delay-queue-name %) + exponential-delay-exchange-name #(util/prefixed-queue-name delay-exchange-name %)] + + (with-redefs [config/ziggurat-config (constantly (assoc-in ziggurat-config [:retry :type] :exponential))] + (producer/make-queues stream-routes) + + (is (= expected-queue-status (lq/status ch dead-queue-name))) + (is (= expected-queue-status (lq/status ch instant-queue-name))) + (lq/delete ch instant-queue-name) + (lq/delete ch dead-queue-name) + (le/delete ch instant-exchange-name) + (le/delete ch dead-exchange-name) + + ;; Verifying that delay queues with appropriate suffixes have been created + (doseq [s (range 1 (inc retry-count))] + (is (= expected-queue-status (lq/status ch (exponential-delay-queue-name s)))) + (lq/delete ch (exponential-delay-queue-name s)) + (le/delete ch (exponential-delay-exchange-name s))))))) + (testing "it creates queues with suffixes in the range [1, 25] when exponential backoff is enabled and retry-count is more than 25" + (with-open [ch (lch/open connection)] + (let [stream-routes {:default {:handler-fn #(constantly :success)}} + instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config)))) + instant-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:instant (rabbitmq-config)))) + delay-queue-name (util/prefixed-queue-name "default" (:queue-name (:delay (rabbitmq-config)))) + delay-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:delay (rabbitmq-config)))) + dead-queue-name (util/prefixed-queue-name "default" (:queue-name (:dead-letter (rabbitmq-config)))) + dead-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:dead-letter (rabbitmq-config)))) + expected-queue-status {:message-count 0 :consumer-count 0} + exponential-delay-queue-name #(util/prefixed-queue-name delay-queue-name %) + exponential-delay-exchange-name #(util/prefixed-queue-name delay-exchange-name %)] + + (with-redefs [config/ziggurat-config (constantly (-> ziggurat-config + (assoc-in [:retry :type] :exponential) + (assoc-in [:retry :count] 50)))] + (producer/make-queues stream-routes) + (is (= expected-queue-status (lq/status ch dead-queue-name))) + (is (= expected-queue-status (lq/status ch instant-queue-name))) + (lq/delete ch instant-queue-name) + (lq/delete ch dead-queue-name) + (le/delete ch instant-exchange-name) + (le/delete ch dead-exchange-name) + ;; Verifying that delay queues with appropriate suffixes have been created + (doseq [s (range 1 25)] + (is (= expected-queue-status (lq/status ch (exponential-delay-queue-name s)))) + (lq/delete ch (exponential-delay-queue-name s)) + (le/delete ch (exponential-delay-exchange-name s))))))) + (testing "it creates delay queue for linear retries when retry type is not defined in the config" + (let [make-delay-queue-called (atom false) + stream-routes {:default {:handler-fn #(constantly nil)}}] + (with-redefs [config/ziggurat-config (constantly (update-in ziggurat-config [:retry] dissoc :type)) + producer/make-queue (constantly nil) + producer/make-delay-queue (fn [topic] + (if (= topic :test) + (reset! make-delay-queue-called true)))] + (producer/make-queues stream-routes)))) + (testing "it creates delay queue for linear retries when retry type is incorrectly defined in the config" + (let [make-delay-queue-called (atom false) + stream-routes {:default {:handler-fn #(constantly nil)}}] + (with-redefs [config/ziggurat-config (constantly (assoc-in ziggurat-config [:retry :type] :incorrect)) + producer/make-queue (constantly nil) + producer/make-delay-queue (fn [topic] + (if (= topic :test) + (reset! make-delay-queue-called true)))] + (producer/make-queues stream-routes)))) + (testing "it creates channel delay queue for linear retries when retry type is not defined in the channel config" + (let [make-channel-delay-queue-called (atom false) + stream-routes {:default {:handler-fn #(constantly nil) :channel-1 {:handler-fn #(constantly nil)}}}] + (with-redefs [config/ziggurat-config (constantly (update-in ziggurat-config [:stream-router :default :channels :channel-1 :retry] dissoc :type)) + producer/make-channel-queue (constantly nil) + producer/make-channel-delay-queue (fn [topic channel] + (if (and (= channel :channel-1) (= topic :default)) + (reset! make-channel-delay-queue-called true)))] + (producer/make-queues stream-routes)))) + (testing "it creates channel delay queue for linear retries when an incorrect retry type is defined in the channel config" + (let [make-channel-delay-queue-called (atom false) + stream-routes {:default {:handler-fn #(constantly nil) :channel-1 {:handler-fn #(constantly nil)}}}] + (with-redefs [config/ziggurat-config (constantly (assoc-in ziggurat-config [:stream-router :default :channels :channel-1 :retry :type] :incorrect)) + producer/make-channel-queue (constantly nil) + producer/make-channel-delay-queue (fn [topic channel] + (if (and (= channel :channel-1) (= topic :default)) + (reset! make-channel-delay-queue-called true)))] + (producer/make-queues stream-routes)))))) (testing "when retries are disabled" (with-redefs [config/ziggurat-config (constantly (assoc ziggurat-config @@ -431,33 +548,36 @@ :enabled true :queue-timeout-ms 2000}}}}}))] (is (= 2000 (producer/get-channel-queue-timeout-ms topic-entity channel message)))))) - (testing "when exponential backoff are enabled and channel retry count not defined" - (let [channel :channel-no-retry-count] - (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :stream-router {topic-entity {:channels {channel {:retry {:enabled true - :exponential-backoff {:enabled true :count 10}}}}}}))] - (is (= 700 (producer/get-channel-queue-timeout-ms topic-entity channel message)))))) (testing "when exponential backoff are enabled and channel queue timeout defined" (let [channel :exponential-retry] (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) :stream-router {topic-entity {:channels {channel {:retry {:count 5 :enabled true - :queue-timeout-ms 1000 - :exponential-backoff {:enabled true :count 10}}}}}}))] + :type :exponential + :queue-timeout-ms 1000}}}}}))] (is (= 7000 (producer/get-channel-queue-timeout-ms topic-entity channel message)))))) - (testing "when exponential backoff are enabled and channel queue timeout not defined" - (let [channel :channel-exponential-retry] + + (testing "when exponential backoff are enabled and channel queue timeout is not defined" + (let [channel :exponential-retry] (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) :stream-router {topic-entity {:channels {channel {:retry {:count 5 :enabled true - :exponential-backoff {:enabled true}}}}}}))] + :type :exponential}}}}}))] (is (= 700 (producer/get-channel-queue-timeout-ms topic-entity channel message)))))))) (deftest get-queue-timeout-ms-test - (testing "when retries are enabled" + (testing "when exponential retries are enabled" (let [message (assoc message-payload :retry-count 2)] (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) :retry {:enabled true :count 5 - :exponential-backoff {:enabled true :count 10}}))] - (is (= 700 (producer/get-queue-timeout-ms message))))))) + :type :exponential}))] + (is (= 700 (producer/get-queue-timeout-ms message)))))) + (testing "when exponential retries are enabled and retry-count exceeds 25, the max possible timeouts are calculated using 25 as the retry-count" + (let [message (assoc message-payload :retry-count 20)] + (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) + :retry {:enabled true + :count 50 + :type :exponential}))] + ;; For 25 max exponential retries, exponent comes to 25-20=5, which makes timeout = 100*(2^5-1) = 3100 + (is (= 3100 (producer/get-queue-timeout-ms message)))))))