From 278128653106e174cdbe255ce0f26aa85d419091 Mon Sep 17 00:00:00 2001 From: Kartik Gupta Date: Wed, 15 May 2019 15:20:05 +0530 Subject: [PATCH 01/12] Enables pushing to clojars from 2.x branch --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.travis.yml b/.travis.yml index 804610eb..de6d6838 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,3 +20,6 @@ jobs: - stage: deploy script: lein deploy clojars + branches: + only: + - 2.x From b20e46dd54f9d0801d6239f4f5139f85eb6ce8b1 Mon Sep 17 00:00:00 2001 From: Kartik Gupta Date: Thu, 30 May 2019 11:14:29 +0530 Subject: [PATCH 02/12] Running ziggurat in different modes (#46) * Extracts out states per mode * Refactors initialize of states Co-authored-by: Saptanto * Starts and stops the state according to modes passed in main Co-authored-by: Saptanto * Refactors validates modes Co-authored-by: Saptanto * Makes sure that api-server is called management-api-server Co-authored-by: Saptanto * Throws a proper exception when invalid mode is passed Co-authored-by: Saptanto * Removed unused code and imports Co-authored-by: Saptanto * Makes start fn and stop fn vector as a map * Uses ex-info instead of IllegalArgumentException * Converts string literals to keywords for modes --- src/ziggurat/init.clj | 128 +++++++++++++++++++++++------ src/ziggurat/resource/dead_set.clj | 8 +- src/ziggurat/retry.clj | 1 - src/ziggurat/server/middleware.clj | 2 +- test/ziggurat/init_test.clj | 79 ++++++++++++------ 5 files changed, 160 insertions(+), 58 deletions(-) diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index eb11c757..12d4e598 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -3,7 +3,6 @@ (:require [clojure.tools.logging :as log] [mount.core :as mount :refer [defstate]] [schema.core :as s] - [sentry-clj.async :as sentry] [ziggurat.config :refer [ziggurat-config] :as config] [ziggurat.metrics :as metrics] [ziggurat.messaging.connection :as messaging-connection] @@ -28,38 +27,101 @@ (mount/with-args args) (mount/start)))) -(defn start - "Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc" - [actor-start-fn stream-routes actor-routes] +(defn- start-rabbitmq-connection [args] + (start* #{#'messaging-connection/connection} args)) + +(defn- start-rabbitmq-consumers [args] + (start-rabbitmq-connection args) + (messaging-consumer/start-subscribers (get args :stream-routes))) + +(defn- start-rabbitmq-producers [args] + (start-rabbitmq-connection args) + (messaging-producer/make-queues (get args :stream-routes))) + +(defn start-stream [args] + (start-rabbitmq-producers args) + (start* #{#'streams/stream} args)) + +(defn start-management-apis [args] + (start-rabbitmq-connection args) + (start* #{#'server/server} (dissoc args :actor-routes))) + +(defn start-server [args] + (start-rabbitmq-connection args) + (start* #{#'server/server} args)) + +(defn start-workers [args] + (start-rabbitmq-producers args) + (start-rabbitmq-consumers args)) + +(defn- stop-rabbitmq-connection [] + (mount/stop #'messaging-connection/connection)) + +(defn stop-workers [] + (stop-rabbitmq-connection)) + +(defn stop-server [] + (mount/stop #'server/server) + (stop-rabbitmq-connection)) + +(defn stop-stream [] + (mount/stop #'streams/stream) + (stop-rabbitmq-connection)) + +(defn stop-management-apis [] + (mount/stop #'server/server) + (stop-rabbitmq-connection)) + +(def valid-modes-fns + {:api-server {:start-fn start-server :stop-fn stop-server} + :stream-worker {:start-fn start-stream :stop-fn stop-stream} + :worker {:start-fn start-workers :stop-fn stop-workers} + :management-api {:start-fn start-management-apis :stop-fn stop-management-apis}}) + +(defn- execute-function + ([modes fnk] + (execute-function modes fnk nil)) + ([modes fnk args] + (doseq [mode (-> modes + (or (keys valid-modes-fns)) + sort)] + (if (nil? args) + ((fnk (get valid-modes-fns mode))) + ((fnk (get valid-modes-fns mode)) args))))) + +(defn start-common-states [] (start* #{#'config/config #'statsd-reporter - #'sentry-reporter}) - (actor-start-fn) - (start* #{#'messaging-connection/connection} {:stream-routes stream-routes}) - (messaging-producer/make-queues stream-routes) - (messaging-consumer/start-subscribers stream-routes) ;; We want subscribers to start after creating queues on RabbitMQ. - (start* #{#'server/server - #'nrepl-server/server - #'streams/stream} - {:stream-routes stream-routes - :actor-routes actor-routes})) + #'sentry-reporter + #'nrepl-server/server})) -(defn stop - "Calls the Ziggurat's state stop fns and then actor-stop-fn." - [actor-stop-fn] +(defn stop-common-states [] (mount/stop #'config/config #'statsd-reporter #'messaging-connection/connection - #'server/server - #'nrepl-server/server - #'streams/stream) - (actor-stop-fn) - (mount/stop #'config/config)) + #'nrepl-server/server)) + +(defn start + "Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc" + [actor-start-fn stream-routes actor-routes modes] + (start-common-states) + (actor-start-fn) + (execute-function modes + :start-fn + {:actor-routes actor-routes + :stream-routes stream-routes})) -(defn- add-shutdown-hook [actor-stop-fn] +(defn stop + "Calls the Ziggurat's state stop fns and then actor-stop-fn." + [actor-stop-fn modes] + (stop-common-states) + (execute-function modes :stop-fn) + (actor-stop-fn)) + +(defn- add-shutdown-hook [actor-stop-fn modes] (.addShutdownHook (Runtime/getRuntime) - (Thread. ^Runnable #(do (stop actor-stop-fn) + (Thread. ^Runnable #(do (stop actor-stop-fn modes) (shutdown-agents)) "Shutdown-handler"))) @@ -73,6 +135,12 @@ (defn validate-stream-routes [stream-routes] (s/validate StreamRoute stream-routes)) +(defn validate-modes [modes] + (let [invalid-modes (filter #(not (contains? (set (keys valid-modes-fns)) %)) modes) + invalid-modes-count (count invalid-modes)] + (when (pos? invalid-modes-count) + (throw (ex-info "Wrong modes arguement passed - " {:invalid-modes invalid-modes}))))) + (defn main "The entry point for your application. @@ -88,11 +156,17 @@ ([start-fn stop-fn stream-routes] (main start-fn stop-fn stream-routes [])) ([start-fn stop-fn stream-routes actor-routes] + (main {:start-fn start-fn :stop-fn stop-fn :stream-routes stream-routes :actor-routes actor-routes})) + ([{:keys [start-fn stop-fn stream-routes actor-routes modes]}] (try + (validate-modes modes) (validate-stream-routes stream-routes) - (add-shutdown-hook stop-fn) - (start start-fn stream-routes actor-routes) + (add-shutdown-hook stop-fn modes) + (start start-fn stream-routes actor-routes modes) + (catch clojure.lang.ExceptionInfo e + (log/error e) + (System/exit 1)) (catch Exception e (log/error e) - (stop stop-fn) + (stop stop-fn modes) (System/exit 1))))) diff --git a/src/ziggurat/resource/dead_set.clj b/src/ziggurat/resource/dead_set.clj index 06bad964..5179ac9d 100644 --- a/src/ziggurat/resource/dead_set.clj +++ b/src/ziggurat/resource/dead_set.clj @@ -1,9 +1,9 @@ (ns ziggurat.resource.dead-set - (:require [schema.core :as s] - [clojure.tools.logging :as log] + (:require [clojure.tools.logging :as log] + [mount.core :as mount] + [schema.core :as s] [ziggurat.config :refer [get-in-config channel-retry-config]] - [ziggurat.messaging.dead-set :as r] - [mount.core :as mount])) + [ziggurat.messaging.dead-set :as r])) (def not-found-for-retry {:status 404 :body {:error "Retry is not enabled"}}) diff --git a/src/ziggurat/retry.clj b/src/ziggurat/retry.clj index 230998a3..ec2349c9 100644 --- a/src/ziggurat/retry.clj +++ b/src/ziggurat/retry.clj @@ -1,6 +1,5 @@ (ns ziggurat.retry (:require [clojure.set :as set] - [sentry-clj.async :as sentry] [ziggurat.sentry :refer [sentry-reporter]])) (def default-wait 100) diff --git a/src/ziggurat/server/middleware.clj b/src/ziggurat/server/middleware.clj index c5aa4e93..40abc29c 100644 --- a/src/ziggurat/server/middleware.clj +++ b/src/ziggurat/server/middleware.clj @@ -18,7 +18,7 @@ (defn wrap-hyphenate [handler & args] (fn [request] - (let [{:keys [skip-hyphenation] :as response} + (let [response (handler (update request :params #(umap/nested-map-keys (fn [k] (apply csk/->kebab-case-keyword k args)) %)))] diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index 3b91596d..4097e4b8 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -1,6 +1,5 @@ (ns ziggurat.init-test (:require [clojure.test :refer :all] - [clojure.tools.logging :as log] [ziggurat.config :as config] [ziggurat.init :as init] [ziggurat.messaging.connection :as rmqc] @@ -17,8 +16,8 @@ rmqc/start-connection (fn [] (reset! result (* @result 2))) rmqc/stop-connection (constantly nil) config/config-file "config.test.edn"] - (init/start #(reset! result (+ @result 3)) {} []) - (init/stop #()) + (init/start #(reset! result (+ @result 3)) {} [] nil) + (init/stop #() nil) (is (= 16 @result)))))) (deftest stop-calls-actor-stop-fn-test @@ -27,46 +26,57 @@ (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (fn [_] (reset! result (* @result 2))) config/config-file "config.test.edn"] - (init/start #() {} []) - (init/stop #(reset! result (+ @result 3))) + (init/start #() {} [] nil) + (init/stop #(reset! result (+ @result 3)) nil) + (is (= 5 @result)))))) + +(deftest stop-calls-idempotentcy-test + (testing "The stop function should be idempotent" + (let [result (atom 1)] + (with-redefs [streams/start-streams (constantly nil) + streams/stop-streams (constantly nil) + rmqc/stop-connection (fn [_] (reset! result (* @result 2))) + config/config-file "config.test.edn"] + (init/start #() {} [] nil) + (init/stop #(reset! result (+ @result 3)) nil) (is (= 5 @result)))))) (deftest start-calls-make-queues-test (testing "Start calls make queues" - (let [make-queues-called (atom false) + (let [make-queues-called (atom 0) expected-stream-routes {:default {:handler-fn #()}}] (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (constantly nil) messaging-producer/make-queues (fn [stream-routes] - (swap! make-queues-called not) + (swap! make-queues-called + 1) (is (= stream-routes expected-stream-routes))) messaging-consumer/start-subscribers (constantly nil) config/config-file "config.test.edn"] - (init/start #() expected-stream-routes []) - (init/stop #()) - (is @make-queues-called))))) + (init/start #() expected-stream-routes [] nil) + (init/stop #() nil) + (is (= 2 @make-queues-called)))))) (deftest start-calls-start-subscribers-test (testing "Start calls start subscribers" - (let [start-subscriber-called (atom false) + (let [start-subscriber-called (atom 0) expected-stream-routes {:default {:handler-fn #()}}] (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (constantly nil) messaging-consumer/start-subscribers (fn [stream-routes] - (swap! start-subscriber-called not) + (swap! start-subscriber-called + 1) (is (= stream-routes expected-stream-routes))) messaging-producer/make-queues (constantly nil) config/config-file "config.test.edn"] - (init/start #() expected-stream-routes []) - (init/stop #()) - (is @start-subscriber-called))))) + (init/start #() expected-stream-routes [] nil) + (init/stop #() nil) + (is (= 1 @start-subscriber-called)))))) (deftest main-test (testing "Main function should call start" (let [start-was-called (atom false) expected-stream-routes {:default {:handler-fn #(constantly nil)}}] - (with-redefs [init/add-shutdown-hook (fn [_] (constantly nil)) - init/start (fn [_ stream-router _] + (with-redefs [init/add-shutdown-hook (fn [_ _] (constantly nil)) + init/start (fn [_ stream-router _ _] (swap! start-was-called not) (is (= expected-stream-routes stream-router)))] (init/main #() #() expected-stream-routes) @@ -101,11 +111,24 @@ streams/stop-streams (constantly nil) config/config-file "config.test.edn"] (init/start #() {} [["test-ping" (fn [_request] {:status 200 - :body "pong"})]]) + :body "pong"})]] nil) + (let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false) + status-actor status + {:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)] + (init/stop #() nil) + (is (= 200 status-actor)) + (is (= 200 status))))) + + (testing "Deadset management and server api modes should run both actor and deadset management routes" + (with-redefs [streams/start-streams (constantly nil) + streams/stop-streams (constantly nil) + config/config-file "config.test.edn"] + (init/start #() {} [["test-ping" (fn [_request] {:status 200 + :body "pong"})]] [:management-api :api-server]) (let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false) status-actor status {:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)] - (init/stop #()) + (init/stop #() nil) (is (= 200 status-actor)) (is (= 200 status))))) @@ -113,16 +136,22 @@ (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (constantly nil) config/config-file "config.test.edn"] - (init/start #() {} []) - (let [{:keys [status body] :as response} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)] - (init/stop #()) + (init/start #() {} [] nil) + (let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)] + (init/stop #() nil) (is (= 404 status))))) (testing "The ziggurat routes should work fine when actor routes are not provided" (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (constantly nil) config/config-file "config.test.edn"] - (init/start #() {} []) - (let [{:keys [status body] :as response} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)] - (init/stop #()) + (init/start #() {} [] nil) + (let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)] + (init/stop #() nil) (is (= 200 status)))))) + +(deftest validate-modes-test + (testing "Validate modes should raise exception if modes have any invalid element" + (let [modes [:invalid-modes :api-server :second-invalid]] + (is (thrown? clojure.lang.ExceptionInfo (init/validate-modes modes)))))) + From dad2f74f1d397e62508981da5842e5fa30d8c707 Mon Sep 17 00:00:00 2001 From: Kartik Gupta Date: Thu, 30 May 2019 15:11:04 +0530 Subject: [PATCH 03/12] Adds readme for modes --- README.md | 18 ++++++++++++++++++ src/ziggurat/init.clj | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1a147653..eb66dcf5 100644 --- a/README.md +++ b/README.md @@ -117,7 +117,25 @@ To start a stream (a thread that reads messages from Kafka), add this to your co Ziggurat also sets up a HTTP server by default and you can pass in your own routes that it will serve. The above example demonstrates how you can pass in your own route. +or +```clojure +(ziggurat/main {:start-fn start-fn + :stop-fn stop-fn + :stream-routes {:stream-id {:handler-fn main-fn}} + :actor-routes [] + :modes [:api-server]}) +``` + +There are four modes supported by ziggurat +``` + :api-server - Mode by which only server will be started with actor routes and management routes(Dead set management) + :stream-worker - Only start the server plus rabbitmq for only producing the messages for retry and channels + :worker - Starts the rabbitmq consumer for retry and channel + :management-api - Servers only routes which used for deadset management +``` +You can pass in multiple modes and it will start accordingly +If nothing passed to modes then it will start all the modes. ## Configuration diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 12d4e598..a3f3a115 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -169,4 +169,4 @@ (catch Exception e (log/error e) (stop stop-fn modes) - (System/exit 1))))) + (System/exit 1))))) \ No newline at end of file From 0c26ff753dd73f3e721c0889ba92821bc700cbea Mon Sep 17 00:00:00 2001 From: Kartik Gupta Date: Fri, 31 May 2019 10:11:04 +0530 Subject: [PATCH 04/12] Adds one more example in the readme --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index eb66dcf5..35249902 100644 --- a/README.md +++ b/README.md @@ -122,10 +122,12 @@ or (ziggurat/main {:start-fn start-fn :stop-fn stop-fn :stream-routes {:stream-id {:handler-fn main-fn}} - :actor-routes [] - :modes [:api-server]}) + :actor-routes routes + :modes [:api-server :stream-worker]}) ``` +This will start both api-server and stream-worker modes + There are four modes supported by ziggurat ``` :api-server - Mode by which only server will be started with actor routes and management routes(Dead set management) From 4616e81987b0ed506ba87911cb944220cd5a96ba Mon Sep 17 00:00:00 2001 From: Kartik Gupta Date: Fri, 31 May 2019 10:15:18 +0530 Subject: [PATCH 05/12] Adds changelog for 2.11.0 --- CHANGELOG.md | 3 +++ project.clj | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fd28f88..e41125f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ All notable changes to this project will be documented in this file. This change ## Unreleased Changes +## 2.11.0 - 2019-05-31 +- Running ziggurat in different modes (#46) + ## 2.10.2 - 2019-05-03 - Adds config to change the changelog topic replication factor diff --git a/project.clj b/project.clj index 97cee60a..e6d6dfcf 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject tech.gojek/ziggurat "2.10.2" +(defproject tech.gojek/ziggurat "2.11.0" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0" From 944b62441c3bd4dbaf460cdf600b7c9b26285ead Mon Sep 17 00:00:00 2001 From: msbond Date: Tue, 4 Jun 2019 16:22:45 +0530 Subject: [PATCH 06/12] Actor stop fn should stop before the ziggurat state (#53) --- src/ziggurat/init.clj | 4 ++-- test/ziggurat/init_test.clj | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index a3f3a115..1fb5e741 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -114,9 +114,9 @@ (defn stop "Calls the Ziggurat's state stop fns and then actor-stop-fn." [actor-stop-fn modes] + (actor-stop-fn) (stop-common-states) - (execute-function modes :stop-fn) - (actor-stop-fn)) + (execute-function modes :stop-fn)) (defn- add-shutdown-hook [actor-stop-fn modes] (.addShutdownHook diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index 4097e4b8..497d30c5 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -21,14 +21,14 @@ (is (= 16 @result)))))) (deftest stop-calls-actor-stop-fn-test - (testing "The actor stop fn stops after the ziggurat state" + (testing "The actor stop fn stops before the ziggurat state" (let [result (atom 1)] (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (fn [_] (reset! result (* @result 2))) config/config-file "config.test.edn"] (init/start #() {} [] nil) (init/stop #(reset! result (+ @result 3)) nil) - (is (= 5 @result)))))) + (is (= 8 @result)))))) (deftest stop-calls-idempotentcy-test (testing "The stop function should be idempotent" @@ -39,7 +39,7 @@ config/config-file "config.test.edn"] (init/start #() {} [] nil) (init/stop #(reset! result (+ @result 3)) nil) - (is (= 5 @result)))))) + (is (= 8 @result)))))) (deftest start-calls-make-queues-test (testing "Start calls make queues" From 317f352e48ce2fe7e90c48cb4fbae57402f48b98 Mon Sep 17 00:00:00 2001 From: Kartik Gupta Date: Tue, 4 Jun 2019 16:25:35 +0530 Subject: [PATCH 07/12] Adds changelog for 2.11.1 --- CHANGELOG.md | 3 +++ project.clj | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e41125f7..27ab7c09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ All notable changes to this project will be documented in this file. This change ## Unreleased Changes +## 2.11.1 - 2019-06-04 +- Actor stop fn should stop before the Ziggurat state (#53) + ## 2.11.0 - 2019-05-31 - Running ziggurat in different modes (#46) diff --git a/project.clj b/project.clj index e6d6dfcf..4a685bf7 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject tech.gojek/ziggurat "2.11.0" +(defproject tech.gojek/ziggurat "2.11.1" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0" From 53a9fe96194919e1c4c85452c03b8a17ebb7fccb Mon Sep 17 00:00:00 2001 From: Michael Angelo Calimlim Date: Mon, 27 May 2019 09:57:34 +0800 Subject: [PATCH 08/12] add support for providing a topic-name label in the metrics --- src/ziggurat/kafka_delay.clj | 15 ++- src/ziggurat/mapper.clj | 79 +++++++---- src/ziggurat/metrics.clj | 84 ++++++++---- src/ziggurat/streams.clj | 73 +++++----- src/ziggurat/timestamp_transformer.clj | 11 +- test/ziggurat/kafka_delay_test.clj | 35 +++-- test/ziggurat/mapper_test.clj | 132 +++++++++++-------- test/ziggurat/metrics_test.clj | 104 ++++++++++----- test/ziggurat/timestamp_transformer_test.clj | 67 ++++++---- 9 files changed, 375 insertions(+), 225 deletions(-) diff --git a/src/ziggurat/kafka_delay.clj b/src/ziggurat/kafka_delay.clj index eb2582aa..3fae5bdc 100644 --- a/src/ziggurat/kafka_delay.clj +++ b/src/ziggurat/kafka_delay.clj @@ -2,9 +2,12 @@ (:require [ziggurat.metrics :as metrics] [ziggurat.util.time :refer :all])) -(defn calculate-and-report-kafka-delay [metric-namespace record-timestamp] - (let [now-millis (get-current-time-in-millis) - delay (- now-millis - record-timestamp)] - (metrics/report-time metric-namespace delay))) - +(defn calculate-and-report-kafka-delay + ([metric-namespaces record-timestamp] + (calculate-and-report-kafka-delay metric-namespaces record-timestamp nil)) + ([metric-namespaces record-timestamp additional-tags] + (let [now-millis (get-current-time-in-millis) + delay (- now-millis record-timestamp) + default-namespace (last metric-namespaces) + multi-namespaces [metric-namespaces [default-namespace]]] + (metrics/multi-ns-report-time multi-namespaces delay additional-tags)))) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 808380f8..5de1703f 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -1,7 +1,7 @@ (ns ziggurat.mapper (:require [sentry-clj.async :as sentry] - [ziggurat.metrics :as metrics] [ziggurat.messaging.producer :as producer] + [ziggurat.metrics :as metrics] [ziggurat.new-relic :as nr] [ziggurat.sentry :refer [sentry-reporter]]) (:import (java.time Instant))) @@ -15,47 +15,72 @@ (fn [message] (let [topic-entity-name (name topic-entity) new-relic-transaction-name (str topic-entity-name ".handler-fn") - metric-namespace (str topic-entity-name ".message-processing")] + default-namespace "message-processing" + metric-namespaces [topic-entity-name default-namespace] + additional-tags {:topic-name topic-entity-name} + default-namespaces [default-namespace] + success-metric "success" + retry-metric "retry" + skip-metric "skip" + failure-metric "failure" + multi-namespaces [metric-namespaces default-namespaces]] (nr/with-tracing "job" new-relic-transaction-name (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn message) - end-time (.toEpochMilli (Instant/now))] - (metrics/report-time (str topic-entity-name ".handler-fn-execution-time") (- end-time start-time)) + (let [start-time (.toEpochMilli (Instant/now)) + return-code (mapper-fn message) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "handler-fn-execution-time" + multi-execution-time-namespaces [[topic-entity-name execution-time-namespace] + [execution-time-namespace]]] + (metrics/multi-ns-report-time multi-execution-time-namespaces time-val) (case return-code - :success (metrics/increment-count metric-namespace "success") - :retry (do (metrics/increment-count metric-namespace "retry") - (producer/retry message topic-entity)) - :skip (metrics/increment-count metric-namespace "skip") - :block 'TODO + :success (do (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)) + :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) + (producer/retry message topic-entity)) + :skip (do (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags)) + :block 'TODO (do (send-msg-to-channel channels message topic-entity return-code) - (metrics/increment-count metric-namespace "success")))) + (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)))) (catch Throwable e (producer/retry message topic-entity) (sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name)) - (metrics/increment-count metric-namespace "failure"))))))) + (metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags))))))) (defn channel-mapper-func [mapper-fn topic-entity channel] (fn [message] - (let [topic-entity-name (name topic-entity) - channel-name (name channel) - metric-namespace (str topic-entity-name "." channel-name) - message-processing-namespace (str metric-namespace ".message-processing")] + (let [topic-entity-name (name topic-entity) + channel-name (name channel) + default-namespace "message-processing" + base-namespaces [topic-entity-name channel-name] + metric-namespaces (conj base-namespaces default-namespace) + additional-tags {:topic-name topic-entity-name} + default-namespaces [default-namespace] + metric-namespace (apply str (interpose "." metric-namespaces)) + success-metric "success" + retry-metric "retry" + skip-metric "skip" + failure-metric "failure" + multi-namespaces [metric-namespaces default-namespaces]] (nr/with-tracing "job" metric-namespace (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn message) - end-time (.toEpochMilli (Instant/now))] - (metrics/report-time (str metric-namespace ".execution-time") (- end-time start-time)) + (let [start-time (.toEpochMilli (Instant/now)) + return-code (mapper-fn message) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "execution-time" + multi-execution-time-namespaces [(conj base-namespaces execution-time-namespace) + [execution-time-namespace]]] + (metrics/multi-ns-report-time multi-execution-time-namespaces time-val) (case return-code - :success (metrics/increment-count message-processing-namespace "success") - :retry (do (metrics/increment-count message-processing-namespace "retry") - (producer/retry-for-channel message topic-entity channel)) - :skip (metrics/increment-count message-processing-namespace "skip") - :block 'TODO + :success (do (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)) + :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) + (producer/retry-for-channel message topic-entity channel)) + :skip (do (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags)) + :block 'TODO (throw (ex-info "Invalid mapper return code" {:code return-code})))) (catch Throwable e (producer/retry-for-channel message topic-entity channel) (sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) - (metrics/increment-count message-processing-namespace "failure"))))))) \ No newline at end of file + (metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags))))))) diff --git a/src/ziggurat/metrics.clj b/src/ziggurat/metrics.clj index c3373800..df69bfc7 100644 --- a/src/ziggurat/metrics.clj +++ b/src/ziggurat/metrics.clj @@ -1,45 +1,71 @@ (ns ziggurat.metrics - (:require [clojure.tools.logging :as log]) + (:require [clojure.tools.logging :as log] + [clojure.walk :refer [stringify-keys]]) (:import (com.gojek.metrics.datadog DatadogReporter) (com.gojek.metrics.datadog.transport UdpTransport$Builder UdpTransport) - (java.util.concurrent TimeUnit) - (io.dropwizard.metrics5 MetricRegistry Meter MetricName Histogram))) + (io.dropwizard.metrics5 MetricRegistry Meter MetricName Histogram) + (java.util.concurrent TimeUnit))) (defonce ^:private group (atom nil)) (defonce metrics-registry (MetricRegistry.)) +(defn- merge-tags + [additional-tags] + (let [default-tags {"actor" @group}] + (merge default-tags (when (not (empty? additional-tags)) + (stringify-keys additional-tags))))) + (defn mk-meter - [category metric] - (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) - tagged-metric (.tagged ^MetricName metric-name ^"[Ljava.lang.String;" (into-array String ["actor" @group]))] - (.meter ^MetricRegistry metrics-registry ^MetricName tagged-metric))) + ([category metric] + (mk-meter category metric nil)) + ([category metric additional-tags] + (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) + tags (merge-tags additional-tags) + tagged-metric (.tagged ^MetricName metric-name tags)] + (.meter ^MetricRegistry metrics-registry ^MetricName tagged-metric)))) (defn mk-histogram - [category metric] - (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) - tagged-metric (.tagged ^MetricName metric-name ^"[Ljava.lang.String;" (into-array String ["actor" @group]))] - (.histogram ^MetricRegistry metrics-registry tagged-metric))) + ([category metric] + (mk-histogram category metric nil)) + ([category metric additional-tags] + (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) + tags (merge-tags additional-tags) + tagged-metric (.tagged ^MetricName metric-name tags)] + (.histogram ^MetricRegistry metrics-registry tagged-metric)))) + +(defn- intercalate-dot + [names] + (apply str (interpose "." names))) + +(defn- inc-or-dec-count + [sign metric-namespaces metric additional-tags] + (let [metric-namespace (intercalate-dot metric-namespaces) + meter ^Meter (mk-meter metric-namespace metric additional-tags)] + (.mark meter (sign 1)))) -(defn increment-count - ([metric-namespace metric] - (increment-count metric-namespace metric 1)) - ([metric-namespace metric n] - (let [meter ^Meter (mk-meter metric-namespace metric)] - (.mark meter (int n))))) +(def increment-count (partial inc-or-dec-count +)) -(defn decrement-count - ([metric-namespace metric] - (decrement-count metric-namespace metric 1)) - ([metric-namespace metric n] - (let [meter ^Meter (mk-meter metric-namespace metric)] - (.mark meter (int (- n)))))) +(def decrement-count (partial inc-or-dec-count -)) -(defn report-time [metric-namespace time-val] - (let [histogram ^Histogram (mk-histogram metric-namespace "all")] +(defn multi-ns-increment-count [nss metric additional-tags] + (doseq [ns nss] + (increment-count ns metric additional-tags))) + +(defn report-time + [metric-namespaces time-val additional-tags] + (let [metric-namespace (intercalate-dot metric-namespaces) + histogram ^Histogram (mk-histogram metric-namespace "all" additional-tags)] (.update histogram (int time-val)))) +(defn multi-ns-report-time + ([nss time-val] + (multi-ns-report-time nss time-val nil)) + ([nss time-val additional-tags] + (doseq [ns nss] + (report-time ns time-val additional-tags)))) + (defn start-statsd-reporter [statsd-config env app-name] (let [{:keys [enabled host port]} statsd-config] (when enabled @@ -48,10 +74,10 @@ (.withPort port) (.build)) - reporter (-> (DatadogReporter/forRegistry metrics-registry) - (.withTransport transport) - (.withTags [(str env)]) - (.build))] + reporter (-> (DatadogReporter/forRegistry metrics-registry) + (.withTransport transport) + (.withTags [(str env)]) + (.build))] (log/info "Starting statsd reporter") (.start reporter 1 TimeUnit/SECONDS) (reset! group app-name) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 2f8e9098..76df02e0 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -2,29 +2,29 @@ (:require [clojure.tools.logging :as log] [flatland.protobuf.core :as proto] [mount.core :as mount :refer [defstate]] - [ziggurat.metrics :as metrics] - [ziggurat.config :refer [ziggurat-config]] [sentry-clj.async :as sentry] [ziggurat.channel :as chl] - [ziggurat.util.map :as umap] + [ziggurat.config :refer [ziggurat-config]] [ziggurat.mapper :as mpr] + [ziggurat.metrics :as metrics] + [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.timestamp-transformer :as transformer] - [ziggurat.sentry :refer [sentry-reporter]]) - (:import [java.util.regex Pattern] - [java.util Properties] + [ziggurat.util.map :as umap]) + (:import [java.util Properties] + [java.util.regex Pattern] [org.apache.kafka.clients.consumer ConsumerConfig] [org.apache.kafka.common.serialization Serdes] + [org.apache.kafka.common.utils SystemTime] [org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology] [org.apache.kafka.streams.kstream ValueMapper TransformerSupplier] [org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier] - [org.apache.kafka.common.utils SystemTime] [ziggurat.timestamp_transformer IngestionTimeExtractor])) (def default-config-for-stream - {:buffered-records-per-partition 10000 - :commit-interval-ms 15000 - :auto-offset-reset-config "latest" - :oldest-processed-message-in-s 604800 + {:buffered-records-per-partition 10000 + :commit-interval-ms 15000 + :auto-offset-reset-config "latest" + :oldest-processed-message-in-s 604800 :changelog-topic-replication-factor 3}) (defn- properties [{:keys [application-id bootstrap-servers stream-threads-count auto-offset-reset-config buffered-records-per-partition commit-interval-ms changelog-topic-replication-factor]}] @@ -42,12 +42,15 @@ (.put StreamsConfig/REPLICATION_FACTOR_CONFIG (int changelog-topic-replication-factor)) (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config))) -(defn- get-metric-namespace [default topic] - (str (name topic) "." default)) - (defn- log-and-report-metrics [topic-entity message] - (let [message-read-metric-namespace (get-metric-namespace "message" (name topic-entity))] - (metrics/increment-count message-read-metric-namespace "read")) + (let [topic-entity-name (name topic-entity) + additional-tags {:topic-name topic-entity-name} + default-namespace "message" + metric-namespaces [topic-entity-name default-namespace] + default-namespaces [default-namespace] + metric "read" + multi-namespaces [metric-namespaces default-namespaces]] + (metrics/multi-ns-increment-count multi-namespaces metric additional-tags)) message) (defn store-supplier-builder [] @@ -63,15 +66,17 @@ (defn- map-values [mapper-fn stream-builder] (.mapValues stream-builder (value-mapper mapper-fn))) -(defn- transformer-supplier [metric-namespace oldest-processed-message-in-s] +(defn- transformer-supplier + [metric-namespaces oldest-processed-message-in-s additional-tags] (reify TransformerSupplier - (get [_] (transformer/create metric-namespace oldest-processed-message-in-s)))) + (get [_] (transformer/create metric-namespaces oldest-processed-message-in-s additional-tags)))) -(defn- transform-values [topic-entity oldest-processed-message-in-s stream-builder] - (let [metric-namespace (get-metric-namespace "message-received-delay-histogram" topic-entity)] - (.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s) (into-array [(.name (store-supplier-builder))])))) +(defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder] + (let [metric-namespaces [topic-entity-name "message-received-delay-histogram"] + additional-tags {:topic-name topic-entity-name}] + (.transform stream-builder (transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) -(defn- protobuf->hash [message proto-class] +(defn- protobuf->hash [message proto-class topic-entity-name] (try (let [proto-klass (-> proto-class java.lang.Class/forName @@ -83,9 +88,11 @@ keys)] (select-keys loaded-proto proto-keys)) (catch Throwable e - (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) - (metrics/increment-count "message-parsing" "failed") - nil))) + (let [additional-tags {:topic-name topic-entity-name} + metric-namespaces ["message-parsing"]] + (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) + (metrics/increment-count metric-namespaces "failed" additional-tags) + nil)))) (defn- topology [handler-fn {:keys [origin-topic proto-class oldest-processed-message-in-s]} topic-entity channels] (let [builder (StreamsBuilder.) @@ -94,7 +101,7 @@ (.addStateStore builder (store-supplier-builder)) (->> (.stream builder topic-pattern) (transform-values topic-entity-name oldest-processed-message-in-s) - (map-values #(protobuf->hash % proto-class)) + (map-values #(protobuf->hash % proto-class topic-entity-name)) (map-values #(log-and-report-metrics topic-entity-name %)) (map-values #((mpr/mapper-func handler-fn topic-entity channels) %))) (.build builder))) @@ -108,13 +115,13 @@ (start-streams stream-routes (ziggurat-config))) ([stream-routes stream-configs] (reduce (fn [streams stream] - (let [topic-entity (first stream) + (let [topic-entity (first stream) topic-handler-fn (-> stream second :handler-fn) - channels (chl/get-keys-for-topic stream-routes topic-entity) - stream-config (-> stream-configs - (get-in [:stream-router topic-entity]) - (umap/deep-merge default-config-for-stream)) - stream (start-stream* topic-handler-fn stream-config topic-entity channels)] + channels (chl/get-keys-for-topic stream-routes topic-entity) + stream-config (-> stream-configs + (get-in [:stream-router topic-entity]) + (umap/deep-merge default-config-for-stream)) + stream (start-stream* topic-handler-fn stream-config topic-entity channels)] (.start stream) (conj streams stream))) [] @@ -128,4 +135,4 @@ :start (do (log/info "Starting Kafka stream") (start-streams (:stream-routes (mount/args)) (ziggurat-config))) :stop (do (log/info "Stopping Kafka stream") - (stop-streams stream))) \ No newline at end of file + (stop-streams stream))) diff --git a/src/ziggurat/timestamp_transformer.clj b/src/ziggurat/timestamp_transformer.clj index a0875faf..8e513695 100644 --- a/src/ziggurat/timestamp_transformer.clj +++ b/src/ziggurat/timestamp_transformer.clj @@ -17,17 +17,20 @@ (get-current-time-in-millis) ingestion-time)))) -(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespace oldest-processed-message-in-s] Transformer +(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespaces oldest-processed-message-in-s additional-tags] Transformer (^void init [_ ^ProcessorContext context] (do (set! processor-context context) nil)) (transform [_ record-key record-value] (let [message-time (.timestamp processor-context)] (when (message-to-process? message-time oldest-processed-message-in-s) - (calculate-and-report-kafka-delay metric-namespace message-time) + (calculate-and-report-kafka-delay metric-namespaces message-time additional-tags) (KeyValue/pair record-key record-value)))) (punctuate [_ _] nil) (close [_] nil)) -(defn create [metric-namespace process-message-since-in-s] - (TimestampTransformer. nil metric-namespace process-message-since-in-s)) +(defn create + ([metric-namespace process-message-since-in-s] + (create metric-namespace process-message-since-in-s nil)) + ([metric-namespace process-message-since-in-s additional-tags] + (TimestampTransformer. nil metric-namespace process-message-since-in-s additional-tags))) diff --git a/test/ziggurat/kafka_delay_test.clj b/test/ziggurat/kafka_delay_test.clj index 0d8d6b27..3f519225 100644 --- a/test/ziggurat/kafka_delay_test.clj +++ b/test/ziggurat/kafka_delay_test.clj @@ -1,18 +1,27 @@ (ns ziggurat.kafka-delay-test (:require [clojure.test :refer :all] [ziggurat.kafka-delay :refer :all] - [ziggurat.util.time :refer [get-current-time-in-millis]] - [ziggurat.metrics :as metrics])) + [ziggurat.metrics :as metrics] + [ziggurat.util.time :refer [get-current-time-in-millis]])) (deftest calculate-and-report-kafka-delay-test - (testing "calculates and reports the timestamp delay" - (let [record-timestamp 1528720767777 - current-time 1528720768777 - expected-delay 1000 - namespace "test"] - (with-redefs [get-current-time-in-millis (constantly current-time) - metrics/report-time (fn [metric-namespace delay] - (is (= delay expected-delay)) - (is (= metric-namespace namespace)))] - (calculate-and-report-kafka-delay namespace record-timestamp))))) - + (let [record-timestamp 1528720767777 + current-time 1528720768777 + expected-delay 1000 + expected-namespaces ["test"]] + (testing "calculates and reports the timestamp delay" + (let [expected-additional-tags {:topic-name "expected-topic-entity-name"}] + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay additional-tags] + (is (= delay expected-delay)) + (is (= metric-namespaces expected-namespaces)) + (is (= additional-tags expected-additional-tags)))] + (calculate-and-report-kafka-delay expected-namespaces record-timestamp expected-additional-tags)))) + (testing "calculates and reports the timestamp delay when additional tags is empty or nil" + (let [expected-additional-tags nil] + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay additional-tags] + (is (= delay expected-delay)) + (is (= metric-namespaces expected-namespaces)) + (is (= additional-tags expected-additional-tags)))] + (calculate-and-report-kafka-delay expected-namespaces record-timestamp)))))) diff --git a/test/ziggurat/mapper_test.clj b/test/ziggurat/mapper_test.clj index 56f67fa8..49e57118 100644 --- a/test/ziggurat/mapper_test.clj +++ b/test/ziggurat/mapper_test.clj @@ -13,23 +13,29 @@ fix/silence-logging])) (deftest mapper-func-test - (let [message {:foo "bar"} - stream-routes {:default {:handler-fn #(constantly nil)}} - topic (name (first (keys stream-routes))) - expected-metric-namespace "default.message-processing" - expected-report-time-namespace "default.handler-fn-execution-time"] + (let [message {:foo "bar"} + stream-routes {:default {:handler-fn #(constantly nil)}} + expected-topic-entity-name (name (first (keys stream-routes))) + expected-additional-tags {:topic-name expected-topic-entity-name} + default-namespace "message-processing" + report-time-namespace "handler-fn-execution-time" + expected-metric-namespaces ["default" default-namespace] + expected-report-time-namespaces ["default" report-time-namespace]] (testing "message process should be successful" (let [successfully-processed? (atom false) successfully-reported-time? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! successfully-processed? true))) - metrics/report-time (fn [metric-namespace _] - (when (= metric-namespace expected-report-time-namespace) + metrics/report-time (fn [metric-namespaces _ _] + (when (or (= metric-namespaces expected-report-time-namespaces) + (= metric-namespaces [report-time-namespace])) (reset! successfully-reported-time? true)))] - ((mapper-func (constantly :success) topic []) message) + ((mapper-func (constantly :success) expected-topic-entity-name []) message) (is @successfully-processed?) (is @successfully-reported-time?)))) @@ -37,12 +43,13 @@ (fix/with-queues (assoc-in stream-routes [:default :channel-1] (constantly :success)) (let [successfully-processed? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (= metric-namespaces expected-metric-namespaces) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] - ((mapper-func (constantly :channel-1) topic [:channel-1]) message) - (let [message-from-mq (rmq/get-message-from-channel-instant-queue topic :channel-1)] + ((mapper-func (constantly :channel-1) expected-topic-entity-name [:channel-1]) message) + (let [message-from-mq (rmq/get-message-from-channel-instant-queue expected-topic-entity-name :channel-1)] (is (= message message-from-mq)) (is @successfully-processed?)))))) @@ -53,8 +60,8 @@ (let [err (Throwable->map e)] (is (= (:cause err) "Invalid mapper return code")) (is (= (-> err :data :code) :channel-1))))] - ((mapper-func (constantly :channel-1) topic [:some-other-channel]) message) - (let [message-from-mq (rmq/get-message-from-channel-instant-queue topic :channel-1)] + ((mapper-func (constantly :channel-1) expected-topic-entity-name [:some-other-channel]) message) + (let [message-from-mq (rmq/get-message-from-channel-instant-queue expected-topic-entity-name :channel-1)] (is (nil? message-from-mq)))))) (testing "message process should be unsuccessful and retry" @@ -63,12 +70,13 @@ unsuccessfully-processed? (atom false) expected-metric "retry"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (= metric-namespaces expected-metric-namespaces) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] - ((mapper-func (constantly :retry) topic []) message) - (let [message-from-mq (rmq/get-msg-from-delay-queue topic)] + ((mapper-func (constantly :retry) expected-topic-entity-name []) message) + (let [message-from-mq (rmq/get-msg-from-delay-queue expected-topic-entity-name)] (is (= message-from-mq expected-message))) (is @unsuccessfully-processed?))))) @@ -79,39 +87,52 @@ unsuccessfully-processed? (atom false) expected-metric "failure"] (with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true)) - metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + metrics/increment-count (fn [metric-namespaces metric additional-tags] + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace]))) + (is (= additional-tags expected-additional-tags)) + (when (and (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] - ((mapper-func (fn [_] (throw (Exception. "test exception"))) topic []) message) - (let [message-from-mq (rmq/get-msg-from-delay-queue topic)] + ((mapper-func (fn [_] (throw (Exception. "test exception"))) expected-topic-entity-name []) message) + (let [message-from-mq (rmq/get-msg-from-delay-queue expected-topic-entity-name)] (is (= message-from-mq expected-message))) (is @unsuccessfully-processed?) (is @sentry-report-fn-called?))))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - expected-metric-namespace "default.handler-fn-execution-time"] - (with-redefs [metrics/report-time (fn [metric-namespace _] - (when (= metric-namespace expected-metric-namespace) + (let [reported-execution-time? (atom false) + execution-time-namesapce "handler-fn-execution-time" + expected-metric-namespaces ["default" execution-time-namesapce]] + (with-redefs [metrics/report-time (fn [metric-namespaces _ _] + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [execution-time-namesapce]))) + (when (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [execution-time-namesapce])) (reset! reported-execution-time? true)))] - ((mapper-func (constantly :success) topic []) message) + ((mapper-func (constantly :success) expected-topic-entity-name []) message) (is @reported-execution-time?)))))) (deftest channel-mapper-func-test - (let [message {:foo "bar"} - stream-routes {:default {:handler-fn #(constantly nil) - :channel-1 #(constantly nil)}} - topic (first (keys stream-routes)) - channel :channel-1 - expected-metric-namespace "default.channel-1.message-processing"] + (let [message {:foo "bar"} + stream-routes {:default {:handler-fn #(constantly nil) + :channel-1 #(constantly nil)}} + topic (first (keys stream-routes)) + expected-topic-entity-name (name topic) + expected-additional-tags {:topic-name expected-topic-entity-name} + channel :channel-1 + default-namespace "message-processing" + expected-metric-namespaces ["default" "channel-1" default-namespace]] (testing "message process should be successful" (let [successfully-processed? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespaces) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] ((channel-mapper-func (constantly :success) topic channel) message) (is @successfully-processed?)))) @@ -122,9 +143,10 @@ unsuccessfully-processed? (atom false) expected-metric "retry"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] + (when (and (= metric-namespace expected-metric-namespaces) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] ((channel-mapper-func (constantly :retry) topic channel) message) (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic channel)] @@ -138,9 +160,11 @@ unsuccessfully-processed? (atom false) expected-metric "failure"] (with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true)) - metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] ((channel-mapper-func (fn [_] (throw (Exception. "test exception"))) topic channel) message) (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic channel)] @@ -149,10 +173,14 @@ (is @sentry-report-fn-called?))))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - expected-metric-namespace "default.channel-1.execution-time"] - (with-redefs [metrics/report-time (fn [metric-namespace _] - (when (= metric-namespace expected-metric-namespace) + (let [reported-execution-time? (atom false) + execution-time-namesapce "execution-time" + expected-metric-namespaces ["default" "channel-1" execution-time-namesapce]] + (with-redefs [metrics/report-time (fn [metric-namespaces _ _] + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [execution-time-namesapce]))) + (when (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [execution-time-namesapce])) (reset! reported-execution-time? true)))] ((channel-mapper-func (constantly :success) topic channel) message) diff --git a/test/ziggurat/metrics_test.clj b/test/ziggurat/metrics_test.clj index 7c1f7db1..00d18ea6 100644 --- a/test/ziggurat/metrics_test.clj +++ b/test/ziggurat/metrics_test.clj @@ -18,49 +18,81 @@ (is (instance? Histogram meter))))) (deftest increment-count-test - (testing "increases count on the meter" - (let [metric-ns "metric-ns" - metric "metric3" - mk-meter-args (atom nil) - meter (Meter.)] - (with-redefs [metrics/mk-meter (fn [metric-namespace metric] - (reset! mk-meter-args {:metric-namespace metric-namespace - :metric metric}) - meter)] - (metrics/increment-count metric-ns metric) - (is (= 1 (.getCount meter))) - (is (= metric-ns (:metric-namespace @mk-meter-args))) - (is (= metric (:metric @mk-meter-args))))))) + (let [metric-ns ["metric-ns"] + metric "metric3"] + (testing "increases count on the meter" + (let [mk-meter-args (atom nil) + meter (Meter.) + expected-topic-entity-name "expected-topic-entity-name"] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric topic-entity-name] + (is (= topic-entity-name expected-topic-entity-name)) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) + meter)] + (metrics/increment-count metric-ns metric expected-topic-entity-name) + (is (= 1 (.getCount meter))) + (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) + (testing "increases count on the meter when topic-entity-name is nil" + (let [mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags nil] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) + meter)] + (metrics/increment-count metric-ns metric expected-additional-tags) + (is (= 1 (.getCount meter))) + (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))))) (deftest decrement-count-test - (testing "decreases count on the meter" - (let [metric-ns "metric-ns" - metric "metric3" - mk-meter-args (atom nil) - meter (Meter.)] - (with-redefs [metrics/mk-meter (fn [metric-namespace metric] - (reset! mk-meter-args {:metric-namespace metric-namespace - :metric metric}) - meter)] - (metrics/increment-count metric-ns metric) - (is (= 1 (.getCount meter))) - (metrics/decrement-count metric-ns metric) - (is (= 0 (.getCount meter))) - (is (= metric-ns (:metric-namespace @mk-meter-args))) - (is (= metric (:metric @mk-meter-args))))))) + (let [metric-ns ["metric-ns"] + metric "metric3" + mk-meter-args (atom nil) + meter (Meter.)] + (testing "decreases count on the meter" + (let [expected-additional-tags {:topic-name "expected-topic-name"}] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) + meter)] + (metrics/increment-count metric-ns metric expected-additional-tags) + (is (= 1 (.getCount meter))) + (metrics/decrement-count metric-ns metric expected-additional-tags) + (is (zero? (.getCount meter))) + (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) + (testing "decreases count on the meter when additional-tags is nil" + (let [expected-additional-tags nil] + (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespace metric-namespace + :metric metric}) + meter)] + (metrics/increment-count metric-ns metric expected-additional-tags) + (is (= 1 (.getCount meter))) + (metrics/decrement-count metric-ns metric expected-additional-tags) + (is (zero? (.getCount meter))) + (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))))) (deftest report-time-test (testing "updates time-val" - (let [metric-ns "metric-ns" - time-val 10 - mk-histogram-args (atom nil) - reservoir (UniformReservoir.) - histogram (Histogram. reservoir)] - (with-redefs [metrics/mk-histogram (fn [metric-ns metric] + (let [metric-ns ["message-received-delay-histogram"] + time-val 10 + mk-histogram-args (atom nil) + reservoir (UniformReservoir.) + histogram (Histogram. reservoir) + expected-topic-entity-name "expected-topic-entity-name"] + (with-redefs [metrics/mk-histogram (fn [metric-ns metric topic-entity-name] + (is (= topic-entity-name expected-topic-entity-name)) (reset! mk-histogram-args {:metric-namespace metric-ns :metric metric}) histogram)] - (metrics/report-time metric-ns time-val) + (metrics/report-time metric-ns time-val expected-topic-entity-name) (is (= 1 (.getCount histogram))) - (is (= metric-ns (:metric-namespace @mk-histogram-args))) + (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-histogram-args))) (is (= "all" (:metric @mk-histogram-args))))))) diff --git a/test/ziggurat/timestamp_transformer_test.clj b/test/ziggurat/timestamp_transformer_test.clj index da1d4074..a8a6903a 100644 --- a/test/ziggurat/timestamp_transformer_test.clj +++ b/test/ziggurat/timestamp_transformer_test.clj @@ -1,43 +1,60 @@ (ns ziggurat.timestamp-transformer-test (:require [clojure.test :refer :all] - [ziggurat.timestamp-transformer :refer :all] [ziggurat.metrics :as metrics] + [ziggurat.timestamp-transformer :refer :all] [ziggurat.util.time :refer :all]) - (:import [org.apache.kafka.streams.processor ProcessorContext] - [org.apache.kafka.clients.consumer ConsumerRecord] + (:import [org.apache.kafka.clients.consumer ConsumerRecord] + [org.apache.kafka.streams.processor ProcessorContext] [ziggurat.timestamp_transformer IngestionTimeExtractor])) (deftest ingestion-time-extractor-test (let [ingestion-time-extractor (IngestionTimeExtractor.) - topic "some-topic" - partition (int 1) - offset 1 - previous-timestamp 1528720768771 - key "some-key" - value "some-value" - record (ConsumerRecord. topic partition offset key value)] + topic "some-topic" + partition (int 1) + offset 1 + previous-timestamp 1528720768771 + key "some-key" + value "some-value" + record (ConsumerRecord. topic partition offset key value)] (testing "extract timestamp of topic when it has valid timestamp" (with-redefs [get-timestamp-from-record (constantly 1528720768777)] (is (= (.extract ingestion-time-extractor record previous-timestamp) 1528720768777)))) (testing "extract timestamp of topic when it has invalid timestamp" - (with-redefs [get-timestamp-from-record (constantly -1) + (with-redefs [get-timestamp-from-record (constantly -1) get-current-time-in-millis (constantly 1528720768777)] (is (= (.extract ingestion-time-extractor record previous-timestamp) (get-current-time-in-millis))))))) (deftest timestamp-transformer-test - (testing "creates a timestamp-transformer object that calculates and reports timestamp delay" - (let [metric-namespace "test.message-received-delay-histogram" - record-timestamp 1528720767777 - context (reify ProcessorContext - (timestamp [_] record-timestamp)) - current-time 1528720768777 - timestamp-transformer (create metric-namespace current-time) - expected-delay 1000] - (.init timestamp-transformer context) - (with-redefs [get-current-time-in-millis (constantly current-time) - metrics/report-time (fn [namespace delay] - (is (= delay expected-delay)) - (is (= metric-namespace namespace)))] - (.transform timestamp-transformer nil nil))))) + (let [default-namespace "message-received-delay-histogram" + expected-metric-namespaces ["test" default-namespace] + record-timestamp 1528720767777 + current-time 1528720768777 + expected-delay 1000] + (testing "creates a timestamp-transformer object that calculates and reports timestamp delay" + (let [context (reify ProcessorContext + (timestamp [_] record-timestamp)) + expected-topic-entity-name "expected-topic-entity-name" + timestamp-transformer (create expected-metric-namespaces current-time expected-topic-entity-name)] + (.init timestamp-transformer context) + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay topic-entity-name] + (is (= delay expected-delay)) + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace]))) + (is (= expected-topic-entity-name topic-entity-name)))] + (.transform timestamp-transformer nil nil)))) + (testing "creates a timestamp-transformer object that calculates and reports timestamp delay when topic-entity-name is nil" + (let [context (reify ProcessorContext + (timestamp [_] record-timestamp)) + expected-topic-entity-name nil + timestamp-transformer (create expected-metric-namespaces current-time)] + (.init timestamp-transformer context) + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay topic-entity-name] + (is (= delay expected-delay)) + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace]))) + (is (= topic-entity-name expected-topic-entity-name)))] + (.transform timestamp-transformer nil nil)))))) From 3890aa3e15d0ff9b8c9b2d3cc236f50e95531a2f Mon Sep 17 00:00:00 2001 From: Michael Angelo Calimlim Date: Wed, 12 Jun 2019 21:15:30 +0800 Subject: [PATCH 09/12] remove the topic-name tag in the old namespace, but include it on the new namespace and remove the service-name in the new namespace --- src/ziggurat/mapper.clj | 21 ++-- src/ziggurat/metrics.clj | 36 ++++--- src/ziggurat/streams.clj | 21 ++-- test/ziggurat/kafka_delay_test.clj | 2 +- test/ziggurat/mapper_test.clj | 50 ++++----- test/ziggurat/metrics_test.clj | 157 +++++++++++++++++++---------- 6 files changed, 175 insertions(+), 112 deletions(-) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 5de1703f..44290e28 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -1,5 +1,6 @@ (ns ziggurat.mapper (:require [sentry-clj.async :as sentry] + [ziggurat.config :refer [ziggurat-config]] [ziggurat.messaging.producer :as producer] [ziggurat.metrics :as metrics] [ziggurat.new-relic :as nr] @@ -13,11 +14,12 @@ (defn mapper-func [mapper-fn topic-entity channels] (fn [message] - (let [topic-entity-name (name topic-entity) + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) new-relic-transaction-name (str topic-entity-name ".handler-fn") default-namespace "message-processing" - metric-namespaces [topic-entity-name default-namespace] - additional-tags {:topic-name topic-entity-name} + metric-namespaces [service-name topic-entity-name default-namespace] + additional-tags {:topic_name topic-entity-name} default-namespaces [default-namespace] success-metric "success" retry-metric "retry" @@ -31,9 +33,9 @@ end-time (.toEpochMilli (Instant/now)) time-val (- end-time start-time) execution-time-namespace "handler-fn-execution-time" - multi-execution-time-namespaces [[topic-entity-name execution-time-namespace] + multi-execution-time-namespaces [[service-name topic-entity-name execution-time-namespace] [execution-time-namespace]]] - (metrics/multi-ns-report-time multi-execution-time-namespaces time-val) + (metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags) (case return-code :success (do (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)) :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) @@ -50,12 +52,13 @@ (defn channel-mapper-func [mapper-fn topic-entity channel] (fn [message] - (let [topic-entity-name (name topic-entity) + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) channel-name (name channel) default-namespace "message-processing" - base-namespaces [topic-entity-name channel-name] + base-namespaces [service-name topic-entity-name channel-name] metric-namespaces (conj base-namespaces default-namespace) - additional-tags {:topic-name topic-entity-name} + additional-tags {:topic_name topic-entity-name} default-namespaces [default-namespace] metric-namespace (apply str (interpose "." metric-namespaces)) success-metric "success" @@ -72,7 +75,7 @@ execution-time-namespace "execution-time" multi-execution-time-namespaces [(conj base-namespaces execution-time-namespace) [execution-time-namespace]]] - (metrics/multi-ns-report-time multi-execution-time-namespaces time-val) + (metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags) (case return-code :success (do (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)) :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) diff --git a/src/ziggurat/metrics.clj b/src/ziggurat/metrics.clj index df69bfc7..004b989e 100644 --- a/src/ziggurat/metrics.clj +++ b/src/ziggurat/metrics.clj @@ -1,10 +1,10 @@ (ns ziggurat.metrics (:require [clojure.tools.logging :as log] [clojure.walk :refer [stringify-keys]]) - (:import (com.gojek.metrics.datadog DatadogReporter) - (com.gojek.metrics.datadog.transport UdpTransport$Builder UdpTransport) - (io.dropwizard.metrics5 MetricRegistry Meter MetricName Histogram) - (java.util.concurrent TimeUnit))) + (:import com.gojek.metrics.datadog.DatadogReporter + [com.gojek.metrics.datadog.transport UdpTransport UdpTransport$Builder] + [io.dropwizard.metrics5 Histogram Meter MetricName MetricRegistry] + java.util.concurrent.TimeUnit)) (defonce ^:private group (atom nil)) @@ -21,7 +21,8 @@ ([category metric] (mk-meter category metric nil)) ([category metric additional-tags] - (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) + (let [namespace (str category "." metric) + metric-name (MetricRegistry/name ^String namespace nil) tags (merge-tags additional-tags) tagged-metric (.tagged ^MetricName metric-name tags)] (.meter ^MetricRegistry metrics-registry ^MetricName tagged-metric)))) @@ -30,19 +31,25 @@ ([category metric] (mk-histogram category metric nil)) ([category metric additional-tags] - (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) + (let [namespace (str category "." metric) + metric-name (MetricRegistry/name ^String namespace nil) tags (merge-tags additional-tags) tagged-metric (.tagged ^MetricName metric-name tags)] - (.histogram ^MetricRegistry metrics-registry tagged-metric)))) + (.histogram ^MetricRegistry metrics-registry ^MetricName tagged-metric)))) -(defn- intercalate-dot +(defn intercalate-dot [names] (apply str (interpose "." names))) +(defn remove-topic-tag-for-old-namespace + [additional-tags ns] + (let [topic-name (:topic_name additional-tags)] + (dissoc additional-tags (when (some #(= % topic-name) ns) :topic_name)))) + (defn- inc-or-dec-count [sign metric-namespaces metric additional-tags] (let [metric-namespace (intercalate-dot metric-namespaces) - meter ^Meter (mk-meter metric-namespace metric additional-tags)] + meter ^Meter (mk-meter metric-namespace metric (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] (.mark meter (sign 1)))) (def increment-count (partial inc-or-dec-count +)) @@ -56,15 +63,12 @@ (defn report-time [metric-namespaces time-val additional-tags] (let [metric-namespace (intercalate-dot metric-namespaces) - histogram ^Histogram (mk-histogram metric-namespace "all" additional-tags)] + histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] (.update histogram (int time-val)))) -(defn multi-ns-report-time - ([nss time-val] - (multi-ns-report-time nss time-val nil)) - ([nss time-val additional-tags] - (doseq [ns nss] - (report-time ns time-val additional-tags)))) +(defn multi-ns-report-time [nss time-val additional-tags] + (doseq [ns nss] + (report-time ns time-val additional-tags))) (defn start-statsd-reporter [statsd-config env app-name] (let [{:keys [enabled host port]} statsd-config] diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 76df02e0..16f90446 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -43,10 +43,11 @@ (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config))) (defn- log-and-report-metrics [topic-entity message] - (let [topic-entity-name (name topic-entity) - additional-tags {:topic-name topic-entity-name} + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) + additional-tags {:topic_name topic-entity-name} default-namespace "message" - metric-namespaces [topic-entity-name default-namespace] + metric-namespaces [service-name topic-entity-name default-namespace] default-namespaces [default-namespace] metric "read" multi-namespaces [metric-namespaces default-namespaces]] @@ -72,8 +73,9 @@ (get [_] (transformer/create metric-namespaces oldest-processed-message-in-s additional-tags)))) (defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder] - (let [metric-namespaces [topic-entity-name "message-received-delay-histogram"] - additional-tags {:topic-name topic-entity-name}] + (let [service-name (:app-name (ziggurat-config)) + metric-namespaces [service-name topic-entity-name "message-received-delay-histogram"] + additional-tags {:topic_name topic-entity-name}] (.transform stream-builder (transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) (defn- protobuf->hash [message proto-class topic-entity-name] @@ -88,10 +90,13 @@ keys)] (select-keys loaded-proto proto-keys)) (catch Throwable e - (let [additional-tags {:topic-name topic-entity-name} - metric-namespaces ["message-parsing"]] + (let [service-name (:app-name (ziggurat-config)) + additional-tags {:topic_name topic-entity-name} + default-namespace "message-parsing" + metric-namespaces [service-name "message-parsing"] + multi-namespaces [metric-namespaces [default-namespace]]] (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) - (metrics/increment-count metric-namespaces "failed" additional-tags) + (metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags) nil)))) (defn- topology [handler-fn {:keys [origin-topic proto-class oldest-processed-message-in-s]} topic-entity channels] diff --git a/test/ziggurat/kafka_delay_test.clj b/test/ziggurat/kafka_delay_test.clj index 3f519225..18d65659 100644 --- a/test/ziggurat/kafka_delay_test.clj +++ b/test/ziggurat/kafka_delay_test.clj @@ -10,7 +10,7 @@ expected-delay 1000 expected-namespaces ["test"]] (testing "calculates and reports the timestamp delay" - (let [expected-additional-tags {:topic-name "expected-topic-entity-name"}] + (let [expected-additional-tags {:topic_name "expected-topic-entity-name"}] (with-redefs [get-current-time-in-millis (constantly current-time) metrics/report-time (fn [metric-namespaces delay additional-tags] (is (= delay expected-delay)) diff --git a/test/ziggurat/mapper_test.clj b/test/ziggurat/mapper_test.clj index 49e57118..f5347dee 100644 --- a/test/ziggurat/mapper_test.clj +++ b/test/ziggurat/mapper_test.clj @@ -13,14 +13,15 @@ fix/silence-logging])) (deftest mapper-func-test - (let [message {:foo "bar"} + (let [service-name (:app-name (ziggurat-config)) + message {:foo "bar"} stream-routes {:default {:handler-fn #(constantly nil)}} expected-topic-entity-name (name (first (keys stream-routes))) - expected-additional-tags {:topic-name expected-topic-entity-name} + expected-additional-tags {:topic_name expected-topic-entity-name} default-namespace "message-processing" report-time-namespace "handler-fn-execution-time" - expected-metric-namespaces ["default" default-namespace] - expected-report-time-namespaces ["default" report-time-namespace]] + expected-metric-namespaces [expected-topic-entity-name default-namespace] + expected-report-time-namespaces [expected-topic-entity-name report-time-namespace]] (testing "message process should be successful" (let [successfully-processed? (atom false) successfully-reported-time? (atom false) @@ -44,7 +45,8 @@ (let [successfully-processed? (atom false) expected-metric "success"] (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (= metric-namespaces expected-metric-namespaces) + (when (and (or (= metric-namespaces [service-name expected-topic-entity-name default-namespace]) + (= metric-namespaces [default-namespace])) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] @@ -71,7 +73,8 @@ expected-metric "retry"] (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] - (when (and (= metric-namespaces expected-metric-namespaces) + (when (and (or (= metric-namespaces [service-name expected-topic-entity-name default-namespace]) + (= metric-namespaces [default-namespace])) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] @@ -88,10 +91,7 @@ expected-metric "failure"] (with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true)) metrics/increment-count (fn [metric-namespaces metric additional-tags] - (is (or (= metric-namespaces expected-metric-namespaces) - (= metric-namespaces [default-namespace]))) - (is (= additional-tags expected-additional-tags)) - (when (and (or (= metric-namespaces expected-metric-namespaces) + (when (and (or (= metric-namespaces [service-name expected-topic-entity-name default-namespace]) (= metric-namespaces [default-namespace])) (= metric expected-metric) (= additional-tags expected-additional-tags)) @@ -105,7 +105,7 @@ (testing "reports execution time with topic prefix" (let [reported-execution-time? (atom false) execution-time-namesapce "handler-fn-execution-time" - expected-metric-namespaces ["default" execution-time-namesapce]] + expected-metric-namespaces [service-name "default" execution-time-namesapce]] (with-redefs [metrics/report-time (fn [metric-namespaces _ _] (is (or (= metric-namespaces expected-metric-namespaces) (= metric-namespaces [execution-time-namesapce]))) @@ -117,20 +117,23 @@ (is @reported-execution-time?)))))) (deftest channel-mapper-func-test - (let [message {:foo "bar"} + (let [service-name (:app-name (ziggurat-config)) + message {:foo "bar"} stream-routes {:default {:handler-fn #(constantly nil) :channel-1 #(constantly nil)}} topic (first (keys stream-routes)) expected-topic-entity-name (name topic) - expected-additional-tags {:topic-name expected-topic-entity-name} + expected-additional-tags {:topic_name expected-topic-entity-name} channel :channel-1 + channel-name (name channel) default-namespace "message-processing" - expected-metric-namespaces ["default" "channel-1" default-namespace]] + expected-metric-namespaces [expected-topic-entity-name channel default-namespace]] (testing "message process should be successful" (let [successfully-processed? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] - (when (and (= metric-namespace expected-metric-namespaces) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces [service-name expected-topic-entity-name channel-name default-namespace]) + (= metric-namespaces [default-namespace])) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] @@ -143,8 +146,9 @@ unsuccessfully-processed? (atom false) expected-metric "retry"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric additional-tags] - (when (and (= metric-namespace expected-metric-namespaces) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces [service-name expected-topic-entity-name channel-name default-namespace]) + (= metric-namespaces [default-namespace])) (= metric expected-metric) (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] @@ -173,15 +177,11 @@ (is @sentry-report-fn-called?))))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - execution-time-namesapce "execution-time" - expected-metric-namespaces ["default" "channel-1" execution-time-namesapce]] + (let [reported-execution-time? (atom false) + execution-time-namesapce "execution-time"] (with-redefs [metrics/report-time (fn [metric-namespaces _ _] - (is (or (= metric-namespaces expected-metric-namespaces) - (= metric-namespaces [execution-time-namesapce]))) - (when (or (= metric-namespaces expected-metric-namespaces) + (when (or (= metric-namespaces [service-name expected-topic-entity-name channel-name execution-time-namesapce]) (= metric-namespaces [execution-time-namesapce])) (reset! reported-execution-time? true)))] - ((channel-mapper-func (constantly :success) topic channel) message) (is @reported-execution-time?)))))) diff --git a/test/ziggurat/metrics_test.clj b/test/ziggurat/metrics_test.clj index 00d18ea6..45f69f79 100644 --- a/test/ziggurat/metrics_test.clj +++ b/test/ziggurat/metrics_test.clj @@ -18,81 +18,132 @@ (is (instance? Histogram meter))))) (deftest increment-count-test - (let [metric-ns ["metric-ns"] - metric "metric3"] + (let [metric "metric3" + expected-topic-entity-name "expected-topic-entity-name" + input-additional-tags {:topic_name expected-topic-entity-name}] (testing "increases count on the meter" - (let [mk-meter-args (atom nil) + (let [expected-metric-namespaces [expected-topic-entity-name "metric-ns"] + mk-meter-args (atom nil) meter (Meter.) - expected-topic-entity-name "expected-topic-entity-name"] - (with-redefs [metrics/mk-meter (fn [metric-namespace metric topic-entity-name] - (is (= topic-entity-name expected-topic-entity-name)) - (reset! mk-meter-args {:metric-namespace metric-namespace - :metric metric}) + expected-additional-tags {}] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) + meter)] + (metrics/increment-count expected-metric-namespaces metric input-additional-tags) + (is (= 1 (.getCount meter))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) + (testing "increases count on the meter - without topic name on the namespace" + (let [expected-metric-namespaces ["metric-ns"] + mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags input-additional-tags] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) meter)] - (metrics/increment-count metric-ns metric expected-topic-entity-name) + (metrics/increment-count expected-metric-namespaces metric input-additional-tags) (is (= 1 (.getCount meter))) - (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) (is (= metric (:metric @mk-meter-args)))))) - (testing "increases count on the meter when topic-entity-name is nil" - (let [mk-meter-args (atom nil) - meter (Meter.) - expected-additional-tags nil] - (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] + (testing "increases count on the meter when additional-tags is nil" + (let [expected-metric-namespaces [expected-topic-entity-name "metric-ns"] + mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags nil] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] (is (= additional-tags expected-additional-tags)) - (reset! mk-meter-args {:metric-namespace metric-namespace - :metric metric}) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) meter)] - (metrics/increment-count metric-ns metric expected-additional-tags) + (metrics/increment-count expected-metric-namespaces metric expected-additional-tags) (is (= 1 (.getCount meter))) - (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) (is (= metric (:metric @mk-meter-args)))))))) (deftest decrement-count-test - (let [metric-ns ["metric-ns"] - metric "metric3" - mk-meter-args (atom nil) - meter (Meter.)] + (let [expected-topic-name "expected-topic-name" + metric "metric3" + mk-meter-args (atom nil) + meter (Meter.) + input-additional-tags {:topic_name expected-topic-name}] (testing "decreases count on the meter" - (let [expected-additional-tags {:topic-name "expected-topic-name"}] - (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] + (let [expected-additional-tags {} + expected-metric-namespaces [expected-topic-name "metric-ns"]] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) + meter)] + (metrics/increment-count expected-metric-namespaces metric input-additional-tags) + (is (= 1 (.getCount meter))) + (metrics/decrement-count expected-metric-namespaces metric input-additional-tags) + (is (zero? (.getCount meter))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) + (testing "decreases count on the meter - without topic name on the namespace" + (let [expected-additional-tags input-additional-tags + expected-metric-namespaces ["metric-ns"]] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] (is (= additional-tags expected-additional-tags)) - (reset! mk-meter-args {:metric-namespace metric-namespace - :metric metric}) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) meter)] - (metrics/increment-count metric-ns metric expected-additional-tags) + (metrics/increment-count expected-metric-namespaces metric input-additional-tags) (is (= 1 (.getCount meter))) - (metrics/decrement-count metric-ns metric expected-additional-tags) + (metrics/decrement-count expected-metric-namespaces metric input-additional-tags) (is (zero? (.getCount meter))) - (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) (is (= metric (:metric @mk-meter-args)))))) (testing "decreases count on the meter when additional-tags is nil" - (let [expected-additional-tags nil] - (with-redefs [metrics/mk-meter (fn [metric-namespace metric additional-tags] + (let [expected-additional-tags nil + expected-metric-namespaces [expected-topic-name "metric-ns"]] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] (is (= additional-tags expected-additional-tags)) - (reset! mk-meter-args {:metric-namespace metric-namespace - :metric metric}) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) meter)] - (metrics/increment-count metric-ns metric expected-additional-tags) + (metrics/increment-count expected-metric-namespaces metric expected-additional-tags) (is (= 1 (.getCount meter))) - (metrics/decrement-count metric-ns metric expected-additional-tags) + (metrics/decrement-count expected-metric-namespaces metric expected-additional-tags) (is (zero? (.getCount meter))) - (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-meter-args))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) (is (= metric (:metric @mk-meter-args)))))))) (deftest report-time-test - (testing "updates time-val" - (let [metric-ns ["message-received-delay-histogram"] - time-val 10 - mk-histogram-args (atom nil) - reservoir (UniformReservoir.) - histogram (Histogram. reservoir) - expected-topic-entity-name "expected-topic-entity-name"] - (with-redefs [metrics/mk-histogram (fn [metric-ns metric topic-entity-name] - (is (= topic-entity-name expected-topic-entity-name)) - (reset! mk-histogram-args {:metric-namespace metric-ns - :metric metric}) - histogram)] - (metrics/report-time metric-ns time-val expected-topic-entity-name) - (is (= 1 (.getCount histogram))) - (is (= (apply str (interpose "." metric-ns)) (:metric-namespace @mk-histogram-args))) - (is (= "all" (:metric @mk-histogram-args))))))) + (let [expected-topic-entity-name "expected-topic-entity-name" + input-additional-tags {:topic_name expected-topic-entity-name} + time-val 10] + (testing "updates time-val" + (let [expected-metric-namespaces [expected-topic-entity-name "message-received-delay-histogram"] + mk-histogram-args (atom nil) + reservoir (UniformReservoir.) + histogram (Histogram. reservoir) + expected-additional-tags {}] + (with-redefs [metrics/mk-histogram (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-histogram-args {:metric-namespaces metric-namespaces + :metric metric}) + histogram)] + (metrics/report-time expected-metric-namespaces time-val input-additional-tags) + (is (= 1 (.getCount histogram))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args))) + (is (= "all" (:metric @mk-histogram-args)))))) + (testing "updates time-val - without topic name on the namespace" + (let [expected-metric-namespaces ["message-received-delay-histogram"] + mk-histogram-args (atom nil) + reservoir (UniformReservoir.) + histogram (Histogram. reservoir) + expected-additional-tags input-additional-tags] + (with-redefs [metrics/mk-histogram (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-histogram-args {:metric-namespaces metric-namespaces + :metric metric}) + histogram)] + (metrics/report-time expected-metric-namespaces time-val input-additional-tags) + (is (= 1 (.getCount histogram))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args))) + (is (= "all" (:metric @mk-histogram-args)))))))) From 5059a238df182471662be4709917597ebf413075 Mon Sep 17 00:00:00 2001 From: mjayprateek Date: Mon, 17 Jun 2019 11:57:45 +0530 Subject: [PATCH 10/12] Multiple Kafka producers support in ziggurat (#55) * Enabling producing support in Ziggurat * Extended producing support to multiple producers * User can define producer config inside a stream-router config * User can publish data to any producer by calling send method with the stream-router config key against which producer config is defined. * Few unit tests --- README.md | 38 +++++++++- project.clj | 3 +- resources/config.test.ci.edn | 31 +++++--- resources/config.test.edn | 31 +++++--- src/ziggurat/init.clj | 27 +++++-- src/ziggurat/producer.clj | 124 ++++++++++++++++++++++++++++++++ test/ziggurat/fixtures.clj | 41 ++++++++++- test/ziggurat/init_test.clj | 37 +++++++++- test/ziggurat/producer_test.clj | 68 ++++++++++++++++++ 9 files changed, 373 insertions(+), 27 deletions(-) create mode 100644 src/ziggurat/producer.clj create mode 100644 test/ziggurat/producer_test.clj diff --git a/README.md b/README.md index 35249902..69dc95e7 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,24 @@ There are four modes supported by ziggurat You can pass in multiple modes and it will start accordingly If nothing passed to modes then it will start all the modes. +## Publishing data to Kafka Topics in Ziggurat +To enable publishing data to kafka, Ziggurat provides producing support through ziggurat.producer namespace. This namespace defines methods for publishing data to Kafka topics. The methods defined here are essentially wrapper around variants of `send` methods defined in `org.apache.kafka.clients.producer.KafkaProducer`. + +At the time of initialization, an instance of `org.apache.kafka.clients.producer.KafkaProducer` is constructed using config values provided in `resources/config.edn`. A producer can be configured for each of the stream-routes in config.edn. Please see the example below. + +At present, only a few configurations are supported for constructing KafkaProducer. These have been explained [here](#configuration). Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs) +for a complete list of all producer configs available in Kafka. + +Ziggurat.producer namespace defines a multi-arity `send` function which is a thin wrapper around [KafkaProducer#send](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-). This method publishes data to a Kafka topic through a Kafka producer +defined in the stream router configuration. See configuration section below. + +E.g. +For publishing data using a producer which is defined for the stream router config with key `:default`, use send like this: + +`(send :default "test-topic" "key" "value")` + +`(send :default "test-topic" 1 "key" "value")` + ## Configuration All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggurat` key. @@ -151,7 +169,14 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur :origin-topic "kafka-topic-*" :oldest-processed-message-in-s [604800 :int] :proto-class "proto-class" - :changelog-topic-replication-factor [3 :int]}} + :changelog-topic-replication-factor [3 :int] + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}} :datadog {:host "localhost" :port [8125 :int] :enabled [false :bool]} @@ -179,7 +204,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur :jobs {:instant {:worker-count [4 :int] :prefetch-count [4 :int]}} :http-server {:port [8010 :int] - :thread-count [100 :int]}}} + :thread-count [100 :int]}}} ``` * app-name - Refers to the name of the application. Used to namespace queues and metrics. * nrepl-server - Port on which the repl server will be hosted @@ -192,6 +217,15 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur * oldest-processed-messages-in-s - The oldest message which will be processed by stream in second. By default the value is 604800 (1 week) * proto-class - The proto-class of the message so that it can be decompiled before being passed to the mapper function * changelog-topic-replication-factor - the internal changelog topic replication factor. By default the value is 3 + * producer - Configuration for KafkaProducer. Currently, only following options are supported. Please see [Producer Configs](https://kafka.apache.org/documentation/#producerconfigs) for detailed explanation for each of the configuration parameters. + * bootstrap.servers - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + * acks - The number of acknowledgments the producer requires the leader to have received before considering a request complete. Valid values are [all, -1, 0, 1]. + * retries - Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. + * key.serializer - Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface. + * value.serializer - Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. + * max.in.flight.requests.per.connection - The maximum number of unacknowledged requests the client will send on a single connection before blocking. + * enable.idempotence - When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. + * datadog - The statsd host and port that metrics should be sent to, although the key name is datadog, it supports statsd as well to send metrics. * sentry - Whenever a :failure keyword is returned from the mapper-function or an exception is raised while executing the mapper-function, an event is sent to sentry. You can skip this flow by disabling it. * rabbit-mq-connection - The details required to make a connection to rabbitmq. We use rabbitmq for the retry mechanism. diff --git a/project.clj b/project.clj index 4a685bf7..90c678f5 100644 --- a/project.clj +++ b/project.clj @@ -49,7 +49,8 @@ [junit/junit "4.12"] [org.apache.kafka/kafka-streams "1.1.1" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] [org.apache.kafka/kafka-clients "1.1.1" :classifier "test"] - [org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"]] + [org.apache.kafka/kafka_2.11 "1.1.1" :classifier "test"] + [org.clojure/test.check "0.9.0"]] :plugins [[lein-cloverage "1.0.13"]] :repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]} :dev {:plugins [[jonase/eastwood "0.2.6"] diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn index c56c543c..39962a8c 100644 --- a/resources/config.test.ci.edn +++ b/resources/config.test.ci.edn @@ -28,11 +28,26 @@ :enabled [true :bool]} :http-server {:port [8010 :int] :thread-count [100 :int]} - :stream-router {:default {:application-id "application-name-test-02-multiple" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "kafka-topic-*" - :proto-class "com.company.LogMessage" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}}}} \ No newline at end of file + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}} + :without-producer {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}}} diff --git a/resources/config.test.edn b/resources/config.test.edn index 5261129a..9609011b 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -28,11 +28,26 @@ :enabled [true :bool]} :http-server {:port [8010 :int] :thread-count [100 :int]} - :stream-router {:default {:application-id "application-name-test-02-multiple" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "kafka-topic-*" - :proto-class "com.company.LogMessage" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}}}} \ No newline at end of file + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}} + :without-producer {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}}} diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 1fb5e741..023c84f3 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -11,7 +11,8 @@ [ziggurat.nrepl-server :as nrepl-server] [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.server :as server] - [ziggurat.streams :as streams])) + [ziggurat.streams :as streams] + [ziggurat.producer :as producer :refer [kafka-producers]])) (defstate statsd-reporter :start (metrics/start-statsd-reporter (:datadog (ziggurat-config)) @@ -38,9 +39,16 @@ (start-rabbitmq-connection args) (messaging-producer/make-queues (get args :stream-routes))) +(defn start-kafka-producers [] + (start* #{#'kafka-producers})) + +(defn start-kafka-streams [args] + (start* #{#'streams/stream} args)) + (defn start-stream [args] + (start-kafka-producers) (start-rabbitmq-producers args) - (start* #{#'streams/stream} args)) + (start-kafka-streams args)) (defn start-management-apis [args] (start-rabbitmq-connection args) @@ -51,22 +59,31 @@ (start* #{#'server/server} args)) (defn start-workers [args] + (start-kafka-producers) (start-rabbitmq-producers args) (start-rabbitmq-consumers args)) (defn- stop-rabbitmq-connection [] (mount/stop #'messaging-connection/connection)) +(defn stop-kafka-producers [] + (mount/stop #'kafka-producers)) + +(defn stop-kafka-streams [] + (mount/stop #'streams/stream)) + (defn stop-workers [] - (stop-rabbitmq-connection)) + (stop-rabbitmq-connection) + (stop-kafka-producers)) (defn stop-server [] (mount/stop #'server/server) (stop-rabbitmq-connection)) (defn stop-stream [] - (mount/stop #'streams/stream) - (stop-rabbitmq-connection)) + (stop-kafka-streams) + (stop-rabbitmq-connection) + (stop-kafka-producers)) (defn stop-management-apis [] (mount/stop #'server/server) diff --git a/src/ziggurat/producer.clj b/src/ziggurat/producer.clj new file mode 100644 index 00000000..b4ee344b --- /dev/null +++ b/src/ziggurat/producer.clj @@ -0,0 +1,124 @@ +(ns ziggurat.producer + "This namespace defines methods for publishing data to + Kafka topics. The methods defined here are essentially wrapper + around variants of `send` methods defined in + `org.apache.kafka.clients.producer.KafkaProducer`. + + At the time of initialization, an instance of + `org.apache.kafka.clients.producer.KafkaProducer` + is constructed using config values provided in `resources/config.edn`. + + A producer can be configured for each of the stream-routes + in config.edn. Please see the example below + + ` + :stream-router {:default {:application-id \"test\"\n + :bootstrap-servers \"localhost:9092\"\n + :stream-threads-count [1 :int]\n + :origin-topic \"topic\"\n + :proto-class \"flatland.protobuf.test.Example$Photo\"\n + :channels {:channel-1 {:worker-count [10 :int]\n :retry {:count [5 :int]\n :enabled [true :bool]}}}\n + :producer {:bootstrap-servers \"localhost:9092\"\n + :acks \"all\"\n + :retries-config 5\n + :max-in-flight-requests-per-connection 5\n + :enable-idempotence false\n + :value-serializer \"org.apache.kafka.common.serialization.StringSerializer\"\n + :key-serializer \"org.apache.kafka.common.serialization.StringSerializer\"}} + ` + + Usage: + + ` + Please see `send` for publishing data via Kafka producers + ` + + These are the KafkaProducer configs currenlty supported in Ziggurat. + - bootstrap.servers + - acks + - retries + - key.serializer + - value.serializer + - max.in.flight.requests.per.connection + - enable.idempotencecd + + Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs) + for a complete list of all producer configs available in Kafka." + + (:require [ziggurat.config :refer [ziggurat-config]] + [clojure.tools.logging :as log] + [mount.core :refer [defstate]]) + (:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig) + (java.util Properties))) + +(defn- producer-properties-from-config [{:keys [bootstrap-servers + acks + key-serializer + value-serializer + enable-idempotence + retries-config + max-in-flight-requests-per-connection]}] + (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers) + (.put ProducerConfig/ACKS_CONFIG acks) + (.put ProducerConfig/RETRIES_CONFIG (int retries-config)) + (.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG enable-idempotence) + (.put ProducerConfig/MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION (int max-in-flight-requests-per-connection)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG key-serializer) + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG value-serializer))) + +(defn producer-properties-map [] + (reduce (fn [producer-map [stream-config-key stream-config]] + (let [producer-config (:producer stream-config)] + (if (some? producer-config) + (assoc producer-map stream-config-key (producer-properties-from-config producer-config)) + producer-map))) + {} + (seq (:stream-router (ziggurat-config))))) + +(defstate kafka-producers + :start (if (not-empty (producer-properties-map)) + (do (log/info "Starting Kafka producers ...") + (reduce (fn [producers [stream-config-key properties]] + (do (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ") + (assoc producers stream-config-key (KafkaProducer. properties)))) + {} + (seq (producer-properties-map)))) + (log/info "No producers found. Can not initiate start.")) + + :stop (if (not-empty kafka-producers) + (do (log/info "Stopping Kafka producers ...") + (doall (map (fn [[stream-config-key producer]] + (log/debug "Stopping Kafka producer associated with [" stream-config-key "]") + (doto producer + (.flush) + (.close))) + (seq kafka-producers)))) + (log/info "No producers found. Can not initiate stop."))) + +(defn send + "A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables + the users of Ziggurat to produce data to a Kafka topic using a Kafka producer + associated with a Kafka stream config key. + + E.g. + For publishing data to producer defined for the + stream router config with defined agains + key `:default`, use send like this. + + `(send :default \"test-topic\" \"key\" \"value\")` + `(send :default \"test-topic\" 1 \"key\" \"value\")` + + " + + ([stream-config-key topic key value] + (send stream-config-key topic nil key value)) + + ([stream-config-key topic partition key value] + (if (some? (get kafka-producers stream-config-key)) + (let [producer-record (ProducerRecord. topic partition key value)] + (.send (stream-config-key kafka-producers) producer-record)) + + (let [error-msg (str "Can't publish data. No producers defined for stream config [" stream-config-key "]")] + (do (log/error error-msg) + (throw (ex-info error-msg {:stream-config-key stream-config-key}))))))) diff --git a/test/ziggurat/fixtures.clj b/test/ziggurat/fixtures.clj index 72c1c05f..cba85fab 100644 --- a/test/ziggurat/fixtures.clj +++ b/test/ziggurat/fixtures.clj @@ -8,9 +8,14 @@ [ziggurat.messaging.connection :refer [connection]] [ziggurat.server :refer [server]] [ziggurat.messaging.producer :as pr] + [ziggurat.producer :as producer] [langohr.channel :as lch] [langohr.exchange :as le] - [langohr.queue :as lq])) + [langohr.queue :as lq]) + (:import (org.apache.kafka.streams.integration.utils EmbeddedKafkaCluster) + (java.util Properties) + (org.apache.kafka.clients.producer ProducerConfig) + (org.apache.kafka.clients.consumer ConsumerConfig))) (defn mount-config [] (-> (mount/only [#'config/config]) @@ -91,3 +96,37 @@ (finally (delete-queues ~stream-routes) (delete-exchanges ~stream-routes)))) + +(defn mount-producer [] + (-> (mount/only [#'producer/kafka-producers]) + (mount/start))) + +(defn construct-embedded-kafka-cluster [] + (doto (EmbeddedKafkaCluster. 1) + (.start))) + +(def ^:dynamic *embedded-kafka-cluster* nil) +(def ^:dynamic *bootstrap-servers* nil) +(def ^:dynamic *consumer-properties* nil) +(def ^:dynamic *producer-properties* nil) + +(defn mount-only-config-and-producer [f] + (do + (mount-config) + (mount-producer) + (binding [*embedded-kafka-cluster* (construct-embedded-kafka-cluster)] + (binding [*bootstrap-servers* (.bootstrapServers *embedded-kafka-cluster*)] + (binding [*consumer-properties* (doto (Properties.) + (.put ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG, *bootstrap-servers*) + (.put ConsumerConfig/GROUP_ID_CONFIG, "ziggurat-consumer") + (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG, "earliest") + (.put ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + (.put ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")) + *producer-properties* (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG *bootstrap-servers*) + (.put ProducerConfig/ACKS_CONFIG "all") + (.put ProducerConfig/RETRIES_CONFIG (int 0)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer") + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer"))] + (f))))) + (mount/stop)) \ No newline at end of file diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index 497d30c5..41321e64 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -5,8 +5,10 @@ [ziggurat.messaging.connection :as rmqc] [ziggurat.messaging.consumer :as messaging-consumer] [ziggurat.messaging.producer :as messaging-producer] - [ziggurat.streams :as streams] - [ziggurat.server.test-utils :as tu])) + [ziggurat.streams :as streams :refer [stream]] + [mount.core :refer [defstate]] + [ziggurat.server.test-utils :as tu] + [mount.core :as mount])) (deftest start-calls-actor-start-fn-test (testing "The actor start fn starts before the ziggurat state and can read config" @@ -155,3 +157,34 @@ (let [modes [:invalid-modes :api-server :second-invalid]] (is (thrown? clojure.lang.ExceptionInfo (init/validate-modes modes)))))) +(deftest kafka-producers-should-start + (let [args {:actor-routes [] + :stream-routes []} + producer-has-started (atom false)] + (with-redefs [init/start-kafka-producers (fn [] (reset! producer-has-started true)) + init/start-kafka-streams (constantly nil)] + (testing "Starting the streams should start kafka-producers as well" + (init/start-stream args) + (is (= true @producer-has-started))) + (testing "Starting the workers should start kafka-producers as well" + (reset! producer-has-started false) + (init/start-workers args) + (is (= true @producer-has-started)))))) + +(deftest kafka-producers-should-stop + (let [producer-has-stopped (atom false)] + (with-redefs [init/stop-kafka-producers (fn [] (reset! producer-has-stopped true)) + init/stop-kafka-streams (constantly nil)] + (testing "Stopping the streams should stop kafka-producers as well" + (init/stop-stream) + (is (= true @producer-has-stopped))) + (testing "Stopping the workers should stop kafka-producers as well" + (reset! producer-has-stopped false) + (init/stop-workers) + (is (= true @producer-has-stopped))) + (mount/stop)))) + + + + + diff --git a/test/ziggurat/producer_test.clj b/test/ziggurat/producer_test.clj new file mode 100644 index 00000000..2d29bad9 --- /dev/null +++ b/test/ziggurat/producer_test.clj @@ -0,0 +1,68 @@ +(ns ziggurat.producer-test + (:require [clojure.test :refer :all] + [ziggurat.streams :refer [start-streams stop-streams]] + [ziggurat.fixtures :as fix :refer [*producer-properties* *consumer-properties* *embedded-kafka-cluster*]] + [ziggurat.config :refer [ziggurat-config]] + [ziggurat.producer :refer [producer-properties-map send kafka-producers]] + [clojure.test.check.generators :as gen]) + (:import (org.apache.kafka.streams.integration.utils IntegrationTestUtils) + (org.apache.kafka.clients.producer KafkaProducer))) + +(use-fixtures :once fix/mount-only-config-and-producer) + +(defn stream-router-config-without-producer []) +(:stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}) + +(deftest send-data-with-topic-and-value-test + (with-redefs + [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] + (let [topic (gen/generate gen/string-alphanumeric) + key "message" + value "Hello World!!"] + (.createTopic *embedded-kafka-cluster* topic) + (send :default topic key value) + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 1000)] + (is (= value (.value (first result)))))))) + +(deftest send-data-with-topic-key-partition-and-value-test + (with-redefs + [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] + (let [topic (gen/generate gen/string-alphanumeric) + key "message" + value "Hello World!!" + partition (int 0)] + (.createTopic *embedded-kafka-cluster* topic) + (send :default topic partition key value) + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 1000)] + (is (= value (.value (first result)))))))) + +(deftest send-throws-exception-when-no-producers-are-configured + (with-redefs + [kafka-producers {}] + (let [topic "test-topic" + key "message" + value "Hello World!! from non-existant Kafka Producers"] + (is (not-empty (try (send :default topic key value) + (catch Exception e (ex-data e)))))))) + +(deftest producer-properties-map-is-empty-if-no-producers-configured + ; Here ziggurat-config has been substituted with a custom map which + ; does not have any valid producer configs. + (with-redefs + [ziggurat-config stream-router-config-without-producer] + (is (empty? (producer-properties-map))))) + +(deftest producer-properties-map-is-not-empty-if-producers-are-configured + ; Here the config is read from config.test.edn which contains + ; valid producer configs. + (is (seq (producer-properties-map)))) + + + From 684948032aea3b9a284c01181247148f6f71d56b Mon Sep 17 00:00:00 2001 From: krriteshgupta <51387236+krriteshgupta@users.noreply.github.com> Date: Mon, 17 Jun 2019 14:07:56 +0530 Subject: [PATCH 11/12] validate stream routes only when modes is not present or it contains stream-server (#59) --- src/ziggurat/init.clj | 7 ++++--- test/ziggurat/init_test.clj | 37 ++++++++++++++++++++++--------------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 023c84f3..1b22605b 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -149,8 +149,9 @@ {s/Keyword {:handler-fn (s/pred #(fn? %)) s/Keyword (s/pred #(fn? %))}})) -(defn validate-stream-routes [stream-routes] - (s/validate StreamRoute stream-routes)) +(defn validate-stream-routes [stream-routes modes] + (when (or (empty? modes) (contains? (set modes) :stream-worker)) + (s/validate StreamRoute stream-routes))) (defn validate-modes [modes] (let [invalid-modes (filter #(not (contains? (set (keys valid-modes-fns)) %)) modes) @@ -177,7 +178,7 @@ ([{:keys [start-fn stop-fn stream-routes actor-routes modes]}] (try (validate-modes modes) - (validate-stream-routes stream-routes) + (validate-stream-routes stream-routes modes) (add-shutdown-hook stop-fn modes) (start start-fn stream-routes actor-routes modes) (catch clojure.lang.ExceptionInfo e diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index 41321e64..5f94842b 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -86,26 +86,33 @@ (deftest validate-stream-routes-test (let [exception-message "Invalid stream routes"] - (testing "Validate Stream Routes should raise exception if stream routes is nil" - (is (thrown? RuntimeException exception-message (init/validate-stream-routes nil)))) + (testing "Validate Stream Routes should raise exception if stream routes is nil and stream worker is one of the modes" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes nil [:stream-worker])))) - (testing "Validate Stream Routes should raise exception if stream routes are empty" - (is (thrown? RuntimeException exception-message (init/validate-stream-routes {})))) + (testing "Validate Stream Routes should raise exception if stream routes are empty and stream worker is one of the modes" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes {} [:stream-worker])))) - (testing "Validate Stream Routes should raise exception if stream route does not have handler-fn" - (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default {}})))) + (testing "Validate Stream Routes should raise exception if stream route does not have handler-fn and stream worker is one of the modes" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default {}} [:stream-worker])))) - (testing "Validate Stream Routes should raise exception if stream route does have nil value" - (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default nil})))) + (testing "Validate Stream Routes should raise exception if stream route does have nil value and stream worker is one of the modes" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default nil} [:stream-worker])))) - (testing "Validate Stream Routes should raise exception if stream route has nil handler-fn" - (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default {:handler-fn nil}})))) + (testing "Validate Stream Routes should raise exception if stream route has nil handler-fn and stream worker is one of the modes" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default {:handler-fn nil}} [:stream-worker])))) - (testing "Validate Stream Routes should raise exception if stream route has nil handler-fn" - (let [stream-route {:default {:handler-fn (fn []) - :channel-1 (fn []) - :channel-2 (fn [])}}] - (is (= stream-route (init/validate-stream-routes stream-route))))))) + (testing "Validate Stream Routes should raise exception if stream route has nil handler-fn and there is no mode passed" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default {:handler-fn nil}} nil))))) + + (testing "Validate Stream Routes should return nil if stream route is empty or nil and stream worker is not one of the modes" + (is (nil? (init/validate-stream-routes nil [:api-server]))) + (is (nil? (init/validate-stream-routes {} [:api-server])))) + + (testing "Validate Stream Routes should raise exception if stream route has nil handler-fn" + (let [stream-route {:default {:handler-fn (fn []) + :channel-1 (fn []) + :channel-2 (fn [])}}] + (is (= stream-route (init/validate-stream-routes stream-route [:stream-worker])))))) (deftest ziggurat-routes-serve-actor-routes-test (testing "The routes added by actor should be served along with ziggurat-routes" From 79a583fec33fb55b7be46a71bea61bc6d3decb36 Mon Sep 17 00:00:00 2001 From: "prateek.khatri" Date: Mon, 17 Jun 2019 15:07:21 +0530 Subject: [PATCH 12/12] Adds release notes for 2.12.0 Co-authored-by: Kartik Gupta --- CHANGELOG.md | 5 +++++ project.clj | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 27ab7c09..4526fc2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,11 @@ All notable changes to this project will be documented in this file. This change ## Unreleased Changes +## 2.12.0 - 2019-06-17 +- Add support for providing a topic-name label in the metrics +- Multiple Kafka producers support in ziggurat (#55) +- Validate stream routes only when modes is not present or it contains stream-server (#59) + ## 2.11.1 - 2019-06-04 - Actor stop fn should stop before the Ziggurat state (#53) diff --git a/project.clj b/project.clj index 90c678f5..c5bc56ab 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject tech.gojek/ziggurat "2.11.1" +(defproject tech.gojek/ziggurat "2.12.0" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0"