Skip to content

Commit

Permalink
remove the old metrics format from being sent
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed Jul 19, 2019
1 parent be27bb7 commit 88b707d
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 74 deletions.
8 changes: 3 additions & 5 deletions src/ziggurat/kafka_delay.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
([metric-namespaces record-timestamp]
(calculate-and-report-kafka-delay metric-namespaces record-timestamp nil))
([metric-namespaces record-timestamp additional-tags]
(let [now-millis (get-current-time-in-millis)
delay (- now-millis record-timestamp)
default-namespace (last metric-namespaces)
multi-namespaces [metric-namespaces [default-namespace]]]
(metrics/multi-ns-report-time multi-namespaces delay additional-tags))))
(let [now-millis (get-current-time-in-millis)
delay (- now-millis record-timestamp)]
(metrics/report-time metric-namespaces delay additional-tags))))
62 changes: 28 additions & 34 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,38 @@

(defn mapper-func [mapper-fn channels]
(fn [message-payload]
(let [service-name (:app-name (ziggurat-config))
topic-entity (:topic-entity message-payload)
(let [topic-entity (:topic-entity message-payload)
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
default-namespace "message-processing"
metric-namespaces [service-name topic-entity-name default-namespace]
additional-tags {:topic_name topic-entity-name}
default-namespaces [default-namespace]
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-namespaces [metric-namespaces default-namespaces]]
failure-metric "failure"]
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn (:message message-payload))
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
multi-execution-time-namespaces [[service-name topic-entity-name execution-time-namespace]
[execution-time-namespace]]]
(metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags)
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn (:message message-payload))
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
execution-time-namespaces [execution-time-namespace]]
(metrics/report-time execution-time-namespaces time-val additional-tags)
(case return-code
:success (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags)
:success (metrics/increment-count default-namespaces success-metric additional-tags)
:retry (do (metrics/increment-count default-namespaces retry-metric additional-tags)
(producer/retry message-payload))
:skip (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags)
:skip (metrics/increment-count default-namespaces skip-metric additional-tags)
:block 'TODO
(do
(send-msg-to-channel channels message-payload return-code)
(metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags))))
(metrics/increment-count default-namespaces success-metric additional-tags))))
(catch Throwable e
(producer/retry message-payload)
(sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name))
(metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags)))))))
(metrics/increment-count default-namespaces failure-metric additional-tags)))))))

(defn channel-mapper-func [mapper-fn channel]
(fn [message-payload]
Expand All @@ -68,33 +64,31 @@
success-metric "success"
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-namespaces [metric-namespaces default-namespaces]]
failure-metric "failure"]
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn (:message message-payload))
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
multi-execution-time-namespaces [(conj base-namespaces execution-time-namespace)
[execution-time-namespace]]]
(metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags)
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn (:message message-payload))
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
execution-time-namespaces [execution-time-namespace]]
(metrics/report-time execution-time-namespaces time-val additional-tags)
(case return-code
:success (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)
:retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags)
:success (metrics/increment-count default-namespaces success-metric additional-tags)
:retry (do (metrics/increment-count default-namespaces retry-metric additional-tags)
(producer/retry-for-channel message-payload channel))
:skip (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags)
:skip (metrics/increment-count default-namespaces skip-metric additional-tags)
:block 'TODO
(throw (ex-info "Invalid mapper return code" {:code return-code}))))
(catch Throwable e
(producer/retry-for-channel message-payload channel)
(sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
(metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags)))))))
(metrics/increment-count default-namespaces failure-metric additional-tags)))))))

(defrecord MessagePayload [message topic-entity])

