From 53a9fe96194919e1c4c85452c03b8a17ebb7fccb Mon Sep 17 00:00:00 2001 From: Michael Angelo Calimlim Date: Mon, 27 May 2019 09:57:34 +0800 Subject: [PATCH] add support for providing a topic-name label in the metrics --- src/ziggurat/kafka_delay.clj | 15 ++- src/ziggurat/mapper.clj | 79 +++++++---- src/ziggurat/metrics.clj | 84 ++++++++---- src/ziggurat/streams.clj | 73 +++++----- src/ziggurat/timestamp_transformer.clj | 11 +- test/ziggurat/kafka_delay_test.clj | 35 +++-- test/ziggurat/mapper_test.clj | 132 +++++++++++-------- test/ziggurat/metrics_test.clj | 104 ++++++++++----- test/ziggurat/timestamp_transformer_test.clj | 67 ++++++---- 9 files changed, 375 insertions(+), 225 deletions(-) diff --git a/src/ziggurat/kafka_delay.clj b/src/ziggurat/kafka_delay.clj index eb2582aa..3fae5bdc 100644 --- a/src/ziggurat/kafka_delay.clj +++ b/src/ziggurat/kafka_delay.clj @@ -2,9 +2,12 @@ (:require [ziggurat.metrics :as metrics] [ziggurat.util.time :refer :all])) -(defn calculate-and-report-kafka-delay [metric-namespace record-timestamp] - (let [now-millis (get-current-time-in-millis) - delay (- now-millis - record-timestamp)] - (metrics/report-time metric-namespace delay))) - +(defn calculate-and-report-kafka-delay + ([metric-namespaces record-timestamp] + (calculate-and-report-kafka-delay metric-namespaces record-timestamp nil)) + ([metric-namespaces record-timestamp additional-tags] + (let [now-millis (get-current-time-in-millis) + delay (- now-millis record-timestamp) + default-namespace (last metric-namespaces) + multi-namespaces [metric-namespaces [default-namespace]]] + (metrics/multi-ns-report-time multi-namespaces delay additional-tags)))) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 808380f8..5de1703f 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -1,7 +1,7 @@ (ns ziggurat.mapper (:require [sentry-clj.async :as sentry] - [ziggurat.metrics :as metrics] [ziggurat.messaging.producer :as producer] + [ziggurat.metrics :as metrics] [ziggurat.new-relic :as nr] [ziggurat.sentry :refer [sentry-reporter]]) (:import (java.time Instant))) @@ -15,47 +15,72 @@ (fn [message] (let [topic-entity-name (name topic-entity) new-relic-transaction-name (str topic-entity-name ".handler-fn") - metric-namespace (str topic-entity-name ".message-processing")] + default-namespace "message-processing" + metric-namespaces [topic-entity-name default-namespace] + additional-tags {:topic-name topic-entity-name} + default-namespaces [default-namespace] + success-metric "success" + retry-metric "retry" + skip-metric "skip" + failure-metric "failure" + multi-namespaces [metric-namespaces default-namespaces]] (nr/with-tracing "job" new-relic-transaction-name (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn message) - end-time (.toEpochMilli (Instant/now))] - (metrics/report-time (str topic-entity-name ".handler-fn-execution-time") (- end-time start-time)) + (let [start-time (.toEpochMilli (Instant/now)) + return-code (mapper-fn message) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "handler-fn-execution-time" + multi-execution-time-namespaces [[topic-entity-name execution-time-namespace] + [execution-time-namespace]]] + (metrics/multi-ns-report-time multi-execution-time-namespaces time-val) (case return-code - :success (metrics/increment-count metric-namespace "success") - :retry (do (metrics/increment-count metric-namespace "retry") - (producer/retry message topic-entity)) - :skip (metrics/increment-count metric-namespace "skip") - :block 'TODO + :success (do (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)) + :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) + (producer/retry message topic-entity)) + :skip (do (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags)) + :block 'TODO (do (send-msg-to-channel channels message topic-entity return-code) - (metrics/increment-count metric-namespace "success")))) + (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)))) (catch Throwable e (producer/retry message topic-entity) (sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name)) - (metrics/increment-count metric-namespace "failure"))))))) + (metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags))))))) (defn channel-mapper-func [mapper-fn topic-entity channel] (fn [message] - (let [topic-entity-name (name topic-entity) - channel-name (name channel) - metric-namespace (str topic-entity-name "." channel-name) - message-processing-namespace (str metric-namespace ".message-processing")] + (let [topic-entity-name (name topic-entity) + channel-name (name channel) + default-namespace "message-processing" + base-namespaces [topic-entity-name channel-name] + metric-namespaces (conj base-namespaces default-namespace) + additional-tags {:topic-name topic-entity-name} + default-namespaces [default-namespace] + metric-namespace (apply str (interpose "." metric-namespaces)) + success-metric "success" + retry-metric "retry" + skip-metric "skip" + failure-metric "failure" + multi-namespaces [metric-namespaces default-namespaces]] (nr/with-tracing "job" metric-namespace (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn message) - end-time (.toEpochMilli (Instant/now))] - (metrics/report-time (str metric-namespace ".execution-time") (- end-time start-time)) + (let [start-time (.toEpochMilli (Instant/now)) + return-code (mapper-fn message) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "execution-time" + multi-execution-time-namespaces [(conj base-namespaces execution-time-namespace) + [execution-time-namespace]]] + (metrics/multi-ns-report-time multi-execution-time-namespaces time-val) (case return-code - :success (metrics/increment-count message-processing-namespace "success") - :retry (do (metrics/increment-count message-processing-namespace "retry") - (producer/retry-for-channel message topic-entity channel)) - :skip (metrics/increment-count message-processing-namespace "skip") - :block 'TODO + :success (do (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)) + :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) + (producer/retry-for-channel message topic-entity channel)) + :skip (do (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags)) + :block 'TODO (throw (ex-info "Invalid mapper return code" {:code return-code})))) (catch Throwable e (producer/retry-for-channel message topic-entity channel) (sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) - (metrics/increment-count message-processing-namespace "failure"))))))) \ No newline at end of file + (metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags))))))) diff --git a/src/ziggurat/metrics.clj b/src/ziggurat/metrics.clj index c3373800..df69bfc7 100644 --- a/src/ziggurat/metrics.clj +++ b/src/ziggurat/metrics.clj @@ -1,45 +1,71 @@ (ns ziggurat.metrics - (:require [clojure.tools.logging :as log]) + (:require [clojure.tools.logging :as log] + [clojure.walk :refer [stringify-keys]]) (:import (com.gojek.metrics.datadog DatadogReporter) (com.gojek.metrics.datadog.transport UdpTransport$Builder UdpTransport) - (java.util.concurrent TimeUnit) - (io.dropwizard.metrics5 MetricRegistry Meter MetricName Histogram))) + (io.dropwizard.metrics5 MetricRegistry Meter MetricName Histogram) + (java.util.concurrent TimeUnit))) (defonce ^:private group (atom nil)) (defonce metrics-registry (MetricRegistry.)) +(defn- merge-tags + [additional-tags] + (let [default-tags {"actor" @group}] + (merge default-tags (when (not (empty? additional-tags)) + (stringify-keys additional-tags))))) + (defn mk-meter - [category metric] - (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) - tagged-metric (.tagged ^MetricName metric-name ^"[Ljava.lang.String;" (into-array String ["actor" @group]))] - (.meter ^MetricRegistry metrics-registry ^MetricName tagged-metric))) + ([category metric] + (mk-meter category metric nil)) + ([category metric additional-tags] + (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) + tags (merge-tags additional-tags) + tagged-metric (.tagged ^MetricName metric-name tags)] + (.meter ^MetricRegistry metrics-registry ^MetricName tagged-metric)))) (defn mk-histogram - [category metric] - (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) - tagged-metric (.tagged ^MetricName metric-name ^"[Ljava.lang.String;" (into-array String ["actor" @group]))] - (.histogram ^MetricRegistry metrics-registry tagged-metric))) + ([category metric] + (mk-histogram category metric nil)) + ([category metric additional-tags] + (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) + tags (merge-tags additional-tags) + tagged-metric (.tagged ^MetricName metric-name tags)] + (.histogram ^MetricRegistry metrics-registry tagged-metric)))) + +(defn- intercalate-dot + [names] + (apply str (interpose "." names))) + +(defn- inc-or-dec-count + [sign metric-namespaces metric additional-tags] + (let [metric-namespace (intercalate-dot metric-namespaces) + meter ^Meter (mk-meter metric-namespace metric additional-tags)] + (.mark meter (sign 1)))) -(defn increment-count - ([metric-namespace metric] - (increment-count metric-namespace metric 1)) - ([metric-namespace metric n] - (let [meter ^Meter (mk-meter metric-namespace metric)] - (.mark meter (int n))))) +(def increment-count (partial inc-or-dec-count +)) -(defn decrement-count - ([metric-namespace metric] - (decrement-count metric-namespace metric 1)) - ([metric-namespace metric n] - (let [meter ^Meter (mk-meter metric-namespace metric)] - (.mark meter (int (- n)))))) +(def decrement-count (partial inc-or-dec-count -)) -(defn report-time [metric-namespace time-val] - (let [histogram ^Histogram (mk-histogram metric-namespace "all")] +(defn multi-ns-increment-count [nss metric additional-tags] + (doseq [ns nss] + (increment-count ns metric additional-tags))) + +(defn report-time + [metric-namespaces time-val additional-tags] + (let [metric-namespace (intercalate-dot metric-namespaces) + histogram ^Histogram (mk-histogram metric-namespace "all" additional-tags)] (.update histogram (int time-val)))) +(defn multi-ns-report-time + ([nss time-val] + (multi-ns-report-time nss time-val nil)) + ([nss time-val additional-tags] + (doseq [ns nss] + (report-time ns time-val additional-tags)))) + (defn start-statsd-reporter [statsd-config env app-name] (let [{:keys [enabled host port]} statsd-config] (when enabled @@ -48,10 +74,10 @@ (.withPort port) (.build)) - reporter (-> (DatadogReporter/forRegistry metrics-registry) - (.withTransport transport) - (.withTags [(str env)]) - (.build))] + reporter (-> (DatadogReporter/forRegistry metrics-registry) + (.withTransport transport) + (.withTags [(str env)]) + (.build))] (log/info "Starting statsd reporter") (.start reporter 1 TimeUnit/SECONDS) (reset! group app-name) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 2f8e9098..76df02e0 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -2,29 +2,29 @@ (:require [clojure.tools.logging :as log] [flatland.protobuf.core :as proto] [mount.core :as mount :refer [defstate]] - [ziggurat.metrics :as metrics] - [ziggurat.config :refer [ziggurat-config]] [sentry-clj.async :as sentry] [ziggurat.channel :as chl] - [ziggurat.util.map :as umap] + [ziggurat.config :refer [ziggurat-config]] [ziggurat.mapper :as mpr] + [ziggurat.metrics :as metrics] + [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.timestamp-transformer :as transformer] - [ziggurat.sentry :refer [sentry-reporter]]) - (:import [java.util.regex Pattern] - [java.util Properties] + [ziggurat.util.map :as umap]) + (:import [java.util Properties] + [java.util.regex Pattern] [org.apache.kafka.clients.consumer ConsumerConfig] [org.apache.kafka.common.serialization Serdes] + [org.apache.kafka.common.utils SystemTime] [org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology] [org.apache.kafka.streams.kstream ValueMapper TransformerSupplier] [org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier] - [org.apache.kafka.common.utils SystemTime] [ziggurat.timestamp_transformer IngestionTimeExtractor])) (def default-config-for-stream - {:buffered-records-per-partition 10000 - :commit-interval-ms 15000 - :auto-offset-reset-config "latest" - :oldest-processed-message-in-s 604800 + {:buffered-records-per-partition 10000 + :commit-interval-ms 15000 + :auto-offset-reset-config "latest" + :oldest-processed-message-in-s 604800 :changelog-topic-replication-factor 3}) (defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms changelog-topic-replication-factor]}] @@ -42,12 +42,15 @@ (.put StreamsConfig/REPLICATION_FACTOR_CONFIG (int changelog-topic-replication-factor)) (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config))) -(defn- get-metric-namespace [default topic] - (str (name topic) "." default)) - (defn- log-and-report-metrics [topic-entity message] - (let [message-read-metric-namespace (get-metric-namespace "message" (name topic-entity))] - (metrics/increment-count message-read-metric-namespace "read")) + (let [topic-entity-name (name topic-entity) + additional-tags {:topic-name topic-entity-name} + default-namespace "message" + metric-namespaces [topic-entity-name default-namespace] + default-namespaces [default-namespace] + metric "read" + multi-namespaces [metric-namespaces default-namespaces]] + (metrics/multi-ns-increment-count multi-namespaces metric additional-tags)) message) (defn store-supplier-builder [] @@ -63,15 +66,17 @@ (defn- map-values [mapper-fn stream-builder] (.mapValues stream-builder (value-mapper mapper-fn))) -(defn- transformer-supplier [metric-namespace oldest-processed-message-in-s] +(defn- transformer-supplier + [metric-namespaces oldest-processed-message-in-s additional-tags] (reify TransformerSupplier - (get [_] (transformer/create metric-namespace oldest-processed-message-in-s)))) + (get [_] (transformer/create metric-namespaces oldest-processed-message-in-s additional-tags)))) -(defn- transform-values [topic-entity oldest-processed-message-in-s stream-builder] - (let [metric-namespace (get-metric-namespace "message-received-delay-histogram" topic-entity)] - (.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s) (into-array [(.name (store-supplier-builder))])))) +(defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder] + (let [metric-namespaces [topic-entity-name "message-received-delay-histogram"] + additional-tags {:topic-name topic-entity-name}] + (.transform stream-builder (transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) -(defn- protobuf->hash [message proto-class] +(defn- protobuf->hash [message proto-class topic-entity-name] (try (let [proto-klass (-> proto-class java.lang.Class/forName @@ -83,9 +88,11 @@ keys)] (select-keys loaded-proto proto-keys)) (catch Throwable e - (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) - (metrics/increment-count "message-parsing" "failed") - nil))) + (let [additional-tags {:topic-name topic-entity-name} + metric-namespaces ["message-parsing"]] + (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) + (metrics/increment-count metric-namespaces "failed" additional-tags) + nil)))) (defn- topology [handler-fn {:keys [origin-topic proto-class oldest-processed-message-in-s]} topic-entity channels] (let [builder (StreamsBuilder.) @@ -94,7 +101,7 @@ (.addStateStore builder (store-supplier-builder)) (->> (.stream builder topic-pattern) (transform-values topic-entity-name oldest-processed-message-in-s) - (map-values #(protobuf->hash % proto-class)) + (map-values #(protobuf->hash % proto-class topic-entity-name)) (map-values #(log-and-report-metrics topic-entity-name %)) (map-values #((mpr/mapper-func handler-fn topic-entity channels) %))) (.build builder))) @@ -108,13 +115,13 @@ (start-streams stream-routes (ziggurat-config))) ([stream-routes stream-configs] (reduce (fn [streams stream] - (let [topic-entity (first stream) + (let [topic-entity (first stream) topic-handler-fn (-> stream second :handler-fn) - channels (chl/get-keys-for-topic stream-routes topic-entity) - stream-config (-> stream-configs - (get-in [:stream-router topic-entity]) - (umap/deep-merge default-config-for-stream)) - stream (start-stream* topic-handler-fn stream-config topic-entity channels)] + channels (chl/get-keys-for-topic stream-routes topic-entity) + stream-config (-> stream-configs + (get-in [:stream-router topic-entity]) + (umap/deep-merge default-config-for-stream)) + stream (start-stream* topic-handler-fn stream-config topic-entity channels)] (.start stream) (conj streams stream))) [] @@ -128,4 +135,4 @@ :start (do (log/info "Starting Kafka stream") (start-streams (:stream-routes (mount/args)) (ziggurat-config))) :stop (do (log/info "Stopping Kafka stream") - (stop-streams stream))) \ No newline at end of file + (stop-streams stream))) diff --git a/src/ziggurat/timestamp_transformer.clj b/src/ziggurat/timestamp_transformer.clj index a0875faf..8e513695 100644 --- a/src/ziggurat/timestamp_transformer.clj +++ b/src/ziggurat/timestamp_transformer.clj @@ -17,17 +17,20 @@ (get-current-time-in-millis) ingestion-time)))) -(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespace oldest-processed-message-in-s] Transformer +(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespaces oldest-processed-message-in-s additional-tags] Transformer (^void init [_ ^ProcessorContext context] (do (set! processor-context context) nil)) (transform [_ record-key record-value] (let [message-time (.timestamp processor-context)] (when (message-to-process? message-time oldest-processed-message-in-s) - (calculate-and-report-kafka-delay metric-namespace message-time) + (calculate-and-report-kafka-delay metric-namespaces message-time additional-tags) (KeyValue/pair record-key record-value)))) (punctuate [_ _] nil) (close [_] nil)) -(defn create [metric-namespace process-message-since-in-s] - (TimestampTransformer. nil metric-namespace process-message-since-in-s)) +(defn create + ([metric-namespace process-message-since-in-s] + (create metric-namespace process-message-since-in-s nil)) + ([metric-namespace process-message-since-in-s additional-tags] + (TimestampTransformer. nil metric-namespace process-message-since-in-s additional-tags))) diff --git a/test/ziggurat/kafka_delay_test.clj b/test/ziggurat/kafka_delay_test.clj index 0d8d6b27..3f519225 100644 --- a/test/ziggurat/kafka_delay_test.clj +++ b/test/ziggurat/kafka_delay_test.clj @@ -1,18 +1,27 @@ (ns ziggurat.kafka-delay-test (:require [clojure.test :refer :all] [ziggurat.kafka-delay :refer :all] - [ziggurat.util.time :refer [get-current-time-in-millis]] - [ziggurat.metrics :as metrics])) + [ziggurat.metrics :as metrics] + [ziggurat.util.time :refer [get-current-time-in-millis]])) (deftest calculate-and-report-kafka-delay-test - (testing "calculates and reports the timestamp delay" - (let [record-timestamp 1528720767777 - current-time 1528720768777 - expected-delay 1000 - namespace "test"] - (with-redefs [get-current-time-in-millis (constantly current-time) - metrics/report-time (fn [metric-namespace delay] - (is (= delay expected-delay)) - (is (= metric-namespace namespace)))] - (calculate-and-report-kafka-delay namespace record-timestamp))))) - + (let [record-timestamp 1528720767777 + current-time 1528720768777 + expected-delay 1000 + expected-namespaces ["test"]] + (testing "calculates and reports the timestamp delay" + (let [expected-additional-tags {:topic-name "expected-topic-entity-name"}] + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay additional-tags] + (is (= delay expected-delay)) + (is (= metric-namespaces expected-namespaces)) + (is (= additional-tags expected-additional-tags)))] + (calculate-and-report-kafka-delay expected-namespaces record-timestamp expected-additional-tags)))) + (testing "calculates and reports the timestamp delay when additional tags is empty or nil" + (let [expected-additional-tags nil] + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay additional-tags] + (is (= delay expected-delay)) + (is (= metric-namespaces expected-namespaces)) + (is (= additional-tags expected-additional-tags)))] + (calculate-and-report-kafka-delay expected-namespaces record-timestamp)))))) diff --git a/test/ziggurat/mapper_test.clj b/test/ziggurat/mapper_test.clj index 56f67fa8..49e57118 100644 --- a/test/ziggurat/mapper_test.clj +++ b/test/ziggurat/mapper_test.clj @@ -13,23 +13,29 @@ fix/silence-logging])) (deftest mapper-func-test - (let [message {:foo "bar"} - stream-routes {:default {:handler-fn #(constantly nil)}} - topic (name (first (keys stream-routes))) - expected-metric-namespace "default.message-processing" - expected-report-time-namespace "default.handler-fn-execution-time"] + (let [message {:foo "bar"} + stream-routes {:default {:handler-fn #(constantly nil)}} + expected-topic-entity-name (name (first (keys stream-routes))) + expected-additional-tags {:topic-name expected-topic-entity-name} + default-namespace "message-processing" + report-time-namespace "handler-fn-execution-time" + expected-metric-namespaces ["default" default-namespace] + expected-report-time-namespaces ["default" report-time-namespace]] (testing "message process should be successful" (let [successfully-processed? (atom false) successfully-reported-time? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! successfully-processed? true))) - metrics/report-time (fn [metric-namespace _] - (when (= metric-namespace expected-report-time-namespace) + metrics/report-time (fn [metric-namespaces _ _] + (when (or (= metric-namespaces expected-report-time-namespaces) + (= metric-namespaces [report-time-namespace])) (reset! successfully-reported-time? true)))] - ((mapper-func (constantly :success) topic []) message) + ((mapper-func (constantly :success) expected-topic-entity-name []) message) (is @successfully-processed?) (is @successfully-reported-time?)))) @@ -37,12 +43,13 @@ (fix/with-queues (assoc-in stream-routes [:default :channel-1] (constantly :success)) (let [successfully-processed? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (= metric-namespaces expected-metric-namespaces) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] - ((mapper-func (constantly :channel-1) topic [:channel-1]) message) - (let [message-from-mq (rmq/get-message-from-channel-instant-queue topic :channel-1)] + ((mapper-func (constantly :channel-1) expected-topic-entity-name [:channel-1]) message) + (let [message-from-mq (rmq/get-message-from-channel-instant-queue expected-topic-entity-name :channel-1)] (is (= message message-from-mq)) (is @successfully-processed?)))))) @@ -53,8 +60,8 @@ (let [err (Throwable->map e)] (is (= (:cause err) "Invalid mapper return code")) (is (= (-> err :data :code) :channel-1))))] - ((mapper-func (constantly :channel-1) topic [:some-other-channel]) message) - (let [message-from-mq (rmq/get-message-from-channel-instant-queue topic :channel-1)] + ((mapper-func (constantly :channel-1) expected-topic-entity-name [:some-other-channel]) message) + (let [message-from-mq (rmq/get-message-from-channel-instant-queue expected-topic-entity-name :channel-1)] (is (nil? message-from-mq)))))) (testing "message process should be unsuccessful and retry" @@ -63,12 +70,13 @@ unsuccessfully-processed? (atom false) expected-metric "retry"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (= metric-namespaces expected-metric-namespaces) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] - ((mapper-func (constantly :retry) topic []) message) - (let [message-from-mq (rmq/get-msg-from-delay-queue topic)] + ((mapper-func (constantly :retry) expected-topic-entity-name []) message) + (let [message-from-mq (rmq/get-msg-from-delay-queue expected-topic-entity-name)] (is (= message-from-mq expected-message))) (is @unsuccessfully-processed?))))) @@ -79,39 +87,52 @@ unsuccessfully-processed? (atom false) expected-metric "failure"] (with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true)) - metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + metrics/increment-count (fn [metric-namespaces metric additional-tags] + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace]))) + (is (= additional-tags expected-additional-tags)) + (when (and (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] - ((mapper-func (fn [_] (throw (Exception. "test exception"))) topic []) message) - (let [message-from-mq (rmq/get-msg-from-delay-queue topic)] + ((mapper-func (fn [_] (throw (Exception. "test exception"))) expected-topic-entity-name []) message) + (let [message-from-mq (rmq/get-msg-from-delay-queue expected-topic-entity-name)] (is (= message-from-mq expected-message))) (is @unsuccessfully-processed?) (is @sentry-report-fn-called?))))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - expected-metric-namespace "default.handler-fn-execution-time"] - (with-redefs [metrics/report-time (fn [metric-namespace _] - (when (= metric-namespace expected-metric-namespace) + (let [reported-execution-time? (atom false) + execution-time-namesapce "handler-fn-execution-time" + expected-metric-namespaces ["default" execution-time-namesapce]] + (with-redefs [metrics/report-time (fn [metric-namespaces _ _] + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [execution-time-namesapce]))) + (when (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [execution-time-namesapce])) (reset! reported-execution-time? true)))] - ((mapper-func (constantly :success) topic []) message) + ((mapper-func (constantly :success) expected-topic-entity-name []) message) (is @reported-execution-time?)))))) (deftest channel-mapper-func-test - (let [message {:foo "bar"} - stream-routes {:default {:handler-fn #(constantly nil) - :channel-1 #(constantly nil)}} - topic (first (keys stream-routes)) - channel :channel-1 - expected-metric-namespace "default.channel-1.message-processing"] + (let [message {:foo "bar"} + stream-routes {:default {:handler-fn #(constantly nil) + :channel-1 #(constantly nil)}} + topic (first (keys stream-routes)) + expected-topic-entity-name (name topic) + expected-additional-tags {:topic-name expected-topic-entity-name} + channel :channel-1 + default-namespace "message-processing" + expected-metric-namespaces ["default" "channel-1" default-namespace]] (testing "message process should be successful" (let [successfully-processed? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespaces) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] ((channel-mapper-func (constantly :success) topic channel) message) (is @successfully-processed?)))) @@ -122,9 +143,10 @@ unsuccessfully-processed? (atom false) expected-metric "retry"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespaces) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] ((channel-mapper-func (constantly :retry) topic channel) message) (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic channel)] @@ -138,9 +160,11 @@ unsuccessfully-processed? (atom false) expected-metric "failure"] (with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true)) - metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] ((channel-mapper-func (fn [_] (throw (Exception. "test exception"))) topic channel) message) (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic channel)] @@ -149,10 +173,14 @@ (is @sentry-report-fn-called?))))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - expected-metric-namespace "default.channel-1.execution-time"] - (with-redefs [metrics/report-time (fn [metric-namespace _] - (when (= metric-namespace expected-metric-namespace) + (let [reported-execution-time? (atom false) + execution-time-namesapce "execution-time" + expected-metric-namespaces ["default" "channel-1" execution-time-namesapce]] + (with-redefs [metrics/report-time (fn [metric-namespaces _ _] + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [execution-time-namesapce]))) + (when (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [execution-time-namesapce])) (reset! reported-execution-time? true)))] ((channel-mapper-func (constantly :success) topic channel) message) diff --git a/test/ziggurat/metrics_test.clj b/test/ziggurat/metrics_test.clj index 7c1f7db1..00d18ea6 100644 --- a/test/ziggurat/metrics_test.clj +++ b/test/ziggurat/metrics_test.clj @@ -18,49 +18,81 @@ (is (instance? Histogram meter))))) (deftest increment-count-test - (testing "increases count on the meter" - (let [metric-ns "metric-ns" - metric "metric3" - mk-meter-args (atom nil) - meter (Meter.)] - (with-redefs [metrics/mk-meter (fn [metric-namespace metric] - (reset! mk-meter-args {:metric-namespace metric-namespace - :metric metric}) - meter)] - (metrics/increment-count metric-ns metric) - (is (= 1 (.getCount meter))) - (is (= metric-ns (:metric-namespace @mk-meter-args))) - (is (= metric (:metric @mk-meter-args))))))) + (let [metric-ns ["metric-ns"] + metric "metric3"] + (testing "increases count on the meter" + (let [mk-meter-args (atom nil) + meter (Meter.) + expected-topic-entity-name "expected-topic-entity-name"] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric topic-entity-name] + (is (= topic-entity-name expected-topic-entity-name)) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) + meter)] + (metrics/increment-count metric-ns metric expected-topic-entity-name) + (is (= 1 (.getCount meter))) + (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) + (testing "increases count on the meter when topic-entity-name is nil" + (let [mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags nil] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) + meter)] + (metrics/increment-count metric-ns metric expected-additional-tags) + (is (= 1 (.getCount meter))) + (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))))) (deftest decrement-count-test - (testing "decreases count on the meter" - (let [metric-ns "metric-ns" - metric "metric3" - mk-meter-args (atom nil) - meter (Meter.)] - (with-redefs [metrics/mk-meter (fn [metric-namespace metric] - (reset! mk-meter-args {:metric-namespace metric-namespace - :metric metric}) - meter)] - (metrics/increment-count metric-ns metric) - (is (= 1 (.getCount meter))) - (metrics/decrement-count metric-ns metric) - (is (= 0 (.getCount meter))) - (is (= metric-ns (:metric-namespace @mk-meter-args))) - (is (= metric (:metric @mk-meter-args))))))) + (let [metric-ns ["metric-ns"] + metric "metric3" + mk-meter-args (atom nil) + meter (Meter.)] + (testing "decreases count on the meter" + (let [expected-additional-tags {:topic-name "expected-topic-name"}] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) + meter)] + (metrics/increment-count metric-ns metric expected-additional-tags) + (is (= 1 (.getCount meter))) + (metrics/decrement-count metric-ns metric expected-additional-tags) + (is (zero? (.getCount meter))) + (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) + (testing "decreases count on the meter when additional-tags is nil" + (let [expected-additional-tags nil] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) + meter)] + (metrics/increment-count metric-ns metric expected-additional-tags) + (is (= 1 (.getCount meter))) + (metrics/decrement-count metric-ns metric expected-additional-tags) + (is (zero? (.getCount meter))) + (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))))) (deftest report-time-test (testing "updates time-val" - (let [metric-ns "metric-ns" - time-val 10 - mk-histogram-args (atom nil) - reservoir (UniformReservoir.) - histogram (Histogram. reservoir)] - (with-redefs [metrics/mk-histogram (fn [metric-ns metric] + (let [metric-ns ["message-received-delay-histogram"] + time-val 10 + mk-histogram-args (atom nil) + reservoir (UniformReservoir.) + histogram (Histogram. reservoir) + expected-topic-entity-name "expected-topic-entity-name"] + (with-redefs [metrics/mk-histogram (fn [metric-ns metric topic-entity-name] + (is (= topic-entity-name expected-topic-entity-name)) (reset! mk-histogram-args {:metric-namespace metric-ns :metric metric}) histogram)] - (metrics/report-time metric-ns time-val) + (metrics/report-time metric-ns time-val expected-topic-entity-name) (is (= 1 (.getCount histogram))) - (is (= metric-ns (:metric-namespace @mk-histogram-args))) + (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-histogram-args))) (is (= "all" (:metric @mk-histogram-args))))))) diff --git a/test/ziggurat/timestamp_transformer_test.clj b/test/ziggurat/timestamp_transformer_test.clj index da1d4074..a8a6903a 100644 --- a/test/ziggurat/timestamp_transformer_test.clj +++ b/test/ziggurat/timestamp_transformer_test.clj @@ -1,43 +1,60 @@ (ns ziggurat.timestamp-transformer-test (:require [clojure.test :refer :all] - [ziggurat.timestamp-transformer :refer :all] [ziggurat.metrics :as metrics] + [ziggurat.timestamp-transformer :refer :all] [ziggurat.util.time :refer :all]) - (:import [org.apache.kafka.streams.processor ProcessorContext] - [org.apache.kafka.clients.consumer ConsumerRecord] + (:import [org.apache.kafka.clients.consumer ConsumerRecord] + [org.apache.kafka.streams.processor ProcessorContext] [ziggurat.timestamp_transformer IngestionTimeExtractor])) (deftest ingestion-time-extractor-test (let [ingestion-time-extractor (IngestionTimeExtractor.) - topic "some-topic" - partition (int 1) - offset 1 - previous-timestamp 1528720768771 - key "some-key" - value "some-value" - record (ConsumerRecord. topic partition offset key value)] + topic "some-topic" + partition (int 1) + offset 1 + previous-timestamp 1528720768771 + key "some-key" + value "some-value" + record (ConsumerRecord. topic partition offset key value)] (testing "extract timestamp of topic when it has valid timestamp" (with-redefs [get-timestamp-from-record (constantly 1528720768777)] (is (= (.extract ingestion-time-extractor record previous-timestamp) 1528720768777)))) (testing "extract timestamp of topic when it has invalid timestamp" - (with-redefs [get-timestamp-from-record (constantly -1) + (with-redefs [get-timestamp-from-record (constantly -1) get-current-time-in-millis (constantly 1528720768777)] (is (= (.extract ingestion-time-extractor record previous-timestamp) (get-current-time-in-millis))))))) (deftest timestamp-transformer-test - (testing "creates a timestamp-transformer object that calculates and reports timestamp delay" - (let [metric-namespace "test.message-received-delay-histogram" - record-timestamp 1528720767777 - context (reify ProcessorContext - (timestamp [_] record-timestamp)) - current-time 1528720768777 - timestamp-transformer (create metric-namespace current-time) - expected-delay 1000] - (.init timestamp-transformer context) - (with-redefs [get-current-time-in-millis (constantly current-time) - metrics/report-time (fn [namespace delay] - (is (= delay expected-delay)) - (is (= metric-namespace namespace)))] - (.transform timestamp-transformer nil nil))))) + (let [default-namespace "message-received-delay-histogram" + expected-metric-namespaces ["test" default-namespace] + record-timestamp 1528720767777 + current-time 1528720768777 + expected-delay 1000] + (testing "creates a timestamp-transformer object that calculates and reports timestamp delay" + (let [context (reify ProcessorContext + (timestamp [_] record-timestamp)) + expected-topic-entity-name "expected-topic-entity-name" + timestamp-transformer (create expected-metric-namespaces current-time expected-topic-entity-name)] + (.init timestamp-transformer context) + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay topic-entity-name] + (is (= delay expected-delay)) + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace]))) + (is (= expected-topic-entity-name topic-entity-name)))] + (.transform timestamp-transformer nil nil)))) + (testing "creates a timestamp-transformer object that calculates and reports timestamp delay when topic-entity-name is nil" + (let [context (reify ProcessorContext + (timestamp [_] record-timestamp)) + expected-topic-entity-name nil + timestamp-transformer (create expected-metric-namespaces current-time)] + (.init timestamp-transformer context) + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay topic-entity-name] + (is (= delay expected-delay)) + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace]))) + (is (= topic-entity-name expected-topic-entity-name)))] + (.transform timestamp-transformer nil nil))))))