diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj index c1ee53d0..9cabb877 100644 --- a/src/ziggurat/messaging/consumer.clj +++ b/src/ziggurat/messaging/consumer.clj @@ -8,6 +8,7 @@ [ziggurat.messaging.channel_pool :as cpool] [ziggurat.kafka-consumer.consumer-handler :as ch] [ziggurat.mapper :as mpr] + [ziggurat.messaging.producer :as producer] [ziggurat.messaging.connection :refer [consumer-connection, producer-connection]] [ziggurat.messaging.util :as util] [ziggurat.metrics :as metrics] @@ -20,22 +21,21 @@ [ch delivery-tag] (lb/reject ch delivery-tag)) -(defn- publish-to-dead-set - [delivery-tag topic-entity payload] - (let [ch (.borrowObject cpool/channel-pool) - {:keys [exchange-name]} (:dead-letter (rabbitmq-config)) - exchange (util/prefixed-queue-name topic-entity exchange-name)] - (try - (lb/publish ch exchange "" payload) - (catch Exception e - (log/error e "Exception was encountered while publishing to RabbitMQ") - (reject-message ch delivery-tag)) - (finally (.returnObject cpool/channel-pool ch))))) +(defn- ack-message + [ch delivery-tag] + (lb/ack ch delivery-tag)) + +(defn- publish-serialized-payload-to-dead-set-and-ack + [ch delivery-tag payload topic-entity ziggurat-channel-key] + (if (nil? ziggurat-channel-key) + (producer/publish-to-dead-queue payload topic-entity true) + (producer/publish-to-channel-dead-queue ziggurat-channel-key payload topic-entity true)) + (ack-message ch delivery-tag)) (defn convert-and-ack-message "De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message if `ack?` is true." - [ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity] + [ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity ziggurat-channel-key] (try (let [message (nippy/thaw payload)] (when ack? @@ -43,17 +43,13 @@ message) (catch Exception e (report-error e "Error while decoding message, publishing to dead queue...") - (publish-to-dead-set delivery-tag topic-entity payload) (metrics/increment-count ["rabbitmq-message" "conversion"] "failure" {:topic_name (name topic-entity)}) + (publish-serialized-payload-to-dead-set-and-ack ch delivery-tag payload topic-entity ziggurat-channel-key) nil))) -(defn- ack-message - [ch delivery-tag] - (lb/ack ch delivery-tag)) - -(defn process-message-from-queue [ch meta payload topic-entity processing-fn] +(defn process-message-from-queue [ch meta payload topic-entity processing-fn ziggurat-channel-key] (let [delivery-tag (:delivery-tag meta) - message-payload (convert-and-ack-message ch meta payload false topic-entity)] + message-payload (convert-and-ack-message ch meta payload false topic-entity ziggurat-channel-key)] (when message-payload (log/infof "Processing message [%s] from RabbitMQ " message-payload) (try @@ -61,15 +57,15 @@ (processing-fn message-payload) (ack-message ch delivery-tag) (catch Exception e - (publish-to-dead-set delivery-tag topic-entity payload) (report-error e "Error while processing message-payload from RabbitMQ") - (metrics/increment-count ["rabbitmq-message" "process"] "failure" {:topic_name (name topic-entity)})))))) + (metrics/increment-count ["rabbitmq-message" "process"] "failure" {:topic_name (name topic-entity)}) + (publish-serialized-payload-to-dead-set-and-ack ch delivery-tag payload topic-entity ziggurat-channel-key)))))) -(defn read-message-from-queue [ch queue-name topic-entity ack?] +(defn read-message-from-queue [ch queue-name topic-entity ack? ziggurat-channel-key] (try (let [[meta payload] (lb/get ch queue-name false)] (when (some? payload) - (convert-and-ack-message ch meta payload ack? topic-entity))) + (convert-and-ack-message ch meta payload ack? topic-entity ziggurat-channel-key))) (catch Exception e (report-error e "Error while consuming the dead set message") (metrics/increment-count ["rabbitmq-message" "consumption"] "failure" {:topic_name (name topic-entity)})))) @@ -92,7 +88,7 @@ (remove nil? (with-open [ch (lch/open consumer-connection)] (doall (for [_ (range count)] - (read-message-from-queue ch (construct-queue-name topic-entity channel) topic-entity false))))))) + (read-message-from-queue ch (construct-queue-name topic-entity channel) topic-entity false channel))))))) (defn process-dead-set-messages "This method reads and processes `count` number of messages from RabbitMQ dead-letter queue for topic `topic-entity` and @@ -105,7 +101,7 @@ (let [queue-name (construct-queue-name topic-entity channel) [meta payload] (lb/get ch queue-name false)] (when (some? payload) - (process-message-from-queue ch meta payload topic-entity processing-fn)))))))) + (process-message-from-queue ch meta payload topic-entity processing-fn channel)))))))) (defn delete-dead-set-messages "This method deletes `count` number of messages from RabbitMQ dead-letter queue for topic `topic-entity` and channel @@ -116,15 +112,15 @@ (doall (for [_ (range count)] (lb/get ch queue-name true)))))) -(defn- message-handler [wrapped-mapper-fn topic-entity] +(defn- message-handler [wrapped-mapper-fn topic-entity ziggurat-channel-key] (fn [ch meta ^bytes payload] - (clog/with-logging-context {:consumer-group topic-entity} (process-message-from-queue ch meta payload topic-entity wrapped-mapper-fn)))) + (clog/with-logging-context {:consumer-group topic-entity} (process-message-from-queue ch meta payload topic-entity wrapped-mapper-fn ziggurat-channel-key)))) -(defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity] +(defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity ziggurat-channel-key] (lb/qos ch prefetch-count) (lcons/subscribe ch queue-name - (message-handler wrapped-mapper-fn topic-entity) + (message-handler wrapped-mapper-fn topic-entity ziggurat-channel-key) {:handle-shutdown-signal-fn (fn [consumer_tag reason] (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason)) :handle-consume-ok-fn (fn [consumer_tag] @@ -137,7 +133,8 @@ (get-in-config [:jobs :instant :prefetch-count]) (util/prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name])) handler-fn - topic-entity)))) + topic-entity + nil)))) (defn start-channels-subscriber [channels topic-entity] (doseq [channel channels] @@ -149,7 +146,8 @@ channel-prefetch-count (util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name])) (mpr/channel-mapper-func channel-handler-fn channel-key) - topic-entity)))))) + topic-entity + channel-key)))))) (defn start-subscribers "Starts the subscriber to the instant queue of the rabbitmq" diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index d46963f0..528d1ed5 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -77,13 +77,14 @@ (.borrowObject pool)) (defn- publish-internal - [exchange message-payload expiration retry-counter] + [exchange message-payload is-payload-serialized? expiration retry-counter] (try (let [ch (borrow-from-pool cpool/channel-pool)] (try - (lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) - (properties-for-publish expiration (:headers message-payload))) - :success + (let [serialized-payload (if is-payload-serialized? message-payload (nippy/freeze (dissoc message-payload :headers))) + publish-properties (if is-payload-serialized? {} (properties-for-publish expiration (:headers message-payload)))] + (lb/publish ch exchange "" serialized-payload publish-properties) + :success) (finally (return-to-pool cpool/channel-pool ch)))) (catch AlreadyClosedException e (handle-network-exception e message-payload retry-counter)) @@ -104,23 +105,34 @@ (defn publish "This is meant for publishing to rabbitmq. + * Supports publishing both serialized and deserialized messages. If is-payload-serialized is true, the message won't be + frozen before publishing to rabbitmq, else it would be frozen via nippy. + * Use case(s) of sending serialized-payloads + 1) If a subscriber encounters an exception, while deserializing the message it receives from instant queue, + the serialized payload + is sent as is to the dead-letter-queue. + 2) If a subscriber encounters any exception while processing the message, the serialized payload + is sent as is to the dead-letter-queue * Checks if the pool is alive - We do this so that publish does not happen after the channel pool state is stopped. * publish-internal returns multiple states * :success - Message has been successfully produced to rabbitmq * :retry - A retryable exception was encountered and message will be retried until it is successfully published. - * :retry-with-counter - A non recoverable exception is encountered, but the message will be retried for a few times. defined by the counter + * :retry-with-counter - A non recoverable exception is encountered, but the message will be retried for a few times. + defined by the counter { :rabbit-mq-connection { :publish-retry { :non-recoverable-exception {:count}}}}}" ([exchange message-payload] (publish exchange message-payload nil)) ([exchange message-payload expiration] (publish exchange message-payload expiration 0)) ([exchange message-payload expiration retry-counter] + (publish exchange message-payload false expiration retry-counter (:topic-entity message-payload))) + ([exchange message-payload is-payload-serialized? expiration retry-counter topic-entity] (when (is-pool-alive? cpool/channel-pool) (let [start-time (.toEpochMilli (Instant/now)) - result (publish-internal exchange message-payload expiration retry-counter) + result (publish-internal exchange message-payload is-payload-serialized? expiration retry-counter) end-time (.toEpochMilli (Instant/now)) time-val (- end-time start-time) - _ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name (:topic-entity message-payload)) + _ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name topic-entity) :exchange-name exchange})] (when (pos? retry-counter) (log/info "Retrying publishing the message to " exchange) @@ -130,16 +142,16 @@ (= result :success) nil (= result :retry) (do (Thread/sleep (:back-off-ms (publish-retry-config))) - (recur exchange message-payload expiration (inc retry-counter))) + (recur exchange message-payload is-payload-serialized? expiration (inc retry-counter) topic-entity)) (= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config)) (< retry-counter (:count (non-recoverable-exception-config)))) (do (log/info "Backing off") (Thread/sleep (:back-off-ms (non-recoverable-exception-config))) - (recur exchange message-payload expiration (inc retry-counter))) + (recur exchange message-payload is-payload-serialized? expiration (inc retry-counter) topic-entity)) (do (log/error "Publishing the message has failed. It is being dropped") - (metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload)) + (metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name topic-entity) :retry-counter retry-counter})))))))) (defn- retry-type [] @@ -240,11 +252,12 @@ queue-timeout-ms (get-queue-timeout-ms message-payload)] (publish exchange-name message-payload queue-timeout-ms))) -(defn publish-to-dead-queue [message-payload] - (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) - topic-entity (:topic-entity message-payload) - exchange-name (util/prefixed-queue-name topic-entity exchange-name)] - (publish exchange-name message-payload))) +(defn publish-to-dead-queue + ([message-payload] (publish-to-dead-queue message-payload (:topic-entity message-payload) false)) + ([message-payload topic-entity is-payload-serialized?] + (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) + exchange-name (util/prefixed-queue-name topic-entity exchange-name)] + (publish exchange-name message-payload is-payload-serialized? nil 0 topic-entity)))) (defn publish-to-instant-queue [message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) @@ -258,11 +271,12 @@ queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)] (publish exchange-name message-payload queue-timeout-ms))) -(defn publish-to-channel-dead-queue [channel message-payload] - (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) - topic-entity (:topic-entity message-payload) - exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)] - (publish exchange-name message-payload))) +(defn publish-to-channel-dead-queue + ([channel message-payload] (publish-to-channel-dead-queue channel message-payload (:topic-entity message-payload) false)) + ([channel message-payload topic-entity is-payload-serialized?] + (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) + exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)] + (publish exchange-name message-payload is-payload-serialized? nil 0 topic-entity)))) (defn publish-to-channel-instant-queue [channel message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) diff --git a/test/ziggurat/messaging/consumer_test.clj b/test/ziggurat/messaging/consumer_test.clj index 92a5cf57..833d7a2c 100644 --- a/test/ziggurat/messaging/consumer_test.clj +++ b/test/ziggurat/messaging/consumer_test.clj @@ -297,7 +297,7 @@ (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) [meta payload] (lb/get ch prefixed-queue-name false) - _ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn) + _ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn nil) consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)] (is (= consumed-message nil))))))) (testing "process-message function not process a message if convert-message returns nil" @@ -309,12 +309,12 @@ (reset! processing-fn-called true))) topic-entity-name (name topic-entity)] (producer/publish-to-dead-queue message) - (with-redefs [consumer/convert-and-ack-message (fn [_ _ _ _ _] nil)] + (with-redefs [consumer/convert-and-ack-message (fn [_ _ _ _ _ _] nil)] (with-open [ch (lch/open consumer-connection)] (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) [meta payload] (lb/get ch prefixed-queue-name false) - _ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn) + _ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn nil) consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)] (is (= false @processing-fn-called)) (is (= consumed-message nil)))))))) @@ -332,31 +332,23 @@ (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) [meta payload] (lb/get ch prefixed-queue-name false) - _ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn) + _ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn nil) consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)] (is (= consumed-message message)) (is @report-fn-called?)))))))) (deftest convert-and-ack-message-test - (testing "should call publish to dead set when nippy/thaw throws an exception" + (testing "should call publish to dead set and ack the message when nippy/thaw throws an exception" (let [freezed-message (nippy/freeze {:foo "bar"}) is-publish-called? (atom false) - topic-entity "default" - expected-exchange (prefixed-queue-name topic-entity (:exchange-name (:dead-letter (rabbitmq-config))))] + is-message-acked? (atom false) + topic-entity "default"] (with-redefs [nippy/thaw (fn [_] (throw (Exception. "nippy/thaw exception"))) - lb/publish (fn [_ exchange _ payload] - (is (= exchange expected-exchange)) - (is (= freezed-message payload)) - (reset! is-publish-called? true))] - (consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false topic-entity)) - (is (= @is-publish-called? true)))) - (testing "should call reject when both nippy/thaw and lb/publish throws an exception" - (let [freezed-message (nippy/freeze {:foo "bar"}) - is-reject-called? (atom false)] - (with-redefs [nippy/thaw (fn [_] (throw (Exception. "nippy/thaw exception"))) - lb/publish (fn [_ _ _ _] - (throw (Exception. "lb/publish exception"))) - consumer/reject-message (fn [_ _] - (reset! is-reject-called? true))] - (consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false "default")) - (is (= @is-reject-called? true))))) + ziggurat.messaging.producer/publish-to-dead-queue (fn [payload _ _] + (is (= freezed-message payload)) + (reset! is-publish-called? true)) + consumer/ack-message (fn [_ _] + (reset! is-message-acked? true))] + (consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false topic-entity nil)) + (is (= @is-publish-called? true)) + (is (= @is-message-acked? true))))) diff --git a/test/ziggurat/util/rabbitmq.clj b/test/ziggurat/util/rabbitmq.clj index 5092bb57..6b5cde8d 100644 --- a/test/ziggurat/util/rabbitmq.clj +++ b/test/ziggurat/util/rabbitmq.clj @@ -17,7 +17,7 @@ (try (let [[meta payload] (lb/get ch queue-name false)] (when (seq payload) - (consumer/convert-and-ack-message ch meta payload true (keyword topic-name)))) + (consumer/convert-and-ack-message ch meta payload true (keyword topic-name) nil))) (catch NullPointerException e nil)))) @@ -26,7 +26,7 @@ (try (let [[meta payload] (lb/get ch queue-name false)] (when (seq payload) - (consumer/convert-and-ack-message ch meta payload false (keyword topic-name)))) + (consumer/convert-and-ack-message ch meta payload false (keyword topic-name) nil))) (catch NullPointerException e nil))))