diff --git a/README.md b/README.md index cbf8787c..0e2df89e 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ To start a stream (a thread that reads messages from Kafka), add this to your co [message] (println message) :success) - + (def handler-fn (-> main-fn (middleware/protobuf->hash ProtoClass :stream-id))) @@ -88,7 +88,7 @@ To start a stream (a thread that reads messages from Kafka), add this to your co (ziggurat/main start-fn stop-fn {:stream-id {:handler-fn handler-fn}}) ``` -_NOTE: this example assumes that the message is serialized in Protobuf format_ +_NOTE: this example assumes that the message is serialized in Protobuf format_ Please refer the [Middleware section](#middleware-in-ziggurat) for understanding `handler-fn` here. @@ -129,7 +129,7 @@ Please refer the [Middleware section](#middleware-in-ziggurat) for understanding [message] (println message) :success) - + (def handler-fn (-> main-fn (middleware/protobuf->hash ProtoClass :stream-id))) @@ -137,7 +137,7 @@ Please refer the [Middleware section](#middleware-in-ziggurat) for understanding (ziggurat/main start-fn stop-fn {:stream-id {:handler-fn handler-fn}} routes) ``` -_NOTE: this example assumes that the message is serialized in Protobuf format_ +_NOTE: this example assumes that the message is serialized in Protobuf format_ Ziggurat also sets up a HTTP server by default and you can pass in your own routes that it will serve. The above example demonstrates how you can pass in your own route. @@ -211,9 +211,9 @@ It can be used like this. (parse-json :stream-route-config))) ``` -Here, `message-handler-fn` calls `parse-json` with a message handler function -`actual-message-handler-function` as the first argument and the key of a stream-route -config (as defined in `config.edn`) as the second argument. +Here, `message-handler-fn` calls `parse-json` with a message handler function +`actual-message-handler-function` as the first argument and the key of a stream-route +config (as defined in `config.edn`) as the second argument. ## Publishing data to Kafka Topics in Ziggurat To enable publishing data to kafka, Ziggurat provides producing support through ziggurat.producer namespace. This namespace defines methods for publishing data to Kafka topics. The methods defined here are essentially wrapper around variants of `send` methods defined in `org.apache.kafka.clients.producer.KafkaProducer`. @@ -245,8 +245,8 @@ Tracing has been added to the following flows: 3. Produce to rabbitmq channel 4. Produce to another kafka topic -By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment) -and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables. +By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment) +and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables. To enable custom tracer, a custom tracer provider function name can be set in `:custom-provider`. The corresponding function will be executed in runtime to create a tracer. In the event of any errors while executing the custom tracer provider, a Noop tracer will be created. To enable tracing, the following config needs to be added to the `config.edn` under `:ziggurat` key. @@ -263,6 +263,34 @@ JAEGER_AGENT_HOST: "localhost" JAEGER_AGENT_PORT: 6831 ``` +## Stream Joins +This will allow an actor to join messages from 2 topics into 1 result. To be able to use stream joins just add the configuration below to your `config.edn` +```clojure +{:ziggurat {:stream-router {:stream-id { + :consumer-type :stream-joins + :input-topics {:topic-1-key {:name "topic-1"} :topic-2-key {:name "topic-2"}} + :join-cfg {:topic-1-and-topic-2 {:join-window-ms 5000 :join-type :inner}} +}}} +``` +* consumer-type - enables stream joins if `:stream-joins` key is provided, other possible value is `:default` which is the default actor behavior +* input-topics - a map of topics in which you want to use for joining +* join-cfg - a map of configurations which you define the join-window-ms and the join-type (`:inner`, `:left` or `:outer`) + +And your actor's handler function be like +```clojure +(def handler-func + (-> main-func + (mw/protobuf->hash [com.gojek.esb.booking.BookingLogMessage com.gojek.esb.booking.BookingLogMessage] :booking))) +``` + +`Please take note of the vector containing the proto classes` + +Your handler function will receive a message in the following format/structure + +```clojure +{:topic-1-key "message-from-1st-topic" :topic-2-key "message-from-2nd-topic"} +``` + ## Configuration All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggurat` key. @@ -344,7 +372,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur ## Alpha (Experimental) Features The contract and interface for experimental features in Ziggurat can be changed as we iterate towards better designs for that feature. -For all purposes these features should be considered unstable and should only be used after understanding their risks and implementations. +For all purposes these features should be considered unstable and should only be used after understanding their risks and implementations. ### Exponential Backoff based Retries In addition to linear retries, Ziggurat users can now use exponential backoff strategy for retries. This means that the message @@ -359,11 +387,11 @@ Exponential retries can be configured as described below. :ziggurat {:stream-router {:default {:application-id "application_name"...}}} :retry {:type [:exponential :keyword] :count [10 :int] - :enable [true :bool]} + :enable [true :bool]} ``` -Exponential retries can be configured for channels too. Additionally, a user can specify a custom `queue-timeout-ms` value per channel. +Exponential retries can be configured for channels too. Additionally, a user can specify a custom `queue-timeout-ms` value per channel. Timeouts for exponential backoffs are calculated using `queue-timeout-ms`. This implies that each channel can have separate count of retries and different timeout values. @@ -373,7 +401,7 @@ and different timeout values. :retry {:type [:exponential :keyword] :count [10 :int] :queue-timeout-ms 2000 - :enable [true :bool]}}}}} + :enable [true :bool]}}}}} ``` ## Deprecation Notice diff --git a/resources/config.test.edn b/resources/config.test.edn index c6e5e2fc..ae56f1e1 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -39,6 +39,7 @@ :stream-threads-count [1 :int] :origin-topic "topic" :upgrade-from "1.1" + :consumer-type :default :channels {:channel-1 {:worker-count [10 :int] :retry {:type [:linear :keyword] :count [5 :int] diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index f76231ea..c911efb7 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -65,6 +65,8 @@ (defn config-from-env [config-file] (clonfig/read-config (edn-config config-file))) +(declare config) + (defstate config :start (let [config-values-from-env (config-from-env config-file) app-name (-> config-values-from-env :ziggurat :app-name)] @@ -80,8 +82,11 @@ (let [cfg (ziggurat-config)] (get cfg :statsd (:datadog cfg)))) ;; TODO: remove datadog in the future -(defn get-in-config [ks] - (get-in (ziggurat-config) ks)) +(defn get-in-config + ([ks] + (get-in (ziggurat-config) ks)) + ([ks default] + (get-in (ziggurat-config) ks default))) (defn channel-retry-config [topic-entity channel] (get-in (ziggurat-config) [:stream-router topic-entity :channels channel :retry])) diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 7567dd63..4465bdf8 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -139,10 +139,12 @@ (defn- add-shutdown-hook [actor-stop-fn modes] (.addShutdownHook (Runtime/getRuntime) - (Thread. ^Runnable #(do (stop actor-stop-fn modes) - (shutdown-agents)) + (Thread. ^Runnable #((stop actor-stop-fn modes) + (shutdown-agents)) "Shutdown-handler"))) +(declare StreamRoute) + (s/defschema StreamRoute (s/conditional #(and (seq %) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 95f4d80e..b1bb6d95 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -95,6 +95,8 @@ (defrecord MessagePayload [message topic-entity]) +(declare message-payload-schema) + (s/defschema message-payload-schema {:message s/Any :topic-entity s/Keyword diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj index 6fd6847e..3b73ab7d 100644 --- a/src/ziggurat/messaging/consumer.clj +++ b/src/ziggurat/messaging/consumer.clj @@ -10,7 +10,7 @@ [ziggurat.mapper :as mpr] [ziggurat.messaging.connection :refer [connection]] [ziggurat.sentry :refer [sentry-reporter]] - [ziggurat.messaging.util :refer :all] + [ziggurat.messaging.util :refer [prefixed-channel-name prefixed-queue-name]] [ziggurat.metrics :as metrics])) (defn- convert-to-message-payload @@ -24,14 +24,14 @@ (s/validate mpr/message-payload-schema message) (catch Exception e (log/info "old message format read, converting to message-payload: " message) - (let [retry-count (or (:retry-count message) 0) + (let [retry-count (or (:retry-count message) 0) message-payload (mpr/->MessagePayload (dissoc message :retry-count) (keyword topic-entity))] (assoc message-payload :retry-count retry-count))))) (defn convert-and-ack-message "De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message if `ack?` is true." - [ch {:keys [delivery-tag] :as meta} ^bytes payload ack? topic-entity] + [ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity] (try (let [message (nippy/thaw payload)] (when ack? @@ -48,8 +48,8 @@ (lb/ack ch delivery-tag)) (defn process-message-from-queue [ch meta payload topic-entity processing-fn] - (let [delivery-tag (:delivery-tag meta) - message-payload (convert-and-ack-message ch meta payload false topic-entity)] + (let [delivery-tag (:delivery-tag meta) + message-payload (convert-and-ack-message ch meta payload false topic-entity)] (when message-payload (log/infof "Processing message [%s] from RabbitMQ " message-payload) (try @@ -109,13 +109,13 @@ (defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity] (lb/qos ch prefetch-count) - (let [consumer-tag (lcons/subscribe ch - queue-name - (message-handler wrapped-mapper-fn topic-entity) - {:handle-shutdown-signal-fn (fn [consumer_tag reason] - (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason)) - :handle-consume-ok-fn (fn [consumer_tag] - (log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})])) + (lcons/subscribe ch + queue-name + (message-handler wrapped-mapper-fn topic-entity) + {:handle-shutdown-signal-fn (fn [consumer_tag reason] + (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason)) + :handle-consume-ok-fn (fn [consumer_tag] + (log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})) (defn start-retry-subscriber* [mapper-fn topic-entity channels] (when (get-in-config [:retry :enabled]) @@ -129,8 +129,9 @@ (defn start-channels-subscriber [channels topic-entity] (doseq [channel channels] (let [channel-key (first channel) - channel-handler-fn (second channel)] - (dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])] + channel-handler-fn (second channel) + worker-count (get-in-config [:stream-router topic-entity :channels channel-key :worker-count] 0)] + (dotimes [_ worker-count] (start-subscriber* (lch/open connection) 1 (prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name])) diff --git a/src/ziggurat/middleware/default.clj b/src/ziggurat/middleware/default.clj index b70f74f4..387de713 100644 --- a/src/ziggurat/middleware/default.clj +++ b/src/ziggurat/middleware/default.clj @@ -1,27 +1,29 @@ (ns ziggurat.middleware.default (:require [protobuf.impl.flatland.mapdef :as protodef] [sentry-clj.async :as sentry] - [ziggurat.config :refer [ziggurat-config]] + [ziggurat.config :refer [get-in-config ziggurat-config]] [flatland.protobuf.core :as proto] [ziggurat.metrics :as metrics] - [ziggurat.config :as config] [ziggurat.sentry :refer [sentry-reporter]])) -(defn- deserialise-message +(defn- deserialize-message "This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a Clojure PersistentHashMap. Temporary logic for migration of services to Ziggurat V3.0 If the message is of type map, the function just returns the map as it is. In older versions of Ziggurat (< 3.0) we stored the messages in deserialized formats in RabbitMQ and those messages can be processed by this function. So we have this logic here." [message proto-class topic-entity-name] - (if-not (map? message) + (if-not (map? message) ;; TODO: we should have proper dispatch logic per message type (not like this) (try - (let [proto-klass (protodef/mapdef proto-class) - loaded-proto (protodef/parse proto-klass message) - proto-keys (-> proto-klass - protodef/mapdef->schema - :fields - keys)] + (let [[proto-define-fn proto-load-fn proto-schema-fn] (if (true? (get-in-config [:alpha-features :protobuf-middleware :enabled])) + [protodef/mapdef protodef/parse protodef/mapdef->schema] + [proto/protodef proto/protobuf-load proto/protobuf-schema]) + proto-klass (proto-define-fn proto-class) + loaded-proto (proto-load-fn proto-klass message) + proto-keys (-> proto-klass + proto-schema-fn + :fields + keys)] (select-keys loaded-proto proto-keys)) (catch Throwable e (let [service-name (:app-name (ziggurat-config)) @@ -34,40 +36,28 @@ nil))) message)) -(defn- deserialise-message-deprecated - "This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a - Clojure PersistentHashMap. - Temporary logic for migration of services to Ziggurat V3.0 - If the message is of type map, the function just returns the map as it is. In older versions of Ziggurat (< 3.0) we stored - the messages in deserialized formats in RabbitMQ and those messages can be processed by this function. So we have this logic here." - [message proto-class topic-entity-name] - (if-not (map? message) - (try - (let [proto-klass (proto/protodef proto-class) - loaded-proto (proto/protobuf-load proto-klass message) - proto-keys (-> proto-klass - proto/protobuf-schema - :fields - keys)] - (select-keys loaded-proto proto-keys)) - (catch Throwable e - (let [service-name (:app-name (ziggurat-config)) - additional-tags {:topic_name topic-entity-name} - default-namespace "message-parsing" - metric-namespaces [service-name topic-entity-name default-namespace] - multi-namespaces [metric-namespaces [default-namespace]]] - (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) - (metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags) - nil))) - message)) +(defprotocol Deserializable + (deserialize [message proto-class topic-entity-name])) + +(defrecord RegularMessage [message]) +(defrecord StreamJoinsMessage [message]) -(defn get-deserializer [] - (if (config/get-in-config [:alpha-features :protobuf-middleware :enabled]) - deserialise-message - deserialise-message-deprecated)) +(extend-type RegularMessage + Deserializable + (deserialize [this proto-class topic-entity-name] + (deserialize-message (:message this) proto-class topic-entity-name))) + +(extend-type StreamJoinsMessage + Deserializable + (deserialize [this proto-class topic-entity-name] + (reduce + (fn [[k1 v1] [k2 v2]] + {k1 (deserialize-message v1 (if (vector? proto-class) (first proto-class) proto-class) topic-entity-name) + k2 (deserialize-message v2 (if (vector? proto-class) (second proto-class) proto-class) topic-entity-name)}) + (:message this)))) (defn protobuf->hash "This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap" [handler-fn proto-class topic-entity-name] - (fn [message] (handler-fn ((get-deserializer) message proto-class topic-entity-name)))) - + (fn [message] + (handler-fn (deserialize message proto-class topic-entity-name)))) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 4df85c8c..4029be33 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -1,11 +1,11 @@ (ns ziggurat.streams (:require [clojure.tools.logging :as log] [mount.core :as mount :refer [defstate]] - [sentry-clj.async :as sentry] [ziggurat.channel :as chl] [ziggurat.config :refer [ziggurat-config]] [ziggurat.header-transformer :as header-transformer] [ziggurat.mapper :refer [mapper-func ->MessagePayload]] + [ziggurat.middleware.default :as mw] [ziggurat.metrics :as metrics] [ziggurat.timestamp-transformer :as timestamp-transformer] [ziggurat.util.map :as umap] @@ -16,13 +16,15 @@ [org.apache.kafka.common.serialization Serdes] [org.apache.kafka.common.utils SystemTime] [org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology] - [org.apache.kafka.streams.kstream ValueMapper TransformerSupplier ValueTransformerSupplier] + [org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier] [org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier] [ziggurat.timestamp_transformer IngestionTimeExtractor] [io.opentracing Tracer] [io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier] [io.opentracing.contrib.kafka TracingKafkaUtils] - [io.opentracing.tag Tags])) + [io.opentracing.tag Tags] + [java.time Duration] + [ziggurat.middleware.default StreamJoinsMessage])) (def default-config-for-stream {:buffered-records-per-partition 10000 @@ -31,6 +33,7 @@ :oldest-processed-message-in-s 604800 :changelog-topic-replication-factor 3 :session-timeout-ms-config 60000 + :consumer-type :default :default-key-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde" :default-value-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"}) @@ -46,9 +49,9 @@ "Populates `key-deserializer-encoding`, `value-deserializer-encoding` and `deserializer-encoding` in `properties` only if non-nil." [properties key-deserializer-encoding value-deserializer-encoding deserializer-encoding] - (let [KEY_DESERIALIZER_ENCODING "key.deserializer.encoding" + (let [KEY_DESERIALIZER_ENCODING "key.deserializer.encoding" VALUE_DESERIALIZER_ENCODING "value.deserializer.encoding" - DESERIALIZER_ENCODING "deserializer.encoding"] + DESERIALIZER_ENCODING "deserializer.encoding"] (if (some? key-deserializer-encoding) (.put properties KEY_DESERIALIZER_ENCODING key-deserializer-encoding)) (if (some? value-deserializer-encoding) @@ -102,6 +105,16 @@ (metrics/multi-ns-increment-count multi-namespaces metric additional-tags)) message) +(defn- stream-joins-log-and-report-metrics [topic topic-entity message] + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) + additional-tags {:topic-name topic-entity-name :input-topic topic :app-name service-name} + message-read-metric-namespace "stream-joins-message" + multi-namespaces [[message-read-metric-namespace]] + metric "read"] + (metrics/multi-ns-increment-count multi-namespaces metric additional-tags)) + message) + (defn store-supplier-builder [] (KeyValueStoreBuilder. (RocksDbKeyValueBytesStoreSupplier. "state-store") (Serdes/ByteArray) @@ -126,10 +139,17 @@ (get [_] (header-transformer/create)))) (defn- timestamp-transform-values [topic-entity-name oldest-processed-message-in-s stream-builder] - (let [service-name (:app-name (ziggurat-config)) + (let [service-name (:app-name (ziggurat-config)) delay-metric-namespace "message-received-delay-histogram" - metric-namespaces [service-name topic-entity-name delay-metric-namespace] - additional-tags {:topic_name topic-entity-name}] + metric-namespaces [service-name topic-entity-name delay-metric-namespace] + additional-tags {:topic_name topic-entity-name}] + (.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) + +(defn- stream-joins-delay-metric [topic topic-entity-name oldest-processed-message-in-s stream-builder] + (let [service-name (:app-name (ziggurat-config)) + delay-metric-namespace "stream-joins-message-received-delay-histogram" + metric-namespaces [service-name topic-entity-name delay-metric-namespace] + additional-tags {:topic-name topic-entity-name :input-topic topic :app-name service-name}] (.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) (defn- header-transform-values [stream-builder] @@ -144,14 +164,14 @@ (defn- traced-handler-fn [handler-fn channels message topic-entity] (let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer) - span (as-> tracer t - (.buildSpan t "Message-Handler") - (.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER) - (.withTag t (.getKey Tags/COMPONENT) "ziggurat") - (if (nil? parent-ctx) - t - (.asChildOf t parent-ctx)) - (.start t))] + span (as-> tracer t + (.buildSpan t "Message-Handler") + (.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER) + (.withTag t (.getKey Tags/COMPONENT) "ziggurat") + (if (nil? parent-ctx) + t + (.asChildOf t parent-ctx)) + (.start t))] (try ((mapper-func handler-fn channels) (assoc (->MessagePayload (:value message) topic-entity) :headers (:headers message))) (catch Exception e @@ -160,6 +180,53 @@ (finally (.finish span))))) +(defn- join-streams + [oldest-processed-message-in-s handler-fn channels topic-entity stream-1 stream-2] + (let [topic-entity-name (name topic-entity) + topic-name-1 (:topic-name stream-1) + topic-name-2 (:topic-name stream-2) + topic-key-1 (:topic-key stream-1) + topic-key-2 (:topic-key stream-2) + cfg-1 (:cfg stream-1) + cfg-2 (:cfg stream-2) + strm-1 (->> (:stream stream-1) + (stream-joins-delay-metric topic-name-1 topic-entity-name oldest-processed-message-in-s) + (map-values #(stream-joins-log-and-report-metrics topic-name-1 topic-entity-name %))) + strm-2 (->> (:stream stream-2) + (stream-joins-delay-metric topic-name-2 topic-entity-name oldest-processed-message-in-s) + (map-values #(stream-joins-log-and-report-metrics topic-name-2 topic-entity-name %))) + join-window-ms (JoinWindows/of (Duration/ofMillis (:join-window-ms cfg-1))) + join-type (:join-type cfg-1) + value-joiner (reify ValueJoiner + (apply [_ left right] + (mw/->StreamJoinsMessage {topic-key-1 left topic-key-2 right}))) + out-strm (if cfg-1 + (case join-type + :left (.leftJoin strm-1 strm-2 value-joiner join-window-ms) + :outer (.outerJoin strm-1 strm-2 value-joiner join-window-ms) + (.join strm-1 strm-2 value-joiner join-window-ms)) + strm-1)] + {:stream out-strm + :cfg cfg-2})) + +(defn- stream-joins-topology [handler-fn {:keys [input-topics join-cfg oldest-processed-message-in-s]} topic-entity channels] + (let [builder (StreamsBuilder.) + _ (.addStateStore builder (store-supplier-builder)) + stream-map (map (fn [[topic-key topic-value] [_ cfg]] + (let [topic-name (:name topic-value)] + {:stream (.stream builder topic-name) :cfg cfg :topic-name topic-name :topic-key topic-key})) input-topics (assoc join-cfg :end nil)) + {stream :stream} (reduce (partial join-streams oldest-processed-message-in-s handler-fn channels topic-entity) stream-map)] + (->> stream + (header-transform-values) + (map-values #(traced-handler-fn handler-fn channels % topic-entity))) + (.build builder))) + +(defn- normalize-message-value [message] + (let [msg-val (:value message)] + (if (string? msg-val) + message ;; TODO: this should be JsonMessage + (assoc-in message [:value] (mw/->RegularMessage msg-val))))) + (defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels] (let [builder (StreamsBuilder.) topic-entity-name (name topic-entity) @@ -169,13 +236,25 @@ (timestamp-transform-values topic-entity-name oldest-processed-message-in-s) (header-transform-values) (map-values #(log-and-report-metrics topic-entity-name %)) + (map-values normalize-message-value) (map-values #(traced-handler-fn handler-fn channels % topic-entity))) (.build builder))) (defn- start-stream* [handler-fn stream-config topic-entity channels] - (KafkaStreams. ^Topology (topology handler-fn stream-config topic-entity channels) - ^Properties (properties stream-config) - (new TracingKafkaClientSupplier tracer))) + (let [topology-fn (case (:consumer-type stream-config) + :stream-joins stream-joins-topology + topology) + top (topology-fn handler-fn stream-config topic-entity channels) + _ (log/info (.describe top))] + (KafkaStreams. ^Topology top + ^Properties (properties stream-config) + (new TracingKafkaClientSupplier tracer)))) + +(defn- merge-consumer-type-config + [config] + (case (:consumer-type config) + :stream-joins (assoc config :consumer-type (:consumer-type config)) + (assoc config :consumer-type :default))) (defn start-streams ([stream-routes] @@ -183,10 +262,12 @@ ([stream-routes stream-configs] (reduce (fn [streams stream] (let [topic-entity (first stream) - topic-handler-fn (-> stream second :handler-fn) + snd (second stream) + topic-handler-fn (:handler-fn snd) channels (chl/get-keys-for-topic stream-routes topic-entity) stream-config (-> stream-configs (get-in [:stream-router topic-entity]) + (merge-consumer-type-config) (umap/deep-merge default-config-for-stream)) stream (start-stream* topic-handler-fn stream-config topic-entity channels)] (.start stream) diff --git a/test/ziggurat/config_test.clj b/test/ziggurat/config_test.clj index 05bd4c56..a180f3b1 100644 --- a/test/ziggurat/config_test.clj +++ b/test/ziggurat/config_test.clj @@ -1,8 +1,16 @@ (ns ziggurat.config-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [deftest is testing]] [clonfig.core :as clonfig] [mount.core :as mount] - [ziggurat.config :refer :all]) + [ziggurat.config :refer [-get + -getIn + channel-retry-config + config config-file + config-from-env + default-config get-in-config + rabbitmq-config + statsd-config + ziggurat-config]]) (:import (java.util ArrayList))) (deftest config-from-env-test @@ -94,6 +102,15 @@ config-file config-filename] (mount/start #'config) (is (= (-> config-values-from-env :ziggurat :http-server :port) (get-in-config [:http-server :port]))) + (mount/stop)))) + (testing "returns config for key passed with default" + (let [config-filename "config.test.edn" + config-values-from-env (config-from-env config-filename) + default "test"] + (with-redefs [config-from-env (fn [_] config-values-from-env) + config-file config-filename] + (mount/start #'config) + (is (= default (get-in-config [:invalid :value] default))) (mount/stop))))) (deftest channel-retry-config-test diff --git a/test/ziggurat/middleware/default_test.clj b/test/ziggurat/middleware/default_test.clj index f5d32334..74fe1dd2 100644 --- a/test/ziggurat/middleware/default_test.clj +++ b/test/ziggurat/middleware/default_test.clj @@ -1,11 +1,12 @@ (ns ziggurat.middleware.default-test - (:require [clojure.test :refer :all] - [sentry-clj.async :as sentry] + (:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]] [protobuf.core :as proto] + [ziggurat.config :refer [ziggurat-config]] [ziggurat.fixtures :as fix] [ziggurat.metrics :as metrics] - [ziggurat.middleware.default :refer :all :as mw]) - (:import (flatland.protobuf.test Example$Photo))) + [ziggurat.middleware.default :as mw]) + (:import [flatland.protobuf.test Example$Photo] + [ziggurat.middleware.default RegularMessage StreamJoinsMessage])) (use-fixtures :once (join-fixtures [fix/mount-only-config fix/silence-logging])) @@ -21,7 +22,50 @@ handler-fn (fn [msg] (if (= msg message) (reset! handler-fn-called? true)))] - ((protobuf->hash handler-fn proto-class topic-entity-name) proto-message) + ((mw/protobuf->hash handler-fn proto-class topic-entity-name) (mw/->RegularMessage proto-message)) + (is (true? @handler-fn-called?)))) + (testing "deserialize a message from a stream join" + (let [handler-fn-called? (atom false) + left-message {:id 123 + :path "/path/to/left"} + right-message {:id 456 + :path "/path/to/right"} + proto-class Example$Photo + topic-entity-name "test" + left-proto-message (proto/->bytes (proto/create Example$Photo left-message)) + right-proto-message (proto/->bytes (proto/create Example$Photo right-message)) + handler-fn (fn [{:keys [left right]}] + (if (and (= left left-message) + (= right right-message)) + (reset! handler-fn-called? true)))] + ((mw/protobuf->hash handler-fn proto-class topic-entity-name) (mw/->StreamJoinsMessage {:left left-proto-message :right right-proto-message})) + (is (true? @handler-fn-called?)))) + (testing "deserialize a message from a stream join using 2 proto classes" + (let [handler-fn-called? (atom false) + left-message {:id 123 + :path "/path/to/left"} + right-message {:id 456 + :path "/path/to/right"} + proto-class Example$Photo + topic-entity-name "test" + left-proto-message (proto/->bytes (proto/create Example$Photo left-message)) + right-proto-message (proto/->bytes (proto/create Example$Photo right-message)) + handler-fn (fn [{:keys [left right]}] + (if (and (= left left-message) + (= right right-message)) + (reset! handler-fn-called? true)))] + ((mw/protobuf->hash handler-fn [proto-class proto-class] topic-entity-name) (mw/->StreamJoinsMessage {:left left-proto-message :right right-proto-message})) + (is (true? @handler-fn-called?)))) + (testing "When an already deserialised message is passed to the function it calls the handler fn without altering it" + (let [handler-fn-called? (atom false) + message {:id 7 + :path "/photos/h2k3j4h9h23"} + proto-class Example$Photo + topic-entity-name "test" + handler-fn (fn [msg] + (if (= msg message) + (reset! handler-fn-called? true)))] + ((mw/protobuf->hash handler-fn proto-class topic-entity-name) (mw/->RegularMessage message)) (is (true? @handler-fn-called?)))) (testing "When deserialisation fails, it reports to sentry, publishes metrics and passes nil to handler function" (let [handler-fn-called? (atom false) @@ -32,43 +76,35 @@ (reset! handler-fn-called? true)))] (with-redefs [metrics/multi-ns-increment-count (fn [_ _ _] (reset! metric-reporter-called? true))] - ((protobuf->hash handler-fn nil topic-entity-name) nil)) + ((mw/protobuf->hash handler-fn nil topic-entity-name) (mw/->RegularMessage nil))) (is (true? @handler-fn-called?)) - (is (true? @metric-reporter-called?)))) - (testing "When an already deserialised message is passed to the function it calls the handler fn without altering it" - (let [handler-fn-called? (atom false) - message {:id 7 - :path "/photos/h2k3j4h9h23"} - proto-class Example$Photo - topic-entity-name "test" - handler-fn (fn [msg] - (if (= msg message) - (reset! handler-fn-called? true)))] - ((protobuf->hash handler-fn proto-class topic-entity-name) message) - (is (true? @handler-fn-called?))))) + (is (true? @metric-reporter-called?))))) (deftest protobuf->hash-test-alpha-and-deprecated (testing "Deprecated protobuf deserializer" - (with-redefs [ziggurat.config/ziggurat-config (fn [] {:alpha-features {:protobuf-middleware {:enabled false}}})] + (with-redefs [ziggurat-config (fn [] {:alpha-features {:protobuf-middleware {:enabled false}}})] (common-protobuf->hash-test) (testing "When alpha feature is disabled use the old deserializer function" - (let [deserialise-message-called? (atom false) - deserialise-message-deprecated-called? (atom false) - topic-entity-name "test"] - (with-redefs [mw/deserialise-message (fn [_ _ _] (reset! deserialise-message-called? true)) - mw/deserialise-message-deprecated (fn [_ _ _] (reset! deserialise-message-deprecated-called? true))] - ((protobuf->hash (constantly nil) Example$Photo topic-entity-name) nil) - (is (true? @deserialise-message-deprecated-called?)) - (is (false? @deserialise-message-called?))))))) + (let [deserialize-message-called? (atom false) + topic-entity-name "test" + message {:id 7 + :path "/photos/h2k3j4h9h23"} + proto-class Example$Photo + proto-message (proto/->bytes (proto/create Example$Photo message))] + (with-redefs [mw/deserialize-message (fn [_ _ _] (reset! deserialize-message-called? true))] + ((mw/protobuf->hash (constantly nil) proto-class topic-entity-name) (mw/->RegularMessage proto-message)) + (is (true? @deserialize-message-called?))))))) (testing "Alpha protobuf deserializer" (with-redefs [ziggurat.config/ziggurat-config (fn [] {:alpha-features {:protobuf-middleware {:enabled true}}})] (common-protobuf->hash-test) (testing "When alpha feature is enabled use the new deserializer function" - (let [deserialise-message-called? (atom false) - deserialise-message-deprecated-called? (atom false) - topic-entity-name "test"] - (with-redefs [mw/deserialise-message (fn [_ _ _] (reset! deserialise-message-called? true)) - mw/deserialise-message-deprecated (fn [_ _ _] (reset! deserialise-message-deprecated-called? true))] - ((protobuf->hash (constantly nil) Example$Photo topic-entity-name) nil) - (is (true? @deserialise-message-called?)) - (is (false? @deserialise-message-deprecated-called?)))))))) + (let [deserialize-message-called? (atom false) + topic-entity-name "test" + message {:id 7 + :path "/photos/h2k3j4h9h23"} + proto-class Example$Photo + proto-message (proto/->bytes (proto/create Example$Photo message))] + (with-redefs [mw/deserialize-message (fn [_ _ _] (reset! deserialize-message-called? true))] + + ((mw/protobuf->hash (constantly nil) proto-class topic-entity-name) (mw/->RegularMessage proto-message)) + (is (true? @deserialize-message-called?)))))))) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index dc0d714e..6527db7e 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -1,5 +1,5 @@ (ns ziggurat.streams-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]] [protobuf.core :as proto] [ziggurat.streams :refer [start-streams stop-streams]] [ziggurat.fixtures :as fix] @@ -100,6 +100,83 @@ (stop-streams streams) (is (= times @message-received-count)))) +(deftest start-stream-joins-test + (testing "stream joins using inner join" + (let [message-received-count (atom 0) + mapped-fn (get-mapped-fn message-received-count {:topic {:id 7, :path "/photos/h2k3j4h9h23"}, :another-test-topic {:id 7, :path "/photos/h2k3j4h9h23"}}) + times 1 + kvs (repeat times message-key-value) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + streams (start-streams {:default {:handler-fn handler-fn}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :consumer-type] :stream-joins) + (assoc-in [:stream-router :default :input-topics] {:topic {:name "topic"} :another-test-topic {:name "another-test-topic"}}) + (assoc-in [:stream-router :default :join-cfg] {:topic-and-another-test-topic {:join-window-ms 5000 :join-type :inner}}) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))] + (Thread/sleep 10000) ;;waiting for streams to start + (IntegrationTestUtils/produceKeyValuesSynchronously "topic" + kvs + (props) + (MockTime.)) + (IntegrationTestUtils/produceKeyValuesSynchronously "another-test-topic" + kvs + (props) + (MockTime.)) + (Thread/sleep 5000) ;;wating for streams to consume messages + (stop-streams streams) + (is (= times @message-received-count)))) + (testing "stream joins using left join" + (let [message-received-count (atom 0) + mapped-fn (get-mapped-fn message-received-count {:topic {:id 7, :path "/photos/h2k3j4h9h23"}, :another-test-topic {:id 7, :path "/photos/h2k3j4h9h23"}}) + times 1 + kvs (repeat times message-key-value) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + streams (start-streams {:default {:handler-fn handler-fn}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :consumer-type] :stream-joins) + (assoc-in [:stream-router :default :input-topics] {:topic {:name "topic"} :another-test-topic {:name "another-test-topic"}}) + (assoc-in [:stream-router :default :join-cfg] {:topic-and-another-test-topic {:join-window-ms 5000 :join-type :left}}) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))] + (Thread/sleep 10000) ;;waiting for streams to start + (IntegrationTestUtils/produceKeyValuesSynchronously "topic" + kvs + (props) + (MockTime.)) + (IntegrationTestUtils/produceKeyValuesSynchronously "another-test-topic" + kvs + (props) + (MockTime.)) + (Thread/sleep 5000) ;;wating for streams to consume messages + (stop-streams streams) + (is (= times @message-received-count)))) + (testing "stream joins using outer join" + (let [message-received-count (atom 0) + mapped-fn (get-mapped-fn message-received-count {:topic {:id 7, :path "/photos/h2k3j4h9h23"}, :another-test-topic {:id 7, :path "/photos/h2k3j4h9h23"}}) + times 1 + kvs (repeat times message-key-value) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + streams (start-streams {:default {:handler-fn handler-fn}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :consumer-type] :stream-joins) + (assoc-in [:stream-router :default :input-topics] {:topic {:name "topic"} :another-test-topic {:name "another-test-topic"}}) + (assoc-in [:stream-router :default :join-cfg] {:topic-and-another-test-topic {:join-window-ms 5000 :join-type :outer}}) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))] + (Thread/sleep 10000) ;;waiting for streams to start + (IntegrationTestUtils/produceKeyValuesSynchronously "topic" + kvs + (props) + (MockTime.)) + (IntegrationTestUtils/produceKeyValuesSynchronously "another-test-topic" + kvs + (props) + (MockTime.)) + (Thread/sleep 5000) ;;wating for streams to consume messages + (stop-streams streams) + (is (= times @message-received-count))))) + (deftest start-streams-test-with-string-serde (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count json-message)