diff --git a/.travis.yml b/.travis.yml index 78a2d19b..9bfa0de1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -34,3 +34,6 @@ jobs: - stage: deploy script: lein deploy clojars + branches: + only: + - 2.x diff --git a/CHANGELOG.md b/CHANGELOG.md index f00163cd..0a733126 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,21 @@ All notable changes to this project will be documented in this file. This change ## Unreleased Changes -## 3.0.0-rc1 - 2019-05-17 + +## 3.0.0-alpha - 2019-06-21 - Upgrades kafka streams to version 2.1. Please refer [this](UpgradeGuide.md) to upgrade +## 2.12.0 - 2019-06-17 +- Add support for providing a topic-name label in the metrics +- Multiple Kafka producers support in ziggurat (#55) +- Validate stream routes only when modes is not present or it contains stream-server (#59) + +## 2.11.1 - 2019-06-04 +- Actor stop fn should stop before the Ziggurat state (#53) + +## 2.11.0 - 2019-05-31 +- Running ziggurat in different modes (#46) + ## 2.10.2 - 2019-05-03 - Adds config to change the changelog topic replication factor diff --git a/README.md b/README.md index e872a728..0b617d7b 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ Upgarde Guide to 3.x refer[here](UpgradeGuide.md) Add this to your project.clj: -`[tech.gojek/ziggurat "3.0.0-rc1"]` +`[tech.gojek/ziggurat "3.0.0-alpha"]` or for latest stable version @@ -128,7 +128,45 @@ To start a stream (a thread that reads messages from Kafka), add this to your co Ziggurat also sets up a HTTP server by default and you can pass in your own routes that it will serve. The above example demonstrates how you can pass in your own route. +or +```clojure +(ziggurat/main {:start-fn start-fn + :stop-fn stop-fn + :stream-routes {:stream-id {:handler-fn main-fn}} + :actor-routes routes + :modes [:api-server :stream-worker]}) +``` + +This will start both api-server and stream-worker modes + +There are four modes supported by ziggurat +``` + :api-server - Mode by which only server will be started with actor routes and management routes(Dead set management) + :stream-worker - Only start the server plus rabbitmq for only producing the messages for retry and channels + :worker - Starts the rabbitmq consumer for retry and channel + :management-api - Servers only routes which used for deadset management +``` + +You can pass in multiple modes and it will start accordingly +If nothing passed to modes then it will start all the modes. + +## Publishing data to Kafka Topics in Ziggurat +To enable publishing data to kafka, Ziggurat provides producing support through ziggurat.producer namespace. This namespace defines methods for publishing data to Kafka topics. The methods defined here are essentially wrapper around variants of `send` methods defined in `org.apache.kafka.clients.producer.KafkaProducer`. + +At the time of initialization, an instance of `org.apache.kafka.clients.producer.KafkaProducer` is constructed using config values provided in `resources/config.edn`. A producer can be configured for each of the stream-routes in config.edn. Please see the example below. + +At present, only a few configurations are supported for constructing KafkaProducer. These have been explained [here](#configuration). Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs) +for a complete list of all producer configs available in Kafka. + +Ziggurat.producer namespace defines a multi-arity `send` function which is a thin wrapper around [KafkaProducer#send](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-). This method publishes data to a Kafka topic through a Kafka producer +defined in the stream router configuration. See configuration section below. + +E.g. +For publishing data using a producer which is defined for the stream router config with key `:default`, use send like this: + +`(send :default "test-topic" "key" "value")` +`(send :default "test-topic" 1 "key" "value")` ## Configuration @@ -142,7 +180,14 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur :origin-topic "kafka-topic-*" :oldest-processed-message-in-s [604800 :int] :proto-class "proto-class" - :changelog-topic-replication-factor [3 :int]}} + :changelog-topic-replication-factor [3 :int] + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}}} :datadog {:host "localhost" :port [8125 :int] :enabled [false :bool]} @@ -170,7 +215,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur :jobs {:instant {:worker-count [4 :int] :prefetch-count [4 :int]}} :http-server {:port [8010 :int] - :thread-count [100 :int]}}} + :thread-count [100 :int]}}} ``` * app-name - Refers to the name of the application. Used to namespace queues and metrics. * nrepl-server - Port on which the repl server will be hosted @@ -183,6 +228,15 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur * oldest-processed-messages-in-s - The oldest message which will be processed by stream in second. By default the value is 604800 (1 week) * proto-class - The proto-class of the message so that it can be decompiled before being passed to the mapper function * changelog-topic-replication-factor - the internal changelog topic replication factor. By default the value is 3 + * producer - Configuration for KafkaProducer. Currently, only following options are supported. Please see [Producer Configs](https://kafka.apache.org/documentation/#producerconfigs) for detailed explanation for each of the configuration parameters. + * bootstrap.servers - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + * acks - The number of acknowledgments the producer requires the leader to have received before considering a request complete. Valid values are [all, -1, 0, 1]. + * retries - Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. + * key.serializer - Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface. + * value.serializer - Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. + * max.in.flight.requests.per.connection - The maximum number of unacknowledged requests the client will send on a single connection before blocking. + * enable.idempotence - When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. + * datadog - The statsd host and port that metrics should be sent to, although the key name is datadog, it supports statsd as well to send metrics. * sentry - Whenever a :failure keyword is returned from the mapper-function or an exception is raised while executing the mapper-function, an event is sent to sentry. You can skip this flow by disabling it. * rabbit-mq-connection - The details required to make a connection to rabbitmq. We use rabbitmq for the retry mechanism. diff --git a/project.clj b/project.clj index a9dbcaad..f0de4c8d 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject tech.gojek/ziggurat "3.0.0-rc1" +(defproject tech.gojek/ziggurat "3.0.0-alpha" :description "A stream processing framework to build stateless applications on kafka" :url "https://github.com/gojektech/ziggurat" :license {:name "Apache License, Version 2.0" @@ -49,7 +49,8 @@ [junit/junit "4.12"] [org.apache.kafka/kafka-streams "2.1.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]] [org.apache.kafka/kafka-clients "2.1.0" :classifier "test"] - [org.apache.kafka/kafka_2.11 "2.1.0" :classifier "test"]] + [org.apache.kafka/kafka_2.11 "2.1.0" :classifier "test"] + [org.clojure/test.check "0.9.0"]] :plugins [[lein-cloverage "1.0.13"]] :repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]} :dev {:plugins [[lein-cljfmt "0.6.3"] diff --git a/resources/config.test.ci.edn b/resources/config.test.ci.edn index 2cb28ac0..84995c91 100644 --- a/resources/config.test.ci.edn +++ b/resources/config.test.ci.edn @@ -28,12 +28,27 @@ :enabled [true :bool]} :http-server {:port [8010 :int] :thread-count [100 :int]} - :stream-router {:default {:application-id "test" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "topic" - :proto-class "flatland.protobuf.test.Example$Photo" - :upgrade-from "1.1" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}}}} \ No newline at end of file + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :upgrade-from "1.1" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}} + :without-producer {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}}} diff --git a/resources/config.test.edn b/resources/config.test.edn index 20e5ede6..97ac3baf 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -28,12 +28,27 @@ :enabled [true :bool]} :http-server {:port [8010 :int] :thread-count [100 :int]} - :stream-router {:default {:application-id "test" - :bootstrap-servers "localhost:9092" - :stream-threads-count [1 :int] - :origin-topic "topic" - :proto-class "flatland.protobuf.test.Example$Photo" - :upgrade-from "1.1" - :channels {:channel-1 {:worker-count [10 :int] - :retry {:count [5 :int] - :enabled [true :bool]}}}}}}} \ No newline at end of file + :stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :upgrade-from "1.1" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}} + :producer {:bootstrap-servers "localhost:9092" + :acks "all" + :retries-config 5 + :max-in-flight-requests-per-connection 5 + :enable-idempotence false + :value-serializer "org.apache.kafka.common.serialization.StringSerializer" + :key-serializer "org.apache.kafka.common.serialization.StringSerializer"}} + :without-producer {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}}} diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index eb11c757..1b22605b 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -3,7 +3,6 @@ (:require [clojure.tools.logging :as log] [mount.core :as mount :refer [defstate]] [schema.core :as s] - [sentry-clj.async :as sentry] [ziggurat.config :refer [ziggurat-config] :as config] [ziggurat.metrics :as metrics] [ziggurat.messaging.connection :as messaging-connection] @@ -12,7 +11,8 @@ [ziggurat.nrepl-server :as nrepl-server] [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.server :as server] - [ziggurat.streams :as streams])) + [ziggurat.streams :as streams] + [ziggurat.producer :as producer :refer [kafka-producers]])) (defstate statsd-reporter :start (metrics/start-statsd-reporter (:datadog (ziggurat-config)) @@ -28,38 +28,117 @@ (mount/with-args args) (mount/start)))) -(defn start - "Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc" - [actor-start-fn stream-routes actor-routes] +(defn- start-rabbitmq-connection [args] + (start* #{#'messaging-connection/connection} args)) + +(defn- start-rabbitmq-consumers [args] + (start-rabbitmq-connection args) + (messaging-consumer/start-subscribers (get args :stream-routes))) + +(defn- start-rabbitmq-producers [args] + (start-rabbitmq-connection args) + (messaging-producer/make-queues (get args :stream-routes))) + +(defn start-kafka-producers [] + (start* #{#'kafka-producers})) + +(defn start-kafka-streams [args] + (start* #{#'streams/stream} args)) + +(defn start-stream [args] + (start-kafka-producers) + (start-rabbitmq-producers args) + (start-kafka-streams args)) + +(defn start-management-apis [args] + (start-rabbitmq-connection args) + (start* #{#'server/server} (dissoc args :actor-routes))) + +(defn start-server [args] + (start-rabbitmq-connection args) + (start* #{#'server/server} args)) + +(defn start-workers [args] + (start-kafka-producers) + (start-rabbitmq-producers args) + (start-rabbitmq-consumers args)) + +(defn- stop-rabbitmq-connection [] + (mount/stop #'messaging-connection/connection)) + +(defn stop-kafka-producers [] + (mount/stop #'kafka-producers)) + +(defn stop-kafka-streams [] + (mount/stop #'streams/stream)) + +(defn stop-workers [] + (stop-rabbitmq-connection) + (stop-kafka-producers)) + +(defn stop-server [] + (mount/stop #'server/server) + (stop-rabbitmq-connection)) + +(defn stop-stream [] + (stop-kafka-streams) + (stop-rabbitmq-connection) + (stop-kafka-producers)) + +(defn stop-management-apis [] + (mount/stop #'server/server) + (stop-rabbitmq-connection)) + +(def valid-modes-fns + {:api-server {:start-fn start-server :stop-fn stop-server} + :stream-worker {:start-fn start-stream :stop-fn stop-stream} + :worker {:start-fn start-workers :stop-fn stop-workers} + :management-api {:start-fn start-management-apis :stop-fn stop-management-apis}}) + +(defn- execute-function + ([modes fnk] + (execute-function modes fnk nil)) + ([modes fnk args] + (doseq [mode (-> modes + (or (keys valid-modes-fns)) + sort)] + (if (nil? args) + ((fnk (get valid-modes-fns mode))) + ((fnk (get valid-modes-fns mode)) args))))) + +(defn start-common-states [] (start* #{#'config/config #'statsd-reporter - #'sentry-reporter}) - (actor-start-fn) - (start* #{#'messaging-connection/connection} {:stream-routes stream-routes}) - (messaging-producer/make-queues stream-routes) - (messaging-consumer/start-subscribers stream-routes) ;; We want subscribers to start after creating queues on RabbitMQ. - (start* #{#'server/server - #'nrepl-server/server - #'streams/stream} - {:stream-routes stream-routes - :actor-routes actor-routes})) + #'sentry-reporter + #'nrepl-server/server})) -(defn stop - "Calls the Ziggurat's state stop fns and then actor-stop-fn." - [actor-stop-fn] +(defn stop-common-states [] (mount/stop #'config/config #'statsd-reporter #'messaging-connection/connection - #'server/server - #'nrepl-server/server - #'streams/stream) + #'nrepl-server/server)) + +(defn start + "Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc" + [actor-start-fn stream-routes actor-routes modes] + (start-common-states) + (actor-start-fn) + (execute-function modes + :start-fn + {:actor-routes actor-routes + :stream-routes stream-routes})) + +(defn stop + "Calls the Ziggurat's state stop fns and then actor-stop-fn." + [actor-stop-fn modes] (actor-stop-fn) - (mount/stop #'config/config)) + (stop-common-states) + (execute-function modes :stop-fn)) -(defn- add-shutdown-hook [actor-stop-fn] +(defn- add-shutdown-hook [actor-stop-fn modes] (.addShutdownHook (Runtime/getRuntime) - (Thread. ^Runnable #(do (stop actor-stop-fn) + (Thread. ^Runnable #(do (stop actor-stop-fn modes) (shutdown-agents)) "Shutdown-handler"))) @@ -70,8 +149,15 @@ {s/Keyword {:handler-fn (s/pred #(fn? %)) s/Keyword (s/pred #(fn? %))}})) -(defn validate-stream-routes [stream-routes] - (s/validate StreamRoute stream-routes)) +(defn validate-stream-routes [stream-routes modes] + (when (or (empty? modes) (contains? (set modes) :stream-worker)) + (s/validate StreamRoute stream-routes))) + +(defn validate-modes [modes] + (let [invalid-modes (filter #(not (contains? (set (keys valid-modes-fns)) %)) modes) + invalid-modes-count (count invalid-modes)] + (when (pos? invalid-modes-count) + (throw (ex-info "Wrong modes arguement passed - " {:invalid-modes invalid-modes}))))) (defn main "The entry point for your application. @@ -88,11 +174,17 @@ ([start-fn stop-fn stream-routes] (main start-fn stop-fn stream-routes [])) ([start-fn stop-fn stream-routes actor-routes] + (main {:start-fn start-fn :stop-fn stop-fn :stream-routes stream-routes :actor-routes actor-routes})) + ([{:keys [start-fn stop-fn stream-routes actor-routes modes]}] (try - (validate-stream-routes stream-routes) - (add-shutdown-hook stop-fn) - (start start-fn stream-routes actor-routes) + (validate-modes modes) + (validate-stream-routes stream-routes modes) + (add-shutdown-hook stop-fn modes) + (start start-fn stream-routes actor-routes modes) + (catch clojure.lang.ExceptionInfo e + (log/error e) + (System/exit 1)) (catch Exception e (log/error e) - (stop stop-fn) - (System/exit 1))))) + (stop stop-fn modes) + (System/exit 1))))) \ No newline at end of file diff --git a/src/ziggurat/kafka_delay.clj b/src/ziggurat/kafka_delay.clj index eb2582aa..3fae5bdc 100644 --- a/src/ziggurat/kafka_delay.clj +++ b/src/ziggurat/kafka_delay.clj @@ -2,9 +2,12 @@ (:require [ziggurat.metrics :as metrics] [ziggurat.util.time :refer :all])) -(defn calculate-and-report-kafka-delay [metric-namespace record-timestamp] - (let [now-millis (get-current-time-in-millis) - delay (- now-millis - record-timestamp)] - (metrics/report-time metric-namespace delay))) - +(defn calculate-and-report-kafka-delay + ([metric-namespaces record-timestamp] + (calculate-and-report-kafka-delay metric-namespaces record-timestamp nil)) + ([metric-namespaces record-timestamp additional-tags] + (let [now-millis (get-current-time-in-millis) + delay (- now-millis record-timestamp) + default-namespace (last metric-namespaces) + multi-namespaces [metric-namespaces [default-namespace]]] + (metrics/multi-ns-report-time multi-namespaces delay additional-tags)))) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 808380f8..f1b0c7ea 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -1,7 +1,9 @@ (ns ziggurat.mapper - (:require [sentry-clj.async :as sentry] - [ziggurat.metrics :as metrics] + (:require [clojure.string :as str] + [sentry-clj.async :as sentry] + [ziggurat.config :refer [ziggurat-config]] [ziggurat.messaging.producer :as producer] + [ziggurat.metrics :as metrics] [ziggurat.new-relic :as nr] [ziggurat.sentry :refer [sentry-reporter]]) (:import (java.time Instant))) @@ -13,49 +15,76 @@ (defn mapper-func [mapper-fn topic-entity channels] (fn [message] - (let [topic-entity-name (name topic-entity) + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) new-relic-transaction-name (str topic-entity-name ".handler-fn") - metric-namespace (str topic-entity-name ".message-processing")] + default-namespace "message-processing" + metric-namespaces [service-name topic-entity-name default-namespace] + additional-tags {:topic_name topic-entity-name} + default-namespaces [default-namespace] + success-metric "success" + retry-metric "retry" + skip-metric "skip" + failure-metric "failure" + multi-namespaces [metric-namespaces default-namespaces]] (nr/with-tracing "job" new-relic-transaction-name (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn message) - end-time (.toEpochMilli (Instant/now))] - (metrics/report-time (str topic-entity-name ".handler-fn-execution-time") (- end-time start-time)) + (let [start-time (.toEpochMilli (Instant/now)) + return-code (mapper-fn message) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "handler-fn-execution-time" + multi-execution-time-namespaces [[service-name topic-entity-name execution-time-namespace] + [execution-time-namespace]]] + (metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags) (case return-code - :success (metrics/increment-count metric-namespace "success") - :retry (do (metrics/increment-count metric-namespace "retry") - (producer/retry message topic-entity)) - :skip (metrics/increment-count metric-namespace "skip") - :block 'TODO + :success (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags) + :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) + (producer/retry message topic-entity)) + :skip (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags) + :block 'TODO (do (send-msg-to-channel channels message topic-entity return-code) - (metrics/increment-count metric-namespace "success")))) + (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags)))) (catch Throwable e (producer/retry message topic-entity) (sentry/report-error sentry-reporter e (str "Actor execution failed for " topic-entity-name)) - (metrics/increment-count metric-namespace "failure"))))))) + (metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags))))))) (defn channel-mapper-func [mapper-fn topic-entity channel] (fn [message] - (let [topic-entity-name (name topic-entity) - channel-name (name channel) - metric-namespace (str topic-entity-name "." channel-name) - message-processing-namespace (str metric-namespace ".message-processing")] + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) + channel-name (name channel) + default-namespace "message-processing" + base-namespaces [service-name topic-entity-name channel-name] + metric-namespaces (conj base-namespaces default-namespace) + additional-tags {:topic_name topic-entity-name} + default-namespaces [default-namespace] + metric-namespace (str/join "." metric-namespaces) + success-metric "success" + retry-metric "retry" + skip-metric "skip" + failure-metric "failure" + multi-namespaces [metric-namespaces default-namespaces]] (nr/with-tracing "job" metric-namespace (try - (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn message) - end-time (.toEpochMilli (Instant/now))] - (metrics/report-time (str metric-namespace ".execution-time") (- end-time start-time)) + (let [start-time (.toEpochMilli (Instant/now)) + return-code (mapper-fn message) + end-time (.toEpochMilli (Instant/now)) + time-val (- end-time start-time) + execution-time-namespace "execution-time" + multi-execution-time-namespaces [(conj base-namespaces execution-time-namespace) + [execution-time-namespace]]] + (metrics/multi-ns-report-time multi-execution-time-namespaces time-val additional-tags) (case return-code - :success (metrics/increment-count message-processing-namespace "success") - :retry (do (metrics/increment-count message-processing-namespace "retry") - (producer/retry-for-channel message topic-entity channel)) - :skip (metrics/increment-count message-processing-namespace "skip") - :block 'TODO + :success (metrics/multi-ns-increment-count multi-namespaces success-metric additional-tags) + :retry (do (metrics/multi-ns-increment-count multi-namespaces retry-metric additional-tags) + (producer/retry-for-channel message topic-entity channel)) + :skip (metrics/multi-ns-increment-count multi-namespaces skip-metric additional-tags) + :block 'TODO (throw (ex-info "Invalid mapper return code" {:code return-code})))) (catch Throwable e (producer/retry-for-channel message topic-entity channel) (sentry/report-error sentry-reporter e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name)) - (metrics/increment-count message-processing-namespace "failure"))))))) \ No newline at end of file + (metrics/multi-ns-increment-count multi-namespaces failure-metric additional-tags))))))) diff --git a/src/ziggurat/metrics.clj b/src/ziggurat/metrics.clj index c3373800..f58b2481 100644 --- a/src/ziggurat/metrics.clj +++ b/src/ziggurat/metrics.clj @@ -1,45 +1,76 @@ (ns ziggurat.metrics - (:require [clojure.tools.logging :as log]) - (:import (com.gojek.metrics.datadog DatadogReporter) - (com.gojek.metrics.datadog.transport UdpTransport$Builder UdpTransport) - (java.util.concurrent TimeUnit) - (io.dropwizard.metrics5 MetricRegistry Meter MetricName Histogram))) + (:require [clojure.string :as str] + [clojure.tools.logging :as log] + [clojure.walk :refer [stringify-keys]]) + (:import com.gojek.metrics.datadog.DatadogReporter + [com.gojek.metrics.datadog.transport UdpTransport UdpTransport$Builder] + [io.dropwizard.metrics5 Histogram Meter MetricName MetricRegistry] + java.util.concurrent.TimeUnit)) (defonce ^:private group (atom nil)) (defonce metrics-registry (MetricRegistry.)) +(defn- merge-tags + [additional-tags] + (let [default-tags {"actor" @group}] + (merge default-tags (when-not (seq additional-tags) + (stringify-keys additional-tags))))) + (defn mk-meter - [category metric] - (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) - tagged-metric (.tagged ^MetricName metric-name ^"[Ljava.lang.String;" (into-array String ["actor" @group]))] - (.meter ^MetricRegistry metrics-registry ^MetricName tagged-metric))) + ([category metric] + (mk-meter category metric nil)) + ([category metric additional-tags] + (let [namespace (str category "." metric) + metric-name (MetricRegistry/name ^String namespace nil) + tags (merge-tags additional-tags) + tagged-metric (.tagged ^MetricName metric-name tags)] + (.meter ^MetricRegistry metrics-registry ^MetricName tagged-metric)))) (defn mk-histogram - [category metric] - (let [metric-name (MetricRegistry/name ^String @group ^"[Ljava.lang.String;" (into-array String [category metric])) - tagged-metric (.tagged ^MetricName metric-name ^"[Ljava.lang.String;" (into-array String ["actor" @group]))] - (.histogram ^MetricRegistry metrics-registry tagged-metric))) - -(defn increment-count - ([metric-namespace metric] - (increment-count metric-namespace metric 1)) - ([metric-namespace metric n] - (let [meter ^Meter (mk-meter metric-namespace metric)] - (.mark meter (int n))))) - -(defn decrement-count - ([metric-namespace metric] - (decrement-count metric-namespace metric 1)) - ([metric-namespace metric n] - (let [meter ^Meter (mk-meter metric-namespace metric)] - (.mark meter (int (- n)))))) - -(defn report-time [metric-namespace time-val] - (let [histogram ^Histogram (mk-histogram metric-namespace "all")] + ([category metric] + (mk-histogram category metric nil)) + ([category metric additional-tags] + (let [namespace (str category "." metric) + metric-name (MetricRegistry/name ^String namespace nil) + tags (merge-tags additional-tags) + tagged-metric (.tagged ^MetricName metric-name tags)] + (.histogram ^MetricRegistry metrics-registry ^MetricName tagged-metric)))) + +(defn intercalate-dot + [names] + (str/join "." names)) + +(defn remove-topic-tag-for-old-namespace + [additional-tags ns] + (let [topic-name (:topic_name additional-tags)] + (dissoc additional-tags (when (some #(= % topic-name) ns) :topic_name)))) + +(defn- inc-or-dec-count + [sign metric-namespaces metric additional-tags] + (let [metric-namespace (intercalate-dot metric-namespaces) + meter ^Meter (mk-meter metric-namespace metric (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] + (.mark meter (sign 1)))) + +(def increment-count (partial inc-or-dec-count +)) + +(def decrement-count (partial inc-or-dec-count -)) + +(defn multi-ns-increment-count [nss metric additional-tags] + (doseq [ns nss] + (increment-count ns metric additional-tags))) + +(defn report-time + [metric-namespaces time-val additional-tags] + (let [metric-namespace (intercalate-dot metric-namespaces) + histogram ^Histogram (mk-histogram metric-namespace "all" (remove-topic-tag-for-old-namespace additional-tags metric-namespaces))] (.update histogram (int time-val)))) +(defn multi-ns-report-time [nss time-val additional-tags] + (doseq [ns nss] + (report-time ns time-val additional-tags))) + (defn start-statsd-reporter [statsd-config env app-name] (let [{:keys [enabled host port]} statsd-config] (when enabled @@ -48,10 +79,10 @@ (.withPort port) (.build)) - reporter (-> (DatadogReporter/forRegistry metrics-registry) - (.withTransport transport) - (.withTags [(str env)]) - (.build))] + reporter (-> (DatadogReporter/forRegistry metrics-registry) + (.withTransport transport) + (.withTags [(str env)]) + (.build))] (log/info "Starting statsd reporter") (.start reporter 1 TimeUnit/SECONDS) (reset! group app-name) diff --git a/src/ziggurat/producer.clj b/src/ziggurat/producer.clj new file mode 100644 index 00000000..3c4315f4 --- /dev/null +++ b/src/ziggurat/producer.clj @@ -0,0 +1,124 @@ +(ns ziggurat.producer + "This namespace defines methods for publishing data to + Kafka topics. The methods defined here are essentially wrapper + around variants of `send` methods defined in + `org.apache.kafka.clients.producer.KafkaProducer`. + + At the time of initialization, an instance of + `org.apache.kafka.clients.producer.KafkaProducer` + is constructed using config values provided in `resources/config.edn`. + + A producer can be configured for each of the stream-routes + in config.edn. Please see the example below + + ` + :stream-router {:default {:application-id \"test\"\n + :bootstrap-servers \"localhost:9092\"\n + :stream-threads-count [1 :int]\n + :origin-topic \"topic\"\n + :proto-class \"flatland.protobuf.test.Example$Photo\"\n + :channels {:channel-1 {:worker-count [10 :int]\n :retry {:count [5 :int]\n :enabled [true :bool]}}}\n + :producer {:bootstrap-servers \"localhost:9092\"\n + :acks \"all\"\n + :retries-config 5\n + :max-in-flight-requests-per-connection 5\n + :enable-idempotence false\n + :value-serializer \"org.apache.kafka.common.serialization.StringSerializer\"\n + :key-serializer \"org.apache.kafka.common.serialization.StringSerializer\"}} + ` + + Usage: + + ` + Please see `send` for publishing data via Kafka producers + ` + + These are the KafkaProducer configs currenlty supported in Ziggurat. + - bootstrap.servers + - acks + - retries + - key.serializer + - value.serializer + - max.in.flight.requests.per.connection + - enable.idempotencecd + + Please see [Producer configs](http://kafka.apache.org/documentation.html#producerconfigs) + for a complete list of all producer configs available in Kafka." + + (:require [ziggurat.config :refer [ziggurat-config]] + [clojure.tools.logging :as log] + [mount.core :refer [defstate]]) + (:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig) + (java.util Properties))) + +(defn- producer-properties-from-config [{:keys [bootstrap-servers + acks + key-serializer + value-serializer + enable-idempotence + retries-config + max-in-flight-requests-per-connection]}] + (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers) + (.put ProducerConfig/ACKS_CONFIG acks) + (.put ProducerConfig/RETRIES_CONFIG (int retries-config)) + (.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG enable-idempotence) + (.put ProducerConfig/MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION (int max-in-flight-requests-per-connection)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG key-serializer) + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG value-serializer))) + +(defn producer-properties-map [] + (reduce (fn [producer-map [stream-config-key stream-config]] + (let [producer-config (:producer stream-config)] + (if (some? producer-config) + (assoc producer-map stream-config-key (producer-properties-from-config producer-config)) + producer-map))) + {} + (seq (:stream-router (ziggurat-config))))) + +(defstate kafka-producers + :start (if (not-empty (producer-properties-map)) + (do (log/info "Starting Kafka producers ...") + (reduce (fn [producers [stream-config-key properties]] + (do (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ") + (assoc producers stream-config-key (KafkaProducer. properties)))) + {} + (seq (producer-properties-map)))) + (log/info "No producers found. Can not initiate start.")) + + :stop (if (not-empty kafka-producers) + (do (log/info "Stopping Kafka producers ...") + (doall (map (fn [[stream-config-key producer]] + (log/debug "Stopping Kafka producer associated with [" stream-config-key "]") + (doto producer + (.flush) + (.close))) + (seq kafka-producers)))) + (log/info "No producers found. Can not initiate stop."))) + +(defn send + "A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables + the users of Ziggurat to produce data to a Kafka topic using a Kafka producer + associated with a Kafka stream config key. + + E.g. + For publishing data to producer defined for the + stream router config with defined agains + key `:default`, use send like this. + + `(send :default \"test-topic\" \"key\" \"value\")` + `(send :default \"test-topic\" 1 \"key\" \"value\")` + + " + + ([stream-config-key topic key value] + (send stream-config-key topic nil key value)) + + ([stream-config-key topic partition key value] + (if (some? (get kafka-producers stream-config-key)) + (let [producer-record (ProducerRecord. topic partition key value)] + (.send (stream-config-key kafka-producers) producer-record)) + + (let [error-msg (str "Can't publish data. No producers defined for stream config [" stream-config-key "]")] + (log/error error-msg) + (throw (ex-info error-msg {:stream-config-key stream-config-key})))))) diff --git a/src/ziggurat/resource/dead_set.clj b/src/ziggurat/resource/dead_set.clj index 06bad964..5179ac9d 100644 --- a/src/ziggurat/resource/dead_set.clj +++ b/src/ziggurat/resource/dead_set.clj @@ -1,9 +1,9 @@ (ns ziggurat.resource.dead-set - (:require [schema.core :as s] - [clojure.tools.logging :as log] + (:require [clojure.tools.logging :as log] + [mount.core :as mount] + [schema.core :as s] [ziggurat.config :refer [get-in-config channel-retry-config]] - [ziggurat.messaging.dead-set :as r] - [mount.core :as mount])) + [ziggurat.messaging.dead-set :as r])) (def not-found-for-retry {:status 404 :body {:error "Retry is not enabled"}}) diff --git a/src/ziggurat/retry.clj b/src/ziggurat/retry.clj index 230998a3..ec2349c9 100644 --- a/src/ziggurat/retry.clj +++ b/src/ziggurat/retry.clj @@ -1,6 +1,5 @@ (ns ziggurat.retry (:require [clojure.set :as set] - [sentry-clj.async :as sentry] [ziggurat.sentry :refer [sentry-reporter]])) (def default-wait 100) diff --git a/src/ziggurat/server/middleware.clj b/src/ziggurat/server/middleware.clj index c5aa4e93..40abc29c 100644 --- a/src/ziggurat/server/middleware.clj +++ b/src/ziggurat/server/middleware.clj @@ -18,7 +18,7 @@ (defn wrap-hyphenate [handler & args] (fn [request] - (let [{:keys [skip-hyphenation] :as response} + (let [response (handler (update request :params #(umap/nested-map-keys (fn [k] (apply csk/->kebab-case-keyword k args)) %)))] diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index a057f3ee..06631016 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -2,29 +2,29 @@ (:require [clojure.tools.logging :as log] [flatland.protobuf.core :as proto] [mount.core :as mount :refer [defstate]] - [ziggurat.metrics :as metrics] - [ziggurat.config :refer [ziggurat-config]] [sentry-clj.async :as sentry] [ziggurat.channel :as chl] - [ziggurat.util.map :as umap] + [ziggurat.config :refer [ziggurat-config]] [ziggurat.mapper :as mpr] + [ziggurat.metrics :as metrics] + [ziggurat.sentry :refer [sentry-reporter]] [ziggurat.timestamp-transformer :as transformer] - [ziggurat.sentry :refer [sentry-reporter]]) - (:import [java.util.regex Pattern] - [java.util Properties] + [ziggurat.util.map :as umap]) + (:import [java.util Properties] + [java.util.regex Pattern] [org.apache.kafka.clients.consumer ConsumerConfig] [org.apache.kafka.common.serialization Serdes] + [org.apache.kafka.common.utils SystemTime] [org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology] [org.apache.kafka.streams.kstream ValueMapper TransformerSupplier] [org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier] - [org.apache.kafka.common.utils SystemTime] [ziggurat.timestamp_transformer IngestionTimeExtractor])) (def default-config-for-stream - {:buffered-records-per-partition 10000 - :commit-interval-ms 15000 - :auto-offset-reset-config "latest" - :oldest-processed-message-in-s 604800 + {:buffered-records-per-partition 10000 + :commit-interval-ms 15000 + :auto-offset-reset-config "latest" + :oldest-processed-message-in-s 604800 :changelog-topic-replication-factor 3}) (defn- set-upgrade-from-config @@ -62,12 +62,16 @@ (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG auto-offset-reset-config) (set-upgrade-from-config upgrade-from))) -(defn- get-metric-namespace [default topic] - (str (name topic) "." default)) - (defn- log-and-report-metrics [topic-entity message] - (let [message-read-metric-namespace (get-metric-namespace "message" (name topic-entity))] - (metrics/increment-count message-read-metric-namespace "read")) + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) + additional-tags {:topic_name topic-entity-name} + default-namespace "message" + metric-namespaces [service-name topic-entity-name default-namespace] + default-namespaces [default-namespace] + metric "read" + multi-namespaces [metric-namespaces default-namespaces]] + (metrics/multi-ns-increment-count multi-namespaces metric additional-tags)) message) (defn store-supplier-builder [] @@ -83,15 +87,18 @@ (defn- map-values [mapper-fn stream-builder] (.mapValues stream-builder (value-mapper mapper-fn))) -(defn- transformer-supplier [metric-namespace oldest-processed-message-in-s] +(defn- transformer-supplier + [metric-namespaces oldest-processed-message-in-s additional-tags] (reify TransformerSupplier - (get [_] (transformer/create metric-namespace oldest-processed-message-in-s)))) + (get [_] (transformer/create metric-namespaces oldest-processed-message-in-s additional-tags)))) -(defn- transform-values [topic-entity oldest-processed-message-in-s stream-builder] - (let [metric-namespace (get-metric-namespace "message-received-delay-histogram" topic-entity)] - (.transform stream-builder (transformer-supplier metric-namespace oldest-processed-message-in-s) (into-array [(.name (store-supplier-builder))])))) +(defn- transform-values [topic-entity-name oldest-processed-message-in-s stream-builder] + (let [service-name (:app-name (ziggurat-config)) + metric-namespaces [service-name topic-entity-name "message-received-delay-histogram"] + additional-tags {:topic_name topic-entity-name}] + (.transform stream-builder (transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) -(defn- protobuf->hash [message proto-class] +(defn- protobuf->hash [message proto-class topic-entity-name] (try (let [proto-klass (-> proto-class java.lang.Class/forName @@ -103,9 +110,14 @@ keys)] (select-keys loaded-proto proto-keys)) (catch Throwable e - (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) - (metrics/increment-count "message-parsing" "failed") - nil))) + (let [service-name (:app-name (ziggurat-config)) + additional-tags {:topic_name topic-entity-name} + default-namespace "message-parsing" + metric-namespaces [service-name "message-parsing"] + multi-namespaces [metric-namespaces [default-namespace]]] + (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) + (metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags) + nil)))) (defn- topology [handler-fn {:keys [origin-topic proto-class oldest-processed-message-in-s]} topic-entity channels] (let [builder (StreamsBuilder.) @@ -114,7 +126,7 @@ (.addStateStore builder (store-supplier-builder)) (->> (.stream builder topic-pattern) (transform-values topic-entity-name oldest-processed-message-in-s) - (map-values #(protobuf->hash % proto-class)) + (map-values #(protobuf->hash % proto-class topic-entity-name)) (map-values #(log-and-report-metrics topic-entity-name %)) (map-values #((mpr/mapper-func handler-fn topic-entity channels) %))) (.build builder))) @@ -148,4 +160,4 @@ :start (do (log/info "Starting Kafka stream") (start-streams (:stream-routes (mount/args)) (ziggurat-config))) :stop (do (log/info "Stopping Kafka stream") - (stop-streams stream))) \ No newline at end of file + (stop-streams stream))) diff --git a/src/ziggurat/timestamp_transformer.clj b/src/ziggurat/timestamp_transformer.clj index 6863b304..7cdfa1d6 100644 --- a/src/ziggurat/timestamp_transformer.clj +++ b/src/ziggurat/timestamp_transformer.clj @@ -17,16 +17,19 @@ (get-current-time-in-millis) ingestion-time)))) -(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespace oldest-processed-message-in-s] Transformer +(deftype TimestampTransformer [^{:volatile-mutable true} processor-context metric-namespaces oldest-processed-message-in-s additional-tags] Transformer (^void init [_ ^ProcessorContext context] (do (set! processor-context context) nil)) (transform [_ record-key record-value] (let [message-time (.timestamp processor-context)] (when (message-to-process? message-time oldest-processed-message-in-s) - (calculate-and-report-kafka-delay metric-namespace message-time) + (calculate-and-report-kafka-delay metric-namespaces message-time additional-tags) (KeyValue/pair record-key record-value)))) (close [_] nil)) -(defn create [metric-namespace process-message-since-in-s] - (TimestampTransformer. nil metric-namespace process-message-since-in-s)) +(defn create + ([metric-namespace process-message-since-in-s] + (create metric-namespace process-message-since-in-s nil)) + ([metric-namespace process-message-since-in-s additional-tags] + (TimestampTransformer. nil metric-namespace process-message-since-in-s additional-tags))) diff --git a/test/ziggurat/fixtures.clj b/test/ziggurat/fixtures.clj index 72c1c05f..c210c31e 100644 --- a/test/ziggurat/fixtures.clj +++ b/test/ziggurat/fixtures.clj @@ -8,9 +8,13 @@ [ziggurat.messaging.connection :refer [connection]] [ziggurat.server :refer [server]] [ziggurat.messaging.producer :as pr] + [ziggurat.producer :as producer] [langohr.channel :as lch] [langohr.exchange :as le] - [langohr.queue :as lq])) + [langohr.queue :as lq]) + (:import (java.util Properties) + (org.apache.kafka.clients.producer ProducerConfig) + (org.apache.kafka.clients.consumer ConsumerConfig))) (defn mount-config [] (-> (mount/only [#'config/config]) @@ -37,7 +41,7 @@ (with-open [ch (lch/open connection)] (doseq [topic-entity (keys stream-routes)] (let [topic-identifier (name topic-entity) - channels (util/get-channel-names stream-routes topic-entity)] + channels (util/get-channel-names stream-routes topic-entity)] (lq/delete ch (util/prefixed-queue-name topic-identifier (get-queue-name :instant))) (lq/delete ch (util/prefixed-queue-name topic-identifier (get-queue-name :dead-letter))) (lq/delete ch (pr/delay-queue-name topic-identifier (get-queue-name :delay))) @@ -50,7 +54,7 @@ (with-open [ch (lch/open connection)] (doseq [topic-entity (keys stream-routes)] (let [topic-identifier (name topic-entity) - channels (util/get-channel-names stream-routes topic-entity)] + channels (util/get-channel-names stream-routes topic-entity)] (le/delete ch (util/prefixed-queue-name topic-identifier (get-exchange-name :instant))) (le/delete ch (util/prefixed-queue-name topic-identifier (get-exchange-name :dead-letter))) (le/delete ch (util/prefixed-queue-name topic-identifier (get-exchange-name :delay))) @@ -91,3 +95,31 @@ (finally (delete-queues ~stream-routes) (delete-exchanges ~stream-routes)))) + +(defn mount-producer [] + (-> (mount/only [#'producer/kafka-producers]) + (mount/start))) + +(def ^:dynamic *bootstrap-servers* nil) +(def ^:dynamic *consumer-properties* nil) +(def ^:dynamic *producer-properties* nil) + +(defn mount-only-config-and-producer [f] + (do + (mount-config) + (mount-producer) + (binding [*bootstrap-servers* "localhost:9092"] + (binding [*consumer-properties* (doto (Properties.) + (.put ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG, *bootstrap-servers*) + (.put ConsumerConfig/GROUP_ID_CONFIG, "ziggurat-consumer") + (.put ConsumerConfig/AUTO_OFFSET_RESET_CONFIG, "earliest") + (.put ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") + (.put ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")) + *producer-properties* (doto (Properties.) + (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG *bootstrap-servers*) + (.put ProducerConfig/ACKS_CONFIG "all") + (.put ProducerConfig/RETRIES_CONFIG (int 0)) + (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer") + (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer"))] + (f)))) + (mount/stop)) \ No newline at end of file diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj index 3b91596d..5f94842b 100644 --- a/test/ziggurat/init_test.clj +++ b/test/ziggurat/init_test.clj @@ -1,13 +1,14 @@ (ns ziggurat.init-test (:require [clojure.test :refer :all] - [clojure.tools.logging :as log] [ziggurat.config :as config] [ziggurat.init :as init] [ziggurat.messaging.connection :as rmqc] [ziggurat.messaging.consumer :as messaging-consumer] [ziggurat.messaging.producer :as messaging-producer] - [ziggurat.streams :as streams] - [ziggurat.server.test-utils :as tu])) + [ziggurat.streams :as streams :refer [stream]] + [mount.core :refer [defstate]] + [ziggurat.server.test-utils :as tu] + [mount.core :as mount])) (deftest start-calls-actor-start-fn-test (testing "The actor start fn starts before the ziggurat state and can read config" @@ -17,56 +18,67 @@ rmqc/start-connection (fn [] (reset! result (* @result 2))) rmqc/stop-connection (constantly nil) config/config-file "config.test.edn"] - (init/start #(reset! result (+ @result 3)) {} []) - (init/stop #()) + (init/start #(reset! result (+ @result 3)) {} [] nil) + (init/stop #() nil) (is (= 16 @result)))))) (deftest stop-calls-actor-stop-fn-test - (testing "The actor stop fn stops after the ziggurat state" + (testing "The actor stop fn stops before the ziggurat state" (let [result (atom 1)] (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (fn [_] (reset! result (* @result 2))) config/config-file "config.test.edn"] - (init/start #() {} []) - (init/stop #(reset! result (+ @result 3))) - (is (= 5 @result)))))) + (init/start #() {} [] nil) + (init/stop #(reset! result (+ @result 3)) nil) + (is (= 8 @result)))))) + +(deftest stop-calls-idempotentcy-test + (testing "The stop function should be idempotent" + (let [result (atom 1)] + (with-redefs [streams/start-streams (constantly nil) + streams/stop-streams (constantly nil) + rmqc/stop-connection (fn [_] (reset! result (* @result 2))) + config/config-file "config.test.edn"] + (init/start #() {} [] nil) + (init/stop #(reset! result (+ @result 3)) nil) + (is (= 8 @result)))))) (deftest start-calls-make-queues-test (testing "Start calls make queues" - (let [make-queues-called (atom false) + (let [make-queues-called (atom 0) expected-stream-routes {:default {:handler-fn #()}}] (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (constantly nil) messaging-producer/make-queues (fn [stream-routes] - (swap! make-queues-called not) + (swap! make-queues-called + 1) (is (= stream-routes expected-stream-routes))) messaging-consumer/start-subscribers (constantly nil) config/config-file "config.test.edn"] - (init/start #() expected-stream-routes []) - (init/stop #()) - (is @make-queues-called))))) + (init/start #() expected-stream-routes [] nil) + (init/stop #() nil) + (is (= 2 @make-queues-called)))))) (deftest start-calls-start-subscribers-test (testing "Start calls start subscribers" - (let [start-subscriber-called (atom false) + (let [start-subscriber-called (atom 0) expected-stream-routes {:default {:handler-fn #()}}] (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (constantly nil) messaging-consumer/start-subscribers (fn [stream-routes] - (swap! start-subscriber-called not) + (swap! start-subscriber-called + 1) (is (= stream-routes expected-stream-routes))) messaging-producer/make-queues (constantly nil) config/config-file "config.test.edn"] - (init/start #() expected-stream-routes []) - (init/stop #()) - (is @start-subscriber-called))))) + (init/start #() expected-stream-routes [] nil) + (init/stop #() nil) + (is (= 1 @start-subscriber-called)))))) (deftest main-test (testing "Main function should call start" (let [start-was-called (atom false) expected-stream-routes {:default {:handler-fn #(constantly nil)}}] - (with-redefs [init/add-shutdown-hook (fn [_] (constantly nil)) - init/start (fn [_ stream-router _] + (with-redefs [init/add-shutdown-hook (fn [_ _] (constantly nil)) + init/start (fn [_ stream-router _ _] (swap! start-was-called not) (is (= expected-stream-routes stream-router)))] (init/main #() #() expected-stream-routes) @@ -74,26 +86,33 @@ (deftest validate-stream-routes-test (let [exception-message "Invalid stream routes"] - (testing "Validate Stream Routes should raise exception if stream routes is nil" - (is (thrown? RuntimeException exception-message (init/validate-stream-routes nil)))) + (testing "Validate Stream Routes should raise exception if stream routes is nil and stream worker is one of the modes" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes nil [:stream-worker])))) + + (testing "Validate Stream Routes should raise exception if stream routes are empty and stream worker is one of the modes" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes {} [:stream-worker])))) + + (testing "Validate Stream Routes should raise exception if stream route does not have handler-fn and stream worker is one of the modes" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default {}} [:stream-worker])))) - (testing "Validate Stream Routes should raise exception if stream routes are empty" - (is (thrown? RuntimeException exception-message (init/validate-stream-routes {})))) + (testing "Validate Stream Routes should raise exception if stream route does have nil value and stream worker is one of the modes" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default nil} [:stream-worker])))) - (testing "Validate Stream Routes should raise exception if stream route does not have handler-fn" - (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default {}})))) + (testing "Validate Stream Routes should raise exception if stream route has nil handler-fn and stream worker is one of the modes" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default {:handler-fn nil}} [:stream-worker])))) - (testing "Validate Stream Routes should raise exception if stream route does have nil value" - (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default nil})))) + (testing "Validate Stream Routes should raise exception if stream route has nil handler-fn and there is no mode passed" + (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default {:handler-fn nil}} nil))))) - (testing "Validate Stream Routes should raise exception if stream route has nil handler-fn" - (is (thrown? RuntimeException exception-message (init/validate-stream-routes {:default {:handler-fn nil}})))) + (testing "Validate Stream Routes should return nil if stream route is empty or nil and stream worker is not one of the modes" + (is (nil? (init/validate-stream-routes nil [:api-server]))) + (is (nil? (init/validate-stream-routes {} [:api-server])))) - (testing "Validate Stream Routes should raise exception if stream route has nil handler-fn" - (let [stream-route {:default {:handler-fn (fn []) - :channel-1 (fn []) - :channel-2 (fn [])}}] - (is (= stream-route (init/validate-stream-routes stream-route))))))) + (testing "Validate Stream Routes should raise exception if stream route has nil handler-fn" + (let [stream-route {:default {:handler-fn (fn []) + :channel-1 (fn []) + :channel-2 (fn [])}}] + (is (= stream-route (init/validate-stream-routes stream-route [:stream-worker])))))) (deftest ziggurat-routes-serve-actor-routes-test (testing "The routes added by actor should be served along with ziggurat-routes" @@ -101,11 +120,24 @@ streams/stop-streams (constantly nil) config/config-file "config.test.edn"] (init/start #() {} [["test-ping" (fn [_request] {:status 200 - :body "pong"})]]) + :body "pong"})]] nil) (let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false) status-actor status {:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)] - (init/stop #()) + (init/stop #() nil) + (is (= 200 status-actor)) + (is (= 200 status))))) + + (testing "Deadset management and server api modes should run both actor and deadset management routes" + (with-redefs [streams/start-streams (constantly nil) + streams/stop-streams (constantly nil) + config/config-file "config.test.edn"] + (init/start #() {} [["test-ping" (fn [_request] {:status 200 + :body "pong"})]] [:management-api :api-server]) + (let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false) + status-actor status + {:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)] + (init/stop #() nil) (is (= 200 status-actor)) (is (= 200 status))))) @@ -113,16 +145,53 @@ (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (constantly nil) config/config-file "config.test.edn"] - (init/start #() {} []) - (let [{:keys [status body] :as response} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)] - (init/stop #()) + (init/start #() {} [] nil) + (let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/test-ping" true false)] + (init/stop #() nil) (is (= 404 status))))) (testing "The ziggurat routes should work fine when actor routes are not provided" (with-redefs [streams/start-streams (constantly nil) streams/stop-streams (constantly nil) config/config-file "config.test.edn"] - (init/start #() {} []) - (let [{:keys [status body] :as response} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)] - (init/stop #()) + (init/start #() {} [] nil) + (let [{:keys [status]} (tu/get (-> (config/ziggurat-config) :http-server :port) "/ping" true false)] + (init/stop #() nil) (is (= 200 status)))))) + +(deftest validate-modes-test + (testing "Validate modes should raise exception if modes have any invalid element" + (let [modes [:invalid-modes :api-server :second-invalid]] + (is (thrown? clojure.lang.ExceptionInfo (init/validate-modes modes)))))) + +(deftest kafka-producers-should-start + (let [args {:actor-routes [] + :stream-routes []} + producer-has-started (atom false)] + (with-redefs [init/start-kafka-producers (fn [] (reset! producer-has-started true)) + init/start-kafka-streams (constantly nil)] + (testing "Starting the streams should start kafka-producers as well" + (init/start-stream args) + (is (= true @producer-has-started))) + (testing "Starting the workers should start kafka-producers as well" + (reset! producer-has-started false) + (init/start-workers args) + (is (= true @producer-has-started)))))) + +(deftest kafka-producers-should-stop + (let [producer-has-stopped (atom false)] + (with-redefs [init/stop-kafka-producers (fn [] (reset! producer-has-stopped true)) + init/stop-kafka-streams (constantly nil)] + (testing "Stopping the streams should stop kafka-producers as well" + (init/stop-stream) + (is (= true @producer-has-stopped))) + (testing "Stopping the workers should stop kafka-producers as well" + (reset! producer-has-stopped false) + (init/stop-workers) + (is (= true @producer-has-stopped))) + (mount/stop)))) + + + + + diff --git a/test/ziggurat/kafka_delay_test.clj b/test/ziggurat/kafka_delay_test.clj index 0d8d6b27..18d65659 100644 --- a/test/ziggurat/kafka_delay_test.clj +++ b/test/ziggurat/kafka_delay_test.clj @@ -1,18 +1,27 @@ (ns ziggurat.kafka-delay-test (:require [clojure.test :refer :all] [ziggurat.kafka-delay :refer :all] - [ziggurat.util.time :refer [get-current-time-in-millis]] - [ziggurat.metrics :as metrics])) + [ziggurat.metrics :as metrics] + [ziggurat.util.time :refer [get-current-time-in-millis]])) (deftest calculate-and-report-kafka-delay-test - (testing "calculates and reports the timestamp delay" - (let [record-timestamp 1528720767777 - current-time 1528720768777 - expected-delay 1000 - namespace "test"] - (with-redefs [get-current-time-in-millis (constantly current-time) - metrics/report-time (fn [metric-namespace delay] - (is (= delay expected-delay)) - (is (= metric-namespace namespace)))] - (calculate-and-report-kafka-delay namespace record-timestamp))))) - + (let [record-timestamp 1528720767777 + current-time 1528720768777 + expected-delay 1000 + expected-namespaces ["test"]] + (testing "calculates and reports the timestamp delay" + (let [expected-additional-tags {:topic_name "expected-topic-entity-name"}] + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay additional-tags] + (is (= delay expected-delay)) + (is (= metric-namespaces expected-namespaces)) + (is (= additional-tags expected-additional-tags)))] + (calculate-and-report-kafka-delay expected-namespaces record-timestamp expected-additional-tags)))) + (testing "calculates and reports the timestamp delay when additional tags is empty or nil" + (let [expected-additional-tags nil] + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay additional-tags] + (is (= delay expected-delay)) + (is (= metric-namespaces expected-namespaces)) + (is (= additional-tags expected-additional-tags)))] + (calculate-and-report-kafka-delay expected-namespaces record-timestamp)))))) diff --git a/test/ziggurat/mapper_test.clj b/test/ziggurat/mapper_test.clj index 56f67fa8..f5347dee 100644 --- a/test/ziggurat/mapper_test.clj +++ b/test/ziggurat/mapper_test.clj @@ -13,23 +13,30 @@ fix/silence-logging])) (deftest mapper-func-test - (let [message {:foo "bar"} - stream-routes {:default {:handler-fn #(constantly nil)}} - topic (name (first (keys stream-routes))) - expected-metric-namespace "default.message-processing" - expected-report-time-namespace "default.handler-fn-execution-time"] + (let [service-name (:app-name (ziggurat-config)) + message {:foo "bar"} + stream-routes {:default {:handler-fn #(constantly nil)}} + expected-topic-entity-name (name (first (keys stream-routes))) + expected-additional-tags {:topic_name expected-topic-entity-name} + default-namespace "message-processing" + report-time-namespace "handler-fn-execution-time" + expected-metric-namespaces [expected-topic-entity-name default-namespace] + expected-report-time-namespaces [expected-topic-entity-name report-time-namespace]] (testing "message process should be successful" (let [successfully-processed? (atom false) successfully-reported-time? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! successfully-processed? true))) - metrics/report-time (fn [metric-namespace _] - (when (= metric-namespace expected-report-time-namespace) + metrics/report-time (fn [metric-namespaces _ _] + (when (or (= metric-namespaces expected-report-time-namespaces) + (= metric-namespaces [report-time-namespace])) (reset! successfully-reported-time? true)))] - ((mapper-func (constantly :success) topic []) message) + ((mapper-func (constantly :success) expected-topic-entity-name []) message) (is @successfully-processed?) (is @successfully-reported-time?)))) @@ -37,12 +44,14 @@ (fix/with-queues (assoc-in stream-routes [:default :channel-1] (constantly :success)) (let [successfully-processed? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces [service-name expected-topic-entity-name default-namespace]) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] - ((mapper-func (constantly :channel-1) topic [:channel-1]) message) - (let [message-from-mq (rmq/get-message-from-channel-instant-queue topic :channel-1)] + ((mapper-func (constantly :channel-1) expected-topic-entity-name [:channel-1]) message) + (let [message-from-mq (rmq/get-message-from-channel-instant-queue expected-topic-entity-name :channel-1)] (is (= message message-from-mq)) (is @successfully-processed?)))))) @@ -53,8 +62,8 @@ (let [err (Throwable->map e)] (is (= (:cause err) "Invalid mapper return code")) (is (= (-> err :data :code) :channel-1))))] - ((mapper-func (constantly :channel-1) topic [:some-other-channel]) message) - (let [message-from-mq (rmq/get-message-from-channel-instant-queue topic :channel-1)] + ((mapper-func (constantly :channel-1) expected-topic-entity-name [:some-other-channel]) message) + (let [message-from-mq (rmq/get-message-from-channel-instant-queue expected-topic-entity-name :channel-1)] (is (nil? message-from-mq)))))) (testing "message process should be unsuccessful and retry" @@ -63,12 +72,14 @@ unsuccessfully-processed? (atom false) expected-metric "retry"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces [service-name expected-topic-entity-name default-namespace]) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] - ((mapper-func (constantly :retry) topic []) message) - (let [message-from-mq (rmq/get-msg-from-delay-queue topic)] + ((mapper-func (constantly :retry) expected-topic-entity-name []) message) + (let [message-from-mq (rmq/get-msg-from-delay-queue expected-topic-entity-name)] (is (= message-from-mq expected-message))) (is @unsuccessfully-processed?))))) @@ -79,39 +90,52 @@ unsuccessfully-processed? (atom false) expected-metric "failure"] (with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true)) - metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces [service-name expected-topic-entity-name default-namespace]) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] - ((mapper-func (fn [_] (throw (Exception. "test exception"))) topic []) message) - (let [message-from-mq (rmq/get-msg-from-delay-queue topic)] + ((mapper-func (fn [_] (throw (Exception. "test exception"))) expected-topic-entity-name []) message) + (let [message-from-mq (rmq/get-msg-from-delay-queue expected-topic-entity-name)] (is (= message-from-mq expected-message))) (is @unsuccessfully-processed?) (is @sentry-report-fn-called?))))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - expected-metric-namespace "default.handler-fn-execution-time"] - (with-redefs [metrics/report-time (fn [metric-namespace _] - (when (= metric-namespace expected-metric-namespace) + (let [reported-execution-time? (atom false) + execution-time-namesapce "handler-fn-execution-time" + expected-metric-namespaces [service-name "default" execution-time-namesapce]] + (with-redefs [metrics/report-time (fn [metric-namespaces _ _] + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [execution-time-namesapce]))) + (when (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [execution-time-namesapce])) (reset! reported-execution-time? true)))] - ((mapper-func (constantly :success) topic []) message) + ((mapper-func (constantly :success) expected-topic-entity-name []) message) (is @reported-execution-time?)))))) (deftest channel-mapper-func-test - (let [message {:foo "bar"} - stream-routes {:default {:handler-fn #(constantly nil) - :channel-1 #(constantly nil)}} - topic (first (keys stream-routes)) - channel :channel-1 - expected-metric-namespace "default.channel-1.message-processing"] + (let [service-name (:app-name (ziggurat-config)) + message {:foo "bar"} + stream-routes {:default {:handler-fn #(constantly nil) + :channel-1 #(constantly nil)}} + topic (first (keys stream-routes)) + expected-topic-entity-name (name topic) + expected-additional-tags {:topic_name expected-topic-entity-name} + channel :channel-1 + channel-name (name channel) + default-namespace "message-processing" + expected-metric-namespaces [expected-topic-entity-name channel default-namespace]] (testing "message process should be successful" (let [successfully-processed? (atom false) expected-metric "success"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces [service-name expected-topic-entity-name channel-name default-namespace]) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! successfully-processed? true)))] ((channel-mapper-func (constantly :success) topic channel) message) (is @successfully-processed?)))) @@ -122,9 +146,11 @@ unsuccessfully-processed? (atom false) expected-metric "retry"] - (with-redefs [metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + (with-redefs [metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces [service-name expected-topic-entity-name channel-name default-namespace]) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] ((channel-mapper-func (constantly :retry) topic channel) message) (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic channel)] @@ -138,9 +164,11 @@ unsuccessfully-processed? (atom false) expected-metric "failure"] (with-redefs [sentry-report (fn [_ _ _ & _] (reset! sentry-report-fn-called? true)) - metrics/increment-count (fn [metric-namespace metric] - (when (and (= metric-namespace expected-metric-namespace) - (= metric expected-metric)) + metrics/increment-count (fn [metric-namespaces metric additional-tags] + (when (and (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace])) + (= metric expected-metric) + (= additional-tags expected-additional-tags)) (reset! unsuccessfully-processed? true)))] ((channel-mapper-func (fn [_] (throw (Exception. "test exception"))) topic channel) message) (let [message-from-mq (rmq/get-message-from-channel-delay-queue topic channel)] @@ -149,11 +177,11 @@ (is @sentry-report-fn-called?))))) (testing "reports execution time with topic prefix" - (let [reported-execution-time? (atom false) - expected-metric-namespace "default.channel-1.execution-time"] - (with-redefs [metrics/report-time (fn [metric-namespace _] - (when (= metric-namespace expected-metric-namespace) + (let [reported-execution-time? (atom false) + execution-time-namesapce "execution-time"] + (with-redefs [metrics/report-time (fn [metric-namespaces _ _] + (when (or (= metric-namespaces [service-name expected-topic-entity-name channel-name execution-time-namesapce]) + (= metric-namespaces [execution-time-namesapce])) (reset! reported-execution-time? true)))] - ((channel-mapper-func (constantly :success) topic channel) message) (is @reported-execution-time?)))))) diff --git a/test/ziggurat/metrics_test.clj b/test/ziggurat/metrics_test.clj index 7c1f7db1..45f69f79 100644 --- a/test/ziggurat/metrics_test.clj +++ b/test/ziggurat/metrics_test.clj @@ -18,49 +18,132 @@ (is (instance? Histogram meter))))) (deftest increment-count-test - (testing "increases count on the meter" - (let [metric-ns "metric-ns" - metric "metric3" - mk-meter-args (atom nil) - meter (Meter.)] - (with-redefs [metrics/mk-meter (fn [metric-namespace metric] - (reset! mk-meter-args {:metric-namespace metric-namespace - :metric metric}) - meter)] - (metrics/increment-count metric-ns metric) - (is (= 1 (.getCount meter))) - (is (= metric-ns (:metric-namespace @mk-meter-args))) - (is (= metric (:metric @mk-meter-args))))))) + (let [metric "metric3" + expected-topic-entity-name "expected-topic-entity-name" + input-additional-tags {:topic_name expected-topic-entity-name}] + (testing "increases count on the meter" + (let [expected-metric-namespaces [expected-topic-entity-name "metric-ns"] + mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags {}] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) + meter)] + (metrics/increment-count expected-metric-namespaces metric input-additional-tags) + (is (= 1 (.getCount meter))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) + (testing "increases count on the meter - without topic name on the namespace" + (let [expected-metric-namespaces ["metric-ns"] + mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags input-additional-tags] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) + meter)] + (metrics/increment-count expected-metric-namespaces metric input-additional-tags) + (is (= 1 (.getCount meter))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) + (testing "increases count on the meter when additional-tags is nil" + (let [expected-metric-namespaces [expected-topic-entity-name "metric-ns"] + mk-meter-args (atom nil) + meter (Meter.) + expected-additional-tags nil] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) + meter)] + (metrics/increment-count expected-metric-namespaces metric expected-additional-tags) + (is (= 1 (.getCount meter))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))))) (deftest decrement-count-test - (testing "decreases count on the meter" - (let [metric-ns "metric-ns" - metric "metric3" - mk-meter-args (atom nil) - meter (Meter.)] - (with-redefs [metrics/mk-meter (fn [metric-namespace metric] - (reset! mk-meter-args {:metric-namespace metric-namespace - :metric metric}) - meter)] - (metrics/increment-count metric-ns metric) - (is (= 1 (.getCount meter))) - (metrics/decrement-count metric-ns metric) - (is (= 0 (.getCount meter))) - (is (= metric-ns (:metric-namespace @mk-meter-args))) - (is (= metric (:metric @mk-meter-args))))))) + (let [expected-topic-name "expected-topic-name" + metric "metric3" + mk-meter-args (atom nil) + meter (Meter.) + input-additional-tags {:topic_name expected-topic-name}] + (testing "decreases count on the meter" + (let [expected-additional-tags {} + expected-metric-namespaces [expected-topic-name "metric-ns"]] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) + meter)] + (metrics/increment-count expected-metric-namespaces metric input-additional-tags) + (is (= 1 (.getCount meter))) + (metrics/decrement-count expected-metric-namespaces metric input-additional-tags) + (is (zero? (.getCount meter))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) + (testing "decreases count on the meter - without topic name on the namespace" + (let [expected-additional-tags input-additional-tags + expected-metric-namespaces ["metric-ns"]] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) + meter)] + (metrics/increment-count expected-metric-namespaces metric input-additional-tags) + (is (= 1 (.getCount meter))) + (metrics/decrement-count expected-metric-namespaces metric input-additional-tags) + (is (zero? (.getCount meter))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))) + (testing "decreases count on the meter when additional-tags is nil" + (let [expected-additional-tags nil + expected-metric-namespaces [expected-topic-name "metric-ns"]] + (with-redefs [metrics/mk-meter (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-meter-args {:metric-namespaces metric-namespaces + :metric metric}) + meter)] + (metrics/increment-count expected-metric-namespaces metric expected-additional-tags) + (is (= 1 (.getCount meter))) + (metrics/decrement-count expected-metric-namespaces metric expected-additional-tags) + (is (zero? (.getCount meter))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-meter-args))) + (is (= metric (:metric @mk-meter-args)))))))) (deftest report-time-test - (testing "updates time-val" - (let [metric-ns "metric-ns" - time-val 10 - mk-histogram-args (atom nil) - reservoir (UniformReservoir.) - histogram (Histogram. reservoir)] - (with-redefs [metrics/mk-histogram (fn [metric-ns metric] - (reset! mk-histogram-args {:metric-namespace metric-ns - :metric metric}) - histogram)] - (metrics/report-time metric-ns time-val) - (is (= 1 (.getCount histogram))) - (is (= metric-ns (:metric-namespace @mk-histogram-args))) - (is (= "all" (:metric @mk-histogram-args))))))) + (let [expected-topic-entity-name "expected-topic-entity-name" + input-additional-tags {:topic_name expected-topic-entity-name} + time-val 10] + (testing "updates time-val" + (let [expected-metric-namespaces [expected-topic-entity-name "message-received-delay-histogram"] + mk-histogram-args (atom nil) + reservoir (UniformReservoir.) + histogram (Histogram. reservoir) + expected-additional-tags {}] + (with-redefs [metrics/mk-histogram (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-histogram-args {:metric-namespaces metric-namespaces + :metric metric}) + histogram)] + (metrics/report-time expected-metric-namespaces time-val input-additional-tags) + (is (= 1 (.getCount histogram))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args))) + (is (= "all" (:metric @mk-histogram-args)))))) + (testing "updates time-val - without topic name on the namespace" + (let [expected-metric-namespaces ["message-received-delay-histogram"] + mk-histogram-args (atom nil) + reservoir (UniformReservoir.) + histogram (Histogram. reservoir) + expected-additional-tags input-additional-tags] + (with-redefs [metrics/mk-histogram (fn [metric-namespaces metric additional-tags] + (is (= additional-tags expected-additional-tags)) + (reset! mk-histogram-args {:metric-namespaces metric-namespaces + :metric metric}) + histogram)] + (metrics/report-time expected-metric-namespaces time-val input-additional-tags) + (is (= 1 (.getCount histogram))) + (is (= (apply str (interpose "." expected-metric-namespaces)) (:metric-namespaces @mk-histogram-args))) + (is (= "all" (:metric @mk-histogram-args)))))))) diff --git a/test/ziggurat/producer_test.clj b/test/ziggurat/producer_test.clj new file mode 100644 index 00000000..a0463851 --- /dev/null +++ b/test/ziggurat/producer_test.clj @@ -0,0 +1,66 @@ +(ns ziggurat.producer-test + (:require [clojure.test :refer :all] + [ziggurat.streams :refer [start-streams stop-streams]] + [ziggurat.fixtures :as fix :refer [*producer-properties* *consumer-properties*]] + [ziggurat.config :refer [ziggurat-config]] + [ziggurat.producer :refer [producer-properties-map send kafka-producers]] + [clojure.test.check.generators :as gen]) + (:import (org.apache.kafka.streams.integration.utils IntegrationTestUtils) + (org.apache.kafka.clients.producer KafkaProducer))) + +(use-fixtures :once fix/mount-only-config-and-producer) + +(defn stream-router-config-without-producer []) +(:stream-router {:default {:application-id "test" + :bootstrap-servers "localhost:9092" + :stream-threads-count [1 :int] + :origin-topic "topic" + :proto-class "flatland.protobuf.test.Example$Photo" + :channels {:channel-1 {:worker-count [10 :int] + :retry {:count [5 :int] + :enabled [true :bool]}}}}}) + +(deftest send-data-with-topic-and-value-test + (with-redefs + [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] + (let [topic (gen/generate gen/string-alphanumeric 10) + key "message" + value "Hello World!!"] + (send :default topic key value) + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 2000)] + (is (= value (.value (first result)))))))) + +(deftest send-data-with-topic-key-partition-and-value-test + (with-redefs + [kafka-producers (hash-map :default (KafkaProducer. *producer-properties*))] + (let [topic (gen/generate gen/string-alphanumeric 10) + key "message" + value "Hello World!!" + partition (int 0)] + (send :default topic partition key value) + (let [result (IntegrationTestUtils/waitUntilMinKeyValueRecordsReceived *consumer-properties* topic 1 2000)] + (is (= value (.value (first result)))))))) + +(deftest send-throws-exception-when-no-producers-are-configured + (with-redefs + [kafka-producers {}] + (let [topic "test-topic" + key "message" + value "Hello World!! from non-existant Kafka Producers"] + (is (not-empty (try (send :default topic key value) + (catch Exception e (ex-data e)))))))) + +(deftest producer-properties-map-is-empty-if-no-producers-configured + ; Here ziggurat-config has been substituted with a custom map which + ; does not have any valid producer configs. + (with-redefs + [ziggurat-config stream-router-config-without-producer] + (is (empty? (producer-properties-map))))) + +(deftest producer-properties-map-is-not-empty-if-producers-are-configured + ; Here the config is read from config.test.edn which contains + ; valid producer configs. + (is (seq (producer-properties-map)))) + + + diff --git a/test/ziggurat/timestamp_transformer_test.clj b/test/ziggurat/timestamp_transformer_test.clj index da1d4074..a8a6903a 100644 --- a/test/ziggurat/timestamp_transformer_test.clj +++ b/test/ziggurat/timestamp_transformer_test.clj @@ -1,43 +1,60 @@ (ns ziggurat.timestamp-transformer-test (:require [clojure.test :refer :all] - [ziggurat.timestamp-transformer :refer :all] [ziggurat.metrics :as metrics] + [ziggurat.timestamp-transformer :refer :all] [ziggurat.util.time :refer :all]) - (:import [org.apache.kafka.streams.processor ProcessorContext] - [org.apache.kafka.clients.consumer ConsumerRecord] + (:import [org.apache.kafka.clients.consumer ConsumerRecord] + [org.apache.kafka.streams.processor ProcessorContext] [ziggurat.timestamp_transformer IngestionTimeExtractor])) (deftest ingestion-time-extractor-test (let [ingestion-time-extractor (IngestionTimeExtractor.) - topic "some-topic" - partition (int 1) - offset 1 - previous-timestamp 1528720768771 - key "some-key" - value "some-value" - record (ConsumerRecord. topic partition offset key value)] + topic "some-topic" + partition (int 1) + offset 1 + previous-timestamp 1528720768771 + key "some-key" + value "some-value" + record (ConsumerRecord. topic partition offset key value)] (testing "extract timestamp of topic when it has valid timestamp" (with-redefs [get-timestamp-from-record (constantly 1528720768777)] (is (= (.extract ingestion-time-extractor record previous-timestamp) 1528720768777)))) (testing "extract timestamp of topic when it has invalid timestamp" - (with-redefs [get-timestamp-from-record (constantly -1) + (with-redefs [get-timestamp-from-record (constantly -1) get-current-time-in-millis (constantly 1528720768777)] (is (= (.extract ingestion-time-extractor record previous-timestamp) (get-current-time-in-millis))))))) (deftest timestamp-transformer-test - (testing "creates a timestamp-transformer object that calculates and reports timestamp delay" - (let [metric-namespace "test.message-received-delay-histogram" - record-timestamp 1528720767777 - context (reify ProcessorContext - (timestamp [_] record-timestamp)) - current-time 1528720768777 - timestamp-transformer (create metric-namespace current-time) - expected-delay 1000] - (.init timestamp-transformer context) - (with-redefs [get-current-time-in-millis (constantly current-time) - metrics/report-time (fn [namespace delay] - (is (= delay expected-delay)) - (is (= metric-namespace namespace)))] - (.transform timestamp-transformer nil nil))))) + (let [default-namespace "message-received-delay-histogram" + expected-metric-namespaces ["test" default-namespace] + record-timestamp 1528720767777 + current-time 1528720768777 + expected-delay 1000] + (testing "creates a timestamp-transformer object that calculates and reports timestamp delay" + (let [context (reify ProcessorContext + (timestamp [_] record-timestamp)) + expected-topic-entity-name "expected-topic-entity-name" + timestamp-transformer (create expected-metric-namespaces current-time expected-topic-entity-name)] + (.init timestamp-transformer context) + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay topic-entity-name] + (is (= delay expected-delay)) + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace]))) + (is (= expected-topic-entity-name topic-entity-name)))] + (.transform timestamp-transformer nil nil)))) + (testing "creates a timestamp-transformer object that calculates and reports timestamp delay when topic-entity-name is nil" + (let [context (reify ProcessorContext + (timestamp [_] record-timestamp)) + expected-topic-entity-name nil + timestamp-transformer (create expected-metric-namespaces current-time)] + (.init timestamp-transformer context) + (with-redefs [get-current-time-in-millis (constantly current-time) + metrics/report-time (fn [metric-namespaces delay topic-entity-name] + (is (= delay expected-delay)) + (is (or (= metric-namespaces expected-metric-namespaces) + (= metric-namespaces [default-namespace]))) + (is (= topic-entity-name expected-topic-entity-name)))] + (.transform timestamp-transformer nil nil))))))