From 65577a829201e17393854552615e712f0b3f9c4c Mon Sep 17 00:00:00 2001 From: Lakshya Gupta Date: Thu, 30 Jun 2022 13:04:20 +0530 Subject: [PATCH 1/4] rabbitmq-retry-count added --- src/ziggurat/mapper.clj | 163 ++++++++++++++-------------- src/ziggurat/messaging/producer.clj | 61 ++++++----- 2 files changed, 115 insertions(+), 109 deletions(-) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 0ad4a990..040e0f9d 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -16,95 +16,98 @@ (producer/publish-to-channel-instant-queue return-code message-payload)) (defn- create-user-payload - [message-payload] - (-> message-payload - (dissoc :headers) - (dissoc :retry-count) - (dissoc :topic-entity))) + [message-payload configured-retry-count] + (let [remaining-retry-count (get message-payload :retry-count configured-retry-count)] + (-> message-payload + (dissoc :headers) + (dissoc :retry-count) + (dissoc :topic-entity) + (assoc-in [:metadata :rabbitmq-retry-count] (- configured-retry-count remaining-retry-count)) + ))) (defn mapper-func [user-handler-fn channels] (fn [{:keys [topic-entity] :as message-payload}] - (let [service-name (:app-name (ziggurat-config)) - topic-entity-name (name topic-entity) - new-relic-transaction-name (str topic-entity-name ".handler-fn") - message-processing-namespace "message-processing" - base-metric-namespaces [service-name topic-entity-name] - message-processing-namespaces (conj base-metric-namespaces message-processing-namespace) - additional-tags {:topic_name topic-entity-name} - success-metric "success" - retry-metric "retry" - skip-metric "skip" - failure-metric "failure" - dead-letter-metric "dead-letter" + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) + new-relic-transaction-name (str topic-entity-name ".handler-fn") + message-processing-namespace "message-processing" + base-metric-namespaces [service-name topic-entity-name] + message-processing-namespaces (conj base-metric-namespaces message-processing-namespace) + additional-tags {:topic_name topic-entity-name} + success-metric "success" + retry-metric "retry" + skip-metric "skip" + failure-metric "failure" + dead-letter-metric "dead-letter" multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]] - user-payload (create-user-payload message-payload)] + user-payload (create-user-payload message-payload (producer/get-configured-retry-count))] (clog/with-logging-context {:consumer-group topic-entity-name} - (nr/with-tracing "job" new-relic-transaction-name - (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (user-handler-fn user-payload) - end-time (.toEpochMilli (Instant/now)) - time-val (- end-time start-time) - execution-time-namespace "handler-fn-execution-time" - multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace) - [execution-time-namespace]]] - (metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags) - (case return-code - :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags) - :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags) - (producer/retry message-payload)) - :dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags) - (producer/publish-to-dead-queue message-payload)) - :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags) - :block 'TODO - (do - (send-msg-to-channel channels message-payload return-code) - (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)))) - (catch Throwable e - (producer/retry message-payload) - (report-error e (str "Actor execution failed for " topic-entity-name)) - (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))) + (nr/with-tracing "job" new-relic-transaction-name + (try + (let [start-time (.toEpochMilli (Instant/now)) + return-code (user-handler-fn user-payload) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "handler-fn-execution-time" + multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace) + [execution-time-namespace]]] + (metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags) + (case return-code + :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags) + :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags) + (producer/retry message-payload)) + :dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags) + (producer/publish-to-dead-queue message-payload)) + :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags) + :block 'TODO + (do + (send-msg-to-channel channels message-payload return-code) + (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)))) + (catch Throwable e + (producer/retry message-payload) + (report-error e (str "Actor execution failed for " topic-entity-name)) + (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))) (defn channel-mapper-func [user-handler-fn channel] (fn [{:keys [topic-entity] :as message-payload}] - (let [service-name (:app-name (ziggurat-config)) - topic-entity-name (name topic-entity) - channel-name (name channel) - message-processing-namespace "message-processing" - base-metric-namespaces [service-name topic-entity-name channel-name] - message-processing-namespaces (conj base-metric-namespaces message-processing-namespace) - additional-tags {:topic_name topic-entity-name :channel_name channel-name} - metric-namespace (str/join "." message-processing-namespaces) - success-metric "success" - retry-metric "retry" - skip-metric "skip" - failure-metric "failure" - dead-letter-metric "dead-letter" + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) + channel-name (name channel) + message-processing-namespace "message-processing" + base-metric-namespaces [service-name topic-entity-name channel-name] + message-processing-namespaces (conj base-metric-namespaces message-processing-namespace) + additional-tags {:topic_name topic-entity-name :channel_name channel-name} + metric-namespace (str/join "." message-processing-namespaces) + success-metric "success" + retry-metric "retry" + skip-metric "skip" + failure-metric "failure" + dead-letter-metric "dead-letter" multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]] - user-payload (create-user-payload message-payload)] + user-payload (create-user-payload message-payload (producer/get-channel-retry-count topic-entity channel))] (clog/with-logging-context {:consumer-group topic-entity-name :channel channel-name} - (nr/with-tracing "job" metric-namespace - (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (user-handler-fn user-payload) - end-time (.toEpochMilli (Instant/now)) - time-val (- end-time start-time) - execution-time-namespace "execution-time" - multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace) - [execution-time-namespace]]] - (metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags) - (case return-code - :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags) - :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags) - (producer/retry-for-channel message-payload channel)) - :dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags) - (producer/publish-to-channel-dead-queue channel message-payload)) - :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags) - :block 'TODO - (throw (ex-info "Invalid mapper return code" {:code return-code})))) - (catch Throwable e - (producer/retry-for-channel message-payload channel) - (report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) - (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))) + (nr/with-tracing "job" metric-namespace + (try + (let [start-time (.toEpochMilli (Instant/now)) + return-code (user-handler-fn user-payload) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "execution-time" + multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace) + [execution-time-namespace]]] + (metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags) + (case return-code + :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags) + :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags) + (producer/retry-for-channel message-payload channel)) + :dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags) + (producer/publish-to-channel-dead-queue channel message-payload)) + :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags) + :block 'TODO + (throw (ex-info "Invalid mapper return code" {:code return-code})))) + (catch Throwable e + (producer/retry-for-channel message-payload channel) + (report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) + (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))) (defrecord MessagePayload [message topic-entity]) diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 3b825c98..33fe5f04 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -117,11 +117,11 @@ ([exchange message-payload expiration retry-counter] (when (is-pool-alive? cpool/channel-pool) (let [start-time (.toEpochMilli (Instant/now)) - result (publish-internal exchange message-payload expiration retry-counter) - end-time (.toEpochMilli (Instant/now)) - time-val (- end-time start-time) - _ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name (:topic-entity message-payload)) - :exchange-name exchange})] + result (publish-internal exchange message-payload expiration retry-counter) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + _ (metrics/multi-ns-report-histogram ["rabbitmq-publish-time"] time-val {:topic-entity (name (:topic-entity message-payload)) + :exchange-name exchange})] (when (pos? retry-counter) (log/info "Retrying publishing the message to " exchange) (log/info "Retry attempt " retry-counter)) @@ -151,12 +151,15 @@ (defn- channel-retry-type [topic-entity channel] (:type (channel-retry-config topic-entity channel))) -(defn- get-channel-retry-count [topic-entity channel] +(defn get-channel-retry-count [topic-entity channel] (:count (channel-retry-config topic-entity channel))) +(defn get-configured-retry-count [] + (-> (ziggurat-config) :retry :count)) + (defn- get-channel-queue-timeout-or-default-timeout [topic-entity channel] (let [channel-queue-timeout-ms (:queue-timeout-ms (channel-retry-config topic-entity channel)) - queue-timeout-ms (get-in (rabbitmq-config) [:delay :queue-timeout-ms])] + queue-timeout-ms (get-in (rabbitmq-config) [:delay :queue-timeout-ms])] (or channel-queue-timeout-ms queue-timeout-ms))) (defn- get-backoff-exponent @@ -193,8 +196,8 @@ (defn get-queue-timeout-ms "Calculate queue timeout for delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled." [message-payload] - (let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms) - retry-count (-> (ziggurat-config) :retry :count) + (let [queue-timeout-ms (-> (rabbitmq-config) :delay :queue-timeout-ms) + retry-count (-> (ziggurat-config) :retry :count) message-retry-count (:retry-count message-payload)] (if (= :exponential (-> (ziggurat-config) :retry :type)) (get-exponential-backoff-timeout-ms retry-count message-retry-count queue-timeout-ms) @@ -204,8 +207,8 @@ "Calculate queue timeout for channel delay queue. Uses the value from [[get-exponential-backoff-timeout-ms]] if exponential backoff enabled." [topic-entity channel message-payload] (let [channel-queue-timeout-ms (get-channel-queue-timeout-or-default-timeout topic-entity channel) - message-retry-count (:retry-count message-payload) - channel-retry-count (get-channel-retry-count topic-entity channel)] + message-retry-count (:retry-count message-payload) + channel-retry-count (get-channel-retry-count topic-entity channel)] (if (= :exponential (channel-retry-type topic-entity channel)) (get-exponential-backoff-timeout-ms channel-retry-count message-retry-count channel-queue-timeout-ms) channel-queue-timeout-ms))) @@ -215,10 +218,10 @@ [topic-entity message-payload] (let [{:keys [exchange-name]} (:delay (rabbitmq-config)) exchange-name (util/prefixed-queue-name topic-entity exchange-name) - retry-count (-> (ziggurat-config) :retry :count)] + retry-count (-> (ziggurat-config) :retry :count)] (if (= :exponential (-> (ziggurat-config) :retry :type)) (let [message-retry-count (:retry-count message-payload) - backoff-exponent (get-backoff-exponent retry-count message-retry-count)] + backoff-exponent (get-backoff-exponent retry-count message-retry-count)] (util/prefixed-queue-name exchange-name backoff-exponent)) exchange-name))) @@ -226,7 +229,7 @@ "This function return delay exchange name for retry when using channel flow. It will return exchange name with retry count as suffix if exponential backoff enabled." [topic-entity channel message-payload] (let [{:keys [exchange-name]} (:delay (rabbitmq-config)) - exchange-name (util/prefixed-channel-name topic-entity channel exchange-name) + exchange-name (util/prefixed-channel-name topic-entity channel exchange-name) channel-retry-count (get-channel-retry-count topic-entity channel)] (if (= :exponential (channel-retry-type topic-entity channel)) (let [message-retry-count (:retry-count message-payload) @@ -235,38 +238,38 @@ exchange-name))) (defn publish-to-delay-queue [message-payload] - (let [topic-entity (:topic-entity message-payload) - exchange-name (get-delay-exchange-name topic-entity message-payload) + (let [topic-entity (:topic-entity message-payload) + exchange-name (get-delay-exchange-name topic-entity message-payload) queue-timeout-ms (get-queue-timeout-ms message-payload)] (publish exchange-name message-payload queue-timeout-ms))) (defn publish-to-dead-queue [message-payload] (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) - topic-entity (:topic-entity message-payload) + topic-entity (:topic-entity message-payload) exchange-name (util/prefixed-queue-name topic-entity exchange-name)] (publish exchange-name message-payload))) (defn publish-to-instant-queue [message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) - topic-entity (:topic-entity message-payload) + topic-entity (:topic-entity message-payload) exchange-name (util/prefixed-queue-name topic-entity exchange-name)] (publish exchange-name message-payload))) (defn publish-to-channel-delay-queue [channel message-payload] - (let [topic-entity (:topic-entity message-payload) - exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload) + (let [topic-entity (:topic-entity message-payload) + exchange-name (get-channel-delay-exchange-name topic-entity channel message-payload) queue-timeout-ms (get-channel-queue-timeout-ms topic-entity channel message-payload)] (publish exchange-name message-payload queue-timeout-ms))) (defn publish-to-channel-dead-queue [channel message-payload] (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) - topic-entity (:topic-entity message-payload) + topic-entity (:topic-entity message-payload) exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)] (publish exchange-name message-payload))) (defn publish-to-channel-instant-queue [channel message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) - topic-entity (:topic-entity message-payload) + topic-entity (:topic-entity message-payload) exchange-name (util/prefixed-channel-name topic-entity channel exchange-name)] (publish exchange-name message-payload))) @@ -286,17 +289,17 @@ (defn- make-delay-queue [topic-entity] (let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config)) - queue-name (delay-queue-name topic-entity queue-name) - exchange-name (util/prefixed-queue-name topic-entity exchange-name) + queue-name (delay-queue-name topic-entity queue-name) + exchange-name (util/prefixed-queue-name topic-entity exchange-name) dead-letter-exchange-name (util/prefixed-queue-name topic-entity dead-letter-exchange)] (create-and-bind-queue queue-name exchange-name dead-letter-exchange-name))) (defn- make-delay-queue-with-retry-count [topic-entity retry-count] (let [{:keys [queue-name exchange-name dead-letter-exchange]} (:delay (rabbitmq-config)) - queue-name (delay-queue-name topic-entity queue-name) - exchange-name (util/prefixed-queue-name topic-entity exchange-name) + queue-name (delay-queue-name topic-entity queue-name) + exchange-name (util/prefixed-queue-name topic-entity exchange-name) dead-letter-exchange-name (util/prefixed-queue-name topic-entity dead-letter-exchange) - sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))] + sequence (min MAX_EXPONENTIAL_RETRIES (inc retry-count))] (doseq [s (range 1 sequence)] (create-and-bind-queue (util/prefixed-queue-name queue-name s) (util/prefixed-queue-name exchange-name s) dead-letter-exchange-name)))) @@ -308,7 +311,7 @@ (defn- make-queue [topic-identifier queue-type] (let [{:keys [queue-name exchange-name]} (queue-type (rabbitmq-config)) - queue-name (util/prefixed-queue-name topic-identifier queue-name) + queue-name (util/prefixed-queue-name topic-identifier queue-name) exchange-name (util/prefixed-queue-name topic-identifier exchange-name)] (create-and-bind-queue queue-name exchange-name))) @@ -340,7 +343,7 @@ (defn make-queues [routes] (when (is-connection-required?) (doseq [topic-entity (keys routes)] - (let [channels (util/get-channel-names routes topic-entity) + (let [channels (util/get-channel-names routes topic-entity) retry-type (retry-type)] (make-channel-queues channels topic-entity) (when (-> (ziggurat-config) :retry :enabled) From 725b59f1e90cc76149c94b431861d810ecb3e4ea Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Thu, 30 Jun 2022 16:59:22 +0530 Subject: [PATCH 2/4] A new unit test in ziggurat.mapper-func.test namespace --- test/ziggurat/mapper_test.clj | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/test/ziggurat/mapper_test.clj b/test/ziggurat/mapper_test.clj index 4b64b007..1558a374 100644 --- a/test/ziggurat/mapper_test.clj +++ b/test/ziggurat/mapper_test.clj @@ -18,6 +18,33 @@ fix/mount-metrics])) (deftest ^:integration mapper-func-test + (let [stream-routes {:default {:handler-fn #(constantly nil)}} + topic-entity (name (first (keys stream-routes))) + message-payload {:message {:foo "bar"} + :topic-entity (keyword topic-entity) + :headers {:hello "world"} + :metadata {:meta "data"}} + expected-message-payload {:message {:foo "bar"} + :metadata {:meta "data" :rabbitmq-retry-count 0}} + actual-paylaod (atom {}) + user-handler-called (atom false) + my-user-handler (fn [this-message-payload] + (reset! user-handler-called true) + (reset! actual-paylaod this-message-payload))] + (testing "mapper-func should call the user handler-fn and process the result" + (with-redefs [metrics/increment-count (constantly nil) + metrics/report-histogram (constantly nil)] + ((mapper-func my-user-handler []) message-payload) + (is (= expected-message-payload @actual-paylaod)) + (is (true? @user-handler-called)))))) + +(deftest mapper-func-provides-retry-count-in-message-payload-metadata-test + (testing "mapper-func should make rabbitmq-retry-count available in message-payload to the user-handler-fn") + (testing "mapper-func adds {:rabbitmq-retry-count 0} to the message payload metadata if this payload is being processed for the first time") + (testing "mapper-func adds {:rabbitmq-retry-count 0} to the message payload metadata for the first retry") + (testing "mapper-func adds {:rabbitmq-retry-count } to the message payload metadata for the last retry")) + +(deftest ^:integration mapper-func-message-processing-test (let [service-name (:app-name (ziggurat-config)) stream-routes {:default {:handler-fn #(constantly nil)}} topic-entity (name (first (keys stream-routes))) @@ -28,7 +55,7 @@ report-time-namespace "handler-fn-execution-time" expected-metric-namespaces [topic-entity expected-metric-namespace] expected-report-time-namespaces [topic-entity report-time-namespace]] - (testing "message process should be successful" + (testing "mapper-func should increment 'success' metrics count if user handler-fn returns :success" (let [successfully-processed? (atom false) successfully-reported-time? (atom false) expected-metric "success"] From b19f019a21af1c6473a35d37d485f7302011fa74 Mon Sep 17 00:00:00 2001 From: Lakshya Gupta Date: Mon, 4 Jul 2022 16:39:51 +0530 Subject: [PATCH 3/4] feature added for rabbitmq-retry-count in handler fnc --- src/ziggurat/mapper.clj | 99 +++++++++-------- test/ziggurat/mapper_test.clj | 200 ++++++++++++++++++++-------------- 2 files changed, 165 insertions(+), 134 deletions(-) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 040e0f9d..7aed46fd 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -22,8 +22,7 @@ (dissoc :headers) (dissoc :retry-count) (dissoc :topic-entity) - (assoc-in [:metadata :rabbitmq-retry-count] (- configured-retry-count remaining-retry-count)) - ))) + (assoc-in [:metadata :rabbitmq-retry-count] (- configured-retry-count remaining-retry-count))))) (defn mapper-func [user-handler-fn channels] (fn [{:keys [topic-entity] :as message-payload}] @@ -42,31 +41,31 @@ multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]] user-payload (create-user-payload message-payload (producer/get-configured-retry-count))] (clog/with-logging-context {:consumer-group topic-entity-name} - (nr/with-tracing "job" new-relic-transaction-name - (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (user-handler-fn user-payload) - end-time (.toEpochMilli (Instant/now)) - time-val (- end-time start-time) - execution-time-namespace "handler-fn-execution-time" - multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace) - [execution-time-namespace]]] - (metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags) - (case return-code - :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags) - :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags) - (producer/retry message-payload)) - :dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags) - (producer/publish-to-dead-queue message-payload)) - :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags) - :block 'TODO - (do - (send-msg-to-channel channels message-payload return-code) - (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)))) - (catch Throwable e - (producer/retry message-payload) - (report-error e (str "Actor execution failed for " topic-entity-name)) - (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))) + (nr/with-tracing "job" new-relic-transaction-name + (try + (let [start-time (.toEpochMilli (Instant/now)) + return-code (user-handler-fn user-payload) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "handler-fn-execution-time" + multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace) + [execution-time-namespace]]] + (metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags) + (case return-code + :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags) + :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags) + (producer/retry message-payload)) + :dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags) + (producer/publish-to-dead-queue message-payload)) + :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags) + :block 'TODO + (do + (send-msg-to-channel channels message-payload return-code) + (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)))) + (catch Throwable e + (producer/retry message-payload) + (report-error e (str "Actor execution failed for " topic-entity-name)) + (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))) (defn channel-mapper-func [user-handler-fn channel] (fn [{:keys [topic-entity] :as message-payload}] @@ -86,28 +85,28 @@ multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]] user-payload (create-user-payload message-payload (producer/get-channel-retry-count topic-entity channel))] (clog/with-logging-context {:consumer-group topic-entity-name :channel channel-name} - (nr/with-tracing "job" metric-namespace - (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (user-handler-fn user-payload) - end-time (.toEpochMilli (Instant/now)) - time-val (- end-time start-time) - execution-time-namespace "execution-time" - multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace) - [execution-time-namespace]]] - (metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags) - (case return-code - :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags) - :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags) - (producer/retry-for-channel message-payload channel)) - :dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags) - (producer/publish-to-channel-dead-queue channel message-payload)) - :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags) - :block 'TODO - (throw (ex-info "Invalid mapper return code" {:code return-code})))) - (catch Throwable e - (producer/retry-for-channel message-payload channel) - (report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) - (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))) + (nr/with-tracing "job" metric-namespace + (try + (let [start-time (.toEpochMilli (Instant/now)) + return-code (user-handler-fn user-payload) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "execution-time" + multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace) + [execution-time-namespace]]] + (metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags) + (case return-code + :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags) + :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags) + (producer/retry-for-channel message-payload channel)) + :dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags) + (producer/publish-to-channel-dead-queue channel message-payload)) + :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags) + :block 'TODO + (throw (ex-info "Invalid mapper return code" {:code return-code})))) + (catch Throwable e + (producer/retry-for-channel message-payload channel) + (report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) + (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))) (defrecord MessagePayload [message topic-entity]) diff --git a/test/ziggurat/mapper_test.clj b/test/ziggurat/mapper_test.clj index 1558a374..85c3668c 100644 --- a/test/ziggurat/mapper_test.clj +++ b/test/ziggurat/mapper_test.clj @@ -18,53 +18,85 @@ fix/mount-metrics])) (deftest ^:integration mapper-func-test - (let [stream-routes {:default {:handler-fn #(constantly nil)}} - topic-entity (name (first (keys stream-routes))) - message-payload {:message {:foo "bar"} - :topic-entity (keyword topic-entity) - :headers {:hello "world"} - :metadata {:meta "data"}} - expected-message-payload {:message {:foo "bar"} - :metadata {:meta "data" :rabbitmq-retry-count 0}} - actual-paylaod (atom {}) - user-handler-called (atom false) - my-user-handler (fn [this-message-payload] - (reset! user-handler-called true) - (reset! actual-paylaod this-message-payload))] + (let [stream-routes {:default {:handler-fn #(constantly nil)}} + topic-entity (name (first (keys stream-routes))) + message-payload {:message {:foo "bar"} + :topic-entity (keyword topic-entity) + :headers {:hello "world"} + :metadata {:meta "data"}} + expected-message-payload {:message {:foo "bar"} + :metadata {:meta "data" :rabbitmq-retry-count 0}} + actual-payload (atom {}) + user-handler-called (atom false) + my-user-handler (fn [this-message-payload] + (reset! user-handler-called true) + (reset! actual-payload this-message-payload))] (testing "mapper-func should call the user handler-fn and process the result" - (with-redefs [metrics/increment-count (constantly nil) - metrics/report-histogram (constantly nil)] - ((mapper-func my-user-handler []) message-payload) - (is (= expected-message-payload @actual-paylaod)) - (is (true? @user-handler-called)))))) + (with-redefs [metrics/increment-count (constantly nil) + metrics/report-histogram (constantly nil)] + ((mapper-func my-user-handler []) message-payload) + (is (= expected-message-payload @actual-payload)) + (is (true? @user-handler-called)))))) + +(deftest ^:integration mapper-func-provides-retry-count-in-message-payload-metadata-test + (let [stream-routes {:default {:handler-fn #(constantly nil)}} + topic-entity (name (first (keys stream-routes))) + message-payload {:message {:foo "bar"} + :topic-entity (keyword topic-entity) + ;:headers {:hello "world"} + :metadata {:meta "data"}} + expected-message-payload {:message {:foo "bar"} + :metadata {:meta "data" :rabbitmq-retry-count 0}} + actual-payload (atom {}) + user-handler-called (atom false) + my-user-handler (fn [this-message-payload] + (reset! user-handler-called true) + (reset! actual-payload this-message-payload))] + (testing "mapper-func should make rabbitmq-retry-count available in message-payload to the user-handler-fn" + (with-redefs [metrics/increment-count (constantly nil) + metrics/report-histogram (constantly nil)] + ((mapper-func my-user-handler []) message-payload) + (is (contains? (get @actual-payload :metadata) :rabbitmq-retry-count)))) + (testing "mapper-func adds {:rabbitmq-retry-count 0} to the message payload metadata if this payload is being processed for the first time" + (with-redefs [metrics/increment-count (constantly nil) + metrics/report-histogram (constantly nil)] + ((mapper-func my-user-handler []) message-payload) + (is (= ((get @actual-payload :metadata) :rabbitmq-retry-count) 0)))) + (testing "mapper-func adds {:rabbitmq-retry-count 1} to the message payload metadata for the first retry" + (fix/with-queues stream-routes + (let [expected-message (-> message-payload + (assoc :retry-count (dec (:count (:retry (ziggurat-config)))))) + unsuccessfully-processed? (atom false) + expected-metric "retry"] -(deftest mapper-func-provides-retry-count-in-message-payload-metadata-test - (testing "mapper-func should make rabbitmq-retry-count available in message-payload to the user-handler-fn") - (testing "mapper-func adds {:rabbitmq-retry-count 0} to the message payload metadata if this payload is being processed for the first time") - (testing "mapper-func adds {:rabbitmq-retry-count 0} to the message payload metadata for the first retry") - (testing "mapper-func adds {:rabbitmq-retry-count } to the message payload metadata for the last retry")) + (with-redefs [metrics/increment-count (constantly nil)] + ((mapper-func (constantly :retry) []) message-payload) + (let [message-from-mq (rmq/get-msg-from-delay-queue topic-entity)] + (is (= message-from-mq expected-message)) + ((mapper-func my-user-handler []) message-from-mq) + (is (= ((get @actual-payload :metadata) :rabbitmq-retry-count) 1))))))))) (deftest ^:integration mapper-func-message-processing-test - (let [service-name (:app-name (ziggurat-config)) - stream-routes {:default {:handler-fn #(constantly nil)}} - topic-entity (name (first (keys stream-routes))) - metadata {:meta "data"} - message-payload {:message {:foo "bar"} :topic-entity (keyword topic-entity) :metadata metadata} - expected-additional-tags {:topic_name topic-entity} - expected-metric-namespace "message-processing" - report-time-namespace "handler-fn-execution-time" - expected-metric-namespaces [topic-entity expected-metric-namespace] + (let [service-name (:app-name (ziggurat-config)) + stream-routes {:default {:handler-fn #(constantly nil)}} + topic-entity (name (first (keys stream-routes))) + metadata {:meta "data" :rabbitmq-retry-count 0} + message-payload {:message {:foo "bar"} :topic-entity (keyword topic-entity) :metadata metadata} + expected-additional-tags {:topic_name topic-entity} + expected-metric-namespace "message-processing" + report-time-namespace "handler-fn-execution-time" + expected-metric-namespaces [topic-entity expected-metric-namespace] expected-report-time-namespaces [topic-entity report-time-namespace]] (testing "mapper-func should increment 'success' metrics count if user handler-fn returns :success" - (let [successfully-processed? (atom false) + (let [successfully-processed? (atom false) successfully-reported-time? (atom false) - expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (or (= metric-namespaces expected-metric-namespaces) - (= metric-namespaces [expected-metric-namespace])) - (= metric expected-metric) - (= additional-tags expected-additional-tags)) - (reset! successfully-processed? true))) + expected-metric "success"] + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [expected-metric-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) + (reset! successfully-processed? true))) metrics/report-histogram (fn [metric-namespaces _ _] (when (or (= metric-namespaces expected-report-time-namespaces) (= metric-namespaces [report-time-namespace])) @@ -76,7 +108,7 @@ (testing "message process should successfully push to channel queue" (fix/with-queues (assoc-in stream-routes [:default :channel-1] (constantly :success)) (let [successfully-processed? (atom false) - expected-metric "success"] + expected-metric "success"] (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] (when (and (or (= metric-namespace [service-name topic-entity expected-metric-namespace]) (= metric-namespace [expected-metric-namespace])) @@ -102,10 +134,10 @@ (testing "message process should be unsuccessful and retry" (fix/with-queues stream-routes - (let [expected-message (-> message-payload - (assoc :retry-count (dec (:count (:retry (ziggurat-config)))))) + (let [expected-message (-> message-payload + (assoc :retry-count (dec (:count (:retry (ziggurat-config)))))) unsuccessfully-processed? (atom false) - expected-metric "retry"] + expected-metric "retry"] (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] (when (and (or (= metric-namespace [service-name topic-entity expected-metric-namespace]) @@ -121,7 +153,7 @@ (testing "message process should be unsuccessful and be pushed to dlq" (fix/with-queues stream-routes (let [unsuccessfully-processed? (atom false) - expected-metric "dead-letter"] + expected-metric "dead-letter"] (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] (when (and (or (= metric-namespace [service-name topic-entity expected-metric-namespace]) @@ -136,11 +168,11 @@ (testing "reports error, publishes message to retry queue if mapper-fn raises exception" (fix/with-queues stream-routes - (let [expected-message (-> message-payload - (assoc :retry-count (dec (:count (:retry (ziggurat-config)))))) - report-fn-called? (atom false) + (let [expected-message (-> message-payload + (assoc :retry-count (dec (:count (:retry (ziggurat-config)))))) + report-fn-called? (atom false) unsuccessfully-processed? (atom false) - expected-metric "failure"] + expected-metric "failure"] (with-redefs [report-error (fn [_ _] (reset! report-fn-called? true)) metrics/increment-count (fn [metric-namespace metric additional-tags] (when (and (or (= metric-namespace [service-name topic-entity expected-metric-namespace]) @@ -155,8 +187,8 @@ (is @report-fn-called?))))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - expected-metric-namespace "handler-fn-execution-time" + (let [reported-execution-time? (atom false) + expected-metric-namespace "handler-fn-execution-time" expected-metric-namespaces [service-name "default" expected-metric-namespace]] (with-redefs [metrics/report-histogram (fn [metric-namespaces _ _] (when (or (= metric-namespaces expected-metric-namespaces) @@ -166,41 +198,41 @@ (is @reported-execution-time?)))) (testing "User Handler function should have access to the proto message and metadata via :message and :metadata keywords" (let [user-handler-called (atom false) - headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value"))))) + headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value"))))) user-handler-fn (fn [user-msg-payload] (reset! user-handler-called true) (is (= (-> message-payload (dissoc :headers) - (dissoc :topic-entity)) user-msg-payload)) + (dissoc :topic-entity)) (dissoc user-msg-payload))) (is (some? (:message user-msg-payload))) (is (some? (:metadata user-msg-payload))) - (is (nil? (:retry-count user-msg-payload))) - (is (nil? (:topic-entity user-msg-payload))) - (is (nil? (:headers user-msg-payload))))] - (with-redefs [metrics/increment-count (constantly nil) + (is (nil? (:retry-count user-msg-payload))) + (is (nil? (:topic-entity user-msg-payload))) + (is (nil? (:headers user-msg-payload))))] + (with-redefs [metrics/increment-count (constantly nil) metrics/report-histogram (constantly nil)] ((mapper-func user-handler-fn []) (assoc message-payload :headers headers)) (is @user-handler-called)))))) (deftest ^:integration channel-mapper-func-test - (let [channel :channel-1 - channel-name (name channel) - service-name (:app-name (ziggurat-config)) - stream-routes {:default {:handler-fn #(constantly nil) - channel #(constantly nil)}} - topic (first (keys stream-routes)) - metadata {:meta "data"} - message-payload {:message {:foo "bar"} - :retry-count (:count (:retry (ziggurat-config))) - :topic-entity topic - :metadata metadata} - expected-topic-entity-name (name topic) - expected-additional-tags {:topic_name expected-topic-entity-name :channel_name channel-name} - increment-count-namespace "message-processing" + (let [channel :channel-1 + channel-name (name channel) + service-name (:app-name (ziggurat-config)) + stream-routes {:default {:handler-fn #(constantly nil) + channel #(constantly nil)}} + topic (first (keys stream-routes)) + metadata {:meta "data" :rabbitmq-retry-count 0} + message-payload {:message {:foo "bar"} + :retry-count (:count (:retry (ziggurat-config))) + :topic-entity topic + :metadata metadata} + expected-topic-entity-name (name topic) + expected-additional-tags {:topic_name expected-topic-entity-name :channel_name channel-name} + increment-count-namespace "message-processing" expected-increment-count-namespaces [service-name topic channel-name increment-count-namespace]] (testing "message process should be successful" (let [successfully-processed? (atom false) - expected-metric "success"] + expected-metric "success"] (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] (when (and (or (= metric-namespace expected-increment-count-namespaces) (= metric-namespace [increment-count-namespace])) @@ -212,10 +244,10 @@ (testing "message process should be unsuccessful and retry" (fix/with-queues stream-routes - (let [expected-message (-> message-payload - (assoc :retry-count (dec (:count (:retry (ziggurat-config)))))) + (let [expected-message (-> message-payload + (assoc :retry-count (dec (:count (:retry (ziggurat-config)))))) unsuccessfully-processed? (atom false) - expected-metric "retry"] + expected-metric "retry"] (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] (when (and (or (= metric-namespace expected-increment-count-namespaces) @@ -231,7 +263,7 @@ (testing "message should be published to dead-letter if handler returns :dead-letter keyword" (fix/with-queues stream-routes (let [unsuccessfully-processed? (atom false) - expected-metric "dead-letter"] + expected-metric "dead-letter"] (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] (when (and (or (= metric-namespace expected-increment-count-namespaces) @@ -246,11 +278,11 @@ (testing "message should raise exception and report the error" (fix/with-queues stream-routes - (let [expected-message (-> message-payload - (assoc :retry-count (dec (:count (:retry (ziggurat-config)))))) - report-fn-called? (atom false) + (let [expected-message (-> message-payload + (assoc :retry-count (dec (:count (:retry (ziggurat-config)))))) + report-fn-called? (atom false) unsuccessfully-processed? (atom false) - expected-metric "failure"] + expected-metric "failure"] (with-redefs [report-error (fn [_ _] (reset! report-fn-called? true)) metrics/increment-count (fn [metric-namespace metric additional-tags] (when (and (or (= metric-namespace expected-increment-count-namespaces) @@ -265,8 +297,8 @@ (is @report-fn-called?))))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - execution-time-namespace "execution-time" + (let [reported-execution-time? (atom false) + execution-time-namespace "execution-time" expected-execution-time-namespaces [service-name expected-topic-entity-name channel-name execution-time-namespace]] (with-redefs [metrics/report-histogram (fn [metric-namespaces _ _] (when (or (= metric-namespaces expected-execution-time-namespaces) @@ -285,8 +317,8 @@ (is (some? (:message user-msg-payload))) (is (some? (:metadata user-msg-payload))) (is (nil? (:retry-count user-msg-payload))) - (is (nil? (:topic-entity user-msg-payload))))] - (with-redefs [metrics/increment-count (constantly nil) + (is (nil? (:topic-entity user-msg-payload))))] + (with-redefs [metrics/increment-count (constantly nil) metrics/report-histogram (constantly nil)] ((channel-mapper-func user-handler-fn channel) message-payload) (is @user-handler-called)))))) From 862fccfe1f4e4204c9f531d5beca0c841f4773d1 Mon Sep 17 00:00:00 2001 From: Lakshya Gupta Date: Wed, 6 Jul 2022 11:50:55 +0530 Subject: [PATCH 4/4] config moved to ziggurat.config --- src/ziggurat/config.clj | 43 +++++++++++++++-------------- src/ziggurat/mapper.clj | 4 +-- src/ziggurat/messaging/producer.clj | 3 -- test/ziggurat/mapper_test.clj | 2 +- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index 6460ef7b..e8f0633b 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -8,9 +8,9 @@ (:import (java.util Properties) [org.apache.kafka.common.config SaslConfigs]) (:gen-class - :methods [^{:static true} [get [String] Object] - ^{:static true} [getIn [java.lang.Iterable] Object]] - :name tech.gojek.ziggurat.internal.Config)) + :methods [^{:static true} [get [String] Object] + ^{:static true} [getIn [java.lang.Iterable] Object]] + :name tech.gojek.ziggurat.internal.Config)) (def config-file "config.edn") @@ -25,10 +25,10 @@ :username "guest" :password "guest" :channel-timeout 2000 - :publish-retry {:back-off-ms 5000 - :non-recoverable-exception {:enabled true - :back-off-ms 5000 - :count 5}}} + :publish-retry {:back-off-ms 5000 + :non-recoverable-exception {:enabled true + :back-off-ms 5000 + :count 5}}} :jobs {:instant {:worker-count 4 :prefetch-count 4}} :rabbit-mq {:delay {:queue-name "%s_delay_queue" @@ -77,9 +77,9 @@ (declare config) (defstate config - :start (let [config-values-from-env (config-from-env config-file) - app-name (-> config-values-from-env :ziggurat :app-name)] - (deep-merge (interpolate-config default-config app-name) config-values-from-env))) + :start (let [config-values-from-env (config-from-env config-file) + app-name (-> config-values-from-env :ziggurat :app-name)] + (deep-merge (interpolate-config default-config app-name) config-values-from-env))) (defn ziggurat-config [] (get config :ziggurat)) @@ -169,9 +169,9 @@ (defn- normalize-value [v] (str/trim - (cond - (keyword? v) (name v) - :else (str v)))) + (cond + (keyword? v) (name v) + :else (str v)))) (defn set-property [mapping-table p k v] @@ -195,8 +195,8 @@ (defn- add-jaas-properties [properties jaas-config] (if (some? jaas-config) - (let [username (get jaas-config :username) - password (get jaas-config :password) + (let [username (get jaas-config :username) + password (get jaas-config :password) mechanism (get jaas-config :mechanism)] (doto properties (.put SaslConfigs/SASL_JAAS_CONFIG (create-jaas-properties username password mechanism)))) @@ -222,11 +222,11 @@ :mechanism <>}}} " (let [ssl-configs-enabled (:enabled ssl-config-map) - jaas-config (get ssl-config-map :jaas)] + jaas-config (get ssl-config-map :jaas)] (if (true? ssl-configs-enabled) (as-> properties pr - (add-jaas-properties pr jaas-config) - (reduce-kv set-property-fn pr ssl-config-map)) + (add-jaas-properties pr jaas-config) + (reduce-kv set-property-fn pr ssl-config-map)) properties))) (defn build-properties @@ -249,11 +249,14 @@ " [set-property-fn config-map] (as-> (Properties.) pr - (build-ssl-properties pr set-property-fn (ssl-config)) - (reduce-kv set-property-fn pr config-map))) + (build-ssl-properties pr set-property-fn (ssl-config)) + (reduce-kv set-property-fn pr config-map))) (def build-consumer-config-properties (partial build-properties (partial set-property consumer-config-mapping-table))) (def build-producer-config-properties (partial build-properties (partial set-property producer-config-mapping-table))) (def build-streams-config-properties (partial build-properties (partial set-property streams-config-mapping-table))) + +(defn get-configured-retry-count [] + (-> (ziggurat-config) :retry :count)) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 7aed46fd..7fe928bb 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -1,7 +1,7 @@ (ns ziggurat.mapper (:require [clojure.string :as str] [sentry-clj.async :as sentry] - [ziggurat.config :refer [ziggurat-config]] + [ziggurat.config :refer [ziggurat-config get-configured-retry-count]] [ziggurat.messaging.producer :as producer] [ziggurat.metrics :as metrics] [ziggurat.new-relic :as nr] @@ -39,7 +39,7 @@ failure-metric "failure" dead-letter-metric "dead-letter" multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]] - user-payload (create-user-payload message-payload (producer/get-configured-retry-count))] + user-payload (create-user-payload message-payload (get-configured-retry-count))] (clog/with-logging-context {:consumer-group topic-entity-name} (nr/with-tracing "job" new-relic-transaction-name (try diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index 33fe5f04..d46963f0 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -154,9 +154,6 @@ (defn get-channel-retry-count [topic-entity channel] (:count (channel-retry-config topic-entity channel))) -(defn get-configured-retry-count [] - (-> (ziggurat-config) :retry :count)) - (defn- get-channel-queue-timeout-or-default-timeout [topic-entity channel] (let [channel-queue-timeout-ms (:queue-timeout-ms (channel-retry-config topic-entity channel)) queue-timeout-ms (get-in (rabbitmq-config) [:delay :queue-timeout-ms])] diff --git a/test/ziggurat/mapper_test.clj b/test/ziggurat/mapper_test.clj index 85c3668c..555ed6eb 100644 --- a/test/ziggurat/mapper_test.clj +++ b/test/ziggurat/mapper_test.clj @@ -1,7 +1,7 @@ (ns ziggurat.mapper-test (:require [clojure.test :refer :all] [schema.core :as s] - [ziggurat.config :refer [ziggurat-config]] + [ziggurat.config :refer [ziggurat-config get-configured-retry-count]] [ziggurat.fixtures :as fix] [ziggurat.mapper :refer :all] [ziggurat.messaging.connection :refer [producer-connection]]