Skip to content

Commit

Permalink
Merge pull request #49 from macalimlim/goj-88147-for-2.x
Browse files Browse the repository at this point in the history
add support for providing topic-name in the metrics
  • Loading branch information
theanirudhvyas committed Jun 6, 2019
2 parents 317f352 + 53a9fe9 commit 02ce64d
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 225 deletions.
15 changes: 9 additions & 6 deletions src/ziggurat/kafka_delay.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
(:require [ziggurat.metrics :as metrics]
[ziggurat.util.time :refer :all]))

(defn calculate-and-report-kafka-delay [metric-namespace record-timestamp]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis
record-timestamp)]
(metrics/report-time metric-namespace delay)))

(defn calculate-and-report-kafka-delay
([metric-namespaces record-timestamp]
(calculate-and-report-kafka-delay metric-namespaces record-timestamp nil))
([metric-namespaces record-timestamp additional-tags]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis record-timestamp)
default-namespace (last metric-namespaces)
multi-namespaces [metric-namespaces [default-namespace]]]
(metrics/multi-ns-report-time multi-namespaces delay additional-tags))))
79 changes: 52 additions & 27 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns ziggurat.mapper
(:require [sentry-clj.async :as sentry]
[ziggurat.metrics :as metrics]
[ziggurat.messaging.producer :as producer]
[ziggurat.metrics :as metrics]
[ziggurat.new-relic :as nr]
[ziggurat.sentry :refer [sentry-reporter]])
(:import (java.time Instant)))
Expand All @@ -15,47 +15,72 @@
(fn [message]
(let [topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
metric-namespace (str topic-entity-name ".message-processing")]
default-namespace "message-processing"
metric-namespaces [topic-entity-name default-namespace]
additional-tags {:topic-name topic-entity-name}
default-namespaces [default-namespace]
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-namespaces [metric-namespaces default-namespaces]]
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))]
(metrics/report-time (str topic-entity-name ".handler-fn-execution-time") (- end-time start-time))
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
multi-execution-time-namespaces [[topic-entity-name execution-time-namespace]
[execution-time-namespace]]]
(metrics/multi-ns-report-time multi-execution-time-namespaces time-val)
(case return-code
:success (metrics/increment-count metric-namespace "success")
:retry (do (metrics/increment-count metric-namespace "retry")
(producer/retry message topic-entity))
:skip (metrics/increment-count metric-namespace "skip")
:block 'TODO
:success (do (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags))
:retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags)
(producer/retry message topic-entity))
:skip (do (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags))
:block 'TODO
(do
(send-msg-to-channel channels message topic-entity return-code)
(metrics/increment-count metric-namespace "success"))))
(metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags))))
(catch Throwable e
(producer/retry message topic-entity)
(sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name))
(metrics/increment-count metric-namespace "failure")))))))
(metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags)))))))

(defn channel-mapper-func [mapper-fn topic-entity channel]
(fn [message]
(let [topic-entity-name (name topic-entity)
channel-name (name channel)
metric-namespace (str topic-entity-name "." channel-name)
message-processing-namespace (str metric-namespace ".message-processing")]
(let [topic-entity-name (name topic-entity)
channel-name (name channel)
default-namespace "message-processing"
base-namespaces [topic-entity-name channel-name]
metric-namespaces (conj base-namespaces default-namespace)
additional-tags {:topic-name topic-entity-name}
default-namespaces [default-namespace]
metric-namespace (apply str (interpose "." metric-namespaces))
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-namespaces [metric-namespaces default-namespaces]]
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))]
(metrics/report-time (str metric-namespace ".execution-time") (- end-time start-time))
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespaces [(conj base-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-time multi-execution-time-namespaces time-val)
(case return-code
:success (metrics/increment-count message-processing-namespace "success")
:retry (do (metrics/increment-count message-processing-namespace "retry")
(producer/retry-for-channel message topic-entity channel))
:skip (metrics/increment-count message-processing-namespace "skip")
:block 'TODO
:success (do (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags))
:retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags)
(producer/retry-for-channel message topic-entity channel))
:skip (do (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags))
:block 'TODO
(throw (ex-info "Invalid mapper return code" {:code return-code}))))
(catch Throwable e
(producer/retry-for-channel message topic-entity channel)
(sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/increment-count message-processing-namespace "failure")))))))
(metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags)))))))
84 changes: 55 additions & 29 deletions src/ziggurat/metrics.clj
Original file line number Diff line number Diff line change
@@ -1,45 +1,71 @@
(ns ziggurat.metrics
(:require [clojure.tools.logging :as log])
(:require [clojure.tools.logging :as log]
[clojure.walk :refer [stringify-keys]])
(:import (com.gojek.metrics.datadog DatadogReporter)
(com.gojek.metrics.datadog.transport UdpTransport$Builder UdpTransport)
(java.util.concurrent TimeUnit)
(io.dropwizard.metrics5 MetricRegistry Meter MetricName Histogram)))
(io.dropwizard.metrics5 MetricRegistry Meter MetricName Histogram)
(java.util.concurrent TimeUnit)))

