Skip to content

Commit

Permalink
feature added for rabbitmq-retry-count in handler fnc
Browse files Browse the repository at this point in the history
  • Loading branch information
Lakshya Gupta committed Jul 6, 2022
1 parent 725b59f commit b19f019
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 134 deletions.
99 changes: 49 additions & 50 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
(dissoc :headers)
(dissoc :retry-count)
(dissoc :topic-entity)
(assoc-in [:metadata :rabbitmq-retry-count] (- configured-retry-count remaining-retry-count))
)))
(assoc-in [:metadata :rabbitmq-retry-count] (- configured-retry-count remaining-retry-count)))))

(defn mapper-func [user-handler-fn channels]
(fn [{:keys [topic-entity] :as message-payload}]
Expand All @@ -42,31 +41,31 @@
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload (producer/get-configured-retry-count))]
(clog/with-logging-context {:consumer-group topic-entity-name}
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags)
(case return-code
:success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry message-payload))
:dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags)
(producer/publish-to-dead-queue message-payload))
:skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
:block 'TODO
(do
(send-msg-to-channel channels message-payload return-code)
(metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags))))
(catch Throwable e
(producer/retry message-payload)
(report-error e (str "Actor execution failed for " topic-entity-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags))))))))
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags)
(case return-code
:success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry message-payload))
:dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags)
(producer/publish-to-dead-queue message-payload))
:skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
:block 'TODO
(do
(send-msg-to-channel channels message-payload return-code)
(metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags))))
(catch Throwable e
(producer/retry message-payload)
(report-error e (str "Actor execution failed for " topic-entity-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags))))))))

(defn channel-mapper-func [user-handler-fn channel]
(fn [{:keys [topic-entity] :as message-payload}]
Expand All @@ -86,28 +85,28 @@
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload (producer/get-channel-retry-count topic-entity channel))]
(clog/with-logging-context {:consumer-group topic-entity-name :channel channel-name}
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags)
(case return-code
:success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry-for-channel message-payload channel))
:dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags)
(producer/publish-to-channel-dead-queue channel message-payload))
:skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
:block 'TODO
(throw (ex-info "Invalid mapper return code" {:code return-code}))))
(catch Throwable e
(producer/retry-for-channel message-payload channel)
(report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags))))))))
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags)
(case return-code
:success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry-for-channel message-payload channel))
:dead-letter (do (metrics/multi-ns-increment-count multi-message-processing-namespaces dead-letter-metric additional-tags)
(producer/publish-to-channel-dead-queue channel message-payload))
:skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
:block 'TODO
(throw (ex-info "Invalid mapper return code" {:code return-code}))))
(catch Throwable e
(producer/retry-for-channel message-payload channel)
(report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags))))))))

(defrecord MessagePayload [message topic-entity])

0 comments on commit b19f019

Please sign in to comment.