Skip to content

Commit

Permalink
include metadata in handler
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed Jul 23, 2021
1 parent c4c6d81 commit 4dac309
Show file tree
Hide file tree
Showing 16 changed files with 317 additions and 286 deletions.
11 changes: 8 additions & 3 deletions src/ziggurat/header_transformer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

(deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformer
(^void init [_ ^ProcessorContext context]
(do (set! processor-context context)
nil))
(transform [_ record-value] {:value record-value :headers (.headers processor-context)})
(set! processor-context context))
(transform [_ record-value]
(let [timestamp (.timestamp processor-context)
headers (.headers processor-context)
topic (.topic processor-context)
partition (.partition processor-context)
metadata {:topic topic :partition partition :timestamp timestamp}]
{:value record-value :headers headers :metadata metadata}))
(close [_] nil))

(defn create []
Expand Down
11 changes: 5 additions & 6 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
(ns ziggurat.mapper
(:require [clojure.string :as str]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.messaging.producer :as producer]
[ziggurat.metrics :as metrics]
[ziggurat.new-relic :as nr]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.util.error :refer [report-error]])
(:import (java.time Instant)))

Expand All @@ -15,7 +13,7 @@
(producer/publish-to-channel-instant-queue return-code message-payload))

(defn mapper-func [mapper-fn channels]
(fn [{:keys [topic-entity message] :as message-payload}]
(fn [{:keys [topic-entity message metadata] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
Expand All @@ -27,11 +25,12 @@
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
_ (println "MAPPER MESSAGE PAYLOAD>>> " message-payload)
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]]
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
return-code (mapper-fn message metadata)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
Expand All @@ -53,7 +52,7 @@
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))

(defn channel-mapper-func [mapper-fn channel]
(fn [{:keys [topic-entity message] :as message-payload}]
(fn [{:keys [topic-entity message metadata] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
Expand All @@ -70,7 +69,7 @@
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
return-code (mapper-fn message metadata)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
Expand Down
31 changes: 16 additions & 15 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
(ns ziggurat.messaging.consumer
(:require [ziggurat.mapper :as mpr]
[ziggurat.message-payload :as mp]
[clojure.tools.logging :as log]
(:require [clojure.tools.logging :as log]
[langohr.basic :as lb]
[langohr.channel :as lch]
[langohr.consumers :as lcons]
[ziggurat.kafka-consumer.consumer-handler :as ch]
[schema.core :as s]
[sentry-clj.async :as sentry]
[taoensso.nippy :as nippy]
[ziggurat.config :refer [get-in-config]]
[ziggurat.kafka-consumer.consumer-handler :as ch]
[ziggurat.mapper :as mpr]
[ziggurat.message-payload :as mp]
[ziggurat.messaging.connection :refer [connection]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.messaging.util :refer :all]
[ziggurat.metrics :as metrics]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.util.error :refer [report-error]]))

(defn- convert-to-message-payload
Expand All @@ -27,7 +26,7 @@
(s/validate mp/message-payload-schema message)
(catch Exception e
(log/info "old message format read, converting to message-payload: " message)
(let [retry-count (or (:retry-count message) 0)
(let [retry-count (or (:retry-count message) 0)
message-payload (mp/->MessagePayload (dissoc message :retry-count) (keyword topic-entity))]
(assoc message-payload :retry-count retry-count)))))

Expand All @@ -51,13 +50,15 @@
(lb/ack ch delivery-tag))

(defn process-message-from-queue [ch meta payload topic-entity processing-fn]
(let [delivery-tag (:delivery-tag meta)
message-payload (convert-and-ack-message ch meta payload false topic-entity)]
(let [delivery-tag (:delivery-tag meta)
message-payload (convert-and-ack-message ch meta payload false topic-entity)
metadata (:metadata message-payload)]
(println "PROCESS MESSAGE FROM QUEUE>>>>>>" message-payload)
(when message-payload
(log/infof "Processing message [%s] from RabbitMQ " message-payload)
(try
(log/debug "Calling processor-fn with the message-payload - " message-payload " with retry count - " (:retry-count message-payload))
(processing-fn message-payload)
(processing-fn message-payload metadata)
(ack-message ch delivery-tag)
(catch Exception e
(lb/reject ch delivery-tag true)
Expand Down Expand Up @@ -144,12 +145,12 @@
"Starts the subscriber to the instant queue of the rabbitmq"
[stream-routes batch-routes]
(doseq [stream-route stream-routes]
(let [topic-entity (first stream-route)
handler (-> stream-route second :handler-fn)
channels (-> stream-route second (dissoc :handler-fn))]
(let [topic-entity (first stream-route)
handler (-> stream-route second :handler-fn)
channels (-> stream-route second (dissoc :handler-fn))]
(start-channels-subscriber channels topic-entity)
(start-retry-subscriber* (mpr/mapper-func handler (keys channels)) topic-entity)))
(doseq [batch-route batch-routes]
(let [topic-entity (first batch-route)
handler (-> batch-route second :handler-fn)]
(let [topic-entity (first batch-route)
handler (-> batch-route second :handler-fn)]
(start-retry-subscriber* (fn [message] (ch/process handler message)) topic-entity))))
Loading

0 comments on commit 4dac309

Please sign in to comment.