diff --git a/README.md b/README.md index cbf8787c..5d9546a5 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ To start a stream (a thread that reads messages from Kafka), add this to your co [message] (println message) :success) - + (def handler-fn (-> main-fn (middleware/protobuf->hash ProtoClass :stream-id))) @@ -88,7 +88,7 @@ To start a stream (a thread that reads messages from Kafka), add this to your co (ziggurat/main start-fn stop-fn {:stream-id {:handler-fn handler-fn}}) ``` -_NOTE: this example assumes that the message is serialized in Protobuf format_ +_NOTE: this example assumes that the message is serialized in Protobuf format_ Please refer the [Middleware section](#middleware-in-ziggurat) for understanding `handler-fn` here. @@ -129,7 +129,7 @@ Please refer the [Middleware section](#middleware-in-ziggurat) for understanding [message] (println message) :success) - + (def handler-fn (-> main-fn (middleware/protobuf->hash ProtoClass :stream-id))) @@ -137,7 +137,7 @@ Please refer the [Middleware section](#middleware-in-ziggurat) for understanding (ziggurat/main start-fn stop-fn {:stream-id {:handler-fn handler-fn}} routes) ``` -_NOTE: this example assumes that the message is serialized in Protobuf format_ +_NOTE: this example assumes that the message is serialized in Protobuf format_ Ziggurat also sets up a HTTP server by default and you can pass in your own routes that it will serve. The above example demonstrates how you can pass in your own route. @@ -211,9 +211,9 @@ It can be used like this. (parse-json :stream-route-config))) ``` -Here, `message-handler-fn` calls `parse-json` with a message handler function -`actual-message-handler-function` as the first argument and the key of a stream-route -config (as defined in `config.edn`) as the second argument. +Here, `message-handler-fn` calls `parse-json` with a message handler function +`actual-message-handler-function` as the first argument and the key of a stream-route +config (as defined in `config.edn`) as the second argument. ## Publishing data to Kafka Topics in Ziggurat To enable publishing data to kafka, Ziggurat provides producing support through ziggurat.producer namespace. This namespace defines methods for publishing data to Kafka topics. The methods defined here are essentially wrapper around variants of `send` methods defined in `org.apache.kafka.clients.producer.KafkaProducer`. @@ -245,8 +245,8 @@ Tracing has been added to the following flows: 3. Produce to rabbitmq channel 4. Produce to another kafka topic -By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment) -and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables. +By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment) +and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables. To enable custom tracer, a custom tracer provider function name can be set in `:custom-provider`. The corresponding function will be executed in runtime to create a tracer. In the event of any errors while executing the custom tracer provider, a Noop tracer will be created. To enable tracing, the following config needs to be added to the `config.edn` under `:ziggurat` key. @@ -263,6 +263,34 @@ JAEGER_AGENT_HOST: "localhost" JAEGER_AGENT_PORT: 6831 ``` +## Stream Joins +This will allow an actor to join messages from 2 topics into 1 result. To be able to use stream joins just add the configuration below to your `config.edn` +```clojure +{:ziggurat {:stream-router {:stream-id { + :consumer-type :stream-joins + :input-topics {:topic-1 {} :topic-2 {}} + :join-cfg {:topic-1-and-topic-2 {:join-window-ms 5000 :join-type :inner}} +}}} +``` +* consumer-type - enables stream joins if `:stream-joins` key is provided, other possible value is `:default` which is the default actor behavior +* input-topics - a vector of topics in which you want to use for joining +* join-cfg - a vector of configurations which you define the join-window-ms and the join-type (`:inner`, `:left` or `:outer`) + +And your actor's handler function be like +```clojure +(def handler-func + (-> main-func + (mw/protobuf->hash [com.gojek.esb.booking.BookingLogMessage com.gojek.esb.booking.BookingLogMessage] :booking))) +``` + +`Please take note of the vector containing the proto classes` + +Your handler function will receive a message in the following format/structure + +```clojure +{:left "message-from-1st-topic" :right "message-from-2nd-topic"} +``` + ## Configuration All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggurat` key. @@ -344,7 +372,7 @@ All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggur ## Alpha (Experimental) Features The contract and interface for experimental features in Ziggurat can be changed as we iterate towards better designs for that feature. -For all purposes these features should be considered unstable and should only be used after understanding their risks and implementations. +For all purposes these features should be considered unstable and should only be used after understanding their risks and implementations. ### Exponential Backoff based Retries In addition to linear retries, Ziggurat users can now use exponential backoff strategy for retries. This means that the message @@ -359,11 +387,11 @@ Exponential retries can be configured as described below. :ziggurat {:stream-router {:default {:application-id "application_name"...}}} :retry {:type [:exponential :keyword] :count [10 :int] - :enable [true :bool]} + :enable [true :bool]} ``` -Exponential retries can be configured for channels too. Additionally, a user can specify a custom `queue-timeout-ms` value per channel. +Exponential retries can be configured for channels too. Additionally, a user can specify a custom `queue-timeout-ms` value per channel. Timeouts for exponential backoffs are calculated using `queue-timeout-ms`. This implies that each channel can have separate count of retries and different timeout values. @@ -373,7 +401,7 @@ and different timeout values. :retry {:type [:exponential :keyword] :count [10 :int] :queue-timeout-ms 2000 - :enable [true :bool]}}}}} + :enable [true :bool]}}}}} ``` ## Deprecation Notice diff --git a/resources/config.test.edn b/resources/config.test.edn index c6e5e2fc..5f8e6132 100644 --- a/resources/config.test.edn +++ b/resources/config.test.edn @@ -39,6 +39,9 @@ :stream-threads-count [1 :int] :origin-topic "topic" :upgrade-from "1.1" + :consumer-type :default + :input-topics {:topic {} :another-test-topic {}} + :join-cfg {:topic-and-another-test-topic {:join-window-ms 5000 :join-type :inner}} :channels {:channel-1 {:worker-count [10 :int] :retry {:type [:linear :keyword] :count [5 :int] diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index f76231ea..c911efb7 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -65,6 +65,8 @@ (defn config-from-env [config-file] (clonfig/read-config (edn-config config-file))) +(declare config) + (defstate config :start (let [config-values-from-env (config-from-env config-file) app-name (-> config-values-from-env :ziggurat :app-name)] @@ -80,8 +82,11 @@ (let [cfg (ziggurat-config)] (get cfg :statsd (:datadog cfg)))) ;; TODO: remove datadog in the future -(defn get-in-config [ks] - (get-in (ziggurat-config) ks)) +(defn get-in-config + ([ks] + (get-in (ziggurat-config) ks)) + ([ks default] + (get-in (ziggurat-config) ks default))) (defn channel-retry-config [topic-entity channel] (get-in (ziggurat-config) [:stream-router topic-entity :channels channel :retry])) diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj index 7567dd63..4199acac 100644 --- a/src/ziggurat/init.clj +++ b/src/ziggurat/init.clj @@ -139,16 +139,19 @@ (defn- add-shutdown-hook [actor-stop-fn modes] (.addShutdownHook (Runtime/getRuntime) - (Thread. ^Runnable #(do (stop actor-stop-fn modes) - (shutdown-agents)) + (Thread. ^Runnable #((stop actor-stop-fn modes) + (shutdown-agents)) "Shutdown-handler"))) +(declare StreamRoute) + (s/defschema StreamRoute (s/conditional #(and (seq %) (map? %)) - {s/Keyword {:handler-fn (s/pred #(fn? %)) - s/Keyword (s/pred #(fn? %))}})) + {s/Keyword {:handler-fn (s/pred #(fn? %)) + (s/optional-key :consumer-type) (s/enum :default :stream-joins) + s/Keyword (s/pred #(fn? %))}})) (defn validate-stream-routes [stream-routes modes] (when (or (empty? modes) (contains? (set modes) :stream-worker)) diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index 95f4d80e..b1bb6d95 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -95,6 +95,8 @@ (defrecord MessagePayload [message topic-entity]) +(declare message-payload-schema) + (s/defschema message-payload-schema {:message s/Any :topic-entity s/Keyword diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj index 6fd6847e..3b73ab7d 100644 --- a/src/ziggurat/messaging/consumer.clj +++ b/src/ziggurat/messaging/consumer.clj @@ -10,7 +10,7 @@ [ziggurat.mapper :as mpr] [ziggurat.messaging.connection :refer [connection]] [ziggurat.sentry :refer [sentry-reporter]] - [ziggurat.messaging.util :refer :all] + [ziggurat.messaging.util :refer [prefixed-channel-name prefixed-queue-name]] [ziggurat.metrics :as metrics])) (defn- convert-to-message-payload @@ -24,14 +24,14 @@ (s/validate mpr/message-payload-schema message) (catch Exception e (log/info "old message format read, converting to message-payload: " message) - (let [retry-count (or (:retry-count message) 0) + (let [retry-count (or (:retry-count message) 0) message-payload (mpr/->MessagePayload (dissoc message :retry-count) (keyword topic-entity))] (assoc message-payload :retry-count retry-count))))) (defn convert-and-ack-message "De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message if `ack?` is true." - [ch {:keys [delivery-tag] :as meta} ^bytes payload ack? topic-entity] + [ch {:keys [delivery-tag]} ^bytes payload ack? topic-entity] (try (let [message (nippy/thaw payload)] (when ack? @@ -48,8 +48,8 @@ (lb/ack ch delivery-tag)) (defn process-message-from-queue [ch meta payload topic-entity processing-fn] - (let [delivery-tag (:delivery-tag meta) - message-payload (convert-and-ack-message ch meta payload false topic-entity)] + (let [delivery-tag (:delivery-tag meta) + message-payload (convert-and-ack-message ch meta payload false topic-entity)] (when message-payload (log/infof "Processing message [%s] from RabbitMQ " message-payload) (try @@ -109,13 +109,13 @@ (defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity] (lb/qos ch prefetch-count) - (let [consumer-tag (lcons/subscribe ch - queue-name - (message-handler wrapped-mapper-fn topic-entity) - {:handle-shutdown-signal-fn (fn [consumer_tag reason] - (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason)) - :handle-consume-ok-fn (fn [consumer_tag] - (log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})])) + (lcons/subscribe ch + queue-name + (message-handler wrapped-mapper-fn topic-entity) + {:handle-shutdown-signal-fn (fn [consumer_tag reason] + (log/infof "channel closed with consumer tag: %s, reason: %s " consumer_tag, reason)) + :handle-consume-ok-fn (fn [consumer_tag] + (log/infof "consumer started for %s with consumer tag %s " queue-name consumer_tag))})) (defn start-retry-subscriber* [mapper-fn topic-entity channels] (when (get-in-config [:retry :enabled]) @@ -129,8 +129,9 @@ (defn start-channels-subscriber [channels topic-entity] (doseq [channel channels] (let [channel-key (first channel) - channel-handler-fn (second channel)] - (dotimes [_ (get-in-config [:stream-router topic-entity :channels channel-key :worker-count])] + channel-handler-fn (second channel) + worker-count (get-in-config [:stream-router topic-entity :channels channel-key :worker-count] 0)] + (dotimes [_ worker-count] (start-subscriber* (lch/open connection) 1 (prefixed-channel-name topic-entity channel-key (get-in-config [:rabbit-mq :instant :queue-name])) diff --git a/src/ziggurat/middleware/default.clj b/src/ziggurat/middleware/default.clj index b70f74f4..9b41c038 100644 --- a/src/ziggurat/middleware/default.clj +++ b/src/ziggurat/middleware/default.clj @@ -1,10 +1,9 @@ (ns ziggurat.middleware.default (:require [protobuf.impl.flatland.mapdef :as protodef] [sentry-clj.async :as sentry] - [ziggurat.config :refer [ziggurat-config]] + [ziggurat.config :refer [get-in-config ziggurat-config]] [flatland.protobuf.core :as proto] [ziggurat.metrics :as metrics] - [ziggurat.config :as config] [ziggurat.sentry :refer [sentry-reporter]])) (defn- deserialise-message @@ -32,7 +31,9 @@ (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) (metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags) nil))) - message)) + (let [{:keys [left right]} message] + {:left (deserialise-message left (if (vector? proto-class) (first proto-class) proto-class) topic-entity-name) ;; TODO: convert proto-class into a vector only on the next version + :right (deserialise-message right (if (vector? proto-class) (second proto-class) proto-class) topic-entity-name)}))) (defn- deserialise-message-deprecated "This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a @@ -59,10 +60,12 @@ (sentry/report-error sentry-reporter e (str "Couldn't parse the message with proto - " proto-class)) (metrics/multi-ns-increment-count multi-namespaces "failed" additional-tags) nil))) - message)) + (let [{:keys [left right]} message] + {:left (deserialise-message left (if (vector? proto-class) (first proto-class) proto-class) topic-entity-name) ;; TODO: convert proto-class into a vector only on the next version + :right (deserialise-message right (if (vector? proto-class) (second proto-class) proto-class) topic-entity-name)}))) (defn get-deserializer [] - (if (config/get-in-config [:alpha-features :protobuf-middleware :enabled]) + (if (get-in-config [:alpha-features :protobuf-middleware :enabled]) deserialise-message deserialise-message-deprecated)) @@ -70,4 +73,3 @@ "This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap" [handler-fn proto-class topic-entity-name] (fn [message] (handler-fn ((get-deserializer) message proto-class topic-entity-name)))) - diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 4df85c8c..f8a59458 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -1,7 +1,6 @@ (ns ziggurat.streams (:require [clojure.tools.logging :as log] [mount.core :as mount :refer [defstate]] - [sentry-clj.async :as sentry] [ziggurat.channel :as chl] [ziggurat.config :refer [ziggurat-config]] [ziggurat.header-transformer :as header-transformer] @@ -16,13 +15,14 @@ [org.apache.kafka.common.serialization Serdes] [org.apache.kafka.common.utils SystemTime] [org.apache.kafka.streams KafkaStreams StreamsConfig StreamsBuilder Topology] - [org.apache.kafka.streams.kstream ValueMapper TransformerSupplier ValueTransformerSupplier] + [org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier] [org.apache.kafka.streams.state.internals KeyValueStoreBuilder RocksDbKeyValueBytesStoreSupplier] [ziggurat.timestamp_transformer IngestionTimeExtractor] [io.opentracing Tracer] [io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier] [io.opentracing.contrib.kafka TracingKafkaUtils] - [io.opentracing.tag Tags])) + [io.opentracing.tag Tags] + [java.time Duration])) (def default-config-for-stream {:buffered-records-per-partition 10000 @@ -31,6 +31,9 @@ :oldest-processed-message-in-s 604800 :changelog-topic-replication-factor 3 :session-timeout-ms-config 60000 + :consumer-type :default + ;; :default-key-serde "org.apache.kafka.common.serialization.Serdes$StringSerde" + ;; :default-value-serde "org.apache.kafka.common.serialization.Serdes$StringSerde"}) :default-key-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde" :default-value-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"}) @@ -46,9 +49,9 @@ "Populates `key-deserializer-encoding`, `value-deserializer-encoding` and `deserializer-encoding` in `properties` only if non-nil." [properties key-deserializer-encoding value-deserializer-encoding deserializer-encoding] - (let [KEY_DESERIALIZER_ENCODING "key.deserializer.encoding" + (let [KEY_DESERIALIZER_ENCODING "key.deserializer.encoding" VALUE_DESERIALIZER_ENCODING "value.deserializer.encoding" - DESERIALIZER_ENCODING "deserializer.encoding"] + DESERIALIZER_ENCODING "deserializer.encoding"] (if (some? key-deserializer-encoding) (.put properties KEY_DESERIALIZER_ENCODING key-deserializer-encoding)) (if (some? value-deserializer-encoding) @@ -102,6 +105,16 @@ (metrics/multi-ns-increment-count multi-namespaces metric additional-tags)) message) +(defn- stream-joins-log-and-report-metrics [topic topic-entity message] + (let [service-name (:app-name (ziggurat-config)) + topic-entity-name (name topic-entity) + additional-tags {:topic-name topic-entity-name :input-topic topic :app-name service-name} + message-read-metric-namespace "stream-joins-message" + multi-namespaces [[message-read-metric-namespace]] + metric "read"] + (metrics/multi-ns-increment-count multi-namespaces metric additional-tags)) + message) + (defn store-supplier-builder [] (KeyValueStoreBuilder. (RocksDbKeyValueBytesStoreSupplier. "state-store") (Serdes/ByteArray) @@ -126,10 +139,17 @@ (get [_] (header-transformer/create)))) (defn- timestamp-transform-values [topic-entity-name oldest-processed-message-in-s stream-builder] - (let [service-name (:app-name (ziggurat-config)) + (let [service-name (:app-name (ziggurat-config)) delay-metric-namespace "message-received-delay-histogram" - metric-namespaces [service-name topic-entity-name delay-metric-namespace] - additional-tags {:topic_name topic-entity-name}] + metric-namespaces [service-name topic-entity-name delay-metric-namespace] + additional-tags {:topic_name topic-entity-name}] + (.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) + +(defn- stream-joins-delay-metric [topic topic-entity-name oldest-processed-message-in-s stream-builder] + (let [service-name (:app-name (ziggurat-config)) + delay-metric-namespace "stream-joins-message-received-delay-histogram" + metric-namespaces [service-name topic-entity-name delay-metric-namespace] + additional-tags {:topic-name topic-entity-name :input-topic topic :app-name service-name}] (.transform stream-builder (timestamp-transformer-supplier metric-namespaces oldest-processed-message-in-s additional-tags) (into-array [(.name (store-supplier-builder))])))) (defn- header-transform-values [stream-builder] @@ -144,14 +164,14 @@ (defn- traced-handler-fn [handler-fn channels message topic-entity] (let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer) - span (as-> tracer t - (.buildSpan t "Message-Handler") - (.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER) - (.withTag t (.getKey Tags/COMPONENT) "ziggurat") - (if (nil? parent-ctx) - t - (.asChildOf t parent-ctx)) - (.start t))] + span (as-> tracer t + (.buildSpan t "Message-Handler") + (.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER) + (.withTag t (.getKey Tags/COMPONENT) "ziggurat") + (if (nil? parent-ctx) + t + (.asChildOf t parent-ctx)) + (.start t))] (try ((mapper-func handler-fn channels) (assoc (->MessagePayload (:value message) topic-entity) :headers (:headers message))) (catch Exception e @@ -160,6 +180,44 @@ (finally (.finish span))))) +(defn- join-streams + [oldest-processed-message-in-s handler-fn channels topic-entity stream-1 stream-2] + (let [topic-entity-name (name topic-entity) + topic-1 (:topic-name stream-1) + topic-2 (:topic-name stream-2) + cfg-1 (:cfg stream-1) + cfg-2 (:cfg stream-2) + strm-1 (->> (:stream stream-1) + (stream-joins-delay-metric topic-1 topic-entity-name oldest-processed-message-in-s) + (map-values #(stream-joins-log-and-report-metrics topic-1 topic-entity-name %))) + strm-2 (->> (:stream stream-2) + (stream-joins-delay-metric topic-2 topic-entity-name oldest-processed-message-in-s) + (map-values #(stream-joins-log-and-report-metrics topic-2 topic-entity-name %))) + join-window-ms (JoinWindows/of (Duration/ofMillis (:join-window-ms cfg-1))) + join-type (:join-type cfg-1) + value-joiner (reify ValueJoiner + (apply [_ left right] + {:left left :right right})) + out-strm (if cfg-1 + (case join-type + :left (.leftJoin strm-1 strm-2 value-joiner join-window-ms) + :outer (.outerJoin strm-1 strm-2 value-joiner join-window-ms) + (.join strm-1 strm-2 value-joiner join-window-ms)) + strm-1)] + {:stream out-strm + :cfg cfg-2})) + +(defn- stream-joins-topology [handler-fn {:keys [input-topics join-cfg oldest-processed-message-in-s]} topic-entity channels] + (let [builder (StreamsBuilder.) + topic-patterns (map (fn [[k _]] (Pattern/compile (name k))) input-topics) + _ (.addStateStore builder (store-supplier-builder)) + stream-map (map (fn [topic-pattern [_ v]] {:stream (.stream builder topic-pattern) :cfg v :topic-name topic-pattern}) topic-patterns (assoc join-cfg :end nil)) + {stream :stream} (reduce (partial join-streams oldest-processed-message-in-s handler-fn channels topic-entity) stream-map)] + (->> stream + (header-transform-values) + (map-values #(traced-handler-fn handler-fn channels % topic-entity))) + (.build builder))) + (defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels] (let [builder (StreamsBuilder.) topic-entity-name (name topic-entity) @@ -173,9 +231,26 @@ (.build builder))) (defn- start-stream* [handler-fn stream-config topic-entity channels] - (KafkaStreams. ^Topology (topology handler-fn stream-config topic-entity channels) - ^Properties (properties stream-config) - (new TracingKafkaClientSupplier tracer))) + (let [topology-fn (case (:consumer-type stream-config) + :stream-joins stream-joins-topology + topology) + top (topology-fn handler-fn stream-config topic-entity channels) + _ (log/info (.describe top))] + (KafkaStreams. ^Topology top + ^Properties (properties stream-config) + (new TracingKafkaClientSupplier tracer)))) + +(defn- merge-stream-joins-config ;; not sure about this, :consumer-type can be from the config.edn or the map from init-fn, please comment! + [config stream] + (let [new-cfg (case (:consumer-type config) + :stream-joins (assoc config :consumer-type (:consumer-type config)) + (case (:consumer-type stream) + :stream-joins (assoc config :consumer-type (:consumer-type stream)) + (assoc config :consumer-type :default)))] + (if (and (:default-key-serde stream) + (:default-value-serde stream)) + (assoc new-cfg :default-key-serde (:default-key-serde stream) :default-value-serde (:default-value-serde stream)) + new-cfg))) (defn start-streams ([stream-routes] @@ -183,10 +258,12 @@ ([stream-routes stream-configs] (reduce (fn [streams stream] (let [topic-entity (first stream) - topic-handler-fn (-> stream second :handler-fn) + snd (second stream) + topic-handler-fn (:handler-fn snd) channels (chl/get-keys-for-topic stream-routes topic-entity) stream-config (-> stream-configs (get-in [:stream-router topic-entity]) + (merge-stream-joins-config snd) (umap/deep-merge default-config-for-stream)) stream (start-stream* topic-handler-fn stream-config topic-entity channels)] (.start stream) diff --git a/test/ziggurat/middleware/default_test.clj b/test/ziggurat/middleware/default_test.clj index f5d32334..bde080ae 100644 --- a/test/ziggurat/middleware/default_test.clj +++ b/test/ziggurat/middleware/default_test.clj @@ -1,10 +1,10 @@ (ns ziggurat.middleware.default-test - (:require [clojure.test :refer :all] - [sentry-clj.async :as sentry] + (:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]] [protobuf.core :as proto] + [ziggurat.config :refer [ziggurat-config]] [ziggurat.fixtures :as fix] [ziggurat.metrics :as metrics] - [ziggurat.middleware.default :refer :all :as mw]) + [ziggurat.middleware.default :as mw]) (:import (flatland.protobuf.test Example$Photo))) (use-fixtures :once (join-fixtures [fix/mount-only-config @@ -21,7 +21,39 @@ handler-fn (fn [msg] (if (= msg message) (reset! handler-fn-called? true)))] - ((protobuf->hash handler-fn proto-class topic-entity-name) proto-message) + ((mw/protobuf->hash handler-fn proto-class topic-entity-name) proto-message) + (is (true? @handler-fn-called?)))) + (testing "deserialize a message from a stream join" + (let [handler-fn-called? (atom false) + left-message {:id 123 + :path "/path/to/left"} + right-message {:id 456 + :path "/path/to/right"} + proto-class Example$Photo + topic-entity-name "test" + left-proto-message (proto/->bytes (proto/create Example$Photo left-message)) + right-proto-message (proto/->bytes (proto/create Example$Photo right-message)) + handler-fn (fn [{:keys [left right]}] + (if (and (= left left-message) + (= right right-message)) + (reset! handler-fn-called? true)))] + ((mw/protobuf->hash handler-fn proto-class topic-entity-name) {:left left-proto-message :right right-proto-message}) + (is (true? @handler-fn-called?)))) + (testing "deserialize a message from a stream join using 2 proto classes" + (let [handler-fn-called? (atom false) + left-message {:id 123 + :path "/path/to/left"} + right-message {:id 456 + :path "/path/to/right"} + proto-class Example$Photo + topic-entity-name "test" + left-proto-message (proto/->bytes (proto/create Example$Photo left-message)) + right-proto-message (proto/->bytes (proto/create Example$Photo right-message)) + handler-fn (fn [{:keys [left right]}] + (if (and (= left left-message) + (= right right-message)) + (reset! handler-fn-called? true)))] + ((mw/protobuf->hash handler-fn [proto-class proto-class] topic-entity-name) {:left left-proto-message :right right-proto-message}) (is (true? @handler-fn-called?)))) (testing "When deserialisation fails, it reports to sentry, publishes metrics and passes nil to handler function" (let [handler-fn-called? (atom false) @@ -32,24 +64,13 @@ (reset! handler-fn-called? true)))] (with-redefs [metrics/multi-ns-increment-count (fn [_ _ _] (reset! metric-reporter-called? true))] - ((protobuf->hash handler-fn nil topic-entity-name) nil)) + ((mw/protobuf->hash handler-fn nil topic-entity-name) nil)) (is (true? @handler-fn-called?)) - (is (true? @metric-reporter-called?)))) - (testing "When an already deserialised message is passed to the function it calls the handler fn without altering it" - (let [handler-fn-called? (atom false) - message {:id 7 - :path "/photos/h2k3j4h9h23"} - proto-class Example$Photo - topic-entity-name "test" - handler-fn (fn [msg] - (if (= msg message) - (reset! handler-fn-called? true)))] - ((protobuf->hash handler-fn proto-class topic-entity-name) message) - (is (true? @handler-fn-called?))))) + (is (true? @metric-reporter-called?))))) (deftest protobuf->hash-test-alpha-and-deprecated (testing "Deprecated protobuf deserializer" - (with-redefs [ziggurat.config/ziggurat-config (fn [] {:alpha-features {:protobuf-middleware {:enabled false}}})] + (with-redefs [ziggurat-config (fn [] {:alpha-features {:protobuf-middleware {:enabled false}}})] (common-protobuf->hash-test) (testing "When alpha feature is disabled use the old deserializer function" (let [deserialise-message-called? (atom false) @@ -57,7 +78,7 @@ topic-entity-name "test"] (with-redefs [mw/deserialise-message (fn [_ _ _] (reset! deserialise-message-called? true)) mw/deserialise-message-deprecated (fn [_ _ _] (reset! deserialise-message-deprecated-called? true))] - ((protobuf->hash (constantly nil) Example$Photo topic-entity-name) nil) + ((mw/protobuf->hash (constantly nil) Example$Photo topic-entity-name) nil) (is (true? @deserialise-message-deprecated-called?)) (is (false? @deserialise-message-called?))))))) (testing "Alpha protobuf deserializer" @@ -69,6 +90,6 @@ topic-entity-name "test"] (with-redefs [mw/deserialise-message (fn [_ _ _] (reset! deserialise-message-called? true)) mw/deserialise-message-deprecated (fn [_ _ _] (reset! deserialise-message-deprecated-called? true))] - ((protobuf->hash (constantly nil) Example$Photo topic-entity-name) nil) + ((mw/protobuf->hash (constantly nil) Example$Photo topic-entity-name) nil) (is (true? @deserialise-message-called?)) (is (false? @deserialise-message-deprecated-called?)))))))) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index dc0d714e..ffd4db74 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -1,5 +1,5 @@ (ns ziggurat.streams-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]] [protobuf.core :as proto] [ziggurat.streams :refer [start-streams stop-streams]] [ziggurat.fixtures :as fix] @@ -100,6 +100,89 @@ (stop-streams streams) (is (= times @message-received-count)))) +(deftest start-stream-joins-test + (testing "stream joins using inner join" + (let [message-received-count (atom 0) + mapped-fn (get-mapped-fn message-received-count {:left {:id 7, :path "/photos/h2k3j4h9h23"}, :right {:id 7, :path "/photos/h2k3j4h9h23"}}) + times 1 + kvs (repeat times message-key-value) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + streams (start-streams {:default {:handler-fn handler-fn + :consumer-type :stream-joins + :default-key-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde" + :default-value-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :input-topics] {:topic {} :another-test-topic {}}) + (assoc-in [:stream-router :default :join-cfg] {:topic-and-another-test-topic {:join-window-ms 5000 :join-type :inner}}) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))] + (Thread/sleep 10000) ;;waiting for streams to start + (IntegrationTestUtils/produceKeyValuesSynchronously "topic" + kvs + (props) + (MockTime.)) + (IntegrationTestUtils/produceKeyValuesSynchronously "another-test-topic" + kvs + (props) + (MockTime.)) + (Thread/sleep 5000) ;;wating for streams to consume messages + (stop-streams streams) + (is (= times @message-received-count)))) + (testing "stream joins using left join" + (let [message-received-count (atom 0) + mapped-fn (get-mapped-fn message-received-count {:left {:id 7, :path "/photos/h2k3j4h9h23"}, :right {:id 7, :path "/photos/h2k3j4h9h23"}}) + times 1 + kvs (repeat times message-key-value) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + streams (start-streams {:default {:handler-fn handler-fn + :consumer-type :stream-joins + :default-key-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde" + :default-value-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :input-topics] {:topic {} :another-test-topic {}}) + (assoc-in [:stream-router :default :join-cfg] {:topic-and-another-test-topic {:join-window-ms 5000 :join-type :left}}) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))] + (Thread/sleep 10000) ;;waiting for streams to start + (IntegrationTestUtils/produceKeyValuesSynchronously "topic" + kvs + (props) + (MockTime.)) + (IntegrationTestUtils/produceKeyValuesSynchronously "another-test-topic" + kvs + (props) + (MockTime.)) + (Thread/sleep 5000) ;;wating for streams to consume messages + (stop-streams streams) + (is (= times @message-received-count)))) + (testing "stream joins using outer join" + (let [message-received-count (atom 0) + mapped-fn (get-mapped-fn message-received-count {:left {:id 7, :path "/photos/h2k3j4h9h23"}, :right {:id 7, :path "/photos/h2k3j4h9h23"}}) + times 1 + kvs (repeat times message-key-value) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + streams (start-streams {:default {:handler-fn handler-fn + :consumer-type :stream-joins + :default-key-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde" + :default-value-serde "org.apache.kafka.common.serialization.Serdes$ByteArraySerde"}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :input-topics] {:topic {} :another-test-topic {}}) + (assoc-in [:stream-router :default :join-cfg] {:topic-and-another-test-topic {:join-window-ms 5000 :join-type :outer}}) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))] + (Thread/sleep 10000) ;;waiting for streams to start + (IntegrationTestUtils/produceKeyValuesSynchronously "topic" + kvs + (props) + (MockTime.)) + (IntegrationTestUtils/produceKeyValuesSynchronously "another-test-topic" + kvs + (props) + (MockTime.)) + (Thread/sleep 5000) ;;wating for streams to consume messages + (stop-streams streams) + (is (= times @message-received-count))))) + (deftest start-streams-test-with-string-serde (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count json-message)