Skip to content

Commit

Permalink
Merge 3ba63b8 into be27bb7
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed Jul 26, 2019
2 parents be27bb7 + 3ba63b8 commit da5a868
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 228 deletions.
14 changes: 6 additions & 8 deletions src/ziggurat/kafka_delay.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
[ziggurat.util.time :refer :all]))

(defn calculate-and-report-kafka-delay
([metric-namespaces record-timestamp]
(calculate-and-report-kafka-delay metric-namespaces record-timestamp nil))
([metric-namespaces record-timestamp additional-tags]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis record-timestamp)
default-namespace (last metric-namespaces)
multi-namespaces [metric-namespaces [default-namespace]]]
(metrics/multi-ns-report-time multi-namespaces delay additional-tags))))
([metric-namespace record-timestamp]
(calculate-and-report-kafka-delay metric-namespace record-timestamp nil))
([metric-namespace record-timestamp additional-tags]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis record-timestamp)]
(metrics/report-time metric-namespace delay additional-tags))))
87 changes: 38 additions & 49 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,85 +16,74 @@

(defn mapper-func [mapper-fn channels]
(fn [message-payload]
(let [service-name (:app-name (ziggurat-config))
topic-entity (:topic-entity message-payload)
(let [topic-entity (:topic-entity message-payload)
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
default-namespace "message-processing"
metric-namespaces [service-name topic-entity-name default-namespace]
additional-tags {:topic_name topic-entity-name}
default-namespaces [default-namespace]
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-namespaces [metric-namespaces default-namespaces]]
failure-metric "failure"]
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn (:message message-payload))
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
multi-execution-time-namespaces [[service-name topic-entity-name execution-time-namespace]
[execution-time-namespace]]]
(metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags)
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn (:message message-payload))
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"]
(metrics/report-time execution-time-namespace time-val additional-tags)
(case return-code
:success (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags)
:success (metrics/increment-count default-namespace success-metric additional-tags)
:retry (do (metrics/increment-count default-namespace retry-metric additional-tags)
(producer/retry message-payload))
:skip (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags)
:skip (metrics/increment-count default-namespace skip-metric additional-tags)
:block 'TODO
(do
(send-msg-to-channel channels message-payload return-code)
(metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags))))
(metrics/increment-count default-namespace success-metric additional-tags))))
(catch Throwable e
(producer/retry message-payload)
(sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name))
(metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags)))))))
(metrics/increment-count default-namespace failure-metric additional-tags)))))))

(defn channel-mapper-func [mapper-fn channel]
(fn [message-payload]
(let [service-name (:app-name (ziggurat-config))
topic-entity (:topic-entity message-payload)
topic-entity-name (name topic-entity)
channel-name (name channel)
default-namespace "message-processing"
base-namespaces [service-name topic-entity-name channel-name]
metric-namespaces (conj base-namespaces default-namespace)
additional-tags {:topic_name topic-entity-name :channel_name channel-name}
default-namespaces [default-namespace]
metric-namespace (str/join "." metric-namespaces)
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-namespaces [metric-namespaces default-namespaces]]
(fn [{topic-entity :topic-entity message :message :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
default-namespace "message-processing"
base-namespaces [service-name topic-entity-name channel-name]
metric-namespaces (conj base-namespaces default-namespace)
additional-tags {:topic_name topic-entity-name :channel_name channel-name}
metric-namespace (str/join "." metric-namespaces)
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"]
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn (:message message-payload))
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespaces [(conj base-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags)
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"]
(metrics/report-time execution-time-namespace time-val additional-tags)
(case return-code
:success (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags)
:success (metrics/increment-count default-namespace success-metric additional-tags)
:retry (do (metrics/increment-count default-namespace retry-metric additional-tags)
(producer/retry-for-channel message-payload channel))
:skip (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags)
:skip (metrics/increment-count default-namespace skip-metric additional-tags)
:block 'TODO
(throw (ex-info "Invalid mapper return code" {:code return-code}))))
(catch Throwable e
(producer/retry-for-channel message-payload channel)
(sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags)))))))
(metrics/increment-count default-namespace failure-metric additional-tags)))))))

(defrecord MessagePayload [message topic-entity])

(s/defschema message-payload-schema
{:message s/Any
:topic-entity s/Keyword
{:message s/Any
:topic-entity s/Keyword
(s/optional-key :retry-count) s/Int})
25 changes: 9 additions & 16 deletions src/ziggurat/metrics.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,32 @@
[names]
(str/join "." names))

