Skip to content

Commit

Permalink
Adds headers in message payload
Browse files Browse the repository at this point in the history
  • Loading branch information
roobalimsab committed Oct 1, 2019
1 parent 28ac2cc commit 6da4206
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 57 deletions.
2 changes: 1 addition & 1 deletion resources/config.test.ci.edn
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@
:retry {:count [5 :int]
:enabled [true :bool]}}}}}
:tracer {:enabled [true :bool]
:tracer-provider ""}}}
:custom-provider ""}}}

4 changes: 2 additions & 2 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@
:channels {:channel-1 {:worker-count [10 :int]
:retry {:count [5 :int]
:enabled [true :bool]}}}}}
:tracer {:enabled [true :bool]
:custom-provider ""}}}
:tracer {:enabled [true :bool]
:custom-provider ""}}}
14 changes: 7 additions & 7 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
[ziggurat.sentry :refer [sentry-reporter]])
(:import (java.time Instant)))

(defn- send-msg-to-channel [channels message-payload return-code headers]
(defn- send-msg-to-channel [channels message-payload return-code]
(when-not (contains? (set channels) return-code)
(throw (ex-info "Invalid mapper return code" {:code return-code})))
(producer/publish-to-channel-instant-queue return-code message-payload headers))
(producer/publish-to-channel-instant-queue return-code message-payload))

(defn mapper-func [mapper-fn channels headers]
(defn mapper-func [mapper-fn channels]
(fn [{:keys [topic-entity message] :as message-payload}]
(let [topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
Expand All @@ -35,14 +35,14 @@
(case return-code
:success (metrics/increment-count default-namespace success-metric additional-tags)
:retry (do (metrics/increment-count default-namespace retry-metric additional-tags)
(producer/retry message-payload headers))
(producer/retry message-payload))
:skip (metrics/increment-count default-namespace skip-metric additional-tags)
:block 'TODO
(do
(send-msg-to-channel channels message-payload return-code headers)
(send-msg-to-channel channels message-payload return-code)
(metrics/increment-count default-namespace success-metric additional-tags))))
(catch Throwable e
(producer/retry message-payload headers)
(producer/retry message-payload)
(sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name))
(metrics/increment-count default-namespace failure-metric additional-tags)))))))

Expand Down Expand Up @@ -80,7 +80,7 @@
(sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/increment-count default-namespace failure-metric additional-tags)))))))

(defrecord MessagePayload [message topic-entity])
(defrecord MessagePayload [message topic-entity headers])

(s/defschema message-payload-schema
{:message s/Any
Expand Down
4 changes: 2 additions & 2 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
(catch Exception e
(log/info "old message format read, converting to message-payload: " message)
(let [retry-count (:retry-count message)
message-payload (mpr/->MessagePayload (dissoc message :retry-count) topic-entity)]
message-payload (mpr/->MessagePayload (dissoc message :retry-count) topic-entity {})]
(assoc message-payload :retry-count retry-count)))))

(defn convert-and-ack-message
Expand Down Expand Up @@ -95,7 +95,7 @@
(start-subscriber* (lch/open connection)
(get-in-config [:jobs :instant :prefetch-count])
(prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name]))
(mpr/mapper-func mapper-fn channels {})
(mpr/mapper-func mapper-fn channels)
topic-entity))))

(defn start-channels-subscriber [channels topic-entity]
Expand Down
2 changes: 1 addition & 1 deletion src/ziggurat/messaging/dead_set.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
(doseq [message-payload (consumer/get-dead-set-messages-for-topic true topic-entity count-of-message)]
(producer/publish-to-instant-queue message-payload))
(doseq [message-payload (consumer/get-dead-set-messages-for-channel true topic-entity channel count-of-message)]
(producer/publish-to-channel-instant-queue channel message-payload {}))))
(producer/publish-to-channel-instant-queue channel message-payload))))

