Skip to content

Commit

Permalink
Merge pull request #64 from macalimlim/fix-inc-dec-count
Browse files Browse the repository at this point in the history
Fix inc dec count
  • Loading branch information
theanirudhvyas committed Jun 28, 2019
2 parents 3056a4c + f99fb8f commit 2a14f7f
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 46 deletions.
13 changes: 6 additions & 7 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
[mount.core :as mount :refer [defstate]]
[schema.core :as s]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.metrics :as metrics]
[ziggurat.messaging.connection :as messaging-connection]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
[ziggurat.metrics :as metrics]
[ziggurat.nrepl-server :as nrepl-server]
[ziggurat.producer :as producer :refer [kafka-producers]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]
[ziggurat.producer :as producer :refer [kafka-producers]]))
[ziggurat.streams :as streams]))

(defstate statsd-reporter
:start (metrics/start-statsd-reporter (:datadog (ziggurat-config))
(:env (ziggurat-config))
(:app-name (ziggurat-config)))
(:env (ziggurat-config)))
:stop (metrics/stop-statsd-reporter statsd-reporter))

(defn- start*
Expand Down Expand Up @@ -154,7 +153,7 @@
(s/validate StreamRoute stream-routes)))

(defn validate-modes [modes]
(let [invalid-modes (filter #(not (contains? (set (keys valid-modes-fns)) %)) modes)
(let [invalid-modes (filter #(not (contains? (set (keys valid-modes-fns)) %)) modes)
invalid-modes-count (count invalid-modes)]
(when (pos? invalid-modes-count)
(throw (ex-info "Wrong modes arguement passed - " {:invalid-modes invalid-modes})))))
Expand Down Expand Up @@ -187,4 +186,4 @@
(catch Exception e
(log/error e)
(stop stop-fn modes)
(System/exit 1)))))
(System/exit 1)))))
30 changes: 16 additions & 14 deletions src/ziggurat/metrics.clj
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
(ns ziggurat.metrics
(:require [clojure.string :as str]
[clojure.tools.logging :as log]
[clojure.walk :refer [stringify-keys]])
[clojure.walk :refer [stringify-keys]]
[ziggurat.config :refer [ziggurat-config]])
(:import com.gojek.metrics.datadog.DatadogReporter
[com.gojek.metrics.datadog.transport UdpTransport UdpTransport$Builder]
[io.dropwizard.metrics5 Histogram Meter MetricName MetricRegistry]
java.util.concurrent.TimeUnit))

(defonce ^:private group (atom nil))

(defonce metrics-registry
(MetricRegistry.))

(defn- merge-tags
[additional-tags]
(let [default-tags {"actor" @group}]
(let [default-tags {"actor" (:app-name (ziggurat-config))}]
(merge default-tags (when-not (seq additional-tags)
(stringify-keys additional-tags)))))

Expand Down Expand Up @@ -48,10 +47,12 @@
(dissoc additional-tags (when (some #(= % topic-name) ns) :topic_name))))

(defn- inc-or-dec-count
[sign metric-namespaces metric additional-tags]
(let [metric-namespace (intercalate-dot metric-namespaces)
meter ^Meter (mk-meter metric-namespace metric (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))]
(.mark meter (sign 1))))
([sign metric-namespace metric]
(inc-or-dec-count sign metric-namespace metric nil))
([sign metric-namespaces metric additional-tags]
(let [metric-namespace (intercalate-dot metric-namespaces)
meter ^Meter (mk-meter metric-namespace metric (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))]
(.mark meter (sign 1)))))

(def increment-count (partial inc-or-dec-count +))

Expand All @@ -62,16 +63,18 @@
(increment-count ns metric additional-tags)))

(defn report-time
[metric-namespaces time-val additional-tags]
(let [metric-namespace (intercalate-dot metric-namespaces)
histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))]
(.update histogram (int time-val))))
([metric-namespaces time-val]
(report-time metric-namespaces time-val nil))
([metric-namespaces time-val additional-tags]
(let [metric-namespace (intercalate-dot metric-namespaces)
histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))]
(.update histogram (int time-val)))))

(defn multi-ns-report-time [nss time-val additional-tags]
(doseq [ns nss]
(report-time ns time-val additional-tags)))

