Skip to content

Commit

Permalink
Merge fc925ae into dc012ee
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed May 27, 2019
2 parents dc012ee + fc925ae commit c65dc76
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 62 deletions.
14 changes: 8 additions & 6 deletions src/ziggurat/kafka_delay.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
(:require [ziggurat.metrics :as metrics]
[ziggurat.util.time :refer :all]))

(defn calculate-and-report-kafka-delay [metric-namespace record-timestamp]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis
record-timestamp)]
(metrics/report-time metric-namespace delay)))

(defn calculate-and-report-kafka-delay
([metric-namespace record-timestamp topic-entity-name]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis
record-timestamp)]
(metrics/report-time metric-namespace delay topic-entity-name)))
([metric-namespace record-timestamp]
(calculate-and-report-kafka-delay metric-namespace record-timestamp nil)))
26 changes: 17 additions & 9 deletions src/ziggurat/metrics.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
(:require [clojure.tools.logging :as log])
(:import (com.gojek.metrics.datadog DatadogReporter)
(com.gojek.metrics.datadog.transport UdpTransport$Builder UdpTransport)
(java.util.concurrent TimeUnit)
(io.dropwizard.metrics5 MetricRegistry Meter MetricName Histogram)))
(io.dropwizard.metrics5 MetricRegistry Meter MetricName Histogram)
(java.util.concurrent TimeUnit)))

(defonce ^:private group (atom nil))

Expand All @@ -17,10 +17,15 @@
(.meter ^MetricRegistry metrics-registry ^MetricName tagged-metric)))

(defn mk-histogram
[category metric]
(let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric]))
tagged-metric (.tagged ^MetricName metric-name ^"[Ljava.lang.String;" (into-array String ["actor" @group]))]
(.histogram ^MetricRegistry metrics-registry tagged-metric)))
([category metric]
(mk-histogram category metric nil))
([category metric topic-entity-name]
(let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric]))
tags (if (nil? topic-entity-name)
["actor" @group]
["actor" @group "topic_name" topic-entity-name])
tagged-metric (.tagged ^MetricName metric-name ^"[Ljava.lang.String;" (into-array String tags))]
(.histogram ^MetricRegistry metrics-registry tagged-metric))))

(defn increment-count
([metric-namespace metric]
Expand All @@ -36,9 +41,12 @@
(let [meter ^Meter (mk-meter metric-namespace metric)]
(.mark meter (int (- n))))))

(defn report-time [metric-namespace time-val]
(let [histogram ^Histogram (mk-histogram metric-namespace "all")]
(.update histogram (int time-val))))
(defn report-time
([metric-namespace time-val topic-entity-name]
(let [histogram ^Histogram (mk-histogram metric-namespace "all" topic-entity-name)]
(.update histogram (int time-val))))
([metric-namespace time-val]
(report-time metric-namespace time-val nil)))

(defn start-statsd-reporter [statsd-config env app-name]
(let [{:keys [enabled host port]} statsd-config]
Expand Down
32 changes: 18 additions & 14 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@
(:require [clojure.tools.logging :as log]
[flatland.protobuf.core :as proto]
[mount.core :as mount :refer [defstate]]
[ziggurat.metrics :as metrics]
[ziggurat.config :refer [ziggurat-config]]
[sentry-clj.async :as sentry]
[ziggurat.channel :as chl]
[ziggurat.util.map :as umap]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.mapper :as mpr]
[ziggurat.metrics :as metrics]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.timestamp-transformer :as transformer]
[ziggurat.sentry :refer [sentry-reporter]])
(:import [java.util.regex Pattern]
[java.util Properties]
[ziggurat.util.map :as umap])
(:import [java.util Properties]
[java.util.regex Pattern]
[org.apache.kafka.clients.consumer ConsumerConfig]
[org.apache.kafka.common.serialization Serdes]
[org.apache.kafka.common.utils SystemTime]
[org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology]
[org.apache.kafka.streams.kstream ValueMapper TransformerSupplier]
[org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier]
[org.apache.kafka.common.utils SystemTime]
[ziggurat.timestamp_transformer IngestionTimeExtractor]))

(def default-config-for-stream
Expand Down Expand Up @@ -83,13 +83,17 @@
(defn- map-values [mapper-fn stream-builder]
(.mapValues stream-builder (value-mapper mapper-fn)))

(defn- transformer-supplier [metric-namespace oldest-processed-message-in-s]
(reify TransformerSupplier
(get [_] (transformer/create metric-namespace oldest-processed-message-in-s))))
(defn- transformer-supplier
([metric-namespace oldest-processed-message-in-s]
(reify TransformerSupplier
(get [_] (transformer/create metric-namespace oldest-processed-message-in-s))))
([metric-namespace oldest-processed-message-in-s topic-entity-name]
(reify TransformerSupplier
(get [_] (transformer/create metric-namespace oldest-processed-message-in-s topic-entity-name)))))

(defn- transform-values [topic-entity oldest-processed-message-in-s stream-builder]
(let [metric-namespace (get-metric-namespace "message-received-delay-histogram" topic-entity)]
(.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s) (into-array [(.name (store-supplier-builder))]))))
(defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
(let [metric-namespace (get-metric-namespace "message-received-delay-histogram" topic-entity-name)]
(.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s topic-entity-name) (into-array [(.name (store-supplier-builder))]))))

(defn- protobuf->hash [message proto-class]
(try
Expand Down Expand Up @@ -148,4 +152,4 @@
:start (do (log/info "Starting Kafka stream")
(start-streams (:stream-routes (mount/args)) (ziggurat-config)))
:stop (do (log/info "Stopping Kafka stream")
(stop-streams stream)))
(stop-streams stream)))
11 changes: 7 additions & 4 deletions src/ziggurat/timestamp_transformer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
(get-current-time-in-millis)
ingestion-time))))

