diff --git a/project.clj b/project.clj index 71c96fcc..5aaae1ff 100644 --- a/project.clj +++ b/project.clj @@ -2,7 +2,7 @@ (cemerick.pomegranate.aether/register-wagon-factory! "http" #(org.apache.maven.wagon.providers.http.HttpWagon.)) -(defproject tech.gojek/ziggurat "4.7.4" +(defproject tech.gojek/ziggurat "4.7.5" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0" diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 61679521..8684170b 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -3,18 +3,18 @@ [langohr.basic :as lb] [langohr.channel :as lch] [langohr.exchange :as le] - [langohr.http :as lh] [langohr.queue :as lq] - [ziggurat.messaging.channel_pool :as cpool :refer [is-pool-alive?]] [taoensso.nippy :as nippy] - [ziggurat.config :refer [config ziggurat-config rabbitmq-config channel-retry-config]] - [ziggurat.messaging.connection :refer [producer-connection is-connection-required?]] + [ziggurat.config :refer [channel-retry-config rabbitmq-config ziggurat-config]] + [ziggurat.messaging.channel_pool :as cpool :refer [is-pool-alive?]] + [ziggurat.messaging.connection :refer [is-connection-required? producer-connection]] [ziggurat.messaging.util :as util] [ziggurat.metrics :as metrics]) (:import (com.rabbitmq.client AlreadyClosedException Channel) (java.io IOException) - (org.apache.commons.pool2.impl GenericObjectPool) - (java.util.concurrent TimeoutException))) + (java.time Instant) + (java.util.concurrent TimeoutException) + (org.apache.commons.pool2.impl GenericObjectPool))) (def MAX_EXPONENTIAL_RETRIES 25) @@ -67,8 +67,7 @@ (defn- handle-network-exception [e message-payload retry-counter] (log/error e "Network exception was encountered while publishing to RabbitMQ") - (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload)) - :retry-attempt retry-counter}) + (metrics/increment-count ["rabbitmq" "publish" "network"] "exception" {:topic-entity (name (:topic-entity message-payload))}) :retry) (defn return-to-pool [^GenericObjectPool pool ^Channel ch] @@ -94,8 +93,7 @@ (handle-network-exception e message-payload retry-counter)) (catch Exception e (log/error e "Exception was encountered while publishing to RabbitMQ") - (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload)) - :retry-counter retry-counter}) + (metrics/increment-count ["rabbitmq" "publish"] "exception" {:topic-entity (name (:topic-entity message-payload))}) :retry-with-counter))) (defn- publish-retry-config [] @@ -118,7 +116,12 @@ (publish exchange message-payload expiration 0)) ([exchange message-payload expiration retry-counter] (when (is-pool-alive? cpool/channel-pool) - (let [result (publish-internal exchange message-payload expiration retry-counter)] + (let [start-time (.toEpochMilli (Instant/now)) + result (publish-internal exchange message-payload expiration retry-counter) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + _ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name (:topic-entity message-payload)) + :exchange-name exchange})] (when (pos? retry-counter) (log/info "Retrying publishing the message to " exchange) (log/info "Retry attempt " retry-counter)) @@ -136,7 +139,7 @@ (recur exchange message-payload expiration (inc retry-counter))) (do (log/error "Publishing the message has failed. It is being dropped") - (metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload)) + (metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload)) :retry-counter retry-counter})))))))) (defn- retry-type [] diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index 1262af2e..c2c99ccb 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -42,14 +42,15 @@ (let [channel :channel-1 retry-count (atom (:count (channel-retry-config topic-entity channel))) expected-message-payload (assoc message-payload :retry-count @retry-count)] - (producer/retry-for-channel message-payload channel) - (while (> @retry-count 0) - (swap! retry-count dec) - (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)] - (is (= (get message-from-mq :retry-count 0) @retry-count)) - (producer/retry-for-channel message-from-mq channel))) - (let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)] - (is (= expected-message-payload message-from-mq)))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (producer/retry-for-channel message-payload channel) + (while (> @retry-count 0) + (swap! retry-count dec) + (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)] + (is (= (get message-from-mq :retry-count 0) @retry-count)) + (producer/retry-for-channel message-from-mq channel))) + (let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)] + (is (= expected-message-payload message-from-mq))))))) (testing "message in channel will be retried as defined in message retry-count when message has retry-count" (fix/with-queues @@ -61,25 +62,27 @@ retry-message-payload (assoc message-payload :retry-count @retry-count) expected-message-payload (assoc message-payload :retry-count channel-retry-count)] - (producer/retry-for-channel retry-message-payload channel) - (while (> @retry-count 0) - (swap! retry-count dec) - (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)] - (is (= (get message-from-mq :retry-count 0) @retry-count)) - (producer/retry-for-channel message-from-mq channel))) - (let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)] - (is (= expected-message-payload message-from-mq)))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (producer/retry-for-channel retry-message-payload channel) + (while (> @retry-count 0) + (swap! retry-count dec) + (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)] + (is (= (get message-from-mq :retry-count 0) @retry-count)) + (producer/retry-for-channel message-from-mq channel))) + (let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)] + (is (= expected-message-payload message-from-mq))))))) (testing "message in channel will be retried in delay queue with suffix 1 if message retry-count exceeds retry count in channel config" - (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) - :stream-router - {:default - {:channels - {:exponential-retry - {:retry {:count 5 - :enabled true - :type :exponential - :queue-timeout-ms 1000}}}}}))] + (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) + :stream-router + {:default + {:channels + {:exponential-retry + {:retry {:count 5 + :enabled true + :type :exponential + :queue-timeout-ms 1000}}}}})) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (fix/with-queues {:default {:handler-fn #(constantly nil) :exponential-retry #(constantly nil)}} @@ -91,10 +94,11 @@ (is (= expected-message-payload message-from-mq)))))) (testing "message in channel will be retried with linear queue timeout" - (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) - :stream-router {:default {:channels {:linear-retry {:retry {:count 5 - :enabled true - :queue-timeout-ms 2000}}}}}))] + (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) + :stream-router {:default {:channels {:linear-retry {:retry {:count 5 + :enabled true + :queue-timeout-ms 2000}}}}})) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (fix/with-queues {:default {:handler-fn #(constantly nil) :linear-retry #(constantly nil)}} @@ -113,11 +117,12 @@ (is (= expected-message-payload message-from-mq))))))) (testing "message in channel will be retried with exponential timeout calculated from channel specific queue-timeout-ms value" - (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) - :stream-router {:default {:channels {:exponential-retry {:retry {:count 5 - :enabled true - :type :exponential - :queue-timeout-ms 1000}}}}}))] + (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) + :stream-router {:default {:channels {:exponential-retry {:retry {:count 5 + :enabled true + :type :exponential + :queue-timeout-ms 1000}}}}})) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (fix/with-queues {:default {:handler-fn #(constantly nil) :exponential-retry #(constantly nil)}} @@ -141,26 +146,29 @@ {:default {:handler-fn #(constantly nil)}} (let [retry-message-payload (assoc message-payload :retry-count 5) expected-message-payload (update retry-message-payload :retry-count dec)] - (producer/retry retry-message-payload) - (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] - (is (= expected-message-payload message-from-mq)))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (producer/retry retry-message-payload) + (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] + (is (= expected-message-payload message-from-mq))))))) (testing "message with a retry count of 0 will publish to dead queue" (fix/with-queues {:default {:handler-fn #(constantly nil)}} (let [retry-message-payload (assoc message-payload :retry-count 0) expected-dead-set-message (assoc message-payload :retry-count (retry-count-config))] - (producer/retry retry-message-payload) - (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] - (is (= expected-dead-set-message message-from-mq)))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (producer/retry retry-message-payload) + (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] + (is (= expected-dead-set-message message-from-mq))))))) (testing "message with no retry count will publish to delay queue" (fix/with-queues {:default {:handler-fn #(constantly nil)}} - (let [expected-message (assoc message-payload :retry-count 4)] - (producer/retry message-payload) - (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] - (is (= message-from-mq expected-message)))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (let [expected-message (assoc message-payload :retry-count 4)] + (producer/retry message-payload) + (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] + (is (= message-from-mq expected-message))))))) (testing "publish to delay queue publishes with expiration from config" (fix/with-queues @@ -169,8 +177,9 @@ :persistent true :expiration (str (get-in (rabbitmq-config) [`:delay :queue-timeout-ms])) :headers {}}] - (with-redefs [lb/publish (fn [_ _ _ _ props] - (is (= expected-props props)))] + (with-redefs [lb/publish (fn [_ _ _ _ props] + (is (= expected-props props))) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (producer/publish-to-delay-queue message-payload))))) (testing "publish to delay queue publishes with parsed record headers" @@ -181,8 +190,9 @@ :persistent true :expiration (str (get-in (rabbitmq-config) [:delay :queue-timeout-ms])) :headers {"key" "value"}}] - (with-redefs [lb/publish (fn [_ _ _ _ props] - (is (= expected-props props)))] + (with-redefs [lb/publish (fn [_ _ _ _ props] + (is (= expected-props props))) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (producer/publish-to-delay-queue test-message-payload))))) (testing "message will be retried as defined in ziggurat config retry-count when message doesn't have retry-count" @@ -190,14 +200,15 @@ {:default {:handler-fn #(constantly nil)}} (let [retry-count (atom (retry-count-config)) expected-message-payload (assoc message-payload :retry-count (retry-count-config))] - (producer/retry message-payload) - (while (> @retry-count 0) - (swap! retry-count dec) - (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] - (is (= (get message-from-mq :retry-count 0) @retry-count)) - (producer/retry message-from-mq))) - (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] - (is (= expected-message-payload message-from-mq)))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (producer/retry message-payload) + (while (> @retry-count 0) + (swap! retry-count dec) + (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] + (is (= (get message-from-mq :retry-count 0) @retry-count)) + (producer/retry message-from-mq))) + (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] + (is (= expected-message-payload message-from-mq))))))) (testing "message will be retried as defined in message retry-count when message has retry-count" (fix/with-queues @@ -205,14 +216,15 @@ (let [retry-count (atom 2) retry-message-payload (assoc message-payload :retry-count @retry-count) expected-message-payload (assoc message-payload :retry-count (retry-count-config))] - (producer/retry retry-message-payload) - (while (> @retry-count 0) - (swap! retry-count dec) - (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] - (is (= (get message-from-mq :retry-count 0) @retry-count)) - (producer/retry message-from-mq))) - (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] - (is (= expected-message-payload message-from-mq)))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (producer/retry retry-message-payload) + (while (> @retry-count 0) + (swap! retry-count dec) + (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] + (is (= (get message-from-mq :retry-count 0) @retry-count)) + (producer/retry message-from-mq))) + (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] + (is (= expected-message-payload message-from-mq))))))) (testing "[Backward Compatiblity] Messages will be retried even if retry type is not present in the config." (with-redefs [ziggurat-config (constantly (update-in (ziggurat-config) [:retry] dissoc :type))] @@ -221,21 +233,23 @@ (let [retry-count (atom 2) retry-message-payload (assoc message-payload :retry-count @retry-count) expected-message-payload (assoc message-payload :retry-count (retry-count-config))] - (producer/retry retry-message-payload) - (while (> @retry-count 0) - (swap! retry-count dec) - (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] - (is (= (get message-from-mq :retry-count 0) @retry-count)) - (producer/retry message-from-mq))) - (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] - (is (= expected-message-payload message-from-mq)))))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (producer/retry retry-message-payload) + (while (> @retry-count 0) + (swap! retry-count dec) + (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] + (is (= (get message-from-mq :retry-count 0) @retry-count)) + (producer/retry message-from-mq))) + (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] + (is (= expected-message-payload message-from-mq))))))))) (deftest retry-with-exponential-backoff-test (testing "message will publish to delay with retry count queue when exponential backoff enabled" - (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) - :retry {:count 5 - :enabled true - :type :exponential}))] + (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) + :retry {:count 5 + :enabled true + :type :exponential})) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (testing "message with no retry count will publish to delay queue with suffix 1" (fix/with-queues {:default {:handler-fn #(constantly nil)}} @@ -249,27 +263,30 @@ {:default {:handler-fn #(constantly nil)}} (let [retry-message-payload (assoc message-payload :retry-count 4) expected-message (assoc message-payload :retry-count 3)] - (producer/retry retry-message-payload) - (let [message-from-mq (rmq/get-message-from-retry-queue "default" 2)] - (is (= message-from-mq expected-message)))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (producer/retry retry-message-payload) + (let [message-from-mq (rmq/get-message-from-retry-queue "default" 2)] + (is (= message-from-mq expected-message))))))) (testing "message with available retry counts as 1 will be published to delay queue with suffix 5" (fix/with-queues {:default {:handler-fn #(constantly nil)}} (let [retry-message-payload (assoc message-payload :retry-count 1) expected-message (assoc message-payload :retry-count 0)] - (producer/retry retry-message-payload) - (let [message-from-mq (rmq/get-message-from-retry-queue "default" 5)] - (is (= message-from-mq expected-message)))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (producer/retry retry-message-payload) + (let [message-from-mq (rmq/get-message-from-retry-queue "default" 5)] + (is (= message-from-mq expected-message))))))) (testing "message will be retried in delay queue with suffix 1 if message retry-count exceeds retry count in config" (fix/with-queues {:default {:handler-fn #(constantly nil)}} (let [retry-message-payload (assoc message-payload :retry-count 10) expected-message-payload (assoc message-payload :retry-count 9)] - (producer/retry retry-message-payload) - (let [message-from-mq (rmq/get-message-from-retry-queue "default" 1)] - (is (= message-from-mq expected-message-payload))))))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (producer/retry retry-message-payload) + (let [message-from-mq (rmq/get-message-from-retry-queue "default" 1)] + (is (= message-from-mq expected-message-payload)))))))))) (deftest make-queues-test (let [ziggurat-config (ziggurat-config)] @@ -520,106 +537,116 @@ (deftest publish-behaviour-on-rabbitmq-disconnection-test (testing "producer/publish tries to publish again if IOException is thrown via lb/publish" (let [publish-called (atom 0)] - (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) - lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 10) - (swap! publish-called inc) - (throw (IOException. "io exception")))) - metrics/increment-count (fn [_ _ _] nil)] + (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (IOException. "io exception")))) + metrics/increment-count (fn [_ _ _] nil) + metrics/multi-ns-report-histogram (fn [_ _ _] nil) + publish-behaviour-on-rabbitmq-disconnection-test] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @publish-called))))) (testing "publish/producer tries to publish again if already closed exception is received via lb/publish" (let [publish-called (atom 0)] - (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) - lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 10) - (swap! publish-called inc) - (throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil))))) - metrics/increment-count (fn [_ _ _] nil)] + (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil))))) + metrics/increment-count (fn [_ _ _] nil) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @publish-called))))) (testing "publish/producer tries to publish again if TimeoutException is received via lb/publish" (let [publish-called (atom 0)] - (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) - lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 10) - (swap! publish-called inc) - (throw (TimeoutException. "timeout")))) - metrics/increment-count (fn [_ _ _] nil)] + (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (TimeoutException. "timeout")))) + metrics/increment-count (fn [_ _ _] nil) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @publish-called))))) (testing "producer/publish tries to publish again if IOException is thrown while borrowing from channel" (let [borrow-from-pool-called (atom 0)] - (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) - producer/borrow-from-pool (fn [_] - (when (< @borrow-from-pool-called 10) - (swap! borrow-from-pool-called inc) - (throw (IOException. "io exception")))) - metrics/increment-count (fn [_ _ _] nil)] + (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (IOException. "io exception")))) + metrics/multi-ns-report-histogram (fn [_ _ _] nil) + metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @borrow-from-pool-called))))) (testing "publish/producer tries to publish again if already closed exception is received while borrowing from channel" (let [borrow-from-pool-called (atom 0)] - (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) - producer/borrow-from-pool (fn [_] - (when (< @borrow-from-pool-called 10) - (swap! borrow-from-pool-called inc) - (throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil))))) - metrics/increment-count (fn [_ _ _] nil)] + (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (AlreadyClosedException. (ShutdownSignalException. true true nil nil))))) + metrics/multi-ns-report-histogram (fn [_ _ _] nil) + metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @borrow-from-pool-called))))) (testing "publish/producer tries to publish again if TimeoutException is received while borrowing from channel" (let [borrow-from-pool-called (atom 0)] - (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) - producer/borrow-from-pool (fn [_] - (when (< @borrow-from-pool-called 10) - (swap! borrow-from-pool-called inc) - (throw (TimeoutException. "timeout")))) - metrics/increment-count (fn [_ _ _] nil)] + (with-redefs [lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + producer/borrow-from-pool (fn [_] + (when (< @borrow-from-pool-called 10) + (swap! borrow-from-pool-called inc) + (throw (TimeoutException. "timeout")))) + metrics/increment-count (fn [_ _ _] nil) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 10 @borrow-from-pool-called))))) (testing "producer/publish retries publishing for a certain number of times (configurable) when a non-recoverable exception is thrown" (let [publish-called (atom 0) - config (config/ziggurat-config) - count 3 - config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled true - :back-off-ms 1 - :count count}))] - (with-redefs [config/ziggurat-config (fn [] config) - lch/open (fn [^Connection _] (reify Channel (close [_] nil))) - lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 10) - (swap! publish-called inc) - (throw (Exception. "non-io exception")))) - metrics/increment-count (fn [_ _ _] nil)] + config (config/ziggurat-config) + count 3 + config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled true + :back-off-ms 1 + :count count}))] + (with-redefs [config/ziggurat-config (fn [] config) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (Exception. "non-io exception")))) + metrics/increment-count (fn [_ _ _] nil) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= (inc count) @publish-called))))) (testing "producer/publish does not retry again if the exception thrown is non recoverable and if retry is disabled" (let [publish-called (atom 0) - config (config/ziggurat-config) - count 3 - config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled false - :back-off-ms 1 - :count count}))] - (with-redefs [config/ziggurat-config (fn [] config) - lch/open (fn [^Connection _] (reify Channel (close [_] nil))) - lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 10) - (swap! publish-called inc) - (throw (Exception. "non-io exception")))) - metrics/increment-count (fn [_ _ _] nil)] + config (config/ziggurat-config) + count 3 + config (update-in config [:rabbit-mq-connection :publish-retry :non-recoverable-exception] (constantly {:enabled false + :back-off-ms 1 + :count count}))] + (with-redefs [config/ziggurat-config (fn [] config) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (Exception. "non-io exception")))) + metrics/increment-count (fn [_ _ _] nil) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 1 @publish-called))))) (testing "producer/publish does not publish even once if channel pool is not alive" (let [publish-called (atom 0)] - (with-redefs [cpool/is-pool-alive? (fn [_] false) - lch/open (fn [^Connection _] (reify Channel (close [_] nil))) - lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 10) - (swap! publish-called inc) - (throw (Exception. "non-io exception")))) - metrics/increment-count (fn [_ _ _] nil)] + (with-redefs [cpool/is-pool-alive? (fn [_] false) + lch/open (fn [^Connection _] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 10) + (swap! publish-called inc) + (throw (Exception. "non-io exception")))) + metrics/increment-count (fn [_ _ _] nil) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 0 @publish-called)))))) @@ -627,40 +654,43 @@ (testing "creates a span when tracer is enabled" (let [stream-routes {:default {:handler-fn #(constantly nil) :channel-1 #(constantly nil)}}] - (.reset tracer) - (fix/with-queues - stream-routes - (do - (producer/retry message-payload) - (let [finished-spans (.finishedSpans tracer)] - (is (= 1 (.size finished-spans))) - (is (= "send" (-> finished-spans - (.get 0) - (.operationName)))))))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (.reset tracer) + (fix/with-queues + stream-routes + (do + (producer/retry message-payload) + (let [finished-spans (.finishedSpans tracer)] + (is (= 1 (.size finished-spans))) + (is (= "send" (-> finished-spans + (.get 0) + (.operationName))))))))))) (deftest publish-to-channel-instant-queue-test (testing "creates a span when tracer is enabled" (let [stream-routes {:default {:handler-fn #(constantly nil) :channel-1 #(constantly nil)}}] - (.reset tracer) - (fix/with-queues - stream-routes - (do - (producer/publish-to-channel-instant-queue :channel-1 message-payload) - (let [finished-spans (.finishedSpans tracer)] - (is (= 1 (.size finished-spans))) - (is (= "send" (-> finished-spans - (.get 0) - (.operationName)))))))))) + (with-redefs [metrics/multi-ns-report-histogram (fn [_ _ _] nil)] + (.reset tracer) + (fix/with-queues + stream-routes + (do + (producer/publish-to-channel-instant-queue :channel-1 message-payload) + (let [finished-spans (.finishedSpans tracer)] + (is (= 1 (.size finished-spans))) + (is (= "send" (-> finished-spans + (.get 0) + (.operationName))))))))))) (deftest get-channel-queue-timeout-ms-test (let [message (assoc message-payload :retry-count 2)] (testing "when retries are enabled" (let [channel :linear-retry] - (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :stream-router {topic-entity {:channels {channel {:retry {:count 5 - :enabled true - :queue-timeout-ms 2000}}}}}))] + (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) + :stream-router {topic-entity {:channels {channel {:retry {:count 5 + :enabled true + :queue-timeout-ms 2000}}}}})) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (is (= 2000 (producer/get-channel-queue-timeout-ms topic-entity channel message)))))) (testing "when exponential backoff are enabled and channel queue timeout defined" (let [channel :exponential-retry] @@ -673,34 +703,38 @@ (testing "when exponential backoff are enabled and channel queue timeout is not defined" (let [channel :exponential-retry] - (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :stream-router {topic-entity {:channels {channel {:retry {:count 5 - :enabled true - :type :exponential}}}}}))] + (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) + :stream-router {topic-entity {:channels {channel {:retry {:count 5 + :enabled true + :type :exponential}}}}})) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (is (= 700 (producer/get-channel-queue-timeout-ms topic-entity channel message)))))))) (deftest get-queue-timeout-ms-test (testing "when exponential retries are enabled" (let [message (assoc message-payload :retry-count 2)] - (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :retry {:enabled true - :count 5 - :type :exponential}))] + (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) + :retry {:enabled true + :count 5 + :type :exponential})) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] (is (= 700 (producer/get-queue-timeout-ms message)))))) (testing "when exponential retries are enabled and retry-count exceeds 25, the max possible timeouts are calculated using 25 as the retry-count" (let [message (assoc message-payload :retry-count 20)] - (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :retry {:enabled true - :count 50 - :type :exponential}))] + (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) + :retry {:enabled true + :count 50 + :type :exponential})) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] ;; For 25 max exponential retries, exponent comes to 25-20=5, which makes timeout = 100*(2^5-1) = 3100 (is (= 3100 (producer/get-queue-timeout-ms message)))))) (testing "when exponential retries are enabled with total retries as 25 and if the message has already been retried 24 times, then the queue-timeout is calculated without any failure" (let [message (assoc message-payload :retry-count 1)] - (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :retry {:enabled true - :count 25 - :type :exponential} - :rabbit-mq {:delay {:queue-timeout-ms 5000}}))] + (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) + :retry {:enabled true + :count 25 + :type :exponential} + :rabbit-mq {:delay {:queue-timeout-ms 5000}})) + metrics/multi-ns-report-histogram (fn [_ _ _] nil)] ;; For 25 max exponential retries, exponent comes to 25-1=24, which makes timeout = 5000*(2^24-1) = 83886075000 (is (= 83886075000 (producer/get-queue-timeout-ms message)))))))