diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn index f29fb457..dcf17033 100644 --- a/resources/config.test.ci.edn +++ b/resources/config.test.ci.edn @@ -51,5 +51,5 @@ :retry {:count [5 :int] :enabled [true :bool]}}}}} :tracer {:enabled [true :bool] - :tracer-provider ""}}} + :custom-provider ""}}} diff --git a/resources/config.test.edn b/resources/config.test.edn index 1e9933b2..f5e7b22e 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -50,5 +50,5 @@ :channels {:channel-1 {:worker-count [10 :int] :retry {:count [5 :int] :enabled [true :bool]}}}}} - :tracer {:enabled [true :bool] - :custom-provider ""}}} + :tracer {:enabled [true :bool] + :custom-provider ""}}} diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 91b6248d..d80b08a9 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -9,12 +9,12 @@ [ziggurat.sentry :refer [sentry-reporter]]) (:import (java.time Instant))) -(defn- send-msg-to-channel [channels message-payload return-code headers] +(defn- send-msg-to-channel [channels message-payload return-code] (when-not (contains? (set channels) return-code) (throw (ex-info "Invalid mapper return code" {:code return-code}))) - (producer/publish-to-channel-instant-queue return-code message-payload headers)) + (producer/publish-to-channel-instant-queue return-code message-payload)) -(defn mapper-func [mapper-fn channels headers] +(defn mapper-func [mapper-fn channels] (fn [{:keys [topic-entity message] :as message-payload}] (let [topic-entity-name (name topic-entity) new-relic-transaction-name (str topic-entity-name ".handler-fn") @@ -35,14 +35,14 @@ (case return-code :success (metrics/increment-count default-namespace success-metric additional-tags) :retry (do (metrics/increment-count default-namespace retry-metric additional-tags) - (producer/retry message-payload headers)) + (producer/retry message-payload)) :skip (metrics/increment-count default-namespace skip-metric additional-tags) :block 'TODO (do - (send-msg-to-channel channels message-payload return-code headers) + (send-msg-to-channel channels message-payload return-code) (metrics/increment-count default-namespace success-metric additional-tags)))) (catch Throwable e - (producer/retry message-payload headers) + (producer/retry message-payload) (sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name)) (metrics/increment-count default-namespace failure-metric additional-tags))))))) @@ -80,7 +80,7 @@ (sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) (metrics/increment-count default-namespace failure-metric additional-tags))))))) -(defrecord MessagePayload [message topic-entity]) +(defrecord MessagePayload [message topic-entity headers]) (s/defschema message-payload-schema {:message s/Any diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj index 79b4652a..d7620bfb 100644 --- a/src/ziggurat/messaging/consumer.clj +++ b/src/ziggurat/messaging/consumer.clj @@ -21,7 +21,7 @@ (catch Exception e (log/info "old message format read, converting to message-payload: " message) (let [retry-count (:retry-count message) - message-payload (mpr/->MessagePayload (dissoc message :retry-count) topic-entity)] + message-payload (mpr/->MessagePayload (dissoc message :retry-count) topic-entity {})] (assoc message-payload :retry-count retry-count))))) (defn convert-and-ack-message @@ -95,7 +95,7 @@ (start-subscriber* (lch/open connection) (get-in-config [:jobs :instant :prefetch-count]) (prefixed-queue-name topic-entity (get-in-config [:rabbit-mq :instant :queue-name])) - (mpr/mapper-func mapper-fn channels {}) + (mpr/mapper-func mapper-fn channels) topic-entity)))) (defn start-channels-subscriber [channels topic-entity] diff --git a/src/ziggurat/messaging/dead_set.clj b/src/ziggurat/messaging/dead_set.clj index 17ef555f..a0d54481 100644 --- a/src/ziggurat/messaging/dead_set.clj +++ b/src/ziggurat/messaging/dead_set.clj @@ -9,7 +9,7 @@ (doseq [message-payload (consumer/get-dead-set-messages-for-topic true topic-entity count-of-message)] (producer/publish-to-instant-queue message-payload)) (doseq [message-payload (consumer/get-dead-set-messages-for-channel true topic-entity channel count-of-message)] - (producer/publish-to-channel-instant-queue channel message-payload {})))) + (producer/publish-to-channel-instant-queue channel message-payload)))) (defn- get-messages "Gets n messages from dead queue and gives the option to ack or un-ack them" diff --git a/src/ziggurat/messaging/producer.clj b/src/ziggurat/messaging/producer.clj index eed91622..83653fe2 100644 --- a/src/ziggurat/messaging/producer.clj +++ b/src/ziggurat/messaging/producer.clj @@ -60,23 +60,23 @@ (defn- publish ([exchange message-payload] - (publish exchange message-payload nil {})) - ([exchange message-payload expiration headers] + (publish exchange message-payload nil)) + ([exchange message-payload expiration] (try (with-retry {:count 5 :wait 100 :on-failure #(log/error "publishing message to rabbitmq failed with error " (.getMessage %))} (with-open [ch (lch/open connection)] - (lb/publish ch exchange "" (nippy/freeze message-payload) (properties-for-publish expiration headers)))) + (lb/publish ch exchange "" (nippy/freeze message-payload) (properties-for-publish expiration (:headers message-payload))))) (catch Throwable e (sentry/report-error sentry-reporter e "Pushing message to rabbitmq failed, data: " message-payload))))) -(defn publish-to-delay-queue [message-payload headers] +(defn publish-to-delay-queue [message-payload] (let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config)) topic-entity (:topic-entity message-payload) exchange-name (prefixed-queue-name topic-entity exchange-name)] - (publish exchange-name message-payload queue-timeout-ms headers))) + (publish exchange-name message-payload queue-timeout-ms))) (defn publish-to-dead-queue [message-payload] (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) @@ -94,7 +94,7 @@ (let [{:keys [exchange-name queue-timeout-ms]} (:delay (rabbitmq-config)) topic-entity (:topic-entity message-payload) exchange-name (prefixed-channel-name topic-entity channel exchange-name)] - (publish exchange-name message-payload queue-timeout-ms {}))) + (publish exchange-name message-payload queue-timeout-ms))) (defn publish-to-channel-dead-queue [channel message-payload] (let [{:keys [exchange-name]} (:dead-letter (rabbitmq-config)) @@ -102,11 +102,11 @@ exchange-name (prefixed-channel-name topic-entity channel exchange-name)] (publish exchange-name message-payload))) -(defn publish-to-channel-instant-queue [channel message-payload headers] +(defn publish-to-channel-instant-queue [channel message-payload] (let [{:keys [exchange-name]} (:instant (rabbitmq-config)) topic-entity (:topic-entity message-payload) exchange-name (prefixed-channel-name topic-entity channel exchange-name)] - (publish exchange-name message-payload nil headers))) + (publish exchange-name message-payload nil))) (defn- channel-retries-enabled [topic-entity channel] (-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :enabled)) @@ -114,11 +114,11 @@ (defn- get-channel-retry-count [topic-entity channel] (-> (ziggurat-config) :stream-router topic-entity :channels channel :retry :count)) -(defn retry [{:keys [retry-count topic-entity] :as message-payload} headers] +(defn retry [{:keys [retry-count topic-entity] :as message-payload}] (when (-> (ziggurat-config) :retry :enabled) (cond - (nil? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec (-> (ziggurat-config) :retry :count))) headers) - (pos? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec retry-count)) headers) + (nil? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec (-> (ziggurat-config) :retry :count)))) + (pos? retry-count) (publish-to-delay-queue (assoc message-payload :retry-count (dec retry-count))) (zero? retry-count) (publish-to-dead-queue message-payload)))) (defn retry-for-channel [{:keys [retry-count topic-entity] :as message-payload} channel] diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 1852811a..04d0f1b0 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -107,7 +107,7 @@ (.start))] (try (.activate (.scopeManager tracer) span) - ((mapper-func handler-fn channels (:headers message-payload)) (->MessagePayload (:value message-payload) topic-entity)) + ((mapper-func handler-fn channels) (->MessagePayload (:value message-payload) topic-entity (:headers message-payload))) (catch Exception e (log/error "Exception while executing handler function " e)) (finally diff --git a/test/ziggurat/mapper_test.clj b/test/ziggurat/mapper_test.clj index 0c3d061d..fdbe8891 100644 --- a/test/ziggurat/mapper_test.clj +++ b/test/ziggurat/mapper_test.clj @@ -32,7 +32,7 @@ metrics/report-time (fn [metric-namespace _ _] (when (= metric-namespace expected-report-time-namespace) (reset! successfully-reported-time? true)))] - ((mapper-func (constantly :success) [] {}) message-payload) + ((mapper-func (constantly :success) []) message-payload) (is @successfully-processed?) (is @successfully-reported-time?)))) @@ -45,7 +45,7 @@ (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] - ((mapper-func (constantly :channel-1) [:channel-1] {}) message-payload) + ((mapper-func (constantly :channel-1) [:channel-1]) message-payload) (let [message-from-mq (rmq/get-message-from-channel-instant-queue topic-entity :channel-1)] (is (= message-payload message-from-mq)) (is @successfully-processed?)))))) @@ -57,7 +57,7 @@ (let [err (Throwable->map e)] (is (= (:cause err) "Invalid mapper return code")) (is (= (-> err :data :code) :channel-1))))] - ((mapper-func (constantly :channel-1) [:some-other-channel] {}) message-payload) + ((mapper-func (constantly :channel-1) [:some-other-channel]) message-payload) (let [message-from-mq (rmq/get-message-from-channel-instant-queue topic-entity :channel-1)] (is (nil? message-from-mq)))))) @@ -72,7 +72,7 @@ (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] - ((mapper-func (constantly :retry) [] {}) message-payload) + ((mapper-func (constantly :retry) []) message-payload) (let [message-from-mq (rmq/get-msg-from-delay-queue topic-entity)] (is (= message-from-mq expected-message))) (is @unsuccessfully-processed?))))) @@ -89,7 +89,7 @@ (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] - ((mapper-func (fn [_] (throw (Exception. "test exception"))) [] {}) message-payload) + ((mapper-func (fn [_] (throw (Exception. "test exception"))) []) message-payload) (let [message-from-mq (rmq/get-msg-from-delay-queue topic-entity)] (is (= message-from-mq expected-message))) (is @unsuccessfully-processed?) @@ -102,7 +102,7 @@ (when (= metric-namespace expected-metric-namespace) (reset! reported-execution-time? true)))] - ((mapper-func (constantly :success) [] {}) message-payload) + ((mapper-func (constantly :success) []) message-payload) (is @reported-execution-time?)))))) (deftest channel-mapper-func-test diff --git a/test/ziggurat/messaging/consumer_test.clj b/test/ziggurat/messaging/consumer_test.clj index 51db0759..dd6b15ae 100644 --- a/test/ziggurat/messaging/consumer_test.clj +++ b/test/ziggurat/messaging/consumer_test.clj @@ -106,7 +106,7 @@ :retry-limit 2 :success-promise success-promise}) topic-entity []) - (producer/publish-to-delay-queue message-payload {}) + (producer/publish-to-delay-queue message-payload) (when-let [promise-success? (deref success-promise 5000 :timeout)] (is (not (= :timeout promise-success?))) @@ -135,7 +135,7 @@ :skip-promise skip-promise :retry-limit -1}) topic-entity []) - (producer/publish-to-delay-queue message-payload {}) + (producer/publish-to-delay-queue message-payload) (when-let [promise-success? (deref skip-promise 5000 :timeout)] (is (not (= :timeout promise-success?))) @@ -163,7 +163,7 @@ :retry-limit (* no-of-msgs 10)}) topic-entity []) (dotimes [_ no-of-msgs] - (producer/retry (gen-message-payload topic-entity) {})) + (producer/retry (gen-message-payload topic-entity))) (block-and-retry-until (fn [] (let [dead-set-msgs (count (get-dead-set-messages-for-topic false topic-entity no-of-msgs))] @@ -254,7 +254,7 @@ (update-in [:stream-router topic-entity :channels channel :retry :enabled] (constantly false)) (update-in [:stream-router topic-entity :channels channel :worker-count] (constantly 1))))] (start-channels-subscriber {channel channel-fn} topic-entity) - (producer/publish-to-channel-instant-queue channel message-payload {}) + (producer/publish-to-channel-instant-queue channel message-payload) (deref success-promise 5000 :timeout) (is (= 1 @call-counter)) (util/close rmq-ch)))))) @@ -280,7 +280,7 @@ :retry-limit 0 :success-promise success-promise}) topic-entity []) - (producer/publish-to-delay-queue message-payload {}) + (producer/publish-to-delay-queue message-payload) (when-let [promise-success? (deref success-promise 5000 :timeout)] (is (not (= :timeout promise-success?))) diff --git a/test/ziggurat/messaging/producer_test.clj b/test/ziggurat/messaging/producer_test.clj index 32723373..403ffd88 100644 --- a/test/ziggurat/messaging/producer_test.clj +++ b/test/ziggurat/messaging/producer_test.clj @@ -59,7 +59,7 @@ (let [topic-entity :default message-payload {:message {:foo "bar"} :retry-count 5 :topic-entity topic-entity} expected-message-payload (update message-payload :retry-count dec)] - (producer/retry message-payload {}) + (producer/retry message-payload) (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] (is (= expected-message-payload message-from-mq)))))) @@ -68,7 +68,7 @@ {:default {:handler-fn #(constantly nil)}} (let [topic-entity :default message-payload {:message {:foo "bar"} :retry-count 0 :topic-entity topic-entity}] - (producer/retry message-payload {}) + (producer/retry message-payload) (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] (is (= message-payload message-from-mq)))))) @@ -81,7 +81,7 @@ (with-redefs [lb/publish (fn [_ _ _ _ props] (swap! retry-count inc) (throw (Exception. "some exception")))] - (producer/retry message-payload {}) + (producer/retry message-payload) (is (= 6 @retry-count)))))) (testing "message with no retry count will publish to delay queue" @@ -90,7 +90,7 @@ (let [topic-entity :default message-payload {:message {:foo "bar"} :topic-entity topic-entity} expected-message (assoc message-payload :retry-count 4)] - (producer/retry message-payload {}) + (producer/retry message-payload) (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] (is (= message-from-mq expected-message)))))) @@ -105,20 +105,20 @@ :headers {}}] (with-redefs [lb/publish (fn [_ _ _ _ props] (is (= expected-props props)))] - (producer/publish-to-delay-queue message-payload {}))))) + (producer/publish-to-delay-queue message-payload))))) (testing "publish to delay queue publishes with parsed record headers" (fix/with-queues {:default {:handler-fn #(constantly nil)}} (let [topic-entity :default - message-payload {:message {:foo "bar"} :topic-entity topic-entity} + message-payload {:message {:foo "bar"} :topic-entity topic-entity :headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value")))))} expected-props {:content-type "application/octet-stream" :persistent true :expiration (str (get-in (rabbitmq-config) [:delay :queue-timeout-ms])) :headers {"key" "value"}}] (with-redefs [lb/publish (fn [_ _ _ _ props] (is (= expected-props props)))] - (producer/publish-to-delay-queue message-payload (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value")))))))))) + (producer/publish-to-delay-queue message-payload))))) (testing "message will be retried as defined in ziggurat config retry-count when message doesn't have retry-count" (fix/with-queues @@ -127,11 +127,11 @@ topic-entity :default message-payload {:message {:foo "bar"} :topic-entity topic-entity} expected-message-payload (assoc message-payload :retry-count 0)] - (producer/retry message-payload {}) + (producer/retry message-payload) (while (> @retry-count 0) (swap! retry-count dec) (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] - (producer/retry message-from-mq {}))) + (producer/retry message-from-mq))) (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] (is (= expected-message-payload message-from-mq)))))) @@ -142,11 +142,11 @@ topic-entity :default message-payload {:message {:foo "bar"} :retry-count @retry-count :topic-entity topic-entity} expected-message-payload (assoc message-payload :retry-count 0)] - (producer/retry message-payload {}) + (producer/retry message-payload) (while (> @retry-count 0) (swap! retry-count dec) (let [message-from-mq (rmq/get-msg-from-delay-queue "default")] - (producer/retry message-from-mq {}))) + (producer/retry message-from-mq))) (let [message-from-mq (rmq/get-msg-from-dead-queue "default")] (is (= expected-message-payload message-from-mq))))))) @@ -313,7 +313,7 @@ stream-routes (let [topic-entity :default message-payload {:message {:foo "bar"} :topic-entity topic-entity}] - (producer/retry message-payload {}) + (producer/retry message-payload) (let [finished-spans (.finishedSpans tracer)] (is (= 1 (.size finished-spans))) (is (= "send" (-> finished-spans @@ -329,7 +329,7 @@ stream-routes (let [topic-entity :default message-payload {:message {:foo "bar"} :topic-entity topic-entity}] - (producer/publish-to-channel-instant-queue :channel-1 message-payload {}) + (producer/publish-to-channel-instant-queue :channel-1 message-payload) (let [finished-spans (.finishedSpans tracer)] (is (= 1 (.size finished-spans))) (is (= "send" (-> finished-spans diff --git a/test/ziggurat/tracer_test.clj b/test/ziggurat/tracer_test.clj index 2af7f5f9..a00299e7 100644 --- a/test/ziggurat/tracer_test.clj +++ b/test/ziggurat/tracer_test.clj @@ -9,7 +9,7 @@ (use-fixtures :once fix/silence-logging) -(defn tracer-provider [] +(defn custom-tracer-provider [] (MockTracer.)) (deftest mount-tracer-test @@ -30,25 +30,25 @@ (testing "should execute create custom tracer when tracer is enabled and tracer provider is set" (fix/mount-config) (with-redefs [ziggurat-config (fn [] {:tracer {:enabled true - :tracer-provider "ziggurat.tracer-test/tracer-provider"}})] + :custom-provider "ziggurat.tracer-test/custom-tracer-provider"}})] (mount/start (mount/only [#'tracer/tracer])) (is (= "MockTracer" (.getSimpleName (.getClass tracer/tracer))))) (mount/stop)) (testing "should handle gracefully when custom tracer provider returns nil and create NoopTracer" (fix/mount-config) - (with-redefs [tracer-provider (fn [] nil) + (with-redefs [custom-tracer-provider (fn [] nil) ziggurat-config (fn [] {:tracer {:enabled true - :tracer-provider "ziggurat.tracer-test/tracer-provider"}})] + :custom-provider "ziggurat.tracer-test/custom-tracer-provider"}})] (mount/start (mount/only [#'tracer/tracer])) (is (= "NoopTracerImpl" (.getSimpleName (.getClass tracer/tracer))))) (mount/stop)) (testing "should handle gracefully when custom tracer provider returns non tracer instance and create NoopTracer" (fix/mount-config) - (with-redefs [tracer-provider (fn [] "") + (with-redefs [custom-tracer-provider (fn [] "") ziggurat-config (fn [] {:tracer {:enabled true - :tracer-provider "ziggurat.tracer-test/tracer-provider"}})] + :custom-provider "ziggurat.tracer-test/custom-tracer-provider"}})] (mount/start (mount/only [#'tracer/tracer])) (is (= "NoopTracerImpl" (.getSimpleName (.getClass tracer/tracer))))) (mount/stop)) @@ -69,9 +69,9 @@ (testing "should handle create tracer exception gracefully and create NoopTracer" (fix/mount-config) - (with-redefs [tracer-provider (fn [] (throw (RuntimeException.))) + (with-redefs [custom-tracer-provider (fn [] (throw (RuntimeException.))) ziggurat-config (fn [] {:tracer {:enabled true - :tracer-provider "ziggurat.tracer-test/tracer-provider"}})] + :custom-provider "ziggurat.tracer-test/custom-tracer-provider"}})] (mount/start (mount/only [#'tracer/tracer])) (is (= "NoopTracerImpl" (.getSimpleName (.getClass tracer/tracer))))) (mount/stop)))