Skip to content

Commit

Permalink
Merge 86fdabf into f8f322d
Browse files Browse the repository at this point in the history
  • Loading branch information
thediantoni committed Dec 3, 2019
2 parents f8f322d + 86fdabf commit 7eddf43
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 14 deletions.
95 changes: 82 additions & 13 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,66 @@
(sentry/report-error sentry-reporter e
"Pushing message to rabbitmq failed, data: " message-payload)))))

(defn- get-channel-retry-count [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :count))

(defn- get-channel-retry-queue-timeout-ms [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :queue-timeout-ms))

(defn- channel-retries-exponential-backoff-enabled [topic-entity channel]
"Get exponential backoff enabled for specific channel from config."
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :exponential-backoff-enabled))

(defn- get-backoff-exponent [topic-entity channel message-payload]
"Get exponential-backoff value when exponential backoff is enabled.
Max exponent currently is hardoced to 10. For retrial 10th and more will use same queue timeout."
(let [retry-count (-> (ziggurat-config) :retry :count)
channel-retry-count (get-channel-retry-count topic-entity channel)
message-retry-count (:retry-count message-payload)
exponent (- (or channel-retry-count retry-count) message-retry-count)]
(if (< exponent 10)
exponent
10)))

(defn- get-backoff-exponent-timeout-ms [topic-entity channel message-payload queue-timeout-ms]
"Get queue timeout value when exponential backoff is enabled."
(let [exponential-backoff (get-backoff-exponent topic-entity channel message-payload)]
(int (* (dec (Math/pow 2 exponential-backoff)) queue-timeout-ms))))

(defn get-queue-timeout-ms [topic-entity channel message-payload]
"Get queue timeout from config. It will use channel `queue-timeout-ms` if defined, otherwise it will use rabbitmq delay `queue-timeout-ms`.
If `exponential-backoff-enabled` is true, `queue-timeout-ms` will be exponential with formula `(2^n)-1`, where `n` is message retry-count.
_NOTE: Exponential backoff for channel retries is an experimental feature. It should not be used until released in a stable version._"
(let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms)
channel-queue-timeout-ms (get-channel-retry-queue-timeout-ms topic-entity channel)
exponential-backoff-enabled (channel-retries-exponential-backoff-enabled topic-entity channel)]
(if exponential-backoff-enabled
(get-backoff-exponent-timeout-ms topic-entity channel message-payload (or channel-queue-timeout-ms queue-timeout-ms))
(or channel-queue-timeout-ms queue-timeout-ms))))

(defn get-delay-exchange-name [topic-entity message-payload]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
exchange-name (prefixed-queue-name topic-entity exchange-name)
exponential-backoff-enabled (-> (ziggurat-config) :retry :exponential-backoff-enabled)]
(if exponential-backoff-enabled
(let [backoff-exponent (get-backoff-exponent topic-entity {} message-payload)]
(prefixed-queue-name exchange-name backoff-exponent))
exchange-name)))

(defn get-channel-delay-exchange-name [topic-entity channel message-payload]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
exchange-name (prefixed-channel-name topic-entity channel exchange-name)
exponential-backoff-enabled (channel-retries-exponential-backoff-enabled topic-entity channel)]
(if exponential-backoff-enabled
(let [exponential-backoff (get-backoff-exponent topic-entity channel message-payload)]
(str (name exchange-name) "_" exponential-backoff))
exchange-name)))

(defn publish-to-delay-queue [message-payload]
(let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-queue-name topic-entity exchange-name)]
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-delay-exchange-name topic-entity message-payload)
queue-timeout-ms (get-queue-timeout-ms topic-entity {} message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-dead-queue [message-payload]
Expand All @@ -93,13 +149,10 @@
(defn- channel-retries-enabled [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :enabled))

(defn- get-channel-retry-count [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :count))

(defn publish-to-channel-delay-queue [channel message-payload]
(let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-channel-name topic-entity channel exchange-name)]
(let [topic-entity (:topic-entity message-payload)
exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload)
queue-timeout-ms (get-queue-timeout-ms topic-entity channel message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-channel-dead-queue [channel message-payload]
Expand Down Expand Up @@ -135,6 +188,18 @@
dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange)]
(create-and-bind-queue queue-name exchange-name dead-letter-exchange-name)))

(defn- make-delay-with-retry-count-queue [topic-entity retry-count]
(let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config))
queue-name (delay-queue-name topic-entity queue-name)
exchange-name (prefixed-queue-name topic-entity exchange-name)
dead-letter-exchange-name (prefixed-queue-name topic-entity dead-letter-exchange)
sequence (if (<= retry-count 10) (inc retry-count) 11)]
(doseq [s (range 1 sequence)]
(create-and-bind-queue (prefixed-queue-name queue-name s) (prefixed-queue-name exchange-name s) dead-letter-exchange-name))))

