diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 1b22605b..ef447659 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -4,20 +4,19 @@ [mount.core :as mount :refer [defstate]] [schema.core :as s] [ziggurat.config :refer [ziggurat-config] :as config] - [ziggurat.metrics :as metrics] [ziggurat.messaging.connection :as messaging-connection] [ziggurat.messaging.consumer :as messaging-consumer] [ziggurat.messaging.producer :as messaging-producer] + [ziggurat.metrics :as metrics] [ziggurat.nrepl-server :as nrepl-server] + [ziggurat.producer :as producer :refer [kafka-producers]] [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.server :as server] - [ziggurat.streams :as streams] - [ziggurat.producer :as producer :refer [kafka-producers]])) + [ziggurat.streams :as streams])) (defstate statsd-reporter :start (metrics/start-statsd-reporter (:datadog (ziggurat-config)) - (:env (ziggurat-config)) - (:app-name (ziggurat-config))) + (:env (ziggurat-config))) :stop (metrics/stop-statsd-reporter statsd-reporter)) (defn- start* @@ -154,7 +153,7 @@ (s/validate StreamRoute stream-routes))) (defn validate-modes [modes] - (let [invalid-modes (filter #(not (contains? (set (keys valid-modes-fns)) %)) modes) + (let [invalid-modes (filter #(not (contains? (set (keys valid-modes-fns)) %)) modes) invalid-modes-count (count invalid-modes)] (when (pos? invalid-modes-count) (throw (ex-info "Wrong modes arguement passed - " {:invalid-modes invalid-modes}))))) @@ -187,4 +186,4 @@ (catch Exception e (log/error e) (stop stop-fn modes) - (System/exit 1))))) \ No newline at end of file + (System/exit 1))))) diff --git a/src/ziggurat/metrics.clj b/src/ziggurat/metrics.clj index 004b989e..a1a659c4 100644 --- a/src/ziggurat/metrics.clj +++ b/src/ziggurat/metrics.clj @@ -1,20 +1,19 @@ (ns ziggurat.metrics (:require [clojure.tools.logging :as log] - [clojure.walk :refer [stringify-keys]]) + [clojure.walk :refer [stringify-keys]] + [ziggurat.config :refer [ziggurat-config]]) (:import com.gojek.metrics.datadog.DatadogReporter [com.gojek.metrics.datadog.transport UdpTransport UdpTransport$Builder] [io.dropwizard.metrics5 Histogram Meter MetricName MetricRegistry] java.util.concurrent.TimeUnit)) -(defonce ^:private group (atom nil)) - (defonce metrics-registry (MetricRegistry.)) (defn- merge-tags [additional-tags] - (let [default-tags {"actor" @group}] - (merge default-tags (when (not (empty? additional-tags)) + (let [default-tags {"actor" (:app-name (ziggurat-config))}] + (merge default-tags (when-not (seq additional-tags) (stringify-keys additional-tags))))) (defn mk-meter @@ -47,10 +46,12 @@ (dissoc additional-tags (when (some #(= % topic-name) ns) :topic_name)))) (defn- inc-or-dec-count - [sign metric-namespaces metric additional-tags] - (let [metric-namespace (intercalate-dot metric-namespaces) - meter ^Meter (mk-meter metric-namespace metric (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] - (.mark meter (sign 1)))) + ([sign metric-namespace metric] + (inc-or-dec-count sign metric-namespace metric nil)) + ([sign metric-namespaces metric additional-tags] + (let [metric-namespace (intercalate-dot metric-namespaces) + meter ^Meter (mk-meter metric-namespace metric (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] + (.mark meter (sign 1))))) (def increment-count (partial inc-or-dec-count +)) @@ -61,16 +62,18 @@ (increment-count ns metric additional-tags))) (defn report-time - [metric-namespaces time-val additional-tags] - (let [metric-namespace (intercalate-dot metric-namespaces) - histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] - (.update histogram (int time-val)))) + ([metric-namespaces time-val] + (report-time metric-namespaces time-val nil)) + ([metric-namespaces time-val additional-tags] + (let [metric-namespace (intercalate-dot metric-namespaces) + histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] + (.update histogram (int time-val))))) (defn multi-ns-report-time [nss time-val additional-tags] (doseq [ns nss] (report-time ns time-val additional-tags))) -(defn start-statsd-reporter [statsd-config env app-name] +(defn start-statsd-reporter [statsd-config env] (let [{:keys [enabled host port]} statsd-config] (when enabled (let [transport (-> (UdpTransport$Builder.) @@ -84,7 +87,6 @@ (.build))] (log/info "Starting statsd reporter") (.start reporter 1 TimeUnit/SECONDS) - (reset! group app-name) {:reporter reporter :transport transport})))) (defn stop-statsd-reporter [datadog-reporter] diff --git a/test/ziggurat/metrics_test.clj b/test/ziggurat/metrics_test.clj index 45f69f79..8ae9c3ac 100644 --- a/test/ziggurat/metrics_test.clj +++ b/test/ziggurat/metrics_test.clj @@ -35,6 +35,20 @@ (is (= 1 (.getCount meter))) (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) (is (= metric (:metric @mk-meter-args)))))) + (testing "increases count on the meter - w/o additional-tags argument" + (let [expected-metric-namespaces [expected-topic-entity-name "metric-ns"] + mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags nil] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) + meter)] + (metrics/increment-count expected-metric-namespaces metric) + (is (= 1 (.getCount meter))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) (testing "increases count on the meter - without topic name on the namespace" (let [expected-metric-namespaces ["metric-ns"] mk-meter-args (atom nil) @@ -132,6 +146,21 @@ (is (= 1 (.getCount histogram))) (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args))) (is (= "all" (:metric @mk-histogram-args)))))) + (testing "updates time-val - w/o additional-tags argument" + (let [expected-metric-namespaces [expected-topic-entity-name "message-received-delay-histogram"] + mk-histogram-args (atom nil) + reservoir (UniformReservoir.) + histogram (Histogram. reservoir) + expected-additional-tags nil] + (with-redefs [metrics/mk-histogram (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-histogram-args {:metric-namespaces metric-namespaces + :metric metric}) + histogram)] + (metrics/report-time expected-metric-namespaces time-val) + (is (= 1 (.getCount histogram))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args))) + (is (= "all" (:metric @mk-histogram-args)))))) (testing "updates time-val - without topic name on the namespace" (let [expected-metric-namespaces ["message-received-delay-histogram"] mk-histogram-args (atom nil) diff --git a/test/ziggurat/producer_test.clj b/test/ziggurat/producer_test.clj index 2d29bad9..c4693f28 100644 --- a/test/ziggurat/producer_test.clj +++ b/test/ziggurat/producer_test.clj @@ -1,12 +1,13 @@ (ns ziggurat.producer-test - (:require [clojure.test :refer :all] - [ziggurat.streams :refer [start-streams stop-streams]] - [ziggurat.fixtures :as fix :refer [*producer-properties* *consumer-properties* *embedded-kafka-cluster*]] + (:require [clojure.string :refer [blank?]] + [clojure.test :refer :all] + [clojure.test.check.generators :as gen] [ziggurat.config :refer [ziggurat-config]] + [ziggurat.fixtures :as fix :refer [*producer-properties* *consumer-properties*]] [ziggurat.producer :refer [producer-properties-map send kafka-producers]] - [clojure.test.check.generators :as gen]) - (:import (org.apache.kafka.streams.integration.utils IntegrationTestUtils) - (org.apache.kafka.clients.producer KafkaProducer))) + [ziggurat.streams :refer [start-streams stop-streams]]) + (:import (org.apache.kafka.clients.producer KafkaProducer) + (org.apache.kafka.streams.integration.utils IntegrationTestUtils))) (use-fixtures :once fix/mount-only-config-and-producer) @@ -21,33 +22,30 @@ :enabled [true :bool]}}}}}) (deftest send-data-with-topic-and-value-test - (with-redefs - [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] - (let [topic (gen/generate gen/string-alphanumeric) - key "message" - value "Hello World!!"] - (.createTopic *embedded-kafka-cluster* topic) + (with-redefs [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] + (let [alphanum-gen (gen/such-that #(not (blank? %)) gen/string-alphanumeric) + topic (gen/generate alphanum-gen 10) + key "message" + value "Hello World!!"] (send :default topic key value) (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 1000)] (is (= value (.value (first result)))))))) (deftest send-data-with-topic-key-partition-and-value-test - (with-redefs - [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] - (let [topic (gen/generate gen/string-alphanumeric) - key "message" - value "Hello World!!" - partition (int 0)] - (.createTopic *embedded-kafka-cluster* topic) + (with-redefs [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] + (let [alphanum-gen (gen/such-that #(not (blank? %)) gen/string-alphanumeric) + topic (gen/generate alphanum-gen 10) + key "message" + value "Hello World!!" + partition (int 0)] (send :default topic partition key value) (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 1000)] (is (= value (.value (first result)))))))) (deftest send-throws-exception-when-no-producers-are-configured - (with-redefs - [kafka-producers {}] + (with-redefs [kafka-producers {}] (let [topic "test-topic" - key "message" + key "message" value "Hello World!! from non-existant Kafka Producers"] (is (not-empty (try (send :default topic key value) (catch Exception e (ex-data e)))))))) @@ -55,14 +53,10 @@ (deftest producer-properties-map-is-empty-if-no-producers-configured ; Here ziggurat-config has been substituted with a custom map which ; does not have any valid producer configs. - (with-redefs - [ziggurat-config stream-router-config-without-producer] + (with-redefs [ziggurat-config stream-router-config-without-producer] (is (empty? (producer-properties-map))))) (deftest producer-properties-map-is-not-empty-if-producers-are-configured ; Here the config is read from config.test.edn which contains ; valid producer configs. (is (seq (producer-properties-map)))) - - -