(defn- get-messages
"Gets n messages from dead queue and gives the option to ack or un-ack them"
Expand Down
22 changes: 11 additions & 11 deletions src/ziggurat/messaging/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,23 @@

(defn- publish
([exchange message-payload]
(publish exchange message-payload nil {}))
([exchange message-payload expiration headers]
(publish exchange message-payload nil))
([exchange message-payload expiration]
(try
(with-retry {:count 5
:wait 100
:on-failure #(log/error "publishing message to rabbitmq failed with error " (.getMessage %))}
(with-open [ch (lch/open connection)]
(lb/publish ch exchange "" (nippy/freeze message-payload) (properties-for-publish expiration headers))))
(lb/publish ch exchange "" (nippy/freeze message-payload) (properties-for-publish expiration (:headers message-payload)))))
(catch Throwable e
(sentry/report-error sentry-reporter e
"Pushing message to rabbitmq failed, data: " message-payload)))))

(defn publish-to-delay-queue [message-payload headers]
(defn publish-to-delay-queue [message-payload]
(let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-queue-name topic-entity exchange-name)]
(publish exchange-name message-payload queue-timeout-ms headers)))
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-dead-queue [message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
Expand All @@ -94,31 +94,31 @@
(let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload queue-timeout-ms {})))
(publish exchange-name message-payload queue-timeout-ms)))

(defn publish-to-channel-dead-queue [channel message-payload]
(let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload)))

(defn publish-to-channel-instant-queue [channel message-payload headers]
(defn publish-to-channel-instant-queue [channel message-payload]
(let [{:keys [exchange-name]} (:instant (rabbitmq-config))
topic-entity (:topic-entity message-payload)
exchange-name (prefixed-channel-name topic-entity channel exchange-name)]
(publish exchange-name message-payload nil headers)))
(publish exchange-name message-payload nil)))

(defn- channel-retries-enabled [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :enabled))

(defn- get-channel-retry-count [topic-entity channel]
(-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :count))

(defn retry [{:keys [retry-count topic-entity] :as message-payload} headers]
(defn retry [{:keys [retry-count topic-entity] :as message-payload}]
(when (-> (ziggurat-config) :retry :enabled)
(cond
(nil? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec (-> (ziggurat-config) :retry :count))) headers)
(pos? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec retry-count)) headers)
(nil? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec (-> (ziggurat-config) :retry :count))))
(pos? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec retry-count)))
(zero? retry-count) (publish-to-dead-queue message-payload))))

(defn retry-for-channel [{:keys [retry-count topic-entity] :as message-payload} channel]
Expand Down
2 changes: 1 addition & 1 deletion src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
(.start))]
(try
(.activate (.scopeManager tracer) span)
((mapper-func handler-fn channels (:headers message-payload)) (->MessagePayload (:value message-payload) topic-entity))
((mapper-func handler-fn channels) (->MessagePayload (:value message-payload) topic-entity (:headers message-payload)))
(catch Exception e
(log/error "Exception while executing handler function " e))
(finally
Expand Down
12 changes: 6 additions & 6 deletions test/ziggurat/mapper_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
metrics/report-time (fn [metric-namespace _ _]
(when (= metric-namespace expected-report-time-namespace)
(reset! successfully-reported-time? true)))]
((mapper-func (constantly :success) [] {}) message-payload)
((mapper-func (constantly :success) []) message-payload)
(is @successfully-processed?)
(is @successfully-reported-time?))))

Expand All @@ -45,7 +45,7 @@
(= metric expected-metric)
(= additional-tags expected-additional-tags))
(reset! successfully-processed? true)))]
((mapper-func (constantly :channel-1) [:channel-1] {}) message-payload)
((mapper-func (constantly :channel-1) [:channel-1]) message-payload)
(let [message-from-mq (rmq/get-message-from-channel-instant-queue topic-entity :channel-1)]
(is (= message-payload message-from-mq))
(is @successfully-processed?))))))
Expand All @@ -57,7 +57,7 @@
(let [err (Throwable->map e)]
(is (= (:cause err) "Invalid mapper return code"))
(is (= (-> err :data :code) :channel-1))))]
((mapper-func (constantly :channel-1) [:some-other-channel] {}) message-payload)
((mapper-func (constantly :channel-1) [:some-other-channel]) message-payload)
(let [message-from-mq (rmq/get-message-from-channel-instant-queue topic-entity :channel-1)]
(is (nil? message-from-mq))))))

