Skip to content

Commit

Permalink
Merge 7007d07 into f47c522
Browse files Browse the repository at this point in the history
  • Loading branch information
thediantoni committed Nov 20, 2019
2 parents f47c522 + 7007d07 commit c16bc97
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 14 deletions.
51 changes: 41 additions & 10 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,20 @@
"Get exponential backoff enabled for specific channel from config."
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :exponential-backoff-enabled))

(defn- get-exponential-backoff [retry-count message-retry-count queue-timeout-ms]
(defn- get-exponential-backoff [topic-entity channel message-payload]
"Get exponential-backoff value when exponential backoff is enabled."
(let [retry-count (-> (ziggurat-config) :retry :count)
channel-retry-count (get-channel-retry-count topic-entity channel)
message-retry-count (:retry-count message-payload)
exponent (- (or channel-retry-count retry-count) message-retry-count)]
(if (< exponent 10)
exponent
10)))

(defn- get-exponential-backoff-timeout-ms [topic-entity channel message-payload queue-timeout-ms]
"Get queue timeout value when exponential backoff is enabled."
(int (* (dec (Math/pow 2 (- retry-count message-retry-count))) queue-timeout-ms)))
(let [exponential-backoff (get-exponential-backoff topic-entity channel message-payload)]
(int (* (dec (Math/pow 2 exponential-backoff)) queue-timeout-ms))))

(defn get-queue-timeout-ms [topic-entity channel message-payload]
"Get queue timeout from config. It will use channel `queue-timeout-ms` if defined, otherwise it will use rabbitmq delay `queue-timeout-ms`.
Expand All @@ -116,16 +127,21 @@
channel-queue-timeout-ms (get-channel-retry-queue-timeout-ms topic-entity channel)
exponential-backoff-enabled (channel-retries-exponential-backoff-enabled topic-entity channel)]
(if exponential-backoff-enabled
(let [retry-count (-> (ziggurat-config) :retry :count)
channel-retry-count (get-channel-retry-count topic-entity channel)
message-retry-count (:retry-count message-payload)]
(get-exponential-backoff (or channel-retry-count retry-count) message-retry-count (or channel-queue-timeout-ms queue-timeout-ms)))
(get-exponential-backoff-timeout-ms topic-entity channel message-payload (or channel-queue-timeout-ms queue-timeout-ms))
(or channel-queue-timeout-ms queue-timeout-ms))))

(defn publish-to-channel-delay-queue [channel message-payload]
(let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
(defn get-delay-exchange-name [topic-entity channel message-payload]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
exchange-name (prefixed-channel-name topic-entity channel exchange-name)
exponential-backoff-enabled (channel-retries-exponential-backoff-enabled topic-entity channel)]
(if exponential-backoff-enabled
(let [exponential-backoff (get-exponential-backoff topic-entity channel message-payload)]
(str (name exchange-name) "_" exponential-backoff))
exchange-name)))

(defn publish-to-channel-delay-queue [channel message-payload]
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-delay-exchange-name topic-entity channel message-payload)
queue-timeout-ms (get-queue-timeout-ms topic-entity channel message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

Expand Down Expand Up @@ -168,6 +184,19 @@
dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange)]
(create-and-bind-queue queue-name exchange-name dead-letter-exchange-name)))

(defn- make-delay-retry-sequence-queue [topic-entity channel]
(let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config))
queue-name (delay-queue-name (with-channel-name topic-entity channel) queue-name)
exchange-name (prefixed-queue-name (with-channel-name topic-entity channel) exchange-name)
dead-letter-exchange-name (prefixed-queue-name (with-channel-name topic-entity channel) dead-letter-exchange)
retry-count (get-channel-retry-count topic-entity channel)
sequence (if (<= retry-count 10) (inc retry-count) 11)]
(doseq [s (range 1 sequence)]
(create-and-bind-queue (prefixed-queue-name queue-name s) (prefixed-queue-name exchange-name s) dead-letter-exchange-name))))

(defn- make-channel-delay-retry-sequence-queue [topic-entity channel]
(make-delay-retry-sequence-queue topic-entity channel))

(defn- make-channel-delay-queue [topic-entity channel]
(make-delay-queue (with-channel-name topic-entity channel)))

Expand All @@ -185,7 +214,9 @@
(make-channel-queue topic-entity channel :instant)
(when (channel-retries-enabled topic-entity channel)
(make-channel-delay-queue topic-entity channel)
(make-channel-queue topic-entity channel :dead-letter))))
(make-channel-queue topic-entity channel :dead-letter)
(when (channel-retries-exponential-backoff-enabled topic-entity channel)
(make-channel-delay-retry-sequence-queue topic-entity channel)))))

(defn make-queues [stream-routes]
(when (is-connection-required?)
Expand Down
8 changes: 4 additions & 4 deletions test/ziggurat/messaging/producer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@
(fix/with-queues
{:default {:handler-fn #(constantly nil)
:exponential-retry #(constantly nil)}}
(let [retry-count (atom 2)
(let [retry-count (atom 5)
topic-entity :default
channel :exponential-retry
message-payload {:message {:foo "bar"} :topic-entity topic-entity :retry-count @retry-count}
expected-message-payload (assoc message-payload :retry-count 0)]
(producer/retry-for-channel message-payload channel)
(while (> @retry-count 0)
(swap! retry-count dec)
(let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)]
(let [message-from-mq (rmq/get-message-from-channel-retry-queue topic-entity channel (- 5 @retry-count))]
(producer/retry-for-channel message-from-mq channel)))
(let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)]
(is (= expected-message-payload message-from-mq))))))
Expand All @@ -91,15 +91,15 @@
(fix/with-queues
{:default {:handler-fn #(constantly nil)
:channel-exponential-retry #(constantly nil)}}
(let [retry-count (atom 2)
(let [retry-count (atom 5)
topic-entity :default
channel :channel-exponential-retry
message-payload {:message {:foo "bar"} :topic-entity topic-entity :retry-count @retry-count}
expected-message-payload (assoc message-payload :retry-count 0)]
(producer/retry-for-channel message-payload channel)
(while (> @retry-count 0)
(swap! retry-count dec)
(let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)]
(let [message-from-mq (rmq/get-message-from-channel-retry-queue topic-entity channel (- 5 @retry-count))]
(producer/retry-for-channel message-from-mq channel)))
(let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)]
(is (= expected-message-payload message-from-mq)))))))
Expand Down
6 changes: 6 additions & 0 deletions test/ziggurat/util/rabbitmq.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
queue-name (prefixed-channel-name topic-name channel-name queue-name)]
(get-msg-from-rabbitmq queue-name topic-name)))

(defn get-message-from-channel-retry-queue [topic channel sequence]
(let [{:keys [queue-name]} (:delay (rabbitmq-config))
delay-queue-name (producer/delay-queue-name (rutil/with-channel-name topic channel) queue-name)
queue-name (rutil/prefixed-queue-name delay-queue-name sequence)]
(get-msg-from-rabbitmq queue-name topic)))

(defn close [^Channel channel]
(try
(.close channel)
Expand Down

0 comments on commit c16bc97

Please sign in to comment.