diff --git a/src/ziggurat/kafka_delay.clj b/src/ziggurat/kafka_delay.clj index 3fae5bdc..a9442f25 100644 --- a/src/ziggurat/kafka_delay.clj +++ b/src/ziggurat/kafka_delay.clj @@ -6,8 +6,6 @@ ([metric-namespaces record-timestamp] (calculate-and-report-kafka-delay metric-namespaces record-timestamp nil)) ([metric-namespaces record-timestamp additional-tags] - (let [now-millis (get-current-time-in-millis) - delay (- now-millis record-timestamp) - default-namespace (last metric-namespaces) - multi-namespaces [metric-namespaces [default-namespace]]] - (metrics/multi-ns-report-time multi-namespaces delay additional-tags)))) + (let [now-millis (get-current-time-in-millis) + delay (- now-millis record-timestamp)] + (metrics/report-time metric-namespaces delay additional-tags)))) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 2cfeb5c3..8ea3fa2e 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -16,42 +16,38 @@ (defn mapper-func [mapper-fn channels] (fn [message-payload] - (let [service-name (:app-name (ziggurat-config)) - topic-entity (:topic-entity message-payload) + (let [topic-entity (:topic-entity message-payload) topic-entity-name (name topic-entity) new-relic-transaction-name (str topic-entity-name ".handler-fn") default-namespace "message-processing" - metric-namespaces [service-name topic-entity-name default-namespace] additional-tags {:topic_name topic-entity-name} default-namespaces [default-namespace] success-metric "success" retry-metric "retry" skip-metric "skip" - failure-metric "failure" - multi-namespaces [metric-namespaces default-namespaces]] + failure-metric "failure"] (nr/with-tracing "job" new-relic-transaction-name (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn (:message message-payload)) - end-time (.toEpochMilli (Instant/now)) - time-val (- end-time start-time) - execution-time-namespace "handler-fn-execution-time" - multi-execution-time-namespaces [[service-name topic-entity-name execution-time-namespace] - [execution-time-namespace]]] - (metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags) + (let [start-time (.toEpochMilli (Instant/now)) + return-code (mapper-fn (:message message-payload)) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "handler-fn-execution-time" + execution-time-namespaces [execution-time-namespace]] + (metrics/report-time execution-time-namespaces time-val additional-tags) (case return-code - :success (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags) - :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) + :success (metrics/increment-count default-namespaces success-metric additional-tags) + :retry (do (metrics/increment-count default-namespaces retry-metric additional-tags) (producer/retry message-payload)) - :skip (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags) + :skip (metrics/increment-count default-namespaces skip-metric additional-tags) :block 'TODO (do (send-msg-to-channel channels message-payload return-code) - (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)))) + (metrics/increment-count default-namespaces success-metric additional-tags)))) (catch Throwable e (producer/retry message-payload) (sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name)) - (metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags))))))) + (metrics/increment-count default-namespaces failure-metric additional-tags))))))) (defn channel-mapper-func [mapper-fn channel] (fn [message-payload] @@ -68,33 +64,31 @@ success-metric "success" retry-metric "retry" skip-metric "skip" - failure-metric "failure" - multi-namespaces [metric-namespaces default-namespaces]] + failure-metric "failure"] (nr/with-tracing "job" metric-namespace (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn (:message message-payload)) - end-time (.toEpochMilli (Instant/now)) - time-val (- end-time start-time) - execution-time-namespace "execution-time" - multi-execution-time-namespaces [(conj base-namespaces execution-time-namespace) - [execution-time-namespace]]] - (metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags) + (let [start-time (.toEpochMilli (Instant/now)) + return-code (mapper-fn (:message message-payload)) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "execution-time" + execution-time-namespaces [execution-time-namespace]] + (metrics/report-time execution-time-namespaces time-val additional-tags) (case return-code - :success (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags) - :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) + :success (metrics/increment-count default-namespaces success-metric additional-tags) + :retry (do (metrics/increment-count default-namespaces retry-metric additional-tags) (producer/retry-for-channel message-payload channel)) - :skip (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags) + :skip (metrics/increment-count default-namespaces skip-metric additional-tags) :block 'TODO (throw (ex-info "Invalid mapper return code" {:code return-code})))) (catch Throwable e (producer/retry-for-channel message-payload channel) (sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) - (metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags))))))) + (metrics/increment-count default-namespaces failure-metric additional-tags))))))) (defrecord MessagePayload [message topic-entity]) (s/defschema message-payload-schema - {:message s/Any - :topic-entity s/Keyword + {:message s/Any + :topic-entity s/Keyword (s/optional-key :retry-count) s/Int}) diff --git a/src/ziggurat/metrics.clj b/src/ziggurat/metrics.clj index 56ef1a0f..25453faa 100644 --- a/src/ziggurat/metrics.clj +++ b/src/ziggurat/metrics.clj @@ -50,8 +50,8 @@ (dissoc additional-tags (when (some #(= % topic-name) ns) :topic_name)))) (defn- inc-or-dec-count - ([sign metric-namespace metric] - (inc-or-dec-count sign metric-namespace metric nil)) + ([sign metric-namespaces metric] + (inc-or-dec-count sign metric-namespaces metric nil)) ([sign metric-namespaces metric additional-tags] (let [metric-namespace (intercalate-dot metric-namespaces) meter ^Meter (mk-meter metric-namespace metric (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] @@ -61,10 +61,6 @@ (def decrement-count (partial inc-or-dec-count -)) -(defn multi-ns-increment-count [nss metric additional-tags] - (doseq [ns nss] - (increment-count ns metric additional-tags))) - (defn report-time ([metric-namespaces time-val] (report-time metric-namespaces time-val nil)) @@ -73,10 +69,6 @@ histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] (.update histogram (int time-val))))) -(defn multi-ns-report-time [nss time-val additional-tags] - (doseq [ns nss] - (report-time ns time-val additional-tags))) - (defn start-statsd-reporter [statsd-config env] (let [{:keys [enabled host port]} statsd-config] (when enabled diff --git a/src/ziggurat/middleware/default.clj b/src/ziggurat/middleware/default.clj index fa401cee..bb036ec3 100644 --- a/src/ziggurat/middleware/default.clj +++ b/src/ziggurat/middleware/default.clj @@ -1,9 +1,9 @@ (ns ziggurat.middleware.default (:require [flatland.protobuf.core :as proto] [sentry-clj.async :as sentry] + [ziggurat.config :refer [ziggurat-config]] [ziggurat.metrics :as metrics] - [ziggurat.sentry :refer [sentry-reporter]] - [ziggurat.config :refer [ziggurat-config]])) + [ziggurat.sentry :refer [sentry-reporter]])) (defn- deserialise-message "This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a @@ -22,13 +22,12 @@ keys)] (select-keys loaded-proto proto-keys)) (catch Throwable e - (let [service-name (:app-name (ziggurat-config)) - additional-tags {:topic_name topic-entity-name} - default-namespace "message-parsing" - metric-namespaces [service-name "message-parsing"] - multi-namespaces [metric-namespaces [default-namespace]]] + (let [service-name (:app-name (ziggurat-config)) + additional-tags {:topic_name topic-entity-name} + default-namespace "message-parsing" + default-namespaces [default-namespace]] (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) - (metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags) + (metrics/increment-count default-namespaces "failed" additional-tags) nil))) message)) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 915b2bff..eb1b63c6 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -61,15 +61,12 @@ (set-upgrade-from-config upgrade-from))) (defn- log-and-report-metrics [topic-entity message] - (let [service-name (:app-name (ziggurat-config)) - topic-entity-name (name topic-entity) + (let [topic-entity-name (name topic-entity) additional-tags {:topic_name topic-entity-name} default-namespace "message" - metric-namespaces [service-name topic-entity-name default-namespace] default-namespaces [default-namespace] - metric "read" - multi-namespaces [metric-namespaces default-namespaces]] - (metrics/multi-ns-increment-count multi-namespaces metric additional-tags)) + metric "read"] + (metrics/increment-count default-namespaces metric additional-tags)) message) (defn store-supplier-builder [] @@ -92,7 +89,8 @@ (defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder] (let [service-name (:app-name (ziggurat-config)) - metric-namespaces [service-name topic-entity-name "message-received-delay-histogram"] + metric-namespace "message-received-delay-histogram" + metric-namespaces [metric-namespace] additional-tags {:topic_name topic-entity-name}] (.transform stream-builder (transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) diff --git a/test/ziggurat/middleware/default_test.clj b/test/ziggurat/middleware/default_test.clj index 7e7a8b2e..bf951f85 100644 --- a/test/ziggurat/middleware/default_test.clj +++ b/test/ziggurat/middleware/default_test.clj @@ -1,10 +1,10 @@ (ns ziggurat.middleware.default-test (:require [clojure.test :refer :all] [flatland.protobuf.core :as proto] - [ziggurat.middleware.default :refer :all] - [ziggurat.fixtures :as fix] [sentry-clj.async :as sentry] - [ziggurat.metrics :as metrics]) + [ziggurat.fixtures :as fix] + [ziggurat.metrics :as metrics] + [ziggurat.middleware.default :refer :all]) (:import (flatland.protobuf.test Example$Photo))) (use-fixtures :once (join-fixtures [fix/mount-only-config @@ -24,14 +24,14 @@ ((protobuf->hash handler-fn proto-class topic-entity-name) proto-message) (is (true? @handler-fn-called?)))) (testing "When deserialisation fails, it reports to sentry, publishes metrics and passes nil to handler function" - (let [handler-fn-called? (atom false) + (let [handler-fn-called? (atom false) metric-reporter-called? (atom false) - topic-entity-name "test" - handler-fn (fn [msg] - (if (nil? msg) - (reset! handler-fn-called? true)))] - (with-redefs [metrics/multi-ns-increment-count (fn [_ _ _] - (reset! metric-reporter-called? true))] + topic-entity-name "test" + handler-fn (fn [msg] + (if (nil? msg) + (reset! handler-fn-called? true)))] + (with-redefs [metrics/increment-count (fn [_ _ _] + (reset! metric-reporter-called? true))] ((protobuf->hash handler-fn nil topic-entity-name) nil)) (is (true? @handler-fn-called?)) (is (true? @metric-reporter-called?))))