Expand All @@ -72,7 +72,7 @@
(= metric expected-metric)
(= additional-tags expected-additional-tags))
(reset! unsuccessfully-processed? true)))]
((mapper-func (constantly :retry) [] {}) message-payload)
((mapper-func (constantly :retry) []) message-payload)
(let [message-from-mq (rmq/get-msg-from-delay-queue topic-entity)]
(is (= message-from-mq expected-message)))
(is @unsuccessfully-processed?)))))
Expand All @@ -89,7 +89,7 @@
(= metric expected-metric)
(= additional-tags expected-additional-tags))
(reset! unsuccessfully-processed? true)))]
((mapper-func (fn [_] (throw (Exception. "test exception"))) [] {}) message-payload)
((mapper-func (fn [_] (throw (Exception. "test exception"))) []) message-payload)
(let [message-from-mq (rmq/get-msg-from-delay-queue topic-entity)]
(is (= message-from-mq expected-message)))
(is @unsuccessfully-processed?)
Expand All @@ -102,7 +102,7 @@
(when (= metric-namespace expected-metric-namespace)
(reset! reported-execution-time? true)))]

((mapper-func (constantly :success) [] {}) message-payload)
((mapper-func (constantly :success) []) message-payload)
(is @reported-execution-time?))))))

(deftest channel-mapper-func-test
Expand Down
10 changes: 5 additions & 5 deletions test/ziggurat/messaging/consumer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
:retry-limit 2
:success-promise success-promise}) topic-entity [])

(producer/publish-to-delay-queue message-payload {})
(producer/publish-to-delay-queue message-payload)

(when-let [promise-success? (deref success-promise 5000 :timeout)]
(is (not (= :timeout promise-success?)))
Expand Down Expand Up @@ -135,7 +135,7 @@
:skip-promise skip-promise
:retry-limit -1}) topic-entity [])

(producer/publish-to-delay-queue message-payload {})
(producer/publish-to-delay-queue message-payload)

(when-let [promise-success? (deref skip-promise 5000 :timeout)]
(is (not (= :timeout promise-success?)))
Expand Down Expand Up @@ -163,7 +163,7 @@
:retry-limit (* no-of-msgs 10)}) topic-entity [])

(dotimes [_ no-of-msgs]
(producer/retry (gen-message-payload topic-entity) {}))
(producer/retry (gen-message-payload topic-entity)))

(block-and-retry-until (fn []
(let [dead-set-msgs (count (get-dead-set-messages-for-topic false topic-entity no-of-msgs))]
Expand Down Expand Up @@ -254,7 +254,7 @@
(update-in [:stream-router topic-entity :channels channel :retry :enabled] (constantly false))
(update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1))))]
(start-channels-subscriber {channel channel-fn} topic-entity)
(producer/publish-to-channel-instant-queue channel message-payload {})
(producer/publish-to-channel-instant-queue channel message-payload)
(deref success-promise 5000 :timeout)
(is (= 1 @call-counter))
(util/close rmq-ch))))))
Expand All @@ -280,7 +280,7 @@
:retry-limit 0
:success-promise success-promise}) topic-entity [])

(producer/publish-to-delay-queue message-payload {})
(producer/publish-to-delay-queue message-payload)

(when-let [promise-success? (deref success-promise 5000 :timeout)]
(is (not (= :timeout promise-success?)))
Expand Down
Loading

0 comments on commit 6da4206

Please sign in to comment.