Skip to content

Commit

Permalink
Have a support of publishing serialized payload to dead-letter-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Anmol Vijaywargiya committed Sep 6, 2022
1 parent 637594c commit 83a6217
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 44 deletions.
12 changes: 6 additions & 6 deletions src/ziggurat/messaging/consumer.clj
Expand Up @@ -25,11 +25,11 @@
[ch delivery-tag]
(lb/ack ch delivery-tag))

(defn- publish-to-dead-set-and-ack
[ch delivery-tag payload ziggurat-channel-key]
(defn- publish-serialized-payload-to-dead-set-and-ack
[ch delivery-tag payload topic-entity ziggurat-channel-key]
(if (nil? ziggurat-channel-key)
(producer/publish-to-dead-queue payload)
(producer/publish-to-channel-dead-queue ziggurat-channel-key payload))
(producer/publish-to-dead-queue payload topic-entity true)
(producer/publish-to-channel-dead-queue ziggurat-channel-key payload topic-entity true))
(ack-message ch delivery-tag))

(defn convert-and-ack-message
Expand All @@ -44,7 +44,7 @@
(catch Exception e
(report-error e "Error while decoding message, publishing to dead queue...")
(metrics/increment-count ["rabbitmq-message" "conversion"] "failure" {:topic_name (name topic-entity)})
(publish-to-dead-set-and-ack ch delivery-tag payload ziggurat-channel-key)
(publish-serialized-payload-to-dead-set-and-ack ch delivery-tag payload topic-entity ziggurat-channel-key)
nil)))

(defn process-message-from-queue [ch meta payload topic-entity processing-fn ziggurat-channel-key]
Expand All @@ -59,7 +59,7 @@
(catch Exception e
(report-error e "Error while processing message-payload from RabbitMQ")
(metrics/increment-count ["rabbitmq-message" "process"] "failure" {:topic_name (name topic-entity)})
(publish-to-dead-set-and-ack ch delivery-tag payload ziggurat-channel-key))))))
(publish-serialized-payload-to-dead-set-and-ack ch delivery-tag payload topic-entity ziggurat-channel-key))))))

(defn read-message-from-queue [ch queue-name topic-entity ack? ziggurat-channel-key]
(try
Expand Down
43 changes: 24 additions & 19 deletions src/ziggurat/messaging/producer.clj
Expand Up @@ -77,13 +77,14 @@
(.borrowObject pool))

(defn- publish-internal
[exchange message-payload expiration retry-counter]
[exchange message-payload is-payload-serialized? expiration retry-counter]
(try
(let [ch (borrow-from-pool cpool/channel-pool)]
(try
(lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers))
(properties-for-publish expiration (:headers message-payload)))
:success
(let [serialized-payload (if is-payload-serialized? message-payload (nippy/freeze (dissoc message-payload :headers)))
publish-properties (if is-payload-serialized? {} (properties-for-publish expiration (:headers message-payload)))]
(lb/publish ch exchange "" serialized-payload publish-properties)
:success)
(finally (return-to-pool cpool/channel-pool ch))))
(catch AlreadyClosedException e
(handle-network-exception e message-payload retry-counter))
Expand Down Expand Up @@ -115,12 +116,14 @@
([exchange message-payload expiration]
(publish exchange message-payload expiration 0))
([exchange message-payload expiration retry-counter]
(publish exchange message-payload false expiration retry-counter (:topic-entity message-payload)))
([exchange message-payload is-payload-serialized? expiration retry-counter topic-entity]
(when (is-pool-alive? cpool/channel-pool)
(let [start-time (.toEpochMilli (Instant/now))
result (publish-internal exchange message-payload expiration retry-counter)
result (publish-internal exchange message-payload is-payload-serialized? expiration retry-counter)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
_ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name (:topic-entity message-payload))
_ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name topic-entity)
:exchange-name exchange})]
(when (pos? retry-counter)
(log/info "Retrying publishing the message to " exchange)
Expand All @@ -130,16 +133,16 @@
(= result :success) nil
(= result :retry) (do
(Thread/sleep (:back-off-ms (publish-retry-config)))
(recur exchange message-payload expiration (inc retry-counter)))
(recur exchange message-payload is-payload-serialized? expiration (inc retry-counter) topic-entity))
(= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config))
(< retry-counter (:count (non-recoverable-exception-config))))
(do
(log/info "Backing off")
(Thread/sleep (:back-off-ms (non-recoverable-exception-config)))
(recur exchange message-payload expiration (inc retry-counter)))
(recur exchange message-payload is-payload-serialized? expiration (inc retry-counter) topic-entity))
(do
(log/error "Publishing the message has failed. It is being dropped")
(metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload))
(metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name topic-entity)
:retry-counter retry-counter}))))))))

