Skip to content

Commit

Permalink
Merge pull request #124 from thediantoni/retry-queue-per-ttl-3.x
Browse files Browse the repository at this point in the history
create different queue per message TTL
  • Loading branch information
theanirudhvyas committed Dec 12, 2019
2 parents eee63d9 + 10a531c commit 048a781
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 14 deletions.
123 changes: 110 additions & 13 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,92 @@
(sentry/report-error sentry-reporter e
"Pushing message to rabbitmq failed, data: " message-payload)))))

(defn- get-channel-retry-count [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :count))

(defn- get-channel-retry-queue-timeout-ms [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :queue-timeout-ms))

(defn- channel-retries-exponential-backoff-config [topic-entity channel]
"Get exponential backoff enabled for specific channel from config."
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :exponential-backoff))

(defn- get-backoff-exponent [retry-count message-retry-count exponential-backoff-count]
"Calculates backoff exponent for the given message-retry-count (number of retries available for the message) and retry-count (number of max retries possible). Returns the min of calculated exponent and exponential-backoff-count"
(let [exponent (- retry-count message-retry-count)
exponential-backoff-count (or exponential-backoff-count 10)]
(if (< exponent exponential-backoff-count)
exponent
exponential-backoff-count)))

(defn- get-exponential-backoff-timeout-ms [retry-count message-retry-count queue-timeout-ms exponential-backoff-count]
"Calculates the exponential timeout value from the number of max retries possible (retry-count), the number of retries available for a message (message-retry-count) and base timeout value (queue-timeout-ms). It uses this formula ((2^n)-1)*queue-timeout-ms, where n is the current message retry-count.
Sample config to use exponential backoff:
{:ziggurat {:retry {:enabled true
:count 5
:exponential-backoff {:enabled true :count 10}}}}
Sample config to use exponential backoff when using channel flow:
{:ziggurat {:stream-router {topic-entity {:channels {channel {:retry {:count 5
:enabled true
:queue-timeout-ms 1000
:exponential-backoff {:enabled true :count 10}}}}}}}}
_NOTE: Exponential backoff for channel retries is an experimental feature. It should not be used until released in a stable version._"
(let [exponential-backoff (get-backoff-exponent retry-count message-retry-count exponential-backoff-count)]
(int (* (dec (Math/pow 2 exponential-backoff)) queue-timeout-ms))))

(defn get-queue-timeout-ms [message-payload]
"Calculate queue timeout for delay queue. Use value from get-exponential-backoff-timeout-ms if exponential backoff enabled."
(let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms)
exponential-backoff-config (-> (ziggurat-config) :retry :exponential-backoff)
retry-count (-> (ziggurat-config) :retry :count)
message-retry-count (:retry-count message-payload)]
(if (:enabled exponential-backoff-config)
(get-exponential-backoff-timeout-ms retry-count message-retry-count queue-timeout-ms (:count exponential-backoff-config))
queue-timeout-ms)))

(defn get-channel-queue-timeout-ms [topic-entity channel message-payload]
"Calculate queue timeout for channel delay queue. Use value from get-exponential-backoff-timeout-ms if exponential backoff enabled."
(let [exponential-backoff-config (channel-retries-exponential-backoff-config topic-entity channel)
queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms)
channel-queue-timeout-ms (get-channel-retry-queue-timeout-ms topic-entity channel)
retry-count (-> (ziggurat-config) :retry :count)
message-retry-count (:retry-count message-payload)
channel-retry-count (get-channel-retry-count topic-entity channel)]
(if (:enabled exponential-backoff-config)
(get-exponential-backoff-timeout-ms (or channel-retry-count retry-count) message-retry-count (or channel-queue-timeout-ms queue-timeout-ms) (:count exponential-backoff-config))
(or channel-queue-timeout-ms queue-timeout-ms))))

(defn get-delay-exchange-name [topic-entity message-payload]
"This function return delay exchange name for retry when using flow without channel. It will return exchange name with retry count as suffix if exponential backoff enabled."
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
exchange-name (prefixed-queue-name topic-entity exchange-name)
exponential-backoff-config (-> (ziggurat-config) :retry :exponential-backoff)
retry-count (-> (ziggurat-config) :retry :count)]
(if (:enabled exponential-backoff-config)
(let [message-retry-count (:retry-count message-payload)
backoff-exponent (get-backoff-exponent retry-count message-retry-count (:count exponential-backoff-config))]
(prefixed-queue-name exchange-name backoff-exponent))
exchange-name)))