(s/defschema message-payload-schema
{:message s/Any
:topic-entity s/Keyword
{:message s/Any
:topic-entity s/Keyword
(s/optional-key :retry-count) s/Int})
12 changes: 2 additions & 10 deletions src/ziggurat/metrics.clj
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
(dissoc additional-tags (when (some #(= % topic-name) ns) :topic_name))))

(defn- inc-or-dec-count
([sign metric-namespace metric]
(inc-or-dec-count sign metric-namespace metric nil))
([sign metric-namespaces metric]
(inc-or-dec-count sign metric-namespaces metric nil))
([sign metric-namespaces metric additional-tags]
(let [metric-namespace (intercalate-dot metric-namespaces)
meter ^Meter (mk-meter metric-namespace metric (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))]
Expand All @@ -61,10 +61,6 @@

(def decrement-count (partial inc-or-dec-count -))

(defn multi-ns-increment-count [nss metric additional-tags]
(doseq [ns nss]
(increment-count ns metric additional-tags)))

(defn report-time
([metric-namespaces time-val]
(report-time metric-namespaces time-val nil))
Expand All @@ -73,10 +69,6 @@
histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))]
(.update histogram (int time-val)))))

(defn multi-ns-report-time [nss time-val additional-tags]
(doseq [ns nss]
(report-time ns time-val additional-tags)))

(defn start-statsd-reporter [statsd-config env]
(let [{:keys [enabled host port]} statsd-config]
(when enabled
Expand Down
15 changes: 7 additions & 8 deletions src/ziggurat/middleware/default.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
(ns ziggurat.middleware.default
(:require [flatland.protobuf.core :as proto]
[sentry-clj.async :as sentry]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.metrics :as metrics]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.config :refer [ziggurat-config]]))
[ziggurat.sentry :refer [sentry-reporter]]))

(defn- deserialise-message
"This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a
Expand All @@ -22,13 +22,12 @@
keys)]
(select-keys loaded-proto proto-keys))
(catch Throwable e
(let [service-name (:app-name (ziggurat-config))
additional-tags {:topic_name topic-entity-name}
default-namespace "message-parsing"
metric-namespaces [service-name "message-parsing"]
multi-namespaces [metric-namespaces [default-namespace]]]
(let [service-name (:app-name (ziggurat-config))
additional-tags {:topic_name topic-entity-name}
default-namespace "message-parsing"
default-namespaces [default-namespace]]
(sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class))
(metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags)
(metrics/increment-count default-namespaces "failed" additional-tags)
nil)))
message))

Expand Down
12 changes: 5 additions & 7 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,12 @@
(set-upgrade-from-config upgrade-from)))

(defn- log-and-report-metrics [topic-entity message]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
(let [topic-entity-name (name topic-entity)
additional-tags {:topic_name topic-entity-name}
default-namespace "message"
metric-namespaces [service-name topic-entity-name default-namespace]
default-namespaces [default-namespace]
metric "read"
multi-namespaces [metric-namespaces default-namespaces]]
(metrics/multi-ns-increment-count multi-namespaces metric additional-tags))
metric "read"]
(metrics/increment-count default-namespaces metric additional-tags))
message)

(defn store-supplier-builder []
Expand All @@ -92,7 +89,8 @@

(defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
(let [service-name (:app-name (ziggurat-config))
metric-namespaces [service-name topic-entity-name "message-received-delay-histogram"]
metric-namespace "message-received-delay-histogram"
metric-namespaces [metric-namespace]
additional-tags {:topic_name topic-entity-name}]
(.transform stream-builder (transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))]))))

Expand Down
20 changes: 10 additions & 10 deletions test/ziggurat/middleware/default_test.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
(ns ziggurat.middleware.default-test
(:require [clojure.test :refer :all]
[flatland.protobuf.core :as proto]
[ziggurat.middleware.default :refer :all]
[ziggurat.fixtures :as fix]
[sentry-clj.async :as sentry]
[ziggurat.metrics :as metrics])
[ziggurat.fixtures :as fix]
[ziggurat.metrics :as metrics]
[ziggurat.middleware.default :refer :all])
(:import (flatland.protobuf.test Example$Photo)))

(use-fixtures :once (join-fixtures [fix/mount-only-config
Expand All @@ -24,14 +24,14 @@
((protobuf->hash handler-fn proto-class topic-entity-name) proto-message)
(is (true? @handler-fn-called?))))
(testing "When deserialisation fails, it reports to sentry, publishes metrics and passes nil to handler function"
(let [handler-fn-called? (atom false)
(let [handler-fn-called? (atom false)
metric-reporter-called? (atom false)
topic-entity-name "test"
handler-fn (fn [msg]
(if (nil? msg)
(reset! handler-fn-called? true)))]
(with-redefs [metrics/multi-ns-increment-count (fn [_ _ _]
(reset! metric-reporter-called? true))]
topic-entity-name "test"
handler-fn (fn [msg]
(if (nil? msg)
(reset! handler-fn-called? true)))]
(with-redefs [metrics/increment-count (fn [_ _ _]
(reset! metric-reporter-called? true))]
((protobuf->hash handler-fn nil topic-entity-name) nil))
(is (true? @handler-fn-called?))
(is (true? @metric-reporter-called?))))
Expand Down

0 comments on commit 88b707d

Please sign in to comment.