diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index f87dffd2..77f4145f 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -64,6 +64,7 @@ (while (> @retry-count 0) (swap! retry-count dec) (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)] + (is (= (get message-from-mq :retry-count 0) @retry-count)) (producer/retry-for-channel message-from-mq channel))) (let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)] (is (= expected-message-payload message-from-mq)))))) @@ -74,13 +75,13 @@ {:default {:channels {:exponential-retry - {:retry {:count 5 + {:retry {:count 5 :enabled true :type :exponential :queue-timeout-ms 1000}}}}}))] (fix/with-queues - {:default {:handler-fn #(constantly nil) - :exponential-retry #(constantly nil)}} + {:default {:handler-fn #(constantly nil) + :exponential-retry #(constantly nil)}} (let [channel :exponential-retry retry-message-payload (assoc message-payload :retry-count 10) expected-message-payload (assoc message-payload :retry-count 9) @@ -105,16 +106,17 @@ (while (> @retry-count 0) (swap! retry-count dec) (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)] + (is (= (get message-from-mq :retry-count 0) @retry-count)) (producer/retry-for-channel message-from-mq channel))) (let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)] (is (= expected-message-payload message-from-mq))))))) (testing "message in channel will be retried with exponential timeout calculated from channel specific queue-timeout-ms value" (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) - :stream-router {:default {:channels {:exponential-retry {:retry {:count 5 - :enabled true - :type :exponential - :queue-timeout-ms 1000}}}}}))] + :stream-router {:default {:channels {:exponential-retry {:retry {:count 5 + :enabled true + :type :exponential + :queue-timeout-ms 1000}}}}}))] (fix/with-queues {:default {:handler-fn #(constantly nil) :exponential-retry #(constantly nil)}} @@ -230,9 +232,9 @@ (deftest retry-with-exponential-backoff-test (testing "message will publish to delay with retry count queue when exponential backoff enabled" (with-redefs [ziggurat-config (constantly (assoc (ziggurat-config) - :retry {:count 5 + :retry {:count 5 :enabled true - :type :exponential}))] + :type :exponential}))] (testing "message with no retry count will publish to delay queue with suffix 1" (fix/with-queues {:default {:handler-fn #(constantly nil)}} @@ -273,7 +275,7 @@ (testing "When retries are enabled" (with-redefs [config/ziggurat-config (constantly (assoc ziggurat-config :retry {:enabled true - :type :linear}))] + :type :linear}))] (testing "it does not create queues when stream-routes are not passed" (let [counter (atom 0)] (with-redefs [producer/create-and-bind-queue (fn @@ -340,16 +342,16 @@ (le/delete ch dead-exchange-name)))) (testing "it creates queues with suffixes in the range [1, retry-count] when exponential backoff is enabled" (with-open [ch (lch/open connection)] - (let [stream-routes {:default {:handler-fn #(constantly :success)}} - retry-count (get-in ziggurat-config [:retry :count]) - instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config)))) - instant-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:instant (rabbitmq-config)))) - delay-queue-name (util/prefixed-queue-name "default" (:queue-name (:delay (rabbitmq-config)))) - delay-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:delay (rabbitmq-config)))) - dead-queue-name (util/prefixed-queue-name "default" (:queue-name (:dead-letter (rabbitmq-config)))) - dead-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:dead-letter (rabbitmq-config)))) - expected-queue-status {:message-count 0 :consumer-count 0} - exponential-delay-queue-name #(util/prefixed-queue-name delay-queue-name %) + (let [stream-routes {:default {:handler-fn #(constantly :success)}} + retry-count (get-in ziggurat-config [:retry :count]) + instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config)))) + instant-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:instant (rabbitmq-config)))) + delay-queue-name (util/prefixed-queue-name "default" (:queue-name (:delay (rabbitmq-config)))) + delay-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:delay (rabbitmq-config)))) + dead-queue-name (util/prefixed-queue-name "default" (:queue-name (:dead-letter (rabbitmq-config)))) + dead-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:dead-letter (rabbitmq-config)))) + expected-queue-status {:message-count 0 :consumer-count 0} + exponential-delay-queue-name #(util/prefixed-queue-name delay-queue-name %) exponential-delay-exchange-name #(util/prefixed-queue-name delay-exchange-name %)] (with-redefs [config/ziggurat-config (constantly (assoc-in ziggurat-config [:retry :type] :exponential))] @@ -369,15 +371,15 @@ (le/delete ch (exponential-delay-exchange-name s))))))) (testing "it creates queues with suffixes in the range [1, 25] when exponential backoff is enabled and retry-count is more than 25" (with-open [ch (lch/open connection)] - (let [stream-routes {:default {:handler-fn #(constantly :success)}} - instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config)))) - instant-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:instant (rabbitmq-config)))) - delay-queue-name (util/prefixed-queue-name "default" (:queue-name (:delay (rabbitmq-config)))) - delay-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:delay (rabbitmq-config)))) - dead-queue-name (util/prefixed-queue-name "default" (:queue-name (:dead-letter (rabbitmq-config)))) - dead-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:dead-letter (rabbitmq-config)))) - expected-queue-status {:message-count 0 :consumer-count 0} - exponential-delay-queue-name #(util/prefixed-queue-name delay-queue-name %) + (let [stream-routes {:default {:handler-fn #(constantly :success)}} + instant-queue-name (util/prefixed-queue-name "default" (:queue-name (:instant (rabbitmq-config)))) + instant-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:instant (rabbitmq-config)))) + delay-queue-name (util/prefixed-queue-name "default" (:queue-name (:delay (rabbitmq-config)))) + delay-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:delay (rabbitmq-config)))) + dead-queue-name (util/prefixed-queue-name "default" (:queue-name (:dead-letter (rabbitmq-config)))) + dead-exchange-name (util/prefixed-queue-name "default" (:exchange-name (:dead-letter (rabbitmq-config)))) + expected-queue-status {:message-count 0 :consumer-count 0} + exponential-delay-queue-name #(util/prefixed-queue-name delay-queue-name %) exponential-delay-exchange-name #(util/prefixed-queue-name delay-exchange-name %)] (with-redefs [config/ziggurat-config (constantly (-> ziggurat-config @@ -397,8 +399,8 @@ (le/delete ch (exponential-delay-exchange-name s))))))) (testing "it creates delay queue for linear retries when retry type is not defined in the config" (let [make-delay-queue-called (atom false) - stream-routes {:default {:handler-fn #(constantly nil)}}] - (with-redefs [config/ziggurat-config (constantly (update-in ziggurat-config [:retry] dissoc :type)) + stream-routes {:default {:handler-fn #(constantly nil)}}] + (with-redefs [config/ziggurat-config (constantly (update-in ziggurat-config [:retry] dissoc :type)) producer/make-queue (constantly nil) producer/make-delay-queue (fn [topic] (if (= topic :test) @@ -406,8 +408,8 @@ (producer/make-queues stream-routes)))) (testing "it creates delay queue for linear retries when retry type is incorrectly defined in the config" (let [make-delay-queue-called (atom false) - stream-routes {:default {:handler-fn #(constantly nil)}}] - (with-redefs [config/ziggurat-config (constantly (assoc-in ziggurat-config [:retry :type] :incorrect)) + stream-routes {:default {:handler-fn #(constantly nil)}}] + (with-redefs [config/ziggurat-config (constantly (assoc-in ziggurat-config [:retry :type] :incorrect)) producer/make-queue (constantly nil) producer/make-delay-queue (fn [topic] (if (= topic :test) @@ -415,8 +417,8 @@ (producer/make-queues stream-routes)))) (testing "it creates channel delay queue for linear retries when retry type is not defined in the channel config" (let [make-channel-delay-queue-called (atom false) - stream-routes {:default {:handler-fn #(constantly nil) :channel-1 {:handler-fn #(constantly nil)}}}] - (with-redefs [config/ziggurat-config (constantly (update-in ziggurat-config [:stream-router :default :channels :channel-1 :retry] dissoc :type)) + stream-routes {:default {:handler-fn #(constantly nil) :channel-1 {:handler-fn #(constantly nil)}}}] + (with-redefs [config/ziggurat-config (constantly (update-in ziggurat-config [:stream-router :default :channels :channel-1 :retry] dissoc :type)) producer/make-channel-queue (constantly nil) producer/make-channel-delay-queue (fn [topic channel] (if (and (= channel :channel-1) (= topic :default)) @@ -424,8 +426,8 @@ (producer/make-queues stream-routes)))) (testing "it creates channel delay queue for linear retries when an incorrect retry type is defined in the channel config" (let [make-channel-delay-queue-called (atom false) - stream-routes {:default {:handler-fn #(constantly nil) :channel-1 {:handler-fn #(constantly nil)}}}] - (with-redefs [config/ziggurat-config (constantly (assoc-in ziggurat-config [:stream-router :default :channels :channel-1 :retry :type] :incorrect)) + stream-routes {:default {:handler-fn #(constantly nil) :channel-1 {:handler-fn #(constantly nil)}}}] + (with-redefs [config/ziggurat-config (constantly (assoc-in ziggurat-config [:stream-router :default :channels :channel-1 :retry :type] :incorrect)) producer/make-channel-queue (constantly nil) producer/make-channel-delay-queue (fn [topic channel] (if (and (= channel :channel-1) (= topic :default)) @@ -517,11 +519,11 @@ (deftest publish-behaviour-on-rabbitmq-disconnection-test (testing "producer/publish tries to publish again if IOException is thrown" (let [publish-called (atom 0)] - (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) - lb/publish (fn [_ _ _ _ _] - (when (< @publish-called 2) - (swap! publish-called inc) - (throw (IOException. "io exception")))) + (with-redefs [lch/open (fn [_] (reify Channel (close [_] nil))) + lb/publish (fn [_ _ _ _ _] + (when (< @publish-called 2) + (swap! publish-called inc) + (throw (IOException. "io exception")))) metrics/increment-count (fn [_ _ _] nil)] (producer/publish "random-exchange" {:topic-entity "hello"} 12345) (is (= 2 @publish-called))))) @@ -588,43 +590,43 @@ (testing "when exponential backoff are enabled and channel queue timeout defined" (let [channel :exponential-retry] (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :stream-router {topic-entity {:channels {channel {:retry {:count 5 - :enabled true - :type :exponential - :queue-timeout-ms 1000}}}}}))] + :stream-router {topic-entity {:channels {channel {:retry {:count 5 + :enabled true + :type :exponential + :queue-timeout-ms 1000}}}}}))] (is (= 7000 (producer/get-channel-queue-timeout-ms topic-entity channel message)))))) (testing "when exponential backoff are enabled and channel queue timeout is not defined" (let [channel :exponential-retry] (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :stream-router {topic-entity {:channels {channel {:retry {:count 5 - :enabled true - :type :exponential}}}}}))] + :stream-router {topic-entity {:channels {channel {:retry {:count 5 + :enabled true + :type :exponential}}}}}))] (is (= 700 (producer/get-channel-queue-timeout-ms topic-entity channel message)))))))) (deftest get-queue-timeout-ms-test (testing "when exponential retries are enabled" (let [message (assoc message-payload :retry-count 2)] (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :retry {:enabled true - :count 5 - :type :exponential}))] + :retry {:enabled true + :count 5 + :type :exponential}))] (is (= 700 (producer/get-queue-timeout-ms message)))))) (testing "when exponential retries are enabled and retry-count exceeds 25, the max possible timeouts are calculated using 25 as the retry-count" (let [message (assoc message-payload :retry-count 20)] (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :retry {:enabled true - :count 50 - :type :exponential}))] + :retry {:enabled true + :count 50 + :type :exponential}))] ;; For 25 max exponential retries, exponent comes to 25-20=5, which makes timeout = 100*(2^5-1) = 3100 (is (= 3100 (producer/get-queue-timeout-ms message)))))) (testing "when exponential retries are enabled with total retries as 25 and if the message has already been retried 24 times, then the queue-timeout is calculated without any failure" (let [message (assoc message-payload :retry-count 1)] (with-redefs [config/ziggurat-config (constantly (assoc (config/ziggurat-config) - :retry {:enabled true - :count 25 - :type :exponential} - :rabbit-mq {:delay {:queue-timeout-ms 5000}}))] + :retry {:enabled true + :count 25 + :type :exponential} + :rabbit-mq {:delay {:queue-timeout-ms 5000}}))] ;; For 25 max exponential retries, exponent comes to 25-1=24, which makes timeout = 5000*(2^24-1) = 83886075000 (is (= 83886075000 (producer/get-queue-timeout-ms message)))))))