(defonce ^:private group (atom nil))

(defonce metrics-registry
(MetricRegistry.))

(defn- merge-tags
[additional-tags]
(let [default-tags {"actor" @group}]
(merge default-tags (when (not (empty? additional-tags))
(stringify-keys additional-tags)))))

(defn mk-meter
[category metric]
(let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric]))
tagged-metric (.tagged ^MetricName metric-name ^"[Ljava.lang.String;" (into-array String ["actor" @group]))]
(.meter ^MetricRegistry metrics-registry ^MetricName tagged-metric)))
([category metric]
(mk-meter category metric nil))
([category metric additional-tags]
(let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric]))
tags (merge-tags additional-tags)
tagged-metric (.tagged ^MetricName metric-name tags)]
(.meter ^MetricRegistry metrics-registry ^MetricName tagged-metric))))

(defn mk-histogram
[category metric]
(let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric]))
tagged-metric (.tagged ^MetricName metric-name ^"[Ljava.lang.String;" (into-array String ["actor" @group]))]
(.histogram ^MetricRegistry metrics-registry tagged-metric)))
([category metric]
(mk-histogram category metric nil))
([category metric additional-tags]
(let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric]))
tags (merge-tags additional-tags)
tagged-metric (.tagged ^MetricName metric-name tags)]
(.histogram ^MetricRegistry metrics-registry tagged-metric))))

(defn- intercalate-dot
[names]
(apply str (interpose "." names)))

(defn- inc-or-dec-count
[sign metric-namespaces metric additional-tags]
(let [metric-namespace (intercalate-dot metric-namespaces)
meter ^Meter (mk-meter metric-namespace metric additional-tags)]
(.mark meter (sign 1))))

(defn increment-count
([metric-namespace metric]
(increment-count metric-namespace metric 1))
([metric-namespace metric n]
(let [meter ^Meter (mk-meter metric-namespace metric)]
(.mark meter (int n)))))
(def increment-count (partial inc-or-dec-count +))

(defn decrement-count
([metric-namespace metric]
(decrement-count metric-namespace metric 1))
([metric-namespace metric n]
(let [meter ^Meter (mk-meter metric-namespace metric)]
(.mark meter (int (- n))))))
(def decrement-count (partial inc-or-dec-count -))

(defn report-time [metric-namespace time-val]
(let [histogram ^Histogram (mk-histogram metric-namespace "all")]
(defn multi-ns-increment-count [nss metric additional-tags]
(doseq [ns nss]
(increment-count ns metric additional-tags)))

(defn report-time
[metric-namespaces time-val additional-tags]
(let [metric-namespace (intercalate-dot metric-namespaces)
histogram ^Histogram (mk-histogram metric-namespace "all" additional-tags)]
(.update histogram (int time-val))))

(defn multi-ns-report-time
([nss time-val]
(multi-ns-report-time nss time-val nil))
([nss time-val additional-tags]
(doseq [ns nss]
(report-time ns time-val additional-tags))))

(defn start-statsd-reporter [statsd-config env app-name]
(let [{:keys [enabled host port]} statsd-config]
(when enabled
Expand All @@ -48,10 +74,10 @@
(.withPort port)
(.build))

reporter (-> (DatadogReporter/forRegistry metrics-registry)
(.withTransport transport)
(.withTags [(str env)])
(.build))]
reporter (-> (DatadogReporter/forRegistry metrics-registry)
(.withTransport transport)
(.withTags [(str env)])
(.build))]
(log/info "Starting statsd reporter")
(.start reporter 1 TimeUnit/SECONDS)
(reset! group app-name)
Expand Down
73 changes: 40 additions & 33 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,29 @@
(:require [clojure.tools.logging :as log]
[flatland.protobuf.core :as proto]
[mount.core :as mount :refer [defstate]]
[ziggurat.metrics :as metrics]
[ziggurat.config :refer [ziggurat-config]]
[sentry-clj.async :as sentry]
[ziggurat.channel :as chl]
[ziggurat.util.map :as umap]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.mapper :as mpr]
[ziggurat.metrics :as metrics]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.timestamp-transformer :as transformer]
[ziggurat.sentry :refer [sentry-reporter]])
(:import [java.util.regex Pattern]
[java.util Properties]
[ziggurat.util.map :as umap])
(:import [java.util Properties]
[java.util.regex Pattern]
[org.apache.kafka.clients.consumer ConsumerConfig]
[org.apache.kafka.common.serialization Serdes]
[org.apache.kafka.common.utils SystemTime]
[org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology]
[org.apache.kafka.streams.kstream ValueMapper TransformerSupplier]
[org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier]
[org.apache.kafka.common.utils SystemTime]
[ziggurat.timestamp_transformer IngestionTimeExtractor]))

