Skip to content

Commit

Permalink
Merge e1a0007 into 8ef1926
Browse files Browse the repository at this point in the history
  • Loading branch information
anmol1vw13 committed Sep 6, 2022
2 parents 8ef1926 + e1a0007 commit c41a86f
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 76 deletions.
60 changes: 29 additions & 31 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
[ziggurat.messaging.channel_pool :as cpool]
[ziggurat.kafka-consumer.consumer-handler :as ch]
[ziggurat.mapper :as mpr]
[ziggurat.messaging.producer :as producer]
[ziggurat.messaging.connection :refer [consumer-connection, producer-connection]]
[ziggurat.messaging.util :as util]
[ziggurat.metrics :as metrics]
Expand All @@ -20,56 +21,51 @@
[ch delivery-tag]
(lb/reject ch delivery-tag))

(defn- publish-to-dead-set
[delivery-tag topic-entity payload]
(let [ch (.borrowObject cpool/channel-pool)
{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
exchange (util/prefixed-queue-name topic-entity exchange-name)]
(try
(lb/publish ch exchange "" payload)
(catch Exception e
(log/error e "Exception was encountered while publishing to RabbitMQ")
(reject-message ch delivery-tag))
(finally (.returnObject cpool/channel-pool ch)))))
(defn- ack-message
[ch delivery-tag]
(lb/ack ch delivery-tag))

(defn- publish-serialized-payload-to-dead-set-and-ack
[ch delivery-tag payload topic-entity ziggurat-channel-key]
(if (nil? ziggurat-channel-key)
(producer/publish-to-dead-queue payload topic-entity true)
(producer/publish-to-channel-dead-queue ziggurat-channel-key payload topic-entity true))
(ack-message ch delivery-tag))

(defn convert-and-ack-message
"De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message
if `ack?` is true."
[ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity]
[ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity ziggurat-channel-key]
(try
(let [message (nippy/thaw payload)]
(when ack?
(lb/ack ch delivery-tag))
message)
(catch Exception e
(report-error e "Error while decoding message, publishing to dead queue...")
(publish-to-dead-set delivery-tag topic-entity payload)
(metrics/increment-count ["rabbitmq-message" "conversion"] "failure" {:topic_name (name topic-entity)})
(publish-serialized-payload-to-dead-set-and-ack ch delivery-tag payload topic-entity ziggurat-channel-key)
nil)))

(defn- ack-message
[ch delivery-tag]
(lb/ack ch delivery-tag))

(defn process-message-from-queue [ch meta payload topic-entity processing-fn]
(defn process-message-from-queue [ch meta payload topic-entity processing-fn ziggurat-channel-key]
(let [delivery-tag (:delivery-tag meta)
message-payload (convert-and-ack-message ch meta payload false topic-entity)]
message-payload (convert-and-ack-message ch meta payload false topic-entity ziggurat-channel-key)]
(when message-payload
(log/infof "Processing message [%s] from RabbitMQ " message-payload)
(try
(log/debug "Calling processor-fn with the message-payload - " message-payload " with retry count - " (:retry-count message-payload))
(processing-fn message-payload)
(ack-message ch delivery-tag)
(catch Exception e
(publish-to-dead-set delivery-tag topic-entity payload)
(report-error e "Error while processing message-payload from RabbitMQ")
(metrics/increment-count ["rabbitmq-message" "process"] "failure" {:topic_name (name topic-entity)}))))))
(metrics/increment-count ["rabbitmq-message" "process"] "failure" {:topic_name (name topic-entity)})
(publish-serialized-payload-to-dead-set-and-ack ch delivery-tag payload topic-entity ziggurat-channel-key))))))

(defn read-message-from-queue [ch queue-name topic-entity ack?]
(defn read-message-from-queue [ch queue-name topic-entity ack? ziggurat-channel-key]
(try
(let [[meta payload] (lb/get ch queue-name false)]
(when (some? payload)
(convert-and-ack-message ch meta payload ack? topic-entity)))
(convert-and-ack-message ch meta payload ack? topic-entity ziggurat-channel-key)))
(catch Exception e
(report-error e "Error while consuming the dead set message")
(metrics/increment-count ["rabbitmq-message" "consumption"] "failure" {:topic_name (name topic-entity)}))))
Expand All @@ -92,7 +88,7 @@
(remove nil?
(with-open [ch (lch/open consumer-connection)]
(doall (for [_ (range count)]
(read-message-from-queue ch (construct-queue-name topic-entity channel) topic-entity false)))))))
(read-message-from-queue ch (construct-queue-name topic-entity channel) topic-entity false channel)))))))

(defn process-dead-set-messages
"This method reads and processes `count` number of messages from RabbitMQ dead-letter queue for topic `topic-entity` and
Expand All @@ -105,7 +101,7 @@
(let [queue-name (construct-queue-name topic-entity channel)
[meta payload] (lb/get ch queue-name false)]
(when (some? payload)
(process-message-from-queue ch meta payload topic-entity processing-fn))))))))
(process-message-from-queue ch meta payload topic-entity processing-fn channel))))))))