(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespace oldest-processed-message-in-s] Transformer
(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespace oldest-processed-message-in-s topic-entity-name] Transformer
(^void init [_ ^ProcessorContext context]
(do (set! processor-context context)
nil))
(transform [_ record-key record-value]
(let [message-time (.timestamp processor-context)]
(when (message-to-process? message-time oldest-processed-message-in-s)
(calculate-and-report-kafka-delay metric-namespace message-time)
(calculate-and-report-kafka-delay metric-namespace message-time topic-entity-name)
(KeyValue/pair record-key record-value))))
(close [_] nil))

(defn create [metric-namespace process-message-since-in-s]
(TimestampTransformer. nil metric-namespace process-message-since-in-s))
(defn create
([metric-namespace process-message-since-in-s]
(create metric-namespace process-message-since-in-s nil))
([metric-namespace process-message-since-in-s topic-entity-name]
(TimestampTransformer. nil metric-namespace process-message-since-in-s topic-entity-name)))
21 changes: 11 additions & 10 deletions test/ziggurat/kafka_delay_test.clj
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
(ns ziggurat.kafka-delay-test
(:require [clojure.test :refer :all]
[ziggurat.kafka-delay :refer :all]
[ziggurat.util.time :refer [get-current-time-in-millis]]
[ziggurat.metrics :as metrics]))
[ziggurat.metrics :as metrics]
[ziggurat.util.time :refer [get-current-time-in-millis]]))

(deftest calculate-and-report-kafka-delay-test
(testing "calculates and reports the timestamp delay"
(let [record-timestamp 1528720767777
current-time 1528720768777
expected-delay 1000
namespace "test"]
(let [record-timestamp 1528720767777
current-time 1528720768777
expected-delay 1000
namespace "test"
expected-topic-entity-name "expected-topic-entity-name"]
(with-redefs [get-current-time-in-millis (constantly current-time)
metrics/report-time (fn [metric-namespace delay]
metrics/report-time (fn [metric-namespace delay topic-entity-name]
(is (= delay expected-delay))
(is (= metric-namespace namespace)))]
(calculate-and-report-kafka-delay namespace record-timestamp)))))

(is (= metric-namespace namespace))
(is (= topic-entity-name expected-topic-entity-name)))]
(calculate-and-report-kafka-delay namespace record-timestamp expected-topic-entity-name)))))
16 changes: 9 additions & 7 deletions test/ziggurat/metrics_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@

(deftest report-time-test
(testing "updates time-val"
(let [metric-ns "metric-ns"
time-val 10
mk-histogram-args (atom nil)
reservoir (UniformReservoir.)
histogram (Histogram. reservoir)]
(with-redefs [metrics/mk-histogram (fn [metric-ns metric]
(let [metric-ns "metric-ns"
time-val 10
mk-histogram-args (atom nil)
reservoir (UniformReservoir.)
histogram (Histogram. reservoir)
expected-topic-entity-name "expected-topic-entity-name"]
(with-redefs [metrics/mk-histogram (fn [metric-ns metric topic-entity-name]
(is (= topic-entity-name expected-topic-entity-name))
(reset! mk-histogram-args {:metric-namespace metric-ns
:metric metric})
histogram)]
(metrics/report-time metric-ns time-val)
(metrics/report-time metric-ns time-val expected-topic-entity-name)
(is (= 1 (.getCount histogram)))
(is (= metric-ns (:metric-namespace @mk-histogram-args)))
(is (= "all" (:metric @mk-histogram-args)))))))
26 changes: 14 additions & 12 deletions test/ziggurat/timestamp_transformer_test.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
(ns ziggurat.timestamp-transformer-test
(:require [clojure.test :refer :all]
[ziggurat.timestamp-transformer :refer :all]
[ziggurat.metrics :as metrics]
[ziggurat.timestamp-transformer :refer :all]
[ziggurat.util.time :refer :all])
(:import [org.apache.kafka.streams.processor ProcessorContext]
[org.apache.kafka.clients.consumer ConsumerRecord]
(:import [org.apache.kafka.clients.consumer ConsumerRecord]
[org.apache.kafka.streams.processor ProcessorContext]
[ziggurat.timestamp_transformer IngestionTimeExtractor]))

(deftest ingestion-time-extractor-test
Expand All @@ -28,16 +28,18 @@

(deftest timestamp-transformer-test
(testing "creates a timestamp-transformer object that calculates and reports timestamp delay"
(let [metric-namespace "test.message-received-delay-histogram"
record-timestamp 1528720767777
context (reify ProcessorContext
(timestamp [_] record-timestamp))
current-time 1528720768777
timestamp-transformer (create metric-namespace current-time)
expected-delay 1000]
(let [metric-namespace "test.message-received-delay-histogram"
record-timestamp 1528720767777
context (reify ProcessorContext
(timestamp [_] record-timestamp))
current-time 1528720768777
expected-topic-entity-name "expected-topic-entity-name"
timestamp-transformer (create metric-namespace current-time expected-topic-entity-name)
expected-delay 1000]
(.init timestamp-transformer context)
(with-redefs [get-current-time-in-millis (constantly current-time)
metrics/report-time (fn [namespace delay]
metrics/report-time (fn [namespace delay topic-entity-name]
(is (= delay expected-delay))
(is (= metric-namespace namespace)))]
(is (= metric-namespace namespace))
(is (= expected-topic-entity-name topic-entity-name)))]
(.transform timestamp-transformer nil nil)))))

0 comments on commit c65dc76

Please sign in to comment.