(defn get-channel-delay-exchange-name [topic-entity channel message-payload]
"This function return delay exchange name for retry when using channel flow. It will return exchange name with retry count as suffix if exponential backoff enabled."
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
exchange-name (prefixed-channel-name topic-entity channel exchange-name)
exponential-backoff-config (channel-retries-exponential-backoff-config topic-entity channel)
channel-retry-count (get-channel-retry-count topic-entity channel)]
(if (:enabled exponential-backoff-config)
(let [message-retry-count (:retry-count message-payload)
exponential-backoff (get-backoff-exponent channel-retry-count message-retry-count (:count exponential-backoff-config))]
(str (name exchange-name) "_" exponential-backoff))
exchange-name)))

(defn publish-to-delay-queue [message-payload]
(let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-queue-name topic-entity exchange-name)]
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-delay-exchange-name topic-entity message-payload)
queue-timeout-ms (get-queue-timeout-ms message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-dead-queue [message-payload]
Expand All @@ -93,13 +175,10 @@
(defn- channel-retries-enabled [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :enabled))

(defn- get-channel-retry-count [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :count))

(defn publish-to-channel-delay-queue [channel message-payload]
(let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-channel-name topic-entity channel exchange-name)]
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload)
queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-channel-dead-queue [channel message-payload]
Expand Down Expand Up @@ -135,6 +214,18 @@
dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange)]
(create-and-bind-queue queue-name exchange-name dead-letter-exchange-name)))

(defn- make-delay-queue-with-retry-count [topic-entity retry-count exponential-backoff-count]
(let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config))
queue-name (delay-queue-name topic-entity queue-name)
exchange-name (prefixed-queue-name topic-entity exchange-name)
dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange)
sequence (if (<= retry-count exponential-backoff-count) (inc retry-count) (inc exponential-backoff-count))]
(doseq [s (range 1 sequence)]
(create-and-bind-queue (prefixed-queue-name queue-name s) (prefixed-queue-name exchange-name s) dead-letter-exchange-name))))

(defn- make-channel-delay-queue-with-retry-count [topic-entity channel retry-count exponential-backoff-count]
(make-delay-queue-with-retry-count (with-channel-name topic-entity channel) retry-count exponential-backoff-count))

(defn- make-channel-delay-queue [topic-entity channel]
(make-delay-queue (with-channel-name topic-entity channel)))

Expand All @@ -151,15 +242,21 @@
(doseq [channel channels]
(make-channel-queue topic-entity channel :instant)
(when (channel-retries-enabled topic-entity channel)
(make-channel-delay-queue topic-entity channel)
(make-channel-queue topic-entity channel :dead-letter))))
(make-channel-queue topic-entity channel :dead-letter)
(let [exponential-backoff-config (channel-retries-exponential-backoff-config topic-entity channel)]
(if (:enabled exponential-backoff-config)
(make-channel-delay-queue-with-retry-count topic-entity channel (get-channel-retry-count topic-entity channel) (:count exponential-backoff-config))
(make-channel-delay-queue topic-entity channel))))))

(defn make-queues [stream-routes]
(when (is-connection-required?)
(doseq [topic-entity (keys stream-routes)]
(let [channels (get-channel-names stream-routes topic-entity)]
(make-channel-queues channels topic-entity)
(when (-> (ziggurat-config) :retry :enabled)
(make-delay-queue topic-entity)
(make-queue topic-entity :instant)
(make-queue topic-entity :dead-letter))))))
(make-queue topic-entity :dead-letter)
(if (-> (ziggurat-config) :retry :exponential-backoff :enabled)
(let [exponential-backoff-count (-> (ziggurat-config) :retry :exponential-backoff :count)]
(make-delay-queue-with-retry-count topic-entity (-> (ziggurat-config) :retry :count) exponential-backoff-count))
(make-delay-queue topic-entity)))))))
Loading

0 comments on commit 048a781

Please sign in to comment.