(defn- make-channel-delay-with-retry-count-queue [topic-entity channel retry-count]
(make-delay-with-retry-count-queue (with-channel-name topic-entity channel) retry-count))

(defn- make-channel-delay-queue [topic-entity channel]
(make-delay-queue (with-channel-name topic-entity channel)))

Expand All @@ -151,15 +216,19 @@
(doseq [channel channels]
(make-channel-queue topic-entity channel :instant)
(when (channel-retries-enabled topic-entity channel)
(make-channel-delay-queue topic-entity channel)
(make-channel-queue topic-entity channel :dead-letter))))
(make-channel-queue topic-entity channel :dead-letter)
(if (channel-retries-exponential-backoff-enabled topic-entity channel)
(make-channel-delay-with-retry-count-queue topic-entity channel (get-channel-retry-count topic-entity channel))
(make-channel-delay-queue topic-entity channel)))))

(defn make-queues [stream-routes]
(when (is-connection-required?)
(doseq [topic-entity (keys stream-routes)]
(let [channels (get-channel-names stream-routes topic-entity)]
(make-channel-queues channels topic-entity)
(when (-> (ziggurat-config) :retry :enabled)
(make-delay-queue topic-entity)
(make-queue topic-entity :instant)
(make-queue topic-entity :dead-letter))))))
(make-queue topic-entity :dead-letter)
(if (-> (ziggurat-config) :retry :exponential-backoff-enabled)
(make-delay-with-retry-count-queue topic-entity (-> (ziggurat-config) :retry :count))
(make-delay-queue topic-entity)))))))
135 changes: 134 additions & 1 deletion test/ziggurat/messaging/producer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,71 @@
(let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)]
(producer/retry-for-channel message-from-mq channel)))
(let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)]
(is (= expected-message-payload message-from-mq)))))))
(is (= expected-message-payload message-from-mq))))))

(testing "message in channel will be retried with linear queue timeout"
(with-redefs [ziggurat-config (constantly (assoc (ziggurat-config)
:stream-router {:default {:channels {:linear-retry {:retry {:count 5
:enabled true
:queue-timeout-ms 2000}}}}}))]
(fix/with-queues
{:default {:handler-fn #(constantly nil)
:linear-retry #(constantly nil)}}
(let [retry-count (atom 2)
topic-entity :default
channel :linear-retry
message-payload {:message {:foo "bar"} :topic-entity topic-entity :retry-count @retry-count}
expected-message-payload (assoc message-payload :retry-count 0)]
(producer/retry-for-channel message-payload channel)
(while (> @retry-count 0)
(swap! retry-count dec)
(let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)]
(producer/retry-for-channel message-from-mq channel)))
(let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)]
(is (= expected-message-payload message-from-mq)))))))

(testing "message in channel will be retried with exponential queue timeout"
(with-redefs [ziggurat-config (constantly (assoc (ziggurat-config)
:stream-router {:default {:channels {:exponential-retry {:retry {:count 5
:enabled true
:queue-timeout-ms 1000
:exponential-backoff-enabled true}}}}}))]
(fix/with-queues
{:default {:handler-fn #(constantly nil)
:exponential-retry #(constantly nil)}}
(let [retry-count (atom 5)
topic-entity :default
channel :exponential-retry
message-payload {:message {:foo "bar"} :topic-entity topic-entity :retry-count @retry-count}
expected-message-payload (assoc message-payload :retry-count 0)]
(producer/retry-for-channel message-payload channel)
(while (> @retry-count 0)
(swap! retry-count dec)
(let [message-from-mq (rmq/get-message-from-channel-retry-queue topic-entity channel (- 5 @retry-count))]
(producer/retry-for-channel message-from-mq channel)))
(let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)]
(is (= expected-message-payload message-from-mq)))))))

(testing "message in channel will be retried with channel exponential queue timeout"
(with-redefs [ziggurat-config (constantly (assoc (ziggurat-config)
:stream-router {:default {:channels {:channel-exponential-retry {:retry {:count 5
:enabled true
:exponential-backoff-enabled true}}}}}))]
(fix/with-queues
{:default {:handler-fn #(constantly nil)
:channel-exponential-retry #(constantly nil)}}
(let [retry-count (atom 5)
topic-entity :default
channel :channel-exponential-retry
message-payload {:message {:foo "bar"} :topic-entity topic-entity :retry-count @retry-count}
expected-message-payload (assoc message-payload :retry-count 0)]
(producer/retry-for-channel message-payload channel)
(while (> @retry-count 0)
(swap! retry-count dec)
(let [message-from-mq (rmq/get-message-from-channel-retry-queue topic-entity channel (- 5 @retry-count))]
(producer/retry-for-channel message-from-mq channel)))
(let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)]
(is (= expected-message-payload message-from-mq))))))))

(deftest retry-test
(testing "message with a retry count of greater than 0 will publish to delay queue"
Expand Down Expand Up @@ -151,6 +215,40 @@
(let [message-from-mq (rmq/get-msg-from-dead-queue "default")]
(is (= expected-message-payload message-from-mq)))))))