(defn delete-dead-set-messages
"This method deletes `count` number of messages from RabbitMQ dead-letter queue for topic `topic-entity` and channel
Expand All @@ -116,15 +112,15 @@
(doall (for [_ (range count)]
(lb/get ch queue-name true))))))

(defn- message-handler [wrapped-mapper-fn topic-entity]
(defn- message-handler [wrapped-mapper-fn topic-entity ziggurat-channel-key]
(fn [ch meta ^bytes payload]
(clog/with-logging-context {:consumer-group topic-entity} (process-message-from-queue ch meta payload topic-entity wrapped-mapper-fn))))
(clog/with-logging-context {:consumer-group topic-entity} (process-message-from-queue ch meta payload topic-entity wrapped-mapper-fn ziggurat-channel-key))))

(defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity]
(defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity ziggurat-channel-key]
(lb/qos ch prefetch-count)
(lcons/subscribe ch
queue-name
(message-handler wrapped-mapper-fn topic-entity)
(message-handler wrapped-mapper-fn topic-entity ziggurat-channel-key)
{:handle-shutdown-signal-fn (fn [consumer_tag reason]
(log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
:handle-consume-ok-fn (fn [consumer_tag]
Expand All @@ -137,7 +133,8 @@
(get-in-config [:jobs :instant :prefetch-count])
(util/prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name]))
handler-fn
topic-entity))))
topic-entity
nil))))

(defn start-channels-subscriber [channels topic-entity]
(doseq [channel channels]
Expand All @@ -149,7 +146,8 @@
channel-prefetch-count
(util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name]))
(mpr/channel-mapper-func channel-handler-fn channel-key)
topic-entity))))))
topic-entity
channel-key))))))

(defn start-subscribers
"Starts the subscriber to the instant queue of the rabbitmq"
Expand Down
54 changes: 34 additions & 20 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@
(.borrowObject pool))

(defn- publish-internal
[exchange message-payload expiration retry-counter]
[exchange message-payload is-payload-serialized? expiration retry-counter]
(try
(let [ch (borrow-from-pool cpool/channel-pool)]
(try
(lb/publish ch exchange "" (nippy/freeze (dissoc message-payload :headers))
(properties-for-publish expiration (:headers message-payload)))
:success
(let [serialized-payload (if is-payload-serialized? message-payload (nippy/freeze (dissoc message-payload :headers)))
publish-properties (if is-payload-serialized? {} (properties-for-publish expiration (:headers message-payload)))]
(lb/publish ch exchange "" serialized-payload publish-properties)
:success)
(finally (return-to-pool cpool/channel-pool ch))))
(catch AlreadyClosedException e
(handle-network-exception e message-payload retry-counter))
Expand All @@ -104,23 +105,34 @@

(defn publish
"This is meant for publishing to rabbitmq.
* Supports publishing both serialized and deserialized messages. If is-payload-serialized is true, the message won't be
frozen before publishing to rabbitmq, else it would be frozen via nippy.
* Use case(s) of sending serialized-payloads
1) If a subscriber encounters an exception, while deserializing the message it receives from instant queue,
the serialized payload
is sent as is to the dead-letter-queue.
2) If a subscriber encounters any exception while processing the message, the serialized payload
is sent as is to the dead-letter-queue
* Checks if the pool is alive - We do this so that publish does not happen after the channel pool state is stopped.
* publish-internal returns multiple states
* :success - Message has been successfully produced to rabbitmq
* :retry - A retryable exception was encountered and message will be retried until it is successfully published.
* :retry-with-counter - A non recoverable exception is encountered, but the message will be retried for a few times. defined by the counter
* :retry-with-counter - A non recoverable exception is encountered, but the message will be retried for a few times.
defined by the counter
{ :rabbit-mq-connection { :publish-retry { :non-recoverable-exception {:count}}}}}"
([exchange message-payload]
(publish exchange message-payload nil))
([exchange message-payload expiration]
(publish exchange message-payload expiration 0))
([exchange message-payload expiration retry-counter]
(publish exchange message-payload false expiration retry-counter (:topic-entity message-payload)))
([exchange message-payload is-payload-serialized? expiration retry-counter topic-entity]
(when (is-pool-alive? cpool/channel-pool)
(let [start-time (.toEpochMilli (Instant/now))
result (publish-internal exchange message-payload expiration retry-counter)
result (publish-internal exchange message-payload is-payload-serialized? expiration retry-counter)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
_ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name (:topic-entity message-payload))
_ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name topic-entity)
:exchange-name exchange})]
(when (pos? retry-counter)
(log/info "Retrying publishing the message to " exchange)
Expand All @@ -130,16 +142,16 @@
(= result :success) nil
(= result :retry) (do
(Thread/sleep (:back-off-ms (publish-retry-config)))
(recur exchange message-payload expiration (inc retry-counter)))
(recur exchange message-payload is-payload-serialized? expiration (inc retry-counter) topic-entity))
(= result :retry-with-counter) (if (and (:enabled (non-recoverable-exception-config))
(< retry-counter (:count (non-recoverable-exception-config))))
(do
(log/info "Backing off")
(Thread/sleep (:back-off-ms (non-recoverable-exception-config)))
(recur exchange message-payload expiration (inc retry-counter)))
(recur exchange message-payload is-payload-serialized? expiration (inc retry-counter) topic-entity))
(do
(log/error "Publishing the message has failed. It is being dropped")
(metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name (:topic-entity message-payload))
(metrics/increment-count ["rabbitmq" "publish"] "message_loss" {:topic-entity (name topic-entity)
:retry-counter retry-counter}))))))))

