Skip to content

Commit

Permalink
Merge pull request #125 from theanirudhvyas/master
Browse files Browse the repository at this point in the history
Reverts exponential backoff in channel changes.
  • Loading branch information
theanirudhvyas committed Dec 3, 2019
2 parents f47c522 + 199ca75 commit f8f322d
Show file tree
Hide file tree
Showing 6 changed files with 3 additions and 164 deletions.
22 changes: 1 addition & 21 deletions resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,7 @@
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}
:channel-delay {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}
:channel-delay-ms [1000 :int]}
:linear-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:queue-timeout-ms [2000 :int]}}
:exponential-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:queue-timeout-ms [1000 :int]
:exponential-backoff-enabled [true :bool]}}
:channel-exponential-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:exponential-backoff-enabled [true :bool]}}
:channel-no-retry-count {:worker-count [10 :int]
:retry {:enabled [true :bool]
:exponential-backoff-enabled [true :bool]}}}
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
Expand Down
22 changes: 1 addition & 21 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,7 @@
:upgrade-from "1.1"
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}
:channel-delay {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}
:channel-delay-ms [1000 :int]}
:linear-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:queue-timeout-ms [2000 :int]}}
:exponential-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:queue-timeout-ms [1000 :int]
:exponential-backoff-enabled [true :bool]}}
:channel-exponential-retry {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]
:exponential-backoff-enabled [true :bool]}}
:channel-no-retry-count {:worker-count [10 :int]
:retry {:enabled [true :bool]
:exponential-backoff-enabled [true :bool]}}}
:enabled [true :bool]}}}
:producer {:bootstrap-servers "localhost:9092"
:acks "all"
:retries-config 5
Expand Down
6 changes: 1 addition & 5 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@
(defn- send-msg-to-channel [channels message-payload return-code]
(when-not (contains? (set channels) return-code)
(throw (ex-info "Invalid mapper return code" {:code return-code})))
(let [topic-entity (:topic-entity message-payload)
channel-delay-ms (get-in (ziggurat-config) [:stream-router topic-entity :channels return-code :channel-delay-ms])]
(if channel-delay-ms
(producer/publish-to-channel-custom-delay-queue return-code message-payload channel-delay-ms)
(producer/publish-to-channel-instant-queue return-code message-payload))))
(producer/publish-to-channel-instant-queue return-code message-payload))

(defn mapper-func [mapper-fn channels]
(fn [{:keys [topic-entity message] :as message-payload}]
Expand Down
33 changes: 0 additions & 33 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -96,41 +96,8 @@
(defn- get-channel-retry-count [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :count))

(defn- get-channel-retry-queue-timeout-ms [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :queue-timeout-ms))

(defn- channel-retries-exponential-backoff-enabled [topic-entity channel]
"Get exponential backoff enabled for specific channel from config."
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :exponential-backoff-enabled))

(defn- get-exponential-backoff [retry-count message-retry-count queue-timeout-ms]
"Get queue timeout value when exponential backoff is enabled."
(int (* (dec (Math/pow 2 (- retry-count message-retry-count))) queue-timeout-ms)))

(defn get-queue-timeout-ms [topic-entity channel message-payload]
"Get queue timeout from config. It will use channel `queue-timeout-ms` if defined, otherwise it will use rabbitmq delay `queue-timeout-ms`.
If `exponential-backoff-enabled` is true, `queue-timeout-ms` will be exponential with formula `(2^n)-1`, where `n` is message retry-count.
_NOTE: Exponential backoff for channel retries is an experimental feature. It should not be used until released in a stable version._"
(let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms)
channel-queue-timeout-ms (get-channel-retry-queue-timeout-ms topic-entity channel)
exponential-backoff-enabled (channel-retries-exponential-backoff-enabled topic-entity channel)]
(if exponential-backoff-enabled
(let [retry-count (-> (ziggurat-config) :retry :count)
channel-retry-count (get-channel-retry-count topic-entity channel)
message-retry-count (:retry-count message-payload)]
(get-exponential-backoff (or channel-retry-count retry-count) message-retry-count (or channel-queue-timeout-ms queue-timeout-ms)))
(or channel-queue-timeout-ms queue-timeout-ms))))

(defn publish-to-channel-delay-queue [channel message-payload]
(let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-channel-name topic-entity channel exchange-name)
queue-timeout-ms (get-queue-timeout-ms topic-entity channel message-payload)]
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-channel-custom-delay-queue [channel message-payload queue-timeout-ms]
(let [{:keys [exchange-name]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload queue-timeout-ms)))
Expand Down
14 changes: 0 additions & 14 deletions test/ziggurat/mapper_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,6 @@
(is (= message-payload message-from-mq))
(is @successfully-processed?))))))

