Skip to content

Commit

Permalink
Merge 2fcaa7e into f8f322d
Browse files Browse the repository at this point in the history
  • Loading branch information
theanirudhvyas committed Dec 4, 2019
2 parents f8f322d + 2fcaa7e commit 0a186ae
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 128 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. This change

## Unreleased Changes

## 3.1.0-alpha.4 - 2019-12-4
- Reintroduces old metrics format (statsd). Ziggurat now pushes metrics in both formats (statsd and prometheus like).
- Reverts the changes for exponential backoff, the current implementation was broken and a new PR is being raised with the correct approach.

## 3.1.0-alpha.3 - 2019-11-11
- Renames report-time to report-histogram while being backward compatible

Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject tech.gojek/ziggurat "3.1.0-alpha.3"
(defproject tech.gojek/ziggurat "3.1.0-alpha.4"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
14 changes: 8 additions & 6 deletions src/ziggurat/kafka_delay.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
[ziggurat.util.time :refer :all]))

(defn calculate-and-report-kafka-delay
([metric-namespace record-timestamp]
(calculate-and-report-kafka-delay metric-namespace record-timestamp nil))
([metric-namespace record-timestamp additional-tags]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis record-timestamp)]
(metrics/report-histogram metric-namespace delay additional-tags))))
([metric-namespaces record-timestamp]
(calculate-and-report-kafka-delay metric-namespaces record-timestamp nil))
([metric-namespaces record-timestamp additional-tags]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis record-timestamp)
default-namespace (last metric-namespaces)
multi-namespaces [metric-namespaces [default-namespace]]]
(metrics/multi-ns-report-histogram multi-namespaces delay additional-tags))))
91 changes: 50 additions & 41 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,69 +16,78 @@

(defn mapper-func [mapper-fn channels]
(fn [{:keys [topic-entity message] :as message-payload}]
(let [topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
default-namespace "message-processing"
additional-tags {:topic_name topic-entity-name}
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
message-processing-namespace "message-processing"
base-metric-namespaces [service-name topic-entity-name]
message-processing-namespaces (conj base-metric-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name}
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]]
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"]
(metrics/report-histogram execution-time-namespace time-val additional-tags)
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags)
(case return-code
:success (metrics/increment-count default-namespace success-metric additional-tags)
:retry (do (metrics/increment-count default-namespace retry-metric additional-tags)
:success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry message-payload))
:skip (metrics/increment-count default-namespace skip-metric additional-tags)
:skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
:block 'TODO
(do
(send-msg-to-channel channels message-payload return-code)
(metrics/increment-count default-namespace success-metric additional-tags))))
(metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags))))
(catch Throwable e
(producer/retry message-payload)
(sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name))
(metrics/increment-count default-namespace failure-metric additional-tags)))))))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))

(defn channel-mapper-func [mapper-fn channel]
(fn [{:keys [topic-entity message] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
default-namespace "message-processing"
base-namespaces [service-name topic-entity-name channel-name]
metric-namespaces (conj base-namespaces default-namespace)
additional-tags {:topic_name topic-entity-name :channel_name channel-name}
metric-namespace (str/join "." metric-namespaces)
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
message-processing-namespace "message-processing"
base-metric-namespaces [service-name topic-entity-name channel-name]
message-processing-namespaces (conj base-metric-namespaces message-processing-namespace)
additional-tags {:topic_name topic-entity-name :channel_name channel-name}
metric-namespace (str/join "." message-processing-namespaces)
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]]
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"]
(metrics/report-histogram execution-time-namespace time-val additional-tags)
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags)
(case return-code
:success (metrics/increment-count default-namespace success-metric additional-tags)
:retry (do (metrics/increment-count default-namespace retry-metric additional-tags)
:success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry-for-channel message-payload channel))
:skip (metrics/increment-count default-namespace skip-metric additional-tags)
:skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
:block 'TODO
(throw (ex-info "Invalid mapper return code" {:code return-code}))))
(catch Throwable e
(producer/retry-for-channel message-payload channel)
(sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/increment-count default-namespace failure-metric additional-tags)))))))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))

(defrecord MessagePayload [message topic-entity])

Expand Down
35 changes: 25 additions & 10 deletions src/ziggurat/metrics.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
^{:static true} [incrementCount [String String java.util.Map] void]
^{:static true} [decrementCount [String String] void]
^{:static true} [decrementCount [String String java.util.Map] void]
^{:static true} [reportTime [String long] void]
^{:static true} [reportTime [String long java.util.Map] void]]))
^{:static true} [reportTime [String long] void]
^{:static true} [reportTime [String long java.util.Map] void]]))

(defonce metrics-registry
(MetricRegistry.))
Expand Down Expand Up @@ -53,11 +53,16 @@
[names]
(str/join "." names))

