diff --git a/src/ziggurat/kafka_delay.clj b/src/ziggurat/kafka_delay.clj index 3fae5bdc..90c61e64 100644 --- a/src/ziggurat/kafka_delay.clj +++ b/src/ziggurat/kafka_delay.clj @@ -3,11 +3,9 @@ [ziggurat.util.time :refer :all])) (defn calculate-and-report-kafka-delay - ([metric-namespaces record-timestamp] - (calculate-and-report-kafka-delay metric-namespaces record-timestamp nil)) - ([metric-namespaces record-timestamp additional-tags] - (let [now-millis (get-current-time-in-millis) - delay (- now-millis record-timestamp) - default-namespace (last metric-namespaces) - multi-namespaces [metric-namespaces [default-namespace]]] - (metrics/multi-ns-report-time multi-namespaces delay additional-tags)))) + ([metric-namespace record-timestamp] + (calculate-and-report-kafka-delay metric-namespace record-timestamp nil)) + ([metric-namespace record-timestamp additional-tags] + (let [now-millis (get-current-time-in-millis) + delay (- now-millis record-timestamp)] + (metrics/report-time metric-namespace delay additional-tags)))) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 2cfeb5c3..9733a0d9 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -16,85 +16,74 @@ (defn mapper-func [mapper-fn channels] (fn [message-payload] - (let [service-name (:app-name (ziggurat-config)) - topic-entity (:topic-entity message-payload) + (let [topic-entity (:topic-entity message-payload) topic-entity-name (name topic-entity) new-relic-transaction-name (str topic-entity-name ".handler-fn") default-namespace "message-processing" - metric-namespaces [service-name topic-entity-name default-namespace] additional-tags {:topic_name topic-entity-name} - default-namespaces [default-namespace] success-metric "success" retry-metric "retry" skip-metric "skip" - failure-metric "failure" - multi-namespaces [metric-namespaces default-namespaces]] + failure-metric "failure"] (nr/with-tracing "job" new-relic-transaction-name (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn (:message message-payload)) - end-time (.toEpochMilli (Instant/now)) - time-val (- end-time start-time) - execution-time-namespace "handler-fn-execution-time" - multi-execution-time-namespaces [[service-name topic-entity-name execution-time-namespace] - [execution-time-namespace]]] - (metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags) + (let [start-time (.toEpochMilli (Instant/now)) + return-code (mapper-fn (:message message-payload)) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "handler-fn-execution-time"] + (metrics/report-time execution-time-namespace time-val additional-tags) (case return-code - :success (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags) - :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) + :success (metrics/increment-count default-namespace success-metric additional-tags) + :retry (do (metrics/increment-count default-namespace retry-metric additional-tags) (producer/retry message-payload)) - :skip (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags) + :skip (metrics/increment-count default-namespace skip-metric additional-tags) :block 'TODO (do (send-msg-to-channel channels message-payload return-code) - (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)))) + (metrics/increment-count default-namespace success-metric additional-tags)))) (catch Throwable e (producer/retry message-payload) (sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name)) - (metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags))))))) + (metrics/increment-count default-namespace failure-metric additional-tags))))))) (defn channel-mapper-func [mapper-fn channel] - (fn [message-payload] - (let [service-name (:app-name (ziggurat-config)) - topic-entity (:topic-entity message-payload) - topic-entity-name (name topic-entity) - channel-name (name channel) - default-namespace "message-processing" - base-namespaces [service-name topic-entity-name channel-name] - metric-namespaces (conj base-namespaces default-namespace) - additional-tags {:topic_name topic-entity-name :channel_name channel-name} - default-namespaces [default-namespace] - metric-namespace (str/join "." metric-namespaces) - success-metric "success" - retry-metric "retry" - skip-metric "skip" - failure-metric "failure" - multi-namespaces [metric-namespaces default-namespaces]] + (fn [{topic-entity :topic-entity message :message :as message-payload}] + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) + channel-name (name channel) + default-namespace "message-processing" + base-namespaces [service-name topic-entity-name channel-name] + metric-namespaces (conj base-namespaces default-namespace) + additional-tags {:topic_name topic-entity-name :channel_name channel-name} + metric-namespace (str/join "." metric-namespaces) + success-metric "success" + retry-metric "retry" + skip-metric "skip" + failure-metric "failure"] (nr/with-tracing "job" metric-namespace (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn (:message message-payload)) - end-time (.toEpochMilli (Instant/now)) - time-val (- end-time start-time) - execution-time-namespace "execution-time" - multi-execution-time-namespaces [(conj base-namespaces execution-time-namespace) - [execution-time-namespace]]] - (metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags) + (let [start-time (.toEpochMilli (Instant/now)) + return-code (mapper-fn message) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "execution-time"] + (metrics/report-time execution-time-namespace time-val additional-tags) (case return-code - :success (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags) - :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) + :success (metrics/increment-count default-namespace success-metric additional-tags) + :retry (do (metrics/increment-count default-namespace retry-metric additional-tags) (producer/retry-for-channel message-payload channel)) - :skip (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags) + :skip (metrics/increment-count default-namespace skip-metric additional-tags) :block 'TODO (throw (ex-info "Invalid mapper return code" {:code return-code})))) (catch Throwable e (producer/retry-for-channel message-payload channel) (sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) - (metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags))))))) + (metrics/increment-count default-namespace failure-metric additional-tags))))))) (defrecord MessagePayload [message topic-entity]) (s/defschema message-payload-schema - {:message s/Any - :topic-entity s/Keyword + {:message s/Any + :topic-entity s/Keyword (s/optional-key :retry-count) s/Int}) diff --git a/src/ziggurat/metrics.clj b/src/ziggurat/metrics.clj index 56ef1a0f..6c72d297 100644 --- a/src/ziggurat/metrics.clj +++ b/src/ziggurat/metrics.clj @@ -40,43 +40,24 @@ tagged-metric (.tagged ^MetricName metric-name tags)] (.histogram ^MetricRegistry metrics-registry ^MetricName tagged-metric)))) -(defn intercalate-dot - [names] - (str/join "." names)) - -(defn remove-topic-tag-for-old-namespace - [additional-tags ns] - (let [topic-name (:topic_name additional-tags)] - (dissoc additional-tags (when (some #(= % topic-name) ns) :topic_name)))) - (defn- inc-or-dec-count ([sign metric-namespace metric] (inc-or-dec-count sign metric-namespace metric nil)) - ([sign metric-namespaces metric additional-tags] - (let [metric-namespace (intercalate-dot metric-namespaces) - meter ^Meter (mk-meter metric-namespace metric (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] + ([sign ^String metric-namespace metric additional-tags] + (let [meter ^Meter (mk-meter metric-namespace metric additional-tags)] (.mark meter (sign 1))))) (def increment-count (partial inc-or-dec-count +)) (def decrement-count (partial inc-or-dec-count -)) -(defn multi-ns-increment-count [nss metric additional-tags] - (doseq [ns nss] - (increment-count ns metric additional-tags))) - (defn report-time - ([metric-namespaces time-val] - (report-time metric-namespaces time-val nil)) - ([metric-namespaces time-val additional-tags] - (let [metric-namespace (intercalate-dot metric-namespaces) - histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] + ([metric-namespace time-val] + (report-time metric-namespace time-val nil)) + ([^String metric-namespace time-val additional-tags] + (let [histogram ^Histogram (mk-histogram metric-namespace "all" additional-tags)] (.update histogram (int time-val))))) -(defn multi-ns-report-time [nss time-val additional-tags] - (doseq [ns nss] - (report-time ns time-val additional-tags))) - (defn start-statsd-reporter [statsd-config env] (let [{:keys [enabled host port]} statsd-config] (when enabled diff --git a/src/ziggurat/middleware/default.clj b/src/ziggurat/middleware/default.clj index fa401cee..55f632da 100644 --- a/src/ziggurat/middleware/default.clj +++ b/src/ziggurat/middleware/default.clj @@ -1,9 +1,9 @@ (ns ziggurat.middleware.default (:require [flatland.protobuf.core :as proto] [sentry-clj.async :as sentry] + [ziggurat.config :refer [ziggurat-config]] [ziggurat.metrics :as metrics] - [ziggurat.sentry :refer [sentry-reporter]] - [ziggurat.config :refer [ziggurat-config]])) + [ziggurat.sentry :refer [sentry-reporter]])) (defn- deserialise-message "This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a @@ -24,11 +24,9 @@ (catch Throwable e (let [service-name (:app-name (ziggurat-config)) additional-tags {:topic_name topic-entity-name} - default-namespace "message-parsing" - metric-namespaces [service-name "message-parsing"] - multi-namespaces [metric-namespaces [default-namespace]]] + default-namespace "message-parsing"] (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) - (metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags) + (metrics/increment-count default-namespace "failed" additional-tags) nil))) message)) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 915b2bff..74700fb0 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -61,15 +61,11 @@ (set-upgrade-from-config upgrade-from))) (defn- log-and-report-metrics [topic-entity message] - (let [service-name (:app-name (ziggurat-config)) - topic-entity-name (name topic-entity) - additional-tags {:topic_name topic-entity-name} - default-namespace "message" - metric-namespaces [service-name topic-entity-name default-namespace] - default-namespaces [default-namespace] - metric "read" - multi-namespaces [metric-namespaces default-namespaces]] - (metrics/multi-ns-increment-count multi-namespaces metric additional-tags)) + (let [topic-entity-name (name topic-entity) + additional-tags {:topic_name topic-entity-name} + default-namespace "message" + metric "read"] + (metrics/increment-count default-namespace metric additional-tags)) message) (defn store-supplier-builder [] @@ -86,15 +82,15 @@ (.mapValues stream-builder (value-mapper mapper-fn))) (defn- transformer-supplier - [metric-namespaces oldest-processed-message-in-s additional-tags] + [metric-namespace oldest-processed-message-in-s additional-tags] (reify TransformerSupplier - (get [_] (transformer/create metric-namespaces oldest-processed-message-in-s additional-tags)))) + (get [_] (transformer/create metric-namespace oldest-processed-message-in-s additional-tags)))) (defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder] - (let [service-name (:app-name (ziggurat-config)) - metric-namespaces [service-name topic-entity-name "message-received-delay-histogram"] - additional-tags {:topic_name topic-entity-name}] - (.transform stream-builder (transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) + (let [service-name (:app-name (ziggurat-config)) + metric-namespace "message-received-delay-histogram" + additional-tags {:topic_name topic-entity-name}] + (.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) (defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels] (let [builder (StreamsBuilder.) diff --git a/src/ziggurat/timestamp_transformer.clj b/src/ziggurat/timestamp_transformer.clj index 7cdfa1d6..0d3c0291 100644 --- a/src/ziggurat/timestamp_transformer.clj +++ b/src/ziggurat/timestamp_transformer.clj @@ -17,14 +17,14 @@ (get-current-time-in-millis) ingestion-time)))) -(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespaces oldest-processed-message-in-s additional-tags] Transformer +(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespace oldest-processed-message-in-s additional-tags] Transformer (^void init [_ ^ProcessorContext context] (do (set! processor-context context) nil)) (transform [_ record-key record-value] (let [message-time (.timestamp processor-context)] (when (message-to-process? message-time oldest-processed-message-in-s) - (calculate-and-report-kafka-delay metric-namespaces message-time additional-tags) + (calculate-and-report-kafka-delay metric-namespace message-time additional-tags) (KeyValue/pair record-key record-value)))) (close [_] nil)) diff --git a/test/ziggurat/mapper_test.clj b/test/ziggurat/mapper_test.clj index fe45b09d..fdbe8891 100644 --- a/test/ziggurat/mapper_test.clj +++ b/test/ziggurat/mapper_test.clj @@ -13,28 +13,24 @@ fix/silence-logging])) (deftest mapper-func-test - (let [service-name (:app-name (ziggurat-config)) - stream-routes {:default {:handler-fn #(constantly nil)}} - topic-entity (name (first (keys stream-routes))) - message-payload {:message {:foo "bar"} :topic-entity (keyword topic-entity)} - expected-additional-tags {:topic_name topic-entity} - default-namespace "message-processing" - report-time-namespace "handler-fn-execution-time" - expected-metric-namespaces [topic-entity default-namespace] - expected-report-time-namespaces [topic-entity report-time-namespace]] + (let [service-name (:app-name (ziggurat-config)) + stream-routes {:default {:handler-fn #(constantly nil)}} + topic-entity (name (first (keys stream-routes))) + message-payload {:message {:foo "bar"} :topic-entity (keyword topic-entity)} + expected-additional-tags {:topic_name topic-entity} + expected-metric-namespace "message-processing" + expected-report-time-namespace "handler-fn-execution-time"] (testing "message process should be successful" (let [successfully-processed? (atom false) successfully-reported-time? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (or (= metric-namespaces expected-metric-namespaces) - (= metric-namespaces [default-namespace])) + (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespace) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! successfully-processed? true))) - metrics/report-time (fn [metric-namespaces _ _] - (when (or (= metric-namespaces expected-report-time-namespaces) - (= metric-namespaces [report-time-namespace])) + metrics/report-time (fn [metric-namespace _ _] + (when (= metric-namespace expected-report-time-namespace) (reset! successfully-reported-time? true)))] ((mapper-func (constantly :success) []) message-payload) (is @successfully-processed?) @@ -44,9 +40,8 @@ (fix/with-queues (assoc-in stream-routes [:default :channel-1] (constantly :success)) (let [successfully-processed? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (or (= metric-namespaces [service-name topic-entity default-namespace]) - (= metric-namespaces [default-namespace])) + (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespace) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] @@ -72,9 +67,8 @@ unsuccessfully-processed? (atom false) expected-metric "retry"] - (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (or (= metric-namespaces [service-name topic-entity default-namespace]) - (= metric-namespaces [default-namespace])) + (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespace) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] @@ -90,9 +84,8 @@ unsuccessfully-processed? (atom false) expected-metric "failure"] (with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true)) - metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (or (= metric-namespaces [service-name topic-entity default-namespace]) - (= metric-namespaces [default-namespace])) + metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespace) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] @@ -103,14 +96,10 @@ (is @sentry-report-fn-called?))))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - execution-time-namesapce "handler-fn-execution-time" - expected-metric-namespaces [service-name "default" execution-time-namesapce]] - (with-redefs [metrics/report-time (fn [metric-namespaces _ _] - (is (or (= metric-namespaces expected-metric-namespaces) - (= metric-namespaces [execution-time-namesapce]))) - (when (or (= metric-namespaces expected-metric-namespaces) - (= metric-namespaces [execution-time-namesapce])) + (let [reported-execution-time? (atom false) + expected-metric-namespace "handler-fn-execution-time"] + (with-redefs [metrics/report-time (fn [metric-namespace _ _] + (when (= metric-namespace expected-metric-namespace) (reset! reported-execution-time? true)))] ((mapper-func (constantly :success) []) message-payload) @@ -128,14 +117,12 @@ :topic-entity topic} expected-topic-entity-name (name topic) expected-additional-tags {:topic_name expected-topic-entity-name :channel_name channel-name} - default-namespace "message-processing" - expected-metric-namespaces [expected-topic-entity-name channel default-namespace]] + expected-metric-namespace "message-processing"] (testing "message process should be successful" (let [successfully-processed? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (or (= metric-namespaces [service-name expected-topic-entity-name channel-name default-namespace]) - (= metric-namespaces [default-namespace])) + (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespace) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] @@ -148,9 +135,8 @@ unsuccessfully-processed? (atom false) expected-metric "retry"] - (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (or (= metric-namespaces [service-name expected-topic-entity-name channel-name default-namespace]) - (= metric-namespaces [default-namespace])) + (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespace) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] @@ -166,9 +152,8 @@ unsuccessfully-processed? (atom false) expected-metric "failure"] (with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true)) - metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (or (= metric-namespaces expected-metric-namespaces) - (= metric-namespaces [default-namespace])) + metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespace) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] @@ -180,10 +165,9 @@ (testing "reports execution time with topic prefix" (let [reported-execution-time? (atom false) - execution-time-namesapce "execution-time"] - (with-redefs [metrics/report-time (fn [metric-namespaces _ _] - (when (or (= metric-namespaces [service-name expected-topic-entity-name channel-name execution-time-namesapce]) - (= metric-namespaces [execution-time-namesapce])) + execution-time-namespace "execution-time"] + (with-redefs [metrics/report-time (fn [metric-namespace _ _] + (when (= metric-namespace execution-time-namespace) (reset! reported-execution-time? true)))] ((channel-mapper-func (constantly :success) channel) message-payload) (is @reported-execution-time?)))))) diff --git a/test/ziggurat/metrics_test.clj b/test/ziggurat/metrics_test.clj index 32db653a..ef2e8791 100644 --- a/test/ziggurat/metrics_test.clj +++ b/test/ziggurat/metrics_test.clj @@ -49,60 +49,46 @@ expected-topic-entity-name "expected-topic-entity-name" input-additional-tags {:topic_name expected-topic-entity-name}] (testing "increases count on the meter" - (let [expected-metric-namespaces [expected-topic-entity-name "metric-ns"] - mk-meter-args (atom nil) - meter (Meter.) - expected-additional-tags {}] - (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (let [expected-metric-namespace "metric-ns" + mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags input-additional-tags] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] (is (= additional-tags expected-additional-tags)) - (reset! mk-meter-args {:metric-namespaces metric-namespaces - :metric metric}) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) meter)] - (metrics/increment-count expected-metric-namespaces metric input-additional-tags) + (metrics/increment-count expected-metric-namespace metric input-additional-tags) (is (= 1 (.getCount meter))) - (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= expected-metric-namespace (:metric-namespace @mk-meter-args))) (is (= metric (:metric @mk-meter-args)))))) (testing "increases count on the meter - w/o additional-tags argument" - (let [expected-metric-namespaces [expected-topic-entity-name "metric-ns"] - mk-meter-args (atom nil) - meter (Meter.) - expected-additional-tags nil] - (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (let [expected-metric-namespace "metric-ns" + mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags nil] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] (is (= additional-tags expected-additional-tags)) - (reset! mk-meter-args {:metric-namespaces metric-namespaces - :metric metric}) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) meter)] - (metrics/increment-count expected-metric-namespaces metric) + (metrics/increment-count expected-metric-namespace metric) (is (= 1 (.getCount meter))) - (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) - (is (= metric (:metric @mk-meter-args)))))) - (testing "increases count on the meter - without topic name on the namespace" - (let [expected-metric-namespaces ["metric-ns"] - mk-meter-args (atom nil) - meter (Meter.) - expected-additional-tags input-additional-tags] - (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] - (is (= additional-tags expected-additional-tags)) - (reset! mk-meter-args {:metric-namespaces metric-namespaces - :metric metric}) - meter)] - (metrics/increment-count expected-metric-namespaces metric input-additional-tags) - (is (= 1 (.getCount meter))) - (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= expected-metric-namespace (:metric-namespace @mk-meter-args))) (is (= metric (:metric @mk-meter-args)))))) (testing "increases count on the meter when additional-tags is nil" - (let [expected-metric-namespaces [expected-topic-entity-name "metric-ns"] - mk-meter-args (atom nil) - meter (Meter.) - expected-additional-tags nil] - (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (let [expected-metric-namespace "metric-ns" + mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags nil] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] (is (= additional-tags expected-additional-tags)) - (reset! mk-meter-args {:metric-namespaces metric-namespaces - :metric metric}) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) meter)] - (metrics/increment-count expected-metric-namespaces metric expected-additional-tags) + (metrics/increment-count expected-metric-namespace metric expected-additional-tags) (is (= 1 (.getCount meter))) - (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= expected-metric-namespace (:metric-namespace @mk-meter-args))) (is (= metric (:metric @mk-meter-args)))))))) (deftest decrement-count-test @@ -112,46 +98,32 @@ meter (Meter.) input-additional-tags {:topic_name expected-topic-name}] (testing "decreases count on the meter" - (let [expected-additional-tags {} - expected-metric-namespaces [expected-topic-name "metric-ns"]] - (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (let [expected-additional-tags input-additional-tags + expected-metric-namespace "metric-ns"] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] (is (= additional-tags expected-additional-tags)) - (reset! mk-meter-args {:metric-namespaces metric-namespaces - :metric metric}) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) meter)] - (metrics/increment-count expected-metric-namespaces metric input-additional-tags) + (metrics/increment-count expected-metric-namespace metric input-additional-tags) (is (= 1 (.getCount meter))) - (metrics/decrement-count expected-metric-namespaces metric input-additional-tags) + (metrics/decrement-count expected-metric-namespace metric input-additional-tags) (is (zero? (.getCount meter))) - (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) - (is (= metric (:metric @mk-meter-args)))))) - (testing "decreases count on the meter - without topic name on the namespace" - (let [expected-additional-tags input-additional-tags - expected-metric-namespaces ["metric-ns"]] - (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] - (is (= additional-tags expected-additional-tags)) - (reset! mk-meter-args {:metric-namespaces metric-namespaces - :metric metric}) - meter)] - (metrics/increment-count expected-metric-namespaces metric input-additional-tags) - (is (= 1 (.getCount meter))) - (metrics/decrement-count expected-metric-namespaces metric input-additional-tags) - (is (zero? (.getCount meter))) - (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= expected-metric-namespace (:metric-namespace @mk-meter-args))) (is (= metric (:metric @mk-meter-args)))))) (testing "decreases count on the meter when additional-tags is nil" - (let [expected-additional-tags nil - expected-metric-namespaces [expected-topic-name "metric-ns"]] - (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (let [expected-additional-tags nil + expected-metric-namespace "metric-ns"] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] (is (= additional-tags expected-additional-tags)) - (reset! mk-meter-args {:metric-namespaces metric-namespaces - :metric metric}) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) meter)] - (metrics/increment-count expected-metric-namespaces metric expected-additional-tags) + (metrics/increment-count expected-metric-namespace metric expected-additional-tags) (is (= 1 (.getCount meter))) - (metrics/decrement-count expected-metric-namespaces metric expected-additional-tags) + (metrics/decrement-count expected-metric-namespace metric expected-additional-tags) (is (zero? (.getCount meter))) - (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= expected-metric-namespace (:metric-namespace @mk-meter-args))) (is (= metric (:metric @mk-meter-args)))))))) (deftest report-time-test @@ -159,47 +131,32 @@ input-additional-tags {:topic_name expected-topic-entity-name} time-val 10] (testing "updates time-val" - (let [expected-metric-namespaces [expected-topic-entity-name "message-received-delay-histogram"] - mk-histogram-args (atom nil) - reservoir (UniformReservoir.) - histogram (Histogram. reservoir) - expected-additional-tags {}] - (with-redefs [metrics/mk-histogram (fn [metric-namespaces metric additional-tags] + (let [expected-metric-namespace "message-received-delay-histogram" + mk-histogram-args (atom nil) + reservoir (UniformReservoir.) + histogram (Histogram. reservoir) + expected-additional-tags input-additional-tags] + (with-redefs [metrics/mk-histogram (fn [metric-namespace metric additional-tags] (is (= additional-tags expected-additional-tags)) - (reset! mk-histogram-args {:metric-namespaces metric-namespaces - :metric metric}) + (reset! mk-histogram-args {:metric-namespace metric-namespace + :metric metric}) histogram)] - (metrics/report-time expected-metric-namespaces time-val input-additional-tags) + (metrics/report-time expected-metric-namespace time-val input-additional-tags) (is (= 1 (.getCount histogram))) - (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args))) + (is (= expected-metric-namespace (:metric-namespace @mk-histogram-args))) (is (= "all" (:metric @mk-histogram-args)))))) (testing "updates time-val - w/o additional-tags argument" - (let [expected-metric-namespaces [expected-topic-entity-name "message-received-delay-histogram"] - mk-histogram-args (atom nil) - reservoir (UniformReservoir.) - histogram (Histogram. reservoir) - expected-additional-tags nil] - (with-redefs [metrics/mk-histogram (fn [metric-namespaces metric additional-tags] - (is (= additional-tags expected-additional-tags)) - (reset! mk-histogram-args {:metric-namespaces metric-namespaces - :metric metric}) - histogram)] - (metrics/report-time expected-metric-namespaces time-val) - (is (= 1 (.getCount histogram))) - (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args))) - (is (= "all" (:metric @mk-histogram-args)))))) - (testing "updates time-val - without topic name on the namespace" - (let [expected-metric-namespaces ["message-received-delay-histogram"] - mk-histogram-args (atom nil) - reservoir (UniformReservoir.) - histogram (Histogram. reservoir) - expected-additional-tags input-additional-tags] - (with-redefs [metrics/mk-histogram (fn [metric-namespaces metric additional-tags] + (let [expected-metric-namespace "message-received-delay-histogram" + mk-histogram-args (atom nil) + reservoir (UniformReservoir.) + histogram (Histogram. reservoir) + expected-additional-tags nil] + (with-redefs [metrics/mk-histogram (fn [metric-namespace metric additional-tags] (is (= additional-tags expected-additional-tags)) - (reset! mk-histogram-args {:metric-namespaces metric-namespaces - :metric metric}) + (reset! mk-histogram-args {:metric-namespace metric-namespace + :metric metric}) histogram)] - (metrics/report-time expected-metric-namespaces time-val input-additional-tags) + (metrics/report-time expected-metric-namespace time-val) (is (= 1 (.getCount histogram))) - (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args))) + (is (= expected-metric-namespace (:metric-namespace @mk-histogram-args))) (is (= "all" (:metric @mk-histogram-args)))))))) diff --git a/test/ziggurat/middleware/default_test.clj b/test/ziggurat/middleware/default_test.clj index 7e7a8b2e..bf951f85 100644 --- a/test/ziggurat/middleware/default_test.clj +++ b/test/ziggurat/middleware/default_test.clj @@ -1,10 +1,10 @@ (ns ziggurat.middleware.default-test (:require [clojure.test :refer :all] [flatland.protobuf.core :as proto] - [ziggurat.middleware.default :refer :all] - [ziggurat.fixtures :as fix] [sentry-clj.async :as sentry] - [ziggurat.metrics :as metrics]) + [ziggurat.fixtures :as fix] + [ziggurat.metrics :as metrics] + [ziggurat.middleware.default :refer :all]) (:import (flatland.protobuf.test Example$Photo))) (use-fixtures :once (join-fixtures [fix/mount-only-config @@ -24,14 +24,14 @@ ((protobuf->hash handler-fn proto-class topic-entity-name) proto-message) (is (true? @handler-fn-called?)))) (testing "When deserialisation fails, it reports to sentry, publishes metrics and passes nil to handler function" - (let [handler-fn-called? (atom false) + (let [handler-fn-called? (atom false) metric-reporter-called? (atom false) - topic-entity-name "test" - handler-fn (fn [msg] - (if (nil? msg) - (reset! handler-fn-called? true)))] - (with-redefs [metrics/multi-ns-increment-count (fn [_ _ _] - (reset! metric-reporter-called? true))] + topic-entity-name "test" + handler-fn (fn [msg] + (if (nil? msg) + (reset! handler-fn-called? true)))] + (with-redefs [metrics/increment-count (fn [_ _ _] + (reset! metric-reporter-called? true))] ((protobuf->hash handler-fn nil topic-entity-name) nil)) (is (true? @handler-fn-called?)) (is (true? @metric-reporter-called?))))