(testing "message process should successfully push to delay channel queue"
(fix/with-queues (assoc-in stream-routes [:default :channel-delay] (constantly :success))
(let [successfully-processed? (atom false)
expected-metric "success"]
(with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags]
(when (and (= metric-namespaces expected-metric-namespace)
(= metric expected-metric)
(= additional-tags expected-additional-tags))
(reset! successfully-processed? true)))]
((mapper-func (constantly :channel-delay) [:channel-delay]) message-payload)
(let [message-from-mq (rmq/get-message-from-channel-delay-queue :default :channel-delay)]
(is (= message-payload message-from-mq))
(is @successfully-processed?))))))

(testing "message process should raise exception if channel not in list"
(fix/with-queues
(assoc-in stream-routes [:default :channel-1] (constantly :success))
Expand Down
70 changes: 0 additions & 70 deletions test/ziggurat/messaging/producer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,57 +46,6 @@
message-payload {:message {:foo "bar"} :retry-count @retry-count :topic-entity topic-entity}
expected-message-payload (assoc message-payload :retry-count 0)]
(producer/retry-for-channel message-payload channel)
(while (> @retry-count 0)
(swap! retry-count dec)
(let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)]
(producer/retry-for-channel message-from-mq channel)))
(let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)]
(is (= expected-message-payload message-from-mq))))))

(testing "message in channel will be retried with linear queue timeout"
(fix/with-queues
{:default {:handler-fn #(constantly nil)
:linear-retry #(constantly nil)}}
(let [retry-count (atom 2)
topic-entity :default
channel :linear-retry
message-payload {:message {:foo "bar"} :topic-entity topic-entity :retry-count @retry-count}
expected-message-payload (assoc message-payload :retry-count 0)]
(producer/retry-for-channel message-payload channel)
(while (> @retry-count 0)
(swap! retry-count dec)
(let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)]
(producer/retry-for-channel message-from-mq channel)))
(let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)]
(is (= expected-message-payload message-from-mq))))))

(testing "message in channel will be retried with exponential queue timeout"
(fix/with-queues
{:default {:handler-fn #(constantly nil)
:exponential-retry #(constantly nil)}}
(let [retry-count (atom 2)
topic-entity :default
channel :exponential-retry
message-payload {:message {:foo "bar"} :topic-entity topic-entity :retry-count @retry-count}
expected-message-payload (assoc message-payload :retry-count 0)]
(producer/retry-for-channel message-payload channel)
(while (> @retry-count 0)
(swap! retry-count dec)
(let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)]
(producer/retry-for-channel message-from-mq channel)))
(let [message-from-mq (rmq/get-msg-from-channel-dead-queue topic-entity channel)]
(is (= expected-message-payload message-from-mq))))))

(testing "message in channel will be retried with channel exponential queue timeout"
(fix/with-queues
{:default {:handler-fn #(constantly nil)
:channel-exponential-retry #(constantly nil)}}
(let [retry-count (atom 2)
topic-entity :default
channel :channel-exponential-retry
message-payload {:message {:foo "bar"} :topic-entity topic-entity :retry-count @retry-count}
expected-message-payload (assoc message-payload :retry-count 0)]
(producer/retry-for-channel message-payload channel)
(while (> @retry-count 0)
(swap! retry-count dec)
(let [message-from-mq (rmq/get-message-from-channel-delay-queue topic-entity channel)]
Expand Down Expand Up @@ -387,22 +336,3 @@
(is (= "send" (-> finished-spans
(.get 0)
(.operationName))))))))))

(deftest get-queue-timeout-ms-test
(let [message (assoc (->MessagePayload "message" "topic-entity") :retry-count 2)]
(testing "when retries are enabled"
(let [topic-entity :default
channel :linear-retry]
(is (= 2000 (producer/get-queue-timeout-ms topic-entity channel message)))))
(testing "when exponential backoff are enabled and channel retry count not defined"
(let [topic-entity :default
channel :channel-no-retry-count]
(is (= 700 (producer/get-queue-timeout-ms topic-entity channel message)))))
(testing "when exponential backoff are enabled and channel queue timeout defined"
(let [topic-entity :default
channel :exponential-retry]
(is (= 7000 (producer/get-queue-timeout-ms topic-entity channel message)))))
(testing "when exponential backoff are enabled and channel queue timeout not defined"
(let [topic-entity :default
channel :channel-exponential-retry]
(is (= 700 (producer/get-queue-timeout-ms topic-entity channel message)))))))

0 comments on commit f8f322d

Please sign in to comment.