(defn remove-topic-tag-for-old-namespace
[additional-tags ns]
(let [topic-name (:topic_name additional-tags)]
(dissoc additional-tags (when (some #(= % topic-name) ns) :topic_name))))

(defn- get-metric-namespaces
[metric-namespaces]
(if (vector? metric-namespaces)
(intercalate-dot metric-namespaces)
metric-namespaces))
(str (:app-name (ziggurat-config)) "." metric-namespaces)))

(defn- get-v
[f d v]
Expand All @@ -76,22 +81,32 @@
(inc-or-dec-count sign {:metric-namespace metric-namespace :metric metric :n n :additional-tags additional-tags}))
([sign {:keys [metric-namespace metric n additional-tags]}]
(let [metric-ns (get-metric-namespaces metric-namespace)
meter ^Meter (mk-meter metric-ns metric (get-map additional-tags))]
meter ^Meter (mk-meter metric-ns metric (remove-topic-tag-for-old-namespace (get-map additional-tags) metric-namespace))]
(.mark meter (sign (get-int n))))))

(def increment-count (partial inc-or-dec-count +))

(def decrement-count (partial inc-or-dec-count -))

(defn multi-ns-increment-count [nss metric additional-tags]
(doseq [ns nss]
(increment-count ns metric additional-tags)))

(defn report-histogram
([metric-namespaces val]
(report-histogram metric-namespaces val nil))
([metric-namespaces val additional-tags]
(let [metric-namespace (get-metric-namespaces metric-namespaces)
histogram ^Histogram (mk-histogram metric-namespace "all" additional-tags)]
histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))]
(.update histogram (get-int val)))))

(def report-time report-histogram) ;; for backward compatibility
(def report-time report-histogram) ;; for backward compatibility

(defn multi-ns-report-histogram [nss time-val additional-tags]
(doseq [ns nss]
(report-histogram ns time-val additional-tags)))

(def multi-ns-report-time multi-ns-report-histogram) ;; for backward compatibility

(defn start-statsd-reporter [statsd-config env]
(let [{:keys [enabled host port]} statsd-config]
Expand All @@ -101,10 +116,10 @@
(.withPort port)
(.build))

reporter (-> (DatadogReporter/forRegistry metrics-registry)
(.withTransport transport)
(.withTags [(str env)])
(.build))]
reporter (-> (DatadogReporter/forRegistry metrics-registry)
(.withTransport transport)
(.withTags [(str env)])
(.build))]
(log/info "Starting statsd reporter")
(.start reporter 1 TimeUnit/SECONDS)
{:reporter reporter :transport transport}))))
Expand Down
6 changes: 4 additions & 2 deletions src/ziggurat/middleware/default.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
(catch Throwable e
(let [service-name (:app-name (ziggurat-config))
additional-tags {:topic_name topic-entity-name}
default-namespace "message-parsing"]
default-namespace "message-parsing"
metric-namespaces [service-name topic-entity-name default-namespace]
multi-namespaces [metric-namespaces [default-namespace]]]
(sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class))
(metrics/increment-count default-namespace "failed" additional-tags)
(metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags)
nil)))
message))

Expand Down
10 changes: 7 additions & 3 deletions src/ziggurat/middleware/json.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"
(:require [cheshire.core :refer [parse-string]]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.metrics :as metrics]))

Expand All @@ -12,10 +13,13 @@
(try
(parse-string message key-fn)
(catch Exception e
(let [additional-tags {:topic_name (name topic-entity)}
default-namespace "json-message-parsing"]
(let [service-name (:app-name (ziggurat-config))
additional-tags {:topic_name (name topic-entity)}
default-namespace "json-message-parsing"
metric-namespaces [service-name topic-entity default-namespace]
multi-namespaces [metric-namespaces [default-namespace]]]
(sentry/report-error sentry-reporter e (str "Could not parse JSON message " message))
(metrics/increment-count default-namespace "failed" additional-tags)
(metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags)
nil))))

(defn parse-json
Expand Down
15 changes: 9 additions & 6 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,14 @@
(set-encoding-config key-deserializer-encoding value-deserializer-encoding deserializer-encoding)))

(defn- log-and-report-metrics [topic-entity message]
(let [topic-entity-name (name topic-entity)
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
additional-tags {:topic_name topic-entity-name}
default-namespace "message"
metric-namespaces [service-name topic-entity-name default-namespace]
multi-namespaces [metric-namespaces [default-namespace]]
metric "read"]
(metrics/increment-count default-namespace metric additional-tags))
(metrics/multi-ns-increment-count multi-namespaces metric additional-tags))
message)

(defn store-supplier-builder []
Expand All @@ -110,9 +113,9 @@
(.mapValues stream-builder (value-mapper mapper-fn)))

(defn- timestamp-transformer-supplier
[metric-namespace oldest-processed-message-in-s additional-tags]
[metric-namespaces oldest-processed-message-in-s additional-tags]
(reify TransformerSupplier
(get [_] (timestamp-transformer/create metric-namespace oldest-processed-message-in-s additional-tags))))
(get [_] (timestamp-transformer/create metric-namespaces oldest-processed-message-in-s additional-tags))))

(defn- header-transformer-supplier
[]
Expand All @@ -121,9 +124,9 @@

(defn- timestamp-transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
(let [service-name (:app-name (ziggurat-config))
metric-namespace "message-received-delay-histogram"
metric-namespaces [service-name topic-entity-name "message-received-delay-histogram"]
additional-tags {:topic_name topic-entity-name}]
(.transform stream-builder (timestamp-transformer-supplier metric-namespace oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))
(.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))

(defn- header-transform-values [stream-builder]
(.transformValues stream-builder (header-transformer-supplier) (into-array [(.name (store-supplier-builder))])))
Expand Down
Loading

0 comments on commit 0a186ae

Please sign in to comment.