diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj index 6ce6045d..5b7a47a0 100644 --- a/src/ziggurat/messaging/consumer.clj +++ b/src/ziggurat/messaging/consumer.clj @@ -1,30 +1,35 @@ (ns ziggurat.messaging.consumer - (:require [ziggurat.mapper :as mpr] - [clojure.tools.logging :as log] + (:require [clojure.tools.logging :as log] [langohr.basic :as lb] [langohr.channel :as lch] [langohr.consumers :as lcons] - [ziggurat.kafka-consumer.consumer-handler :as ch] [taoensso.nippy :as nippy] - [ziggurat.config :refer [get-in-config]] + [ziggurat.config :refer [get-in-config rabbitmq-config]] + [ziggurat.kafka-consumer.consumer-handler :as ch] + [ziggurat.mapper :as mpr] [ziggurat.messaging.connection :refer [connection]] - [ziggurat.sentry :refer [sentry-reporter]] - [ziggurat.messaging.util :refer :all] + [ziggurat.messaging.util :as util] [ziggurat.metrics :as metrics] [ziggurat.util.error :refer [report-error]])) (defn convert-and-ack-message "De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message if `ack?` is true." - [ch {:keys [delivery-tag] :as meta} ^bytes payload ack? topic-entity] + [ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity] (try (let [message (nippy/thaw payload)] (when ack? (lb/ack ch delivery-tag)) message) (catch Exception e - (lb/reject ch delivery-tag false) - (report-error e "Error while decoding message") + (report-error e "Error while decoding message, publishing to dead queue...") + (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) + exchange (util/prefixed-queue-name topic-entity exchange-name)] + (try + (lb/publish ch exchange "" payload) + (catch Exception e + (log/error e "Exception was encountered while publishing to RabbitMQ") + (lb/reject ch delivery-tag false)))) (metrics/increment-count ["rabbitmq-message" "conversion"] "failure" {:topic_name (name topic-entity)}) nil))) @@ -33,8 +38,8 @@ (lb/ack ch delivery-tag)) (defn process-message-from-queue [ch meta payload topic-entity processing-fn] - (let [delivery-tag (:delivery-tag meta) - message-payload (convert-and-ack-message ch meta payload false topic-entity)] + (let [delivery-tag (:delivery-tag meta) + message-payload (convert-and-ack-message ch meta payload false topic-entity)] (when message-payload (log/infof "Processing message [%s] from RabbitMQ " message-payload) (try @@ -60,8 +65,8 @@ (construct-queue-name topic-entity nil)) ([topic-entity channel] (if (nil? channel) - (prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :dead-letter :queue-name])) - (prefixed-channel-name topic-entity channel (get-in-config [:rabbit-mq :dead-letter :queue-name]))))) + (util/prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :dead-letter :queue-name])) + (util/prefixed-channel-name topic-entity channel (get-in-config [:rabbit-mq :dead-letter :queue-name]))))) (defn get-dead-set-messages "This method can be used to read and optionally ack messages in dead-letter queue, based on the value of `ack?`. @@ -94,20 +99,20 @@ (defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity] (lb/qos ch prefetch-count) - (let [consumer-tag (lcons/subscribe ch - queue-name - (message-handler wrapped-mapper-fn topic-entity) - {:handle-shutdown-signal-fn (fn [consumer_tag reason] - (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason)) - :handle-consume-ok-fn (fn [consumer_tag] - (log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})])) + (lcons/subscribe ch + queue-name + (message-handler wrapped-mapper-fn topic-entity) + {:handle-shutdown-signal-fn (fn [consumer_tag reason] + (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason)) + :handle-consume-ok-fn (fn [consumer_tag] + (log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})) (defn start-retry-subscriber* [handler-fn topic-entity] (when (get-in-config [:retry :enabled]) (dotimes [_ (get-in-config [:jobs :instant :worker-count])] (start-subscriber* (lch/open connection) (get-in-config [:jobs :instant :prefetch-count]) - (prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name])) + (util/prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name])) handler-fn topic-entity)))) @@ -118,7 +123,7 @@ (dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])] (start-subscriber* (lch/open connection) 1 - (prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name])) + (util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name])) (mpr/channel-mapper-func channel-handler-fn channel-key) topic-entity))))) @@ -126,12 +131,12 @@ "Starts the subscriber to the instant queue of the rabbitmq" [stream-routes batch-routes] (doseq [stream-route stream-routes] - (let [topic-entity (first stream-route) - handler (-> stream-route second :handler-fn) - channels (-> stream-route second (dissoc :handler-fn))] + (let [topic-entity (first stream-route) + handler (-> stream-route second :handler-fn) + channels (-> stream-route second (dissoc :handler-fn))] (start-channels-subscriber channels topic-entity) (start-retry-subscriber* (mpr/mapper-func handler (keys channels)) topic-entity))) (doseq [batch-route batch-routes] - (let [topic-entity (first batch-route) - handler (-> batch-route second :handler-fn)] + (let [topic-entity (first batch-route) + handler (-> batch-route second :handler-fn)] (start-retry-subscriber* (fn [message] (ch/process handler message)) topic-entity)))) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 699b2105..8f561776 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -3,22 +3,20 @@ [langohr.basic :as lb] [langohr.channel :as lch] [langohr.exchange :as le] - [langohr.queue :as lq] [langohr.http :as lh] + [langohr.queue :as lq] [taoensso.nippy :as nippy] [ziggurat.config :refer [config ziggurat-config rabbitmq-config channel-retry-config]] [ziggurat.messaging.connection :refer [connection is-connection-required?]] - [ziggurat.messaging.util :refer :all] - [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.messaging.util :as util] [ziggurat.metrics :as metrics]) - (:import (java.io IOException) - (com.rabbitmq.client AlreadyClosedException))) + (:import (com.rabbitmq.client AlreadyClosedException) + (java.io IOException))) (def MAX_EXPONENTIAL_RETRIES 25) (defn delay-queue-name [topic-entity queue-name] - (prefixed-queue-name topic-entity queue-name)) + (util/prefixed-queue-name topic-entity queue-name)) (defn get-replica-count [host-count] (int (Math/ceil (/ host-count 2)))) @@ -52,8 +50,8 @@ hosts-vec (util/list-of-hosts rmq-config) ha-policy-body (get-default-ha-policy rmq-config (get-replica-count (count hosts-vec)))] (loop [hosts hosts-vec] - (let [host-endpoint (str "http://" (first hosts) ":" (get rmq-config :admin-port 15672)) - resp (set-ha-policy-on-host host-endpoint username password ha-policy-body exchange-name queue-name) + (let [host-endpoint (str "http://" (first hosts) ":" (get rmq-config :admin-port 15672)) + resp (set-ha-policy-on-host host-endpoint username password ha-policy-body exchange-name queue-name) remaining-hosts (rest hosts)] (when (and (nil? resp) (pos? (count remaining-hosts))) @@ -78,12 +76,12 @@ (try (let [props (if dead-letter-exchange {"x-dead-letter-exchange" dead-letter-exchange} - {})] - (let [ch (lch/open connection)] - (create-queue queue-name props ch) - (declare-exchange ch exchange-name) - (bind-queue-to-exchange ch queue-name exchange-name) - (set-ha-policy queue-name exchange-name (get-in config [:ziggurat :rabbit-mq-connection])))) + {}) + ch (lch/open connection)] + (create-queue queue-name props ch) + (declare-exchange ch exchange-name) + (bind-queue-to-exchange ch queue-name exchange-name) + (set-ha-policy queue-name exchange-name (get-in config [:ziggurat :rabbit-mq-connection]))) (catch Exception e (log/error e "Error while declaring RabbitMQ queues") (throw e))))) @@ -112,7 +110,7 @@ (defn- publish-internal [exchange message-payload expiration] (try - (with-open [ch (lch/open connection)] + (with-open [ch (lch/open connection)] ;; it opens a connection everytime it publishes? (lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) (properties-for-publish expiration (:headers message-payload)))) false @@ -151,13 +149,14 @@ queue-timeout-ms (get-in (rabbitmq-config) [:delay :queue-timeout-ms])] (or channel-queue-timeout-ms queue-timeout-ms))) -(defn- get-backoff-exponent [retry-count message-retry-count] +(defn- get-backoff-exponent "Calculates the exponent using the formula `retry-count` and `message-retry-count`, where `retry-count` is the total retries possible and `message-retry-count` is the count of retries available for the message. Caps the value of `retry-count` to MAX_EXPONENTIAL_RETRIES. Returns 1, if `message-retry-count` is higher than `max(MAX_EXPONENTIAL_RETRIES, retry-count)`." + [retry-count message-retry-count] (let [exponent (- (min MAX_EXPONENTIAL_RETRIES retry-count) message-retry-count)] (max 1 exponent))) @@ -181,40 +180,44 @@ (let [exponential-backoff (get-backoff-exponent retry-count message-retry-count)] (long (* (dec (Math/pow 2 exponential-backoff)) queue-timeout-ms)))) -(defn get-queue-timeout-ms [message-payload] +(defn get-queue-timeout-ms "Calculate queue timeout for delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled." - (let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms) - retry-count (-> (ziggurat-config) :retry :count) + [message-payload] + (let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms) + retry-count (-> (ziggurat-config) :retry :count) message-retry-count (:retry-count message-payload)] (if (= :exponential (-> (ziggurat-config) :retry :type)) (get-exponential-backoff-timeout-ms retry-count message-retry-count queue-timeout-ms) queue-timeout-ms))) -(defn get-channel-queue-timeout-ms [topic-entity channel message-payload] +(defn get-channel-queue-timeout-ms "Calculate queue timeout for channel delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled." + [topic-entity channel message-payload] (let [channel-queue-timeout-ms (get-channel-queue-timeout-or-default-timeout topic-entity channel) - message-retry-count (:retry-count message-payload) - channel-retry-count (get-channel-retry-count topic-entity channel)] + message-retry-count (:retry-count message-payload) + channel-retry-count (get-channel-retry-count topic-entity channel)] (if (= :exponential (channel-retry-type topic-entity channel)) (get-exponential-backoff-timeout-ms channel-retry-count message-retry-count channel-queue-timeout-ms) channel-queue-timeout-ms))) -(defn get-delay-exchange-name [topic-entity message-payload] +(defn get-delay-exchange-name "This function return delay exchange name for retry when using flow without channel. It will return exchange name with retry count as suffix if exponential backoff enabled." + [topic-entity message-payload] (let [{:keys [exchange-name]} (:delay (rabbitmq-config)) - exchange-name (prefixed-queue-name topic-entity exchange-name) - retry-count (-> (ziggurat-config) :retry :count)] + exchange-name (util/prefixed-queue-name topic-entity exchange-name) + retry-count (-> (ziggurat-config) :retry :count)] (if (= :exponential (-> (ziggurat-config) :retry :type)) (let [message-retry-count (:retry-count message-payload) - backoff-exponent (get-backoff-exponent retry-count message-retry-count)] - (prefixed-queue-name exchange-name backoff-exponent)) + backoff-exponent (get-backoff-exponent retry-count message-retry-count)] + (util/prefixed-queue-name exchange-name backoff-exponent)) exchange-name))) -(defn get-channel-delay-exchange-name [topic-entity channel message-payload] +(defn get-channel-delay-exchange-name "This function return delay exchange name for retry when using channel flow. It will return exchange name with retry count as suffix if exponential backoff enabled." + [topic-entity channel message-payload] (let [{:keys [exchange-name]} (:delay (rabbitmq-config)) - exchange-name (prefixed-channel-name topic-entity channel exchange-name) - channel-retry-count (get-channel-retry-count topic-entity channel)] + exchange-name (util/prefixed-channel-name topic-entity channel exchange-name) + channel-retry-count (get-channel-retry-count topic-entity channel)] (if (= :exponential (channel-retry-type topic-entity channel)) (let [message-retry-count (:retry-count message-payload) exponential-backoff (get-backoff-exponent channel-retry-count message-retry-count)] @@ -222,85 +225,85 @@ exchange-name))) (defn publish-to-delay-queue [message-payload] - (let [topic-entity (:topic-entity message-payload) - exchange-name (get-delay-exchange-name topic-entity message-payload) + (let [topic-entity (:topic-entity message-payload) + exchange-name (get-delay-exchange-name topic-entity message-payload) queue-timeout-ms (get-queue-timeout-ms message-payload)] (publish exchange-name message-payload queue-timeout-ms))) (defn publish-to-dead-queue [message-payload] (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) - topic-entity (:topic-entity message-payload) - exchange-name (prefixed-queue-name topic-entity exchange-name)] + topic-entity (:topic-entity message-payload) + exchange-name (util/prefixed-queue-name topic-entity exchange-name)] (publish exchange-name message-payload))) (defn publish-to-instant-queue [message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) - topic-entity (:topic-entity message-payload) - exchange-name (prefixed-queue-name topic-entity exchange-name)] + topic-entity (:topic-entity message-payload) + exchange-name (util/prefixed-queue-name topic-entity exchange-name)] (publish exchange-name message-payload))) (defn publish-to-channel-delay-queue [channel message-payload] - (let [topic-entity (:topic-entity message-payload) - exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload) + (let [topic-entity (:topic-entity message-payload) + exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload) queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)] (publish exchange-name message-payload queue-timeout-ms))) (defn publish-to-channel-dead-queue [channel message-payload] (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) - topic-entity (:topic-entity message-payload) - exchange-name (prefixed-channel-name topic-entity channel exchange-name)] + topic-entity (:topic-entity message-payload) + exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)] (publish exchange-name message-payload))) (defn publish-to-channel-instant-queue [channel message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) - topic-entity (:topic-entity message-payload) - exchange-name (prefixed-channel-name topic-entity channel exchange-name)] + topic-entity (:topic-entity message-payload) + exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)] (publish exchange-name message-payload))) -(defn retry [{:keys [retry-count topic-entity] :as message-payload}] +(defn retry [{:keys [retry-count] :as message-payload}] (when (-> (ziggurat-config) :retry :enabled) (cond - (nil? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec (-> (ziggurat-config) :retry :count)))) - (pos? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec retry-count))) + (nil? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec (-> (ziggurat-config) :retry :count)))) + (pos? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec retry-count))) (zero? retry-count) (publish-to-dead-queue (assoc message-payload :retry-count (-> (ziggurat-config) :retry :count)))))) (defn retry-for-channel [{:keys [retry-count topic-entity] :as message-payload} channel] (when (channel-retries-enabled topic-entity channel) (cond - (nil? retry-count) (publish-to-channel-delay-queue channel (assoc message-payload :retry-count (dec (get-channel-retry-count topic-entity channel)))) - (pos? retry-count) (publish-to-channel-delay-queue channel (assoc message-payload :retry-count (dec retry-count))) + (nil? retry-count) (publish-to-channel-delay-queue channel (assoc message-payload :retry-count (dec (get-channel-retry-count topic-entity channel)))) + (pos? retry-count) (publish-to-channel-delay-queue channel (assoc message-payload :retry-count (dec retry-count))) (zero? retry-count) (publish-to-channel-dead-queue channel (assoc message-payload :retry-count (get-channel-retry-count topic-entity channel)))))) (defn- make-delay-queue [topic-entity] (let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config)) - queue-name (delay-queue-name topic-entity queue-name) - exchange-name (prefixed-queue-name topic-entity exchange-name) - dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange)] + queue-name (delay-queue-name topic-entity queue-name) + exchange-name (util/prefixed-queue-name topic-entity exchange-name) + dead-letter-exchange-name (util/prefixed-queue-name topic-entity dead-letter-exchange)] (create-and-bind-queue queue-name exchange-name dead-letter-exchange-name))) (defn- make-delay-queue-with-retry-count [topic-entity retry-count] (let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config)) - queue-name (delay-queue-name topic-entity queue-name) - exchange-name (prefixed-queue-name topic-entity exchange-name) - dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange) - sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))] + queue-name (delay-queue-name topic-entity queue-name) + exchange-name (util/prefixed-queue-name topic-entity exchange-name) + dead-letter-exchange-name (util/prefixed-queue-name topic-entity dead-letter-exchange) + sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))] (doseq [s (range 1 sequence)] - (create-and-bind-queue (prefixed-queue-name queue-name s) (prefixed-queue-name exchange-name s) dead-letter-exchange-name)))) + (create-and-bind-queue (util/prefixed-queue-name queue-name s) (util/prefixed-queue-name exchange-name s) dead-letter-exchange-name)))) (defn- make-channel-delay-queue-with-retry-count [topic-entity channel retry-count] - (make-delay-queue-with-retry-count (with-channel-name topic-entity channel) retry-count)) + (make-delay-queue-with-retry-count (util/with-channel-name topic-entity channel) retry-count)) (defn- make-channel-delay-queue [topic-entity channel] - (make-delay-queue (with-channel-name topic-entity channel))) + (make-delay-queue (util/with-channel-name topic-entity channel))) (defn- make-queue [topic-identifier queue-type] (let [{:keys [queue-name exchange-name]} (queue-type (rabbitmq-config)) - queue-name (prefixed-queue-name topic-identifier queue-name) - exchange-name (prefixed-queue-name topic-identifier exchange-name)] + queue-name (util/prefixed-queue-name topic-identifier queue-name) + exchange-name (util/prefixed-queue-name topic-identifier exchange-name)] (create-and-bind-queue queue-name exchange-name))) (defn- make-channel-queue [topic-entity channel-name queue-type] - (make-queue (with-channel-name topic-entity channel-name) queue-type)) + (make-queue (util/with-channel-name topic-entity channel-name) queue-type)) (defn- make-channel-queues [channels topic-entity] (doseq [channel channels] @@ -314,20 +317,20 @@ "Please use it only after understanding its risks and implications." "Its contract can change in the future releases of Ziggurat.") (make-channel-delay-queue-with-retry-count topic-entity channel (get-channel-retry-count topic-entity channel))) - (= :linear channel-retry-type) (make-channel-delay-queue topic-entity channel) - (nil? channel-retry-type) (do - (log/warn "[Deprecation Notice]: Please note that the configuration for channel retries has changed." - "Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide" - "Use :type to specify the type of retry mechanism in the channel config.") - (make-channel-delay-queue topic-entity channel)) - :else (do - (log/warn "Incorrect keyword for type passed, falling back to linear backoff for channel: " channel) - (make-channel-delay-queue topic-entity channel))))))) + (= :linear channel-retry-type) (make-channel-delay-queue topic-entity channel) + (nil? channel-retry-type) (do + (log/warn "[Deprecation Notice]: Please note that the configuration for channel retries has changed." + "Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide" + "Use :type to specify the type of retry mechanism in the channel config.") + (make-channel-delay-queue topic-entity channel)) + :else (do + (log/warn "Incorrect keyword for type passed, falling back to linear backoff for channel: " channel) + (make-channel-delay-queue topic-entity channel))))))) (defn make-queues [routes] (when (is-connection-required?) (doseq [topic-entity (keys routes)] - (let [channels (get-channel-names routes topic-entity) + (let [channels (util/get-channel-names routes topic-entity) retry-type (retry-type)] (make-channel-queues channels topic-entity) (when (-> (ziggurat-config) :retry :enabled) @@ -345,7 +348,6 @@ "Please look at the upgrade guide for details: https://github.com/gojek/ziggurat/wiki/Upgrade-guide" "Use :type to specify the type of retry mechanism in the config.") (make-delay-queue topic-entity)) - :else (do - (log/warn "Incorrect keyword for type passed, falling back to linear backoff for topic Entity: " topic-entity) - (make-delay-queue topic-entity)))))))) - + :else (do + (log/warn "Incorrect keyword for type passed, falling back to linear backoff for topic Entity: " topic-entity) + (make-delay-queue topic-entity)))))))) diff --git a/test/ziggurat/messaging/consumer_test.clj b/test/ziggurat/messaging/consumer_test.clj index be163c45..61140692 100644 --- a/test/ziggurat/messaging/consumer_test.clj +++ b/test/ziggurat/messaging/consumer_test.clj @@ -1,30 +1,28 @@ (ns ziggurat.messaging.consumer-test - (:require [clojure.test :refer :all]) - (:require [langohr.channel :as lch] + (:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]]) + (:require [langohr.basic :as lb] + [langohr.channel :as lch] + [taoensso.nippy :as nippy] [ziggurat.config :refer [ziggurat-config rabbitmq-config]] [ziggurat.fixtures :as fix] [ziggurat.messaging.connection :refer [connection]] - [ziggurat.messaging.consumer :refer :all] + [ziggurat.messaging.consumer :as consumer] [ziggurat.messaging.producer :as producer] [ziggurat.tracer :refer [tracer]] - [ziggurat.util.rabbitmq :as util] - [langohr.basic :as lb] - [ziggurat.messaging.consumer :as consumer] - [ziggurat.message-payload :as mpr] - [taoensso.nippy :as nippy] - [ziggurat.util.error :refer [report-error]])) + [ziggurat.util.error :refer [report-error]] + [ziggurat.util.rabbitmq :as util])) (use-fixtures :once (join-fixtures [fix/init-rabbit-mq fix/silence-logging fix/mount-metrics])) (defn- gen-message-payload [topic-entity] - {:message {:gen-key (apply str (take 10 (repeatedly #(char (+ (rand 26) 65)))))} + {:message {:gen-key (apply str (take 10 (repeatedly #(char (+ (rand 26) 65)))))} :topic-entity topic-entity}) (def topic-entity :default) (deftest process-dead-set-messages-test - (let [message-payload (assoc (gen-message-payload topic-entity) :retry-count 0)] + (let [message-payload (assoc (gen-message-payload topic-entity) :retry-count 0)] (testing "it maps the process-message-from-queue over all the messages fetched from the queue for a topic" (fix/with-queues {topic-entity {:handler-fn (constantly nil)}} (let [count 5 @@ -36,7 +34,7 @@ (producer/publish-to-dead-queue message-payload))] (consumer/process-dead-set-messages topic-entity count processing-fn) (is (= count @process-fn-called)) - (is (empty? (get-dead-set-messages topic-entity count)))))) + (is (empty? (consumer/get-dead-set-messages topic-entity count)))))) (testing "it maps the process-message-from-queue over all the messages fetched from the queue for a channel" (fix/with-queues {topic-entity {:handler-fn (constantly nil) :channel-1 (constantly nil)}} @@ -50,47 +48,38 @@ (producer/publish-to-channel-dead-queue channel message-payload))] (consumer/process-dead-set-messages topic-entity channel count processing-fn) (is (= count @process-fn-called)) - (is (empty? (get-dead-set-messages topic-entity channel count)))))))) + (is (empty? (consumer/get-dead-set-messages topic-entity channel count)))))))) (deftest get-dead-set-messages-test - (let [message-payload (assoc (gen-message-payload topic-entity) :retry-count 0)] + (let [message-payload (assoc (gen-message-payload topic-entity) :retry-count 0)] (testing "get the dead set messages from dead set queue and don't pop the messages from the queue" (fix/with-queues {topic-entity {:handler-fn (constantly nil)}} (let [count-of-messages 10 _ (doseq [_ (range count-of-messages)] (producer/publish-to-dead-queue message-payload)) - dead-set-messages (get-dead-set-messages topic-entity count-of-messages)] + dead-set-messages (consumer/get-dead-set-messages topic-entity count-of-messages)] (is (= (repeat count-of-messages message-payload) dead-set-messages)) - (is (= (repeat count-of-messages message-payload) (get-dead-set-messages topic-entity count-of-messages)))))) + (is (= (repeat count-of-messages message-payload) (consumer/get-dead-set-messages topic-entity count-of-messages)))))) (testing "get the dead set messages from a channel dead set queue and don't pop the messages from the queue" (fix/with-queues {topic-entity {:handler-fn (constantly nil) :channel-1 (constantly nil)}} (let [count-of-messages 10 channel "channel-1" - pushed-message (doseq [_ (range count-of-messages)] + _ (doseq [_ (range count-of-messages)] (producer/publish-to-channel-dead-queue channel message-payload)) - dead-set-messages (get-dead-set-messages topic-entity channel count-of-messages)] + dead-set-messages (consumer/get-dead-set-messages topic-entity channel count-of-messages)] (is (= (repeat count-of-messages message-payload) dead-set-messages)) - (is (= (repeat count-of-messages message-payload) (get-dead-set-messages topic-entity channel count-of-messages)))))))) + (is (= (repeat count-of-messages message-payload) (consumer/get-dead-set-messages topic-entity channel count-of-messages)))))))) -(defn- mock-mapper-fn [{:keys [retry-counter-atom - call-counter-atom - retry-limit - skip-promise - success-promise] :as opts}] +(defn- mock-mapper-fn [{:keys [retry-counter-atom call-counter-atom retry-limit skip-promise success-promise]}] (fn [message] (swap! call-counter-atom inc) - (cond (< @retry-counter-atom (or retry-limit 5)) - (do (when retry-counter-atom (swap! retry-counter-atom inc)) - :retry) - - (= (:msg message) "skip") - (do (when skip-promise (deliver skip-promise true)) - :skip) - - :else - (do (when success-promise (deliver success-promise true)) - :success)))) + (cond (< @retry-counter-atom (or retry-limit 5)) (do (when retry-counter-atom (swap! retry-counter-atom inc)) + :retry) + (= (:msg message) "skip") (do (when skip-promise (deliver skip-promise true)) + :skip) + :else (do (when success-promise (deliver success-promise true)) + :success)))) (deftest start-subscribers-test (testing "start subscribers should not be called if none of the stream-routes or batch-routes are provided" @@ -114,12 +103,12 @@ ch (lch/open connection) counter (atom 0) stream-routes {topic-entity {:handler-fn #(constantly nil)} - :test {:handler-fn #(constantly nil)}}] - (with-redefs [ziggurat-config (fn [] (-> original-zig-config - (update-in [:retry :enabled] (constantly true)) - (update-in [:jobs :instant :worker-count] (constantly no-of-workers)))) - start-retry-subscriber* (fn [_ _] (swap! counter inc))] - (start-subscribers stream-routes {}) + :test {:handler-fn #(constantly nil)}}] + (with-redefs [ziggurat-config (fn [] (-> original-zig-config + (update-in [:retry :enabled] (constantly true)) + (update-in [:jobs :instant :worker-count] (constantly no-of-workers)))) + consumer/start-retry-subscriber* (fn [_ _] (swap! counter inc))] + (consumer/start-subscribers stream-routes {}) (is (= (count stream-routes) @counter)) (util/close ch)))) @@ -178,7 +167,7 @@ (update-in [:stream-router topic-entity :channels channel :retry :enabled] (constantly true)) (update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1))))] (with-redefs [lch/open (fn [_] rmq-ch)] - (start-channels-subscriber {channel channel-fn} topic-entity)) + (consumer/start-channels-subscriber {channel channel-fn} topic-entity)) (producer/retry-for-channel message-payload channel) (when-let [promise-success? (deref success-promise 5000 :timeout)] (is (not (= :timeout promise-success?))) @@ -203,7 +192,7 @@ (with-redefs [ziggurat-config (fn [] (-> original-zig-config (update-in [:stream-router topic-entity :channels channel :retry :enabled] (constantly false)) (update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1))))] - (start-channels-subscriber {channel channel-fn} topic-entity) + (consumer/start-channels-subscriber {channel channel-fn} topic-entity) (producer/publish-to-channel-instant-queue channel message-payload) (deref success-promise 5000 :timeout) (is (= 1 @call-counter)) @@ -212,23 +201,23 @@ (deftest start-retry-subscriber-test (testing "creates a span when tracer is enabled" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} - (let [retry-counter (atom 0) - call-counter (atom 0) - success-promise (promise) - retry-count 3 - message-payload (assoc (gen-message-payload topic-entity) :retry-count 3) + (let [retry-counter (atom 0) + call-counter (atom 0) + success-promise (promise) + retry-count 3 + message-payload (assoc (gen-message-payload topic-entity) :retry-count 3) original-zig-config (ziggurat-config) - rmq-ch (lch/open connection)] + rmq-ch (lch/open connection)] (.reset tracer) (with-redefs [ziggurat-config (fn [] (-> original-zig-config (update-in [:retry :count] (constantly retry-count)) (update-in [:retry :enabled] (constantly true)) (update-in [:jobs :instant :worker-count] (constantly 1))))] - (start-retry-subscriber* (mock-mapper-fn {:retry-counter-atom retry-counter - :call-counter-atom call-counter - :retry-limit 0 - :success-promise success-promise}) topic-entity) + (consumer/start-retry-subscriber* (mock-mapper-fn {:retry-counter-atom retry-counter + :call-counter-atom call-counter + :retry-limit 0 + :success-promise success-promise}) topic-entity) (producer/publish-to-delay-queue message-payload) (when-let [promise-success? (deref success-promise 5000 :timeout)] @@ -248,51 +237,70 @@ (deftest process-message-test (testing "process-message function should ack message after once processing finishes" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} - (let [message (gen-message-payload topic-entity) - processing-fn (fn [message-arg] - (is (= message-arg message))) + (let [message (gen-message-payload topic-entity) + processing-fn (fn [message-arg] + (is (= message-arg message))) topic-entity-name (name topic-entity)] (producer/publish-to-dead-queue message) (with-open [ch (lch/open connection)] (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) [meta payload] (lb/get ch prefixed-queue-name false) - _ (process-message-from-queue ch meta payload topic-entity processing-fn) + _ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn) consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)] (is (= consumed-message nil))))))) (testing "process-message function not process a message if convert-message returns nil" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} - (let [message (gen-message-payload topic-entity) + (let [message (gen-message-payload topic-entity) processing-fn-called (atom false) - processing-fn (fn [message-arg] - (if (nil? message-arg) - (reset! processing-fn-called true))) - topic-entity-name (name topic-entity)] + processing-fn (fn [message-arg] + (when (nil? message-arg) + (reset! processing-fn-called true))) + topic-entity-name (name topic-entity)] (producer/publish-to-dead-queue message) - (with-redefs [convert-and-ack-message (fn [_ _ _ _ _] nil)] + (with-redefs [consumer/convert-and-ack-message (fn [_ _ _ _ _] nil)] (with-open [ch (lch/open connection)] (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) [meta payload] (lb/get ch prefixed-queue-name false) - _ (process-message-from-queue ch meta payload topic-entity processing-fn) + _ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn) consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)] (is (= false @processing-fn-called)) (is (= consumed-message nil)))))))) (testing "process-message function should reject and re-queue a message if processing fails. It should also report the error" (fix/with-queues {topic-entity {:handler-fn #(constantly nil)}} - (let [message (gen-message-payload topic-entity) - processing-fn (fn [message-arg] - (is (= message-arg message)) - (throw (Exception. "exception message"))) + (let [message (gen-message-payload topic-entity) + processing-fn (fn [message-arg] + (is (= message-arg message)) + (throw (Exception. "exception message"))) topic-entity-name (name topic-entity) - report-fn-called? (atom false)] + report-fn-called? (atom false)] (with-redefs [report-error (fn [_ _] (reset! report-fn-called? true))] (producer/publish-to-dead-queue message) (with-open [ch (lch/open connection)] (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) [meta payload] (lb/get ch prefixed-queue-name false) - _ (process-message-from-queue ch meta payload topic-entity processing-fn) + _ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn) consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)] (is (= consumed-message message)) (is @report-fn-called?)))))))) + +(deftest convert-and-ack-message-test + (testing "While constructing a MessagePayload, adds topic-entity as a keyword and retry-count as 0 if message does not already has :retry-count" + (let [message {:foo "bar"} + converted-message-payload (consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} (nippy/freeze message) false "default")] + (is (= converted-message-payload message)))) + (testing "While constructing a MessagePayload, adds topic-entity as a keyword and retry-count as it exists in the message" + (let [message {:foo "bar"} + converted-message-payload (consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} (nippy/freeze message) false "default")] + (is (= converted-message-payload message)))) + (testing "should call publish to dead set when nippy/thaw throws an exception" + (let [freezed-message (nippy/freeze {:foo "bar"}) + is-publish-called? (atom false)] + (with-redefs [nippy/thaw (fn [_] (throw (Exception. "nippy/thaw exception"))) + lb/publish (fn [_ _ _ payload] + (is (= freezed-message payload)) + (reset! is-publish-called? true))] + (consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false "default")) + (is (= @is-publish-called? true))))) diff --git a/test/ziggurat/producer_test.clj b/test/ziggurat/producer_test.clj index 2d43acc1..81ac623e 100644 --- a/test/ziggurat/producer_test.clj +++ b/test/ziggurat/producer_test.clj @@ -32,7 +32,7 @@ key "message" value "Hello World!!"] (send :default topic key value) - (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 7000)] + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 8000)] (is (= value (.value (first result)))))))) (deftest send-data-with-topic-key-partition-and-value-test @@ -43,7 +43,7 @@ value "Hello World!!" partition (int 0)] (send :default topic partition key value) - (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 7000)] + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 8000)] (is (= value (.value (first result)))))))) (deftest send-throws-exception-when-no-producers-are-configured @@ -73,7 +73,7 @@ value "Hello World!!"] (.reset tracer) (send :default topic key value) - (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 7000) + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 7000) finished-spans (.finishedSpans tracer)] (is (= value (.value (first result)))) (is (= 1 (.size finished-spans)))