Skip to content

Commit

Permalink
Merge fabe258 into ed467ca
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed Aug 10, 2021
2 parents ed467ca + fabe258 commit da29378
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 174 deletions.
59 changes: 32 additions & 27 deletions src/ziggurat/messaging/consumer.clj
@@ -1,30 +1,35 @@
(ns ziggurat.messaging.consumer
(:require [ziggurat.mapper :as mpr]
[clojure.tools.logging :as log]
(:require [clojure.tools.logging :as log]
[langohr.basic :as lb]
[langohr.channel :as lch]
[langohr.consumers :as lcons]
[ziggurat.kafka-consumer.consumer-handler :as ch]
[taoensso.nippy :as nippy]
[ziggurat.config :refer [get-in-config]]
[ziggurat.config :refer [get-in-config rabbitmq-config]]
[ziggurat.kafka-consumer.consumer-handler :as ch]
[ziggurat.mapper :as mpr]
[ziggurat.messaging.connection :refer [connection]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.messaging.util :refer :all]
[ziggurat.messaging.util :as util]
[ziggurat.metrics :as metrics]
[ziggurat.util.error :refer [report-error]]))

(defn convert-and-ack-message
"De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message
if `ack?` is true."
[ch {:keys [delivery-tag] :as meta} ^bytes payload ack? topic-entity]
[ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity]
(try
(let [message (nippy/thaw payload)]
(when ack?
(lb/ack ch delivery-tag))
message)
(catch Exception e
(lb/reject ch delivery-tag false)
(report-error e "Error while decoding message")
(report-error e "Error while decoding message, publishing to dead queue...")
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
exchange (util/prefixed-queue-name topic-entity exchange-name)]
(try
(lb/publish ch exchange "" payload)
(catch Exception e
(log/error e "Exception was encountered while publishing to RabbitMQ")
(lb/reject ch delivery-tag false))))
(metrics/increment-count ["rabbitmq-message" "conversion"] "failure" {:topic_name (name topic-entity)})
nil)))

Expand All @@ -33,8 +38,8 @@
(lb/ack ch delivery-tag))

(defn process-message-from-queue [ch meta payload topic-entity processing-fn]
(let [delivery-tag (:delivery-tag meta)
message-payload (convert-and-ack-message ch meta payload false topic-entity)]
(let [delivery-tag (:delivery-tag meta)
message-payload (convert-and-ack-message ch meta payload false topic-entity)]
(when message-payload
(log/infof "Processing message [%s] from RabbitMQ " message-payload)
(try
Expand All @@ -60,8 +65,8 @@
(construct-queue-name topic-entity nil))
([topic-entity channel]
(if (nil? channel)
(prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :dead-letter :queue-name]))
(prefixed-channel-name topic-entity channel (get-in-config [:rabbit-mq :dead-letter :queue-name])))))
(util/prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :dead-letter :queue-name]))
(util/prefixed-channel-name topic-entity channel (get-in-config [:rabbit-mq :dead-letter :queue-name])))))

(defn get-dead-set-messages
"This method can be used to read and optionally ack messages in dead-letter queue, based on the value of `ack?`.
Expand Down Expand Up @@ -94,20 +99,20 @@

(defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity]
(lb/qos ch prefetch-count)
(let [consumer-tag (lcons/subscribe ch
queue-name
(message-handler wrapped-mapper-fn topic-entity)
{:handle-shutdown-signal-fn (fn [consumer_tag reason]
(log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
:handle-consume-ok-fn (fn [consumer_tag]
(log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})]))
(lcons/subscribe ch
queue-name
(message-handler wrapped-mapper-fn topic-entity)
{:handle-shutdown-signal-fn (fn [consumer_tag reason]
(log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason))
:handle-consume-ok-fn (fn [consumer_tag]
(log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))}))

(defn start-retry-subscriber* [handler-fn topic-entity]
(when (get-in-config [:retry :enabled])
(dotimes [_ (get-in-config [:jobs :instant :worker-count])]
(start-subscriber* (lch/open connection)
(get-in-config [:jobs :instant :prefetch-count])
(prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name]))
(util/prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name]))
handler-fn
topic-entity))))

Expand All @@ -118,20 +123,20 @@
(dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])]
(start-subscriber* (lch/open connection)
1
(prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name]))
(util/prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name]))
(mpr/channel-mapper-func channel-handler-fn channel-key)
topic-entity)))))

(defn start-subscribers
"Starts the subscriber to the instant queue of the rabbitmq"
[stream-routes batch-routes]
(doseq [stream-route stream-routes]
(let [topic-entity (first stream-route)
handler (-> stream-route second :handler-fn)
channels (-> stream-route second (dissoc :handler-fn))]
(let [topic-entity (first stream-route)
handler (-> stream-route second :handler-fn)
channels (-> stream-route second (dissoc :handler-fn))]
(start-channels-subscriber channels topic-entity)
(start-retry-subscriber* (mpr/mapper-func handler (keys channels)) topic-entity)))
(doseq [batch-route batch-routes]
(let [topic-entity (first batch-route)
handler (-> batch-route second :handler-fn)]
(let [topic-entity (first batch-route)
handler (-> batch-route second :handler-fn)]
(start-retry-subscriber* (fn [message] (ch/process handler message)) topic-entity))))

0 comments on commit da29378

Please sign in to comment.