(defn remove-topic-tag-for-old-namespace
[additional-tags ns]
(let [topic-name (:topic_name additional-tags)]
(dissoc additional-tags (when (some #(= % topic-name) ns) :topic_name))))
(defn- get-metric-namespaces
[metric-namespaces]
(if (vector? metric-namespaces)
(intercalate-dot metric-namespaces)
metric-namespaces))

(defn- inc-or-dec-count
([sign metric-namespace metric]
(inc-or-dec-count sign metric-namespace metric nil))
([sign metric-namespaces metric additional-tags]
(let [metric-namespace (intercalate-dot metric-namespaces)
meter ^Meter (mk-meter metric-namespace metric (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))]
(let [metric-namespace (get-metric-namespaces metric-namespaces)
meter ^Meter (mk-meter metric-namespace metric additional-tags)]
(.mark meter (sign 1)))))

(def increment-count (partial inc-or-dec-count +))

(def decrement-count (partial inc-or-dec-count -))

(defn multi-ns-increment-count [nss metric additional-tags]
(doseq [ns nss]
(increment-count ns metric additional-tags)))

(defn report-time
([metric-namespaces time-val]
(report-time metric-namespaces time-val nil))
([metric-namespaces time-val additional-tags]
(let [metric-namespace (intercalate-dot metric-namespaces)
histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))]
(let [metric-namespace (get-metric-namespaces metric-namespaces)
histogram ^Histogram (mk-histogram metric-namespace "all" additional-tags)]
(.update histogram (int time-val)))))

(defn multi-ns-report-time [nss time-val additional-tags]
(doseq [ns nss]
(report-time ns time-val additional-tags)))

(defn start-statsd-reporter [statsd-config env]
(let [{:keys [enabled host port]} statsd-config]
(when enabled
Expand Down
10 changes: 4 additions & 6 deletions src/ziggurat/middleware/default.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
(ns ziggurat.middleware.default
(:require [flatland.protobuf.core :as proto]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.metrics :as metrics]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.config :refer [ziggurat-config]]))
[ziggurat.sentry :refer [sentry-reporter]]))

(defn- deserialise-message
"This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a
Expand All @@ -24,11 +24,9 @@
(catch Throwable e
(let [service-name (:app-name (ziggurat-config))
additional-tags {:topic_name topic-entity-name}
default-namespace "message-parsing"
metric-namespaces [service-name "message-parsing"]
multi-namespaces [metric-namespaces [default-namespace]]]
default-namespace "message-parsing"]
(sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class))
(metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags)
(metrics/increment-count default-namespace "failed" additional-tags)
nil)))
message))

Expand Down
26 changes: 11 additions & 15 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,11 @@
(set-upgrade-from-config upgrade-from)))

(defn- log-and-report-metrics [topic-entity message]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
additional-tags {:topic_name topic-entity-name}
default-namespace "message"
metric-namespaces [service-name topic-entity-name default-namespace]
default-namespaces [default-namespace]
metric "read"
multi-namespaces [metric-namespaces default-namespaces]]
(metrics/multi-ns-increment-count multi-namespaces metric additional-tags))
(let [topic-entity-name (name topic-entity)
additional-tags {:topic_name topic-entity-name}
default-namespace "message"
metric "read"]
(metrics/increment-count default-namespace metric additional-tags))
message)

(defn store-supplier-builder []
Expand All @@ -86,15 +82,15 @@
(.mapValues stream-builder (value-mapper mapper-fn)))

(defn- transformer-supplier
[metric-namespaces oldest-processed-message-in-s additional-tags]
[metric-namespace oldest-processed-message-in-s additional-tags]
(reify TransformerSupplier
(get [_] (transformer/create metric-namespaces oldest-processed-message-in-s additional-tags))))
(get [_] (transformer/create metric-namespace oldest-processed-message-in-s additional-tags))))

(defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
(let [service-name (:app-name (ziggurat-config))
metric-namespaces [service-name topic-entity-name "message-received-delay-histogram"]
additional-tags {:topic_name topic-entity-name}]
(.transform stream-builder (transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))
(let [service-name (:app-name (ziggurat-config))
metric-namespace "message-received-delay-histogram"
additional-tags {:topic_name topic-entity-name}]
(.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))

(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels]
(let [builder (StreamsBuilder.)
Expand Down
4 changes: 2 additions & 2 deletions src/ziggurat/timestamp_transformer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
(get-current-time-in-millis)
ingestion-time))))

(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespaces oldest-processed-message-in-s additional-tags] Transformer
(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespace oldest-processed-message-in-s additional-tags] Transformer
(^void init [_ ^ProcessorContext context]
(do (set! processor-context context)
nil))
(transform [_ record-key record-value]
(let [message-time (.timestamp processor-context)]
(when (message-to-process? message-time oldest-processed-message-in-s)
(calculate-and-report-kafka-delay metric-namespaces message-time additional-tags)
(calculate-and-report-kafka-delay metric-namespace message-time additional-tags)
(KeyValue/pair record-key record-value))))
(close [_] nil))

Expand Down
Loading

0 comments on commit da5a868

Please sign in to comment.