(defn start-statsd-reporter [statsd-config env app-name]
(defn start-statsd-reporter [statsd-config env]
(let [{:keys [enabled host port]} statsd-config]
(when enabled
(let [transport (-> (UdpTransport$Builder.)
Expand All @@ -85,7 +88,6 @@
(.build))]
(log/info "Starting statsd reporter")
(.start reporter 1 TimeUnit/SECONDS)
(reset! group app-name)
{:reporter reporter :transport transport}))))

(defn stop-statsd-reporter [datadog-reporter]
Expand Down
29 changes: 29 additions & 0 deletions test/ziggurat/metrics_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@
(is (= 1 (.getCount meter)))
(is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args)))
(is (= metric (:metric @mk-meter-args))))))
(testing "increases count on the meter - w/o additional-tags argument"
(let [expected-metric-namespaces [expected-topic-entity-name "metric-ns"]
mk-meter-args (atom nil)
meter (Meter.)
expected-additional-tags nil]
(with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags]
(is (= additional-tags expected-additional-tags))
(reset! mk-meter-args {:metric-namespaces metric-namespaces
:metric metric})
meter)]
(metrics/increment-count expected-metric-namespaces metric)
(is (= 1 (.getCount meter)))
(is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args)))
(is (= metric (:metric @mk-meter-args))))))
(testing "increases count on the meter - without topic name on the namespace"
(let [expected-metric-namespaces ["metric-ns"]
mk-meter-args (atom nil)
Expand Down Expand Up @@ -132,6 +146,21 @@
(is (= 1 (.getCount histogram)))
(is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args)))
(is (= "all" (:metric @mk-histogram-args))))))
(testing "updates time-val - w/o additional-tags argument"
(let [expected-metric-namespaces [expected-topic-entity-name "message-received-delay-histogram"]
mk-histogram-args (atom nil)
reservoir (UniformReservoir.)
histogram (Histogram. reservoir)
expected-additional-tags nil]
(with-redefs [metrics/mk-histogram (fn [metric-namespaces metric additional-tags]
(is (= additional-tags expected-additional-tags))
(reset! mk-histogram-args {:metric-namespaces metric-namespaces
:metric metric})
histogram)]
(metrics/report-time expected-metric-namespaces time-val)
(is (= 1 (.getCount histogram)))
(is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args)))
(is (= "all" (:metric @mk-histogram-args))))))
(testing "updates time-val - without topic name on the namespace"
(let [expected-metric-namespaces ["message-received-delay-histogram"]
mk-histogram-args (atom nil)
Expand Down
46 changes: 21 additions & 25 deletions test/ziggurat/producer_test.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
(ns ziggurat.producer-test
(:require [clojure.test :refer :all]
[ziggurat.streams :refer [start-streams stop-streams]]
[ziggurat.fixtures :as fix :refer [*producer-properties* *consumer-properties*]]
(:require [clojure.string :refer [blank?]]
[clojure.test :refer :all]
[clojure.test.check.generators :as gen]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.fixtures :as fix :refer [*producer-properties* *consumer-properties*]]
[ziggurat.producer :refer [producer-properties-map send kafka-producers]]
[clojure.test.check.generators :as gen])
(:import (org.apache.kafka.streams.integration.utils IntegrationTestUtils)
(org.apache.kafka.clients.producer KafkaProducer)))
[ziggurat.streams :refer [start-streams stop-streams]])
(:import (org.apache.kafka.clients.producer KafkaProducer)
(org.apache.kafka.streams.integration.utils IntegrationTestUtils)))

(use-fixtures :once fix/mount-only-config-and-producer)

Expand All @@ -21,46 +22,41 @@
:enabled [true :bool]}}}}})

(deftest send-data-with-topic-and-value-test
(with-redefs
[kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))]
(let [topic (gen/generate gen/string-alphanumeric 10)
key "message"
value "Hello World!!"]
(with-redefs [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))]
(let [alphanum-gen (gen/such-that #(not (blank? %)) gen/string-alphanumeric)
topic (gen/generate alphanum-gen 10)
key "message"
value "Hello World!!"]
(send :default topic key value)
(let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 2000)]
(is (= value (.value (first result))))))))

(deftest send-data-with-topic-key-partition-and-value-test
(with-redefs
[kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))]
(let [topic (gen/generate gen/string-alphanumeric 10)
key "message"
value "Hello World!!"
partition (int 0)]
(with-redefs [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))]
(let [alphanum-gen (gen/such-that #(not (blank? %)) gen/string-alphanumeric)
topic (gen/generate alphanum-gen 10)
key "message"
value "Hello World!!"
partition (int 0)]
(send :default topic partition key value)
(let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 2000)]
(is (= value (.value (first result))))))))

(deftest send-throws-exception-when-no-producers-are-configured
(with-redefs
[kafka-producers {}]
(with-redefs [kafka-producers {}]
(let [topic "test-topic"
key "message"
key "message"
value "Hello World!! from non-existant Kafka Producers"]
(is (not-empty (try (send :default topic key value)
(catch Exception e (ex-data e))))))))

(deftest producer-properties-map-is-empty-if-no-producers-configured
; Here ziggurat-config has been substituted with a custom map which
; does not have any valid producer configs.
(with-redefs
[ziggurat-config stream-router-config-without-producer]
(with-redefs [ziggurat-config stream-router-config-without-producer]
(is (empty? (producer-properties-map)))))

(deftest producer-properties-map-is-not-empty-if-producers-are-configured
; Here the config is read from config.test.edn which contains
; valid producer configs.
(is (seq (producer-properties-map))))



0 comments on commit 2a14f7f

Please sign in to comment.