(deftest retry-with-exponential-backoff-test
(testing "message will publish to delay with retry count queue when exponential backoff enabled"
(with-redefs [ziggurat-config (constantly (assoc (ziggurat-config)
:retry {:count 5 :enabled true :exponential-backoff-enabled true}))]
(testing "message with no retry count will publish to delay with retry count queue"
(fix/with-queues
{:default {:handler-fn #(constantly nil)}}
(let [topic-entity :default
message-payload {:message {:foo "bar"} :topic-entity topic-entity}
expected-message (assoc message-payload :retry-count 4)]
(producer/retry message-payload)
(let [message-from-mq (rmq/get-message-from-retry-queue "default" 1)]
(is (= message-from-mq expected-message))))))

(testing "message with retry count 1 will publish to delay with retry count queue"
(fix/with-queues
{:default {:handler-fn #(constantly nil)}}
(let [topic-entity :default
message-payload {:message {:foo "bar"} :topic-entity topic-entity :retry-count 4}
expected-message (assoc message-payload :retry-count 3)]
(producer/retry message-payload)
(let [message-from-mq (rmq/get-message-from-retry-queue "default" 2)]
(is (= message-from-mq expected-message))))))

(testing "message with retry count 1 will publish to delay with retry count queue"
(fix/with-queues
{:default {:handler-fn #(constantly nil)}}
(let [topic-entity :default
message-payload {:message {:foo "bar"} :topic-entity topic-entity :retry-count 1}
expected-message (assoc message-payload :retry-count 0)]
(producer/retry message-payload)
(let [message-from-mq (rmq/get-message-from-retry-queue "default" 5)]
(is (= message-from-mq expected-message)))))))))

(deftest make-queues-test
(let [ziggurat-config (ziggurat-config)]
(testing "When retries are enabled"
Expand Down Expand Up @@ -336,3 +434,38 @@
(is (= "send" (-> finished-spans
(.get 0)
(.operationName))))))))))

(deftest get-queue-timeout-ms-test
(let [message (assoc (->MessagePayload "message" "topic-entity") :retry-count 2)]
(testing "when retries are enabled"
(let [topic-entity :default
channel :linear-retry]
(with-redefs [ziggurat-config (constantly (assoc (ziggurat-config)
:stream-router {topic-entity {:channels {channel {:retry {:count 5
:enabled true
:queue-timeout-ms 2000}}}}}))]
(is (= 2000 (producer/get-queue-timeout-ms topic-entity channel message))))))
(testing "when exponential backoff are enabled and channel retry count not defined"
(let [topic-entity :default
channel :channel-no-retry-count]
(with-redefs [ziggurat-config (constantly (assoc (ziggurat-config)
:stream-router {topic-entity {:channels {channel {:retry {:enabled true
:exponential-backoff-enabled true}}}}}))]
(is (= 700 (producer/get-queue-timeout-ms topic-entity channel message))))))
(testing "when exponential backoff are enabled and channel queue timeout defined"
(let [topic-entity :default
channel :exponential-retry]
(with-redefs [ziggurat-config (constantly (assoc (ziggurat-config)
:stream-router {topic-entity {:channels {channel {:retry {:count 5
:enabled true
:queue-timeout-ms 1000
:exponential-backoff-enabled true}}}}}))]
(is (= 7000 (producer/get-queue-timeout-ms topic-entity channel message))))))
(testing "when exponential backoff are enabled and channel queue timeout not defined"
(let [topic-entity :default
channel :channel-exponential-retry]
(with-redefs [ziggurat-config (constantly (assoc (ziggurat-config)
:stream-router {topic-entity {:channels {channel {:retry {:count 5
:enabled true
:exponential-backoff-enabled true}}}}}))]
(is (= 700 (producer/get-queue-timeout-ms topic-entity channel message))))))))
12 changes: 12 additions & 0 deletions test/ziggurat/util/rabbitmq.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@
queue-name (prefixed-channel-name topic-name channel-name queue-name)]
(get-msg-from-rabbitmq queue-name topic-name)))

(defn get-message-from-retry-queue [topic sequence]
(let [{:keys [queue-name]} (:delay (rabbitmq-config))
delay-queue-name (producer/delay-queue-name topic queue-name)
queue-name (rutil/prefixed-queue-name delay-queue-name sequence)]
(get-msg-from-rabbitmq queue-name topic)))

(defn get-message-from-channel-retry-queue [topic channel sequence]
(let [{:keys [queue-name]} (:delay (rabbitmq-config))
delay-queue-name (producer/delay-queue-name (rutil/with-channel-name topic channel) queue-name)
queue-name (rutil/prefixed-queue-name delay-queue-name sequence)]
(get-msg-from-rabbitmq queue-name topic)))

(defn close [^Channel channel]
(try
(.close channel)
Expand Down

0 comments on commit 7eddf43

Please sign in to comment.