(def default-config-for-stream
{:buffered-records-per-partition 10000
:commit-interval-ms 15000
:auto-offset-reset-config "latest"
:oldest-processed-message-in-s 604800
{:buffered-records-per-partition 10000
:commit-interval-ms 15000
:auto-offset-reset-config "latest"
:oldest-processed-message-in-s 604800
:changelog-topic-replication-factor 3})

(defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms changelog-topic-replication-factor]}]
Expand All @@ -42,12 +42,15 @@
(.put StreamsConfig/REPLICATION_FACTOR_CONFIG (int changelog-topic-replication-factor))
(.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config)))

(defn- get-metric-namespace [default topic]
(str (name topic) "." default))

(defn- log-and-report-metrics [topic-entity message]
(let [message-read-metric-namespace (get-metric-namespace "message" (name topic-entity))]
(metrics/increment-count message-read-metric-namespace "read"))
(let [topic-entity-name (name topic-entity)
additional-tags {:topic-name topic-entity-name}
default-namespace "message"
metric-namespaces [topic-entity-name default-namespace]
default-namespaces [default-namespace]
metric "read"
multi-namespaces [metric-namespaces default-namespaces]]
(metrics/multi-ns-increment-count multi-namespaces metric additional-tags))
message)

(defn store-supplier-builder []
Expand All @@ -63,15 +66,17 @@
(defn- map-values [mapper-fn stream-builder]
(.mapValues stream-builder (value-mapper mapper-fn)))

(defn- transformer-supplier [metric-namespace oldest-processed-message-in-s]
(defn- transformer-supplier
[metric-namespaces oldest-processed-message-in-s additional-tags]
(reify TransformerSupplier
(get [_] (transformer/create metric-namespace oldest-processed-message-in-s))))
(get [_] (transformer/create metric-namespaces oldest-processed-message-in-s additional-tags))))

(defn- transform-values [topic-entity oldest-processed-message-in-s stream-builder]
(let [metric-namespace (get-metric-namespace "message-received-delay-histogram" topic-entity)]
(.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s) (into-array [(.name (store-supplier-builder))]))))
(defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
(let [metric-namespaces [topic-entity-name "message-received-delay-histogram"]
additional-tags {:topic-name topic-entity-name}]
(.transform stream-builder (transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))

(defn- protobuf->hash [message proto-class]
(defn- protobuf->hash [message proto-class topic-entity-name]
(try
(let [proto-klass (-> proto-class
java.lang.Class/forName
Expand All @@ -83,9 +88,11 @@
keys)]
(select-keys loaded-proto proto-keys))
(catch Throwable e
(sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class))
(metrics/increment-count "message-parsing" "failed")
nil)))
(let [additional-tags {:topic-name topic-entity-name}
metric-namespaces ["message-parsing"]]
(sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class))
(metrics/increment-count metric-namespaces "failed" additional-tags)
nil))))

(defn- topology [handler-fn {:keys [origin-topic proto-class oldest-processed-message-in-s]} topic-entity channels]
(let [builder (StreamsBuilder.)
Expand All @@ -94,7 +101,7 @@
(.addStateStore builder (store-supplier-builder))
(->> (.stream builder topic-pattern)
(transform-values topic-entity-name oldest-processed-message-in-s)
(map-values #(protobuf->hash % proto-class))
(map-values #(protobuf->hash % proto-class topic-entity-name))
(map-values #(log-and-report-metrics topic-entity-name %))
(map-values #((mpr/mapper-func handler-fn topic-entity channels) %)))
(.build builder)))
Expand All @@ -108,13 +115,13 @@
(start-streams stream-routes (ziggurat-config)))
([stream-routes stream-configs]
(reduce (fn [streams stream]
(let [topic-entity (first stream)
(let [topic-entity (first stream)
topic-handler-fn (-> stream second :handler-fn)
channels (chl/get-keys-for-topic stream-routes topic-entity)
stream-config (-> stream-configs
(get-in [:stream-router topic-entity])
(umap/deep-merge default-config-for-stream))
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
channels (chl/get-keys-for-topic stream-routes topic-entity)
stream-config (-> stream-configs
(get-in [:stream-router topic-entity])
(umap/deep-merge default-config-for-stream))
stream (start-stream* topic-handler-fn stream-config topic-entity channels)]
(.start stream)
(conj streams stream)))
[]
Expand All @@ -128,4 +135,4 @@
:start (do (log/info "Starting Kafka stream")
(start-streams (:stream-routes (mount/args)) (ziggurat-config)))
:stop (do (log/info "Stopping Kafka stream")
(stop-streams stream)))
(stop-streams stream)))
Loading

0 comments on commit 02ce64d

Please sign in to comment.