diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj index f7edd418..9cabb877 100644 --- a/src/ziggurat/messaging/consumer.clj +++ b/src/ziggurat/messaging/consumer.clj @@ -25,11 +25,11 @@ [ch delivery-tag] (lb/ack ch delivery-tag)) -(defn- publish-to-dead-set-and-ack - [ch delivery-tag payload ziggurat-channel-key] +(defn- publish-serialized-payload-to-dead-set-and-ack + [ch delivery-tag payload topic-entity ziggurat-channel-key] (if (nil? ziggurat-channel-key) - (producer/publish-to-dead-queue payload) - (producer/publish-to-channel-dead-queue ziggurat-channel-key payload)) + (producer/publish-to-dead-queue payload topic-entity true) + (producer/publish-to-channel-dead-queue ziggurat-channel-key payload topic-entity true)) (ack-message ch delivery-tag)) (defn convert-and-ack-message @@ -44,7 +44,7 @@ (catch Exception e (report-error e "Error while decoding message, publishing to dead queue...") (metrics/increment-count ["rabbitmq-message" "conversion"] "failure" {:topic_name (name topic-entity)}) - (publish-to-dead-set-and-ack ch delivery-tag payload ziggurat-channel-key) + (publish-serialized-payload-to-dead-set-and-ack ch delivery-tag payload topic-entity ziggurat-channel-key) nil))) (defn process-message-from-queue [ch meta payload topic-entity processing-fn ziggurat-channel-key] @@ -59,7 +59,7 @@ (catch Exception e (report-error e "Error while processing message-payload from RabbitMQ") (metrics/increment-count ["rabbitmq-message" "process"] "failure" {:topic_name (name topic-entity)}) - (publish-to-dead-set-and-ack ch delivery-tag payload ziggurat-channel-key)))))) + (publish-serialized-payload-to-dead-set-and-ack ch delivery-tag payload topic-entity ziggurat-channel-key)))))) (defn read-message-from-queue [ch queue-name topic-entity ack? ziggurat-channel-key] (try diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index d46963f0..e947314d 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -77,13 +77,14 @@ (.borrowObject pool)) (defn- publish-internal - [exchange message-payload expiration retry-counter] + [exchange message-payload is-payload-serialized? expiration retry-counter] (try (let [ch (borrow-from-pool cpool/channel-pool)] (try - (lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers)) - (properties-for-publish expiration (:headers message-payload))) - :success + (let [serialized-payload (if is-payload-serialized? message-payload (nippy/freeze (dissoc message-payload :headers))) + publish-properties (if is-payload-serialized? {} (properties-for-publish expiration (:headers message-payload)))] + (lb/publish ch exchange "" serialized-payload publish-properties) + :success) (finally (return-to-pool cpool/channel-pool ch)))) (catch AlreadyClosedException e (handle-network-exception e message-payload retry-counter)) @@ -115,12 +116,14 @@ ([exchange message-payload expiration] (publish exchange message-payload expiration 0)) ([exchange message-payload expiration retry-counter] + (publish exchange message-payload false expiration retry-counter (:topic-entity message-payload))) + ([exchange message-payload is-payload-serialized? expiration retry-counter topic-entity] (when (is-pool-alive? cpool/channel-pool) (let [start-time (.toEpochMilli (Instant/now)) - result (publish-internal exchange message-payload expiration retry-counter) + result (publish-internal exchange message-payload is-payload-serialized? expiration retry-counter) end-time (.toEpochMilli (Instant/now)) time-val (- end-time start-time) - _ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name (:topic-entity message-payload)) + _ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name topic-entity) :exchange-name exchange})] (when (pos? retry-counter) (log/info "Retrying publishing the message to " exchange) @@ -130,16 +133,16 @@ (= result :success) nil (= result :retry) (do (Thread/sleep (:back-off-ms (publish-retry-config))) - (recur exchange message-payload expiration (inc retry-counter))) + (recur exchange message-payload is-payload-serialized? expiration (inc retry-counter) topic-entity)) (= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config)) (< retry-counter (:count (non-recoverable-exception-config)))) (do (log/info "Backing off") (Thread/sleep (:back-off-ms (non-recoverable-exception-config))) - (recur exchange message-payload expiration (inc retry-counter))) + (recur exchange message-payload is-payload-serialized? expiration (inc retry-counter) topic-entity)) (do (log/error "Publishing the message has failed. It is being dropped") - (metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload)) + (metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name topic-entity) :retry-counter retry-counter})))))))) (defn- retry-type [] @@ -240,11 +243,12 @@ queue-timeout-ms (get-queue-timeout-ms message-payload)] (publish exchange-name message-payload queue-timeout-ms))) -(defn publish-to-dead-queue [message-payload] - (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) - topic-entity (:topic-entity message-payload) - exchange-name (util/prefixed-queue-name topic-entity exchange-name)] - (publish exchange-name message-payload))) +(defn publish-to-dead-queue + ([message-payload] (publish-to-dead-queue message-payload (:topic-entity message-payload) false)) + ([message-payload topic-entity is-payload-serialized?] + (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) + exchange-name (util/prefixed-queue-name topic-entity exchange-name)] + (publish exchange-name message-payload is-payload-serialized? nil 0 topic-entity)))) (defn publish-to-instant-queue [message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) @@ -258,11 +262,12 @@ queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)] (publish exchange-name message-payload queue-timeout-ms))) -(defn publish-to-channel-dead-queue [channel message-payload] - (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) - topic-entity (:topic-entity message-payload) - exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)] - (publish exchange-name message-payload))) +(defn publish-to-channel-dead-queue + ([channel message-payload] (publish-to-channel-dead-queue channel message-payload (:topic-entity message-payload) false)) + ([channel message-payload topic-entity is-payload-serialized?] + (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) + exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)] + (publish exchange-name message-payload is-payload-serialized? nil 0 topic-entity)))) (defn publish-to-channel-instant-queue [channel message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) diff --git a/test/ziggurat/messaging/consumer_test.clj b/test/ziggurat/messaging/consumer_test.clj index 4beb7550..833d7a2c 100644 --- a/test/ziggurat/messaging/consumer_test.clj +++ b/test/ziggurat/messaging/consumer_test.clj @@ -309,7 +309,7 @@ (reset! processing-fn-called true))) topic-entity-name (name topic-entity)] (producer/publish-to-dead-queue message) - (with-redefs [consumer/convert-and-ack-message (fn [_ _ _ _ _] nil)] + (with-redefs [consumer/convert-and-ack-message (fn [_ _ _ _ _ _] nil)] (with-open [ch (lch/open consumer-connection)] (let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name]) prefixed-queue-name (str topic-entity-name "_" queue-name) @@ -338,25 +338,17 @@ (is @report-fn-called?)))))))) (deftest convert-and-ack-message-test - (testing "should call publish to dead set when nippy/thaw throws an exception" + (testing "should call publish to dead set and ack the message when nippy/thaw throws an exception" (let [freezed-message (nippy/freeze {:foo "bar"}) is-publish-called? (atom false) - topic-entity "default" - expected-exchange (prefixed-queue-name topic-entity (:exchange-name (:dead-letter (rabbitmq-config))))] + is-message-acked? (atom false) + topic-entity "default"] (with-redefs [nippy/thaw (fn [_] (throw (Exception. "nippy/thaw exception"))) - lb/publish (fn [_ exchange _ payload] - (is (= exchange expected-exchange)) - (is (= freezed-message payload)) - (reset! is-publish-called? true))] + ziggurat.messaging.producer/publish-to-dead-queue (fn [payload _ _] + (is (= freezed-message payload)) + (reset! is-publish-called? true)) + consumer/ack-message (fn [_ _] + (reset! is-message-acked? true))] (consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false topic-entity nil)) - (is (= @is-publish-called? true)))) - (testing "should call reject when both nippy/thaw and lb/publish throws an exception" - (let [freezed-message (nippy/freeze {:foo "bar"}) - is-reject-called? (atom false)] - (with-redefs [nippy/thaw (fn [_] (throw (Exception. "nippy/thaw exception"))) - lb/publish (fn [_ _ _ _] - (throw (Exception. "lb/publish exception"))) - consumer/reject-message (fn [_ _] - (reset! is-reject-called? true))] - (consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false "default" nil)) - (is (= @is-reject-called? true))))) + (is (= @is-publish-called? true)) + (is (= @is-message-acked? true)))))