(defn- retry-type []
Expand Down Expand Up @@ -240,11 +252,12 @@
queue-timeout-ms (get-queue-timeout-ms message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-dead-queue [message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload)))
(defn publish-to-dead-queue
([message-payload] (publish-to-dead-queue message-payload (:topic-entity message-payload) false))
([message-payload topic-entity is-payload-serialized?]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
exchange-name (util/prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload is-payload-serialized? nil 0 topic-entity))))

(defn publish-to-instant-queue [message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
Expand All @@ -258,11 +271,12 @@
queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-channel-dead-queue [channel message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload)))
(defn publish-to-channel-dead-queue
([channel message-payload] (publish-to-channel-dead-queue channel message-payload (:topic-entity message-payload) false))
([channel message-payload topic-entity is-payload-serialized?]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload is-payload-serialized? nil 0 topic-entity))))

(defn publish-to-channel-instant-queue [channel message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
Expand Down
38 changes: 15 additions & 23 deletions test/ziggurat/messaging/consumer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@
(let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name])
prefixed-queue-name (str topic-entity-name "_" queue-name)
[meta payload] (lb/get ch prefixed-queue-name false)
_ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn)
_ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn nil)
consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)]
(is (= consumed-message nil)))))))
(testing "process-message function not process a message if convert-message returns nil"
Expand All @@ -309,12 +309,12 @@
(reset! processing-fn-called true)))
topic-entity-name (name topic-entity)]
(producer/publish-to-dead-queue message)
(with-redefs [consumer/convert-and-ack-message (fn [_ _ _ _ _] nil)]
(with-redefs [consumer/convert-and-ack-message (fn [_ _ _ _ _ _] nil)]
(with-open [ch (lch/open consumer-connection)]
(let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name])
prefixed-queue-name (str topic-entity-name "_" queue-name)
[meta payload] (lb/get ch prefixed-queue-name false)
_ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn)
_ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn nil)
consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)]
(is (= false @processing-fn-called))
(is (= consumed-message nil))))))))
Expand All @@ -332,31 +332,23 @@
(let [queue-name (get-in (rabbitmq-config) [:dead-letter :queue-name])
prefixed-queue-name (str topic-entity-name "_" queue-name)
[meta payload] (lb/get ch prefixed-queue-name false)
_ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn)
_ (consumer/process-message-from-queue ch meta payload topic-entity processing-fn nil)
consumed-message (util/get-msg-from-dead-queue-without-ack topic-entity-name)]
(is (= consumed-message message))
(is @report-fn-called?))))))))

(deftest convert-and-ack-message-test
(testing "should call publish to dead set when nippy/thaw throws an exception"
(testing "should call publish to dead set and ack the message when nippy/thaw throws an exception"
(let [freezed-message (nippy/freeze {:foo "bar"})
is-publish-called? (atom false)
topic-entity "default"
expected-exchange (prefixed-queue-name topic-entity (:exchange-name (:dead-letter (rabbitmq-config))))]
is-message-acked? (atom false)
topic-entity "default"]
(with-redefs [nippy/thaw (fn [_] (throw (Exception. "nippy/thaw exception")))
lb/publish (fn [_ exchange _ payload]
(is (= exchange expected-exchange))
(is (= freezed-message payload))
(reset! is-publish-called? true))]
(consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false topic-entity))
(is (= @is-publish-called? true))))
(testing "should call reject when both nippy/thaw and lb/publish throws an exception"
(let [freezed-message (nippy/freeze {:foo "bar"})
is-reject-called? (atom false)]
(with-redefs [nippy/thaw (fn [_] (throw (Exception. "nippy/thaw exception")))
lb/publish (fn [_ _ _ _]
(throw (Exception. "lb/publish exception")))
consumer/reject-message (fn [_ _]
(reset! is-reject-called? true))]
(consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false "default"))
(is (= @is-reject-called? true)))))
ziggurat.messaging.producer/publish-to-dead-queue (fn [payload _ _]
(is (= freezed-message payload))
(reset! is-publish-called? true))
consumer/ack-message (fn [_ _]
(reset! is-message-acked? true))]
(consumer/convert-and-ack-message nil {:delivery-tag "delivery-tag"} freezed-message false topic-entity nil))
(is (= @is-publish-called? true))
(is (= @is-message-acked? true)))))
Loading

0 comments on commit c41a86f

Please sign in to comment.