(defn- retry-type []
Expand Down Expand Up @@ -240,11 +243,12 @@
queue-timeout-ms (get-queue-timeout-ms message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-dead-queue [message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload)))
(defn publish-to-dead-queue
([message-payload] (publish-to-dead-queue message-payload (:topic-entity message-payload) false))
([message-payload topic-entity is-payload-serialized?]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
exchange-name (util/prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload is-payload-serialized? nil 0 topic-entity))))

(defn publish-to-instant-queue [message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
Expand All @@ -258,11 +262,12 @@
queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-channel-dead-queue [channel message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload)))
(defn publish-to-channel-dead-queue
([channel message-payload] (publish-to-channel-dead-queue channel message-payload (:topic-entity message-payload) false))
([channel message-payload topic-entity is-payload-serialized?]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload is-payload-serialized? nil 0 topic-entity))))

(defn publish-to-channel-instant-queue [channel message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
Expand Down
30 changes: 11 additions & 19 deletions test/ziggurat/messaging/consumer_test.clj
Expand Up @@ -309,7 +309,7 @@
(reset! processing-fn-called true)))
topic-entity-name (name topic-entity)]
(producer/publish-to-dead-queue message)
(with-redefs [consumer/convert-and-ack-message (fn [_ _ _ _ _] nil)]
(with-redefs [consumer/convert-and-ack-message (fn [_ _ _ _ _ _] nil)]
(with-open [ch (lch/open consumer-connection)]
(let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name])
prefixed-queue-name (str topic-entity-name "_" queue-name)
Expand Down Expand Up @@ -338,25 +338,17 @@
(is @report-fn-called?))))))))

(deftest convert-and-ack-message-test
(testing "should call publish to dead set when nippy/thaw throws an exception"
(testing "should call publish to dead set and ack the message when nippy/thaw throws an exception"
(let [freezed-message (nippy/freeze {:foo "bar"})
is-publish-called? (atom false)
topic-entity "default"
expected-exchange (prefixed-queue-name topic-entity (:exchange-name (:dead-letter (rabbitmq-config))))]
is-message-acked? (atom false)
topic-entity "default"]
(with-redefs [nippy/thaw (fn [_] (throw (Exception. "nippy/thaw exception")))
lb/publish (fn [_ exchange _ payload]
(is (= exchange expected-exchange))
(is (= freezed-message payload))
(reset! is-publish-called? true))]
ziggurat.messaging.producer/publish-to-dead-queue (fn [payload _ _]
(is (= freezed-message payload))
(reset! is-publish-called? true))
consumer/ack-message (fn [_ _]
(reset! is-message-acked? true))]
(consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false topic-entity nil))
(is (= @is-publish-called? true))))
(testing "should call reject when both nippy/thaw and lb/publish throws an exception"
(let [freezed-message (nippy/freeze {:foo "bar"})
is-reject-called? (atom false)]
(with-redefs [nippy/thaw (fn [_] (throw (Exception. "nippy/thaw exception")))
lb/publish (fn [_ _ _ _]
(throw (Exception. "lb/publish exception")))
consumer/reject-message (fn [_ _]
(reset! is-reject-called? true))]
(consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false "default" nil))
(is (= @is-reject-called? true)))))
(is (= @is-publish-called? true))
(is (= @is-message-acked? true)))))

0 comments on commit 83a6217

Please sign in to comment.