From 072386d00948657174a4ddf77b8120566397ddf3 Mon Sep 17 00:00:00 2001 From: Michael Angelo Calimlim Date: Wed, 28 Jul 2021 17:37:44 +0800 Subject: [PATCH] include the metadata in the message-payload --- src/ziggurat/header_transformer.clj | 11 ++- src/ziggurat/mapper.clj | 8 +- src/ziggurat/message_payload.clj | 4 +- src/ziggurat/middleware/default.clj | 2 +- src/ziggurat/middleware/json.clj | 2 +- src/ziggurat/middleware/stream_joins.clj | 2 +- src/ziggurat/streams.clj | 2 +- test/ziggurat/header_transformer_test.clj | 42 +++++---- test/ziggurat/middleware/default_test.clj | 13 ++- test/ziggurat/middleware/json_test.clj | 53 ++++++----- .../ziggurat/middleware/stream_joins_test.clj | 22 +++-- test/ziggurat/streams_test.clj | 88 +++++++++---------- 12 files changed, 139 insertions(+), 110 deletions(-) diff --git a/src/ziggurat/header_transformer.clj b/src/ziggurat/header_transformer.clj index d527d9c4..e05c5b53 100644 --- a/src/ziggurat/header_transformer.clj +++ b/src/ziggurat/header_transformer.clj @@ -4,9 +4,14 @@ (deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformer (^void init [_ ^ProcessorContext context] - (do (set! processor-context context) - nil)) - (transform [_ record-value] {:value record-value :headers (.headers processor-context)}) + (set! processor-context context)) + (transform [_ record-value] + (let [topic (.topic processor-context) + timestamp (.timestamp processor-context) + partition (.partition processor-context) + headers (.headers processor-context) + metadata {:topic topic :timestamp timestamp :partition partition}] + {:value record-value :headers headers :metadata metadata})) (close [_] nil)) (defn create [] diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj index c82fc33e..0ed457b2 100644 --- a/src/ziggurat/mapper.clj +++ b/src/ziggurat/mapper.clj @@ -13,7 +13,7 @@ (producer/publish-to-channel-instant-queue return-code message-payload)) (defn mapper-func [mapper-fn channels] - (fn [{:keys [topic-entity message] :as message-payload}] + (fn [{:keys [topic-entity] :as message-payload}] (let [service-name (:app-name (ziggurat-config)) topic-entity-name (name topic-entity) new-relic-transaction-name (str topic-entity-name ".handler-fn") @@ -29,7 +29,7 @@ (nr/with-tracing "job" new-relic-transaction-name (try (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn message) + return-code (mapper-fn (dissoc message-payload :topic-entity)) end-time (.toEpochMilli (Instant/now)) time-val (- end-time start-time) execution-time-namespace "handler-fn-execution-time" @@ -51,7 +51,7 @@ (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags))))))) (defn channel-mapper-func [mapper-fn channel] - (fn [{:keys [topic-entity message] :as message-payload}] + (fn [{:keys [topic-entity] :as message-payload}] (let [service-name (:app-name (ziggurat-config)) topic-entity-name (name topic-entity) channel-name (name channel) @@ -68,7 +68,7 @@ (nr/with-tracing "job" metric-namespace (try (let [start-time (.toEpochMilli (Instant/now)) - return-code (mapper-fn message) + return-code (mapper-fn (dissoc message-payload :topic-entity)) end-time (.toEpochMilli (Instant/now)) time-val (- end-time start-time) execution-time-namespace "execution-time" diff --git a/src/ziggurat/message_payload.clj b/src/ziggurat/message_payload.clj index 5bbd19e8..f45e54ca 100644 --- a/src/ziggurat/message_payload.clj +++ b/src/ziggurat/message_payload.clj @@ -4,5 +4,5 @@ (defrecord MessagePayload [message topic-entity]) (defn mk-message-payload - [msg topic-entity] - {:message msg :topic-entity (name topic-entity)}) \ No newline at end of file + [msg topic-entity metadata] + {:message msg :topic-entity (name topic-entity) :metadata metadata}) diff --git a/src/ziggurat/middleware/default.clj b/src/ziggurat/middleware/default.clj index b60b2c3f..7d44f85c 100644 --- a/src/ziggurat/middleware/default.clj +++ b/src/ziggurat/middleware/default.clj @@ -89,4 +89,4 @@ (protobuf->hash handler-fn proto-class topic-entity-name false)) ([handler-fn proto-class topic-entity-name flatten-protobuf-struct?] (fn [message] - (handler-fn (deserialize-message message proto-class topic-entity-name flatten-protobuf-struct?))))) + (handler-fn (assoc message :message (deserialize-message (:message message) proto-class topic-entity-name flatten-protobuf-struct?)))))) diff --git a/src/ziggurat/middleware/json.clj b/src/ziggurat/middleware/json.clj index 9f7f19ce..ec045452 100644 --- a/src/ziggurat/middleware/json.clj +++ b/src/ziggurat/middleware/json.clj @@ -45,4 +45,4 @@ (parse-json handler-fn topic-entity true)) ([handler-fn topic-entity key-fn] (fn [message] - (handler-fn (deserialize-json message topic-entity key-fn))))) + (handler-fn (assoc message :message (deserialize-json (:message message) topic-entity key-fn)))))) diff --git a/src/ziggurat/middleware/stream_joins.clj b/src/ziggurat/middleware/stream_joins.clj index 7a839f6d..7190c60a 100644 --- a/src/ziggurat/middleware/stream_joins.clj +++ b/src/ziggurat/middleware/stream_joins.clj @@ -18,4 +18,4 @@ "This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap" [handler-fn proto-class topic-entity-name] (fn [message] - (handler-fn (deserialize-stream-joins-message message proto-class topic-entity-name)))) + (handler-fn (assoc message :message (deserialize-stream-joins-message (:message message) proto-class topic-entity-name))))) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 1217f5c7..3c3190a1 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -137,7 +137,7 @@ (.asChildOf t parent-ctx)) (.start t))] (try - ((mapper-func handler-fn channels) (mk-message-payload (:value message) topic-entity)) + ((mapper-func handler-fn channels) (mk-message-payload (:value message) topic-entity (:metadata message))) (finally (.finish span))))) diff --git a/test/ziggurat/header_transformer_test.clj b/test/ziggurat/header_transformer_test.clj index 6d831409..e3c192b5 100644 --- a/test/ziggurat/header_transformer_test.clj +++ b/test/ziggurat/header_transformer_test.clj @@ -1,23 +1,35 @@ (ns ziggurat.header-transformer-test - (:require [clojure.test :refer :all] - [ziggurat.header-transformer :refer :all]) - (:import [org.apache.kafka.streams.processor ProcessorContext] - [org.apache.kafka.common.header.internals RecordHeaders RecordHeader])) + (:require [clojure.test :refer [deftest is testing]] + [ziggurat.header-transformer :refer [create]]) + (:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader] + [org.apache.kafka.streams.processor ProcessorContext])) (deftest header-transformer-test (testing "transforms value with passed headers" - (let [headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value"))))) - context (reify ProcessorContext - (headers [_] headers)) - transformer (create) - _ (.init transformer context) + (let [headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value"))))) + topic "topic" + timestamp 1234567890 + partition 1 + context (reify ProcessorContext + (headers [_] headers) + (topic [_] topic) + (timestamp [_] timestamp) + (partition [_] partition)) + transformer (create) + _ (.init transformer context) transformed-val (.transform transformer "val")] - (is (= {:value "val" :headers headers} transformed-val)))) + (is (= {:value "val" :headers headers :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val)))) (testing "transforms value with nil headers when not passed" - (let [context (reify ProcessorContext - (headers [_] nil)) - transformer (create) - _ (.init transformer context) + (let [topic "topic" + timestamp 1234567890 + partition 1 + context (reify ProcessorContext + (headers [_] nil) + (topic [_] topic) + (timestamp [_] timestamp) + (partition [_] partition)) + transformer (create) + _ (.init transformer context) transformed-val (.transform transformer "val")] - (is (= {:value "val" :headers nil} transformed-val))))) + (is (= {:value "val" :headers nil :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val))))) diff --git a/test/ziggurat/middleware/default_test.clj b/test/ziggurat/middleware/default_test.clj index f6f32cac..7296f6f4 100644 --- a/test/ziggurat/middleware/default_test.clj +++ b/test/ziggurat/middleware/default_test.clj @@ -79,16 +79,15 @@ :path "/photos/h2k3j4h9h23"} proto-class Example$Photo topic-entity-name "test" - proto-message (proto/->bytes (proto/create proto-class message)) + proto-message {:message (proto/->bytes (proto/create proto-class message)) :metadata {:topic "topic" :timestamp 1234567890 :partition 1}} handler-fn (fn [msg] - (when (= msg message) + (when (= (:message msg) message) (reset! handler-fn-called? true)))] ((mw/protobuf->hash handler-fn proto-class topic-entity-name) proto-message) (is (true? @handler-fn-called?)))) (testing "When an already deserialised message is passed to the function it calls the handler fn without altering it" (let [handler-fn-called? (atom false) - message {:id 7 - :path "/photos/h2k3j4h9h23"} + message {:message {:id 7 :path "/photos/h2k3j4h9h23"} :metadata {:topic "topic" :timestamp 1234567890 :partition 1}} proto-class Example$Photo topic-entity-name "test" handler-fn (fn [msg] @@ -101,11 +100,11 @@ metric-reporter-called? (atom false) topic-entity-name "test" handler-fn (fn [msg] - (when (nil? msg) + (when (nil? (:message msg)) (reset! handler-fn-called? true)))] (with-redefs [metrics/multi-ns-increment-count (fn [_ _ _] (reset! metric-reporter-called? true))] - ((mw/protobuf->hash handler-fn nil topic-entity-name) nil)) + ((mw/protobuf->hash handler-fn nil topic-entity-name) {:message nil :metadata {:topic "topic" :timestamp 1234567890 :partition 1}})) (is (true? @handler-fn-called?)) (is (true? @metric-reporter-called?)))) (testing "using the new deserializer function" @@ -114,7 +113,7 @@ message {:id 7 :path "/photos/h2k3j4h9h23"} proto-class Example$Photo - proto-message (proto/->bytes (proto/create Example$Photo message))] + proto-message {:message (proto/->bytes (proto/create Example$Photo message)) :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}] (with-redefs [mw/deserialize-message (fn [_ _ _ _] (reset! deserialize-message-called? true))] ((mw/protobuf->hash (constantly nil) proto-class topic-entity-name) proto-message) (is (true? @deserialize-message-called?)))))) diff --git a/test/ziggurat/middleware/json_test.clj b/test/ziggurat/middleware/json_test.clj index 994e2b57..032b2910 100644 --- a/test/ziggurat/middleware/json_test.clj +++ b/test/ziggurat/middleware/json_test.clj @@ -1,9 +1,9 @@ (ns ziggurat.middleware.json-test - (:require [clojure.test :refer :all] - [cheshire.core :refer [generate-string]] - [ziggurat.middleware.json :refer [parse-json]] + (:require [cheshire.core :refer [generate-string]] + [clojure.test :refer [deftest is join-fixtures testing use-fixtures]] [ziggurat.fixtures :as fix] - [ziggurat.metrics :as metrics])) + [ziggurat.metrics :as metrics] + [ziggurat.middleware.json :refer [parse-json]])) (use-fixtures :once (join-fixtures [fix/mount-only-config fix/silence-logging])) @@ -11,49 +11,56 @@ (deftest parse-json-test (testing "Given a handler function (without passing key-fn), parse-json should call that function on after deserializing the string to JSON object." (let [handler-fn-called? (atom false) - message {:a "A" + json-message {:a "A" :b "B"} topic-entity-name "test" - handler-fn (fn [msg] - (if (= msg message) + handler-fn (fn [{:keys [message metadata]}] + (when (and (= json-message message) + (= (:topic metadata) "topic") + (= (:timestamp metadata) 1234567890) + (= (:partition metadata) 1)) (reset! handler-fn-called? true)))] - ((parse-json handler-fn topic-entity-name) (generate-string message)) + ((parse-json handler-fn topic-entity-name) {:message (generate-string json-message) :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}) (is (true? @handler-fn-called?)))) (testing "Given a handler function and key-fn as false, parse-json should call that function on after deserializing the string without coercing the keys to keywords." (let [handler-fn-called? (atom false) - message {:a "A" - :b "B"} - expected-output {"a" "A" "b" "B"} + json-message {"a" "A" "b" "B"} topic-entity-name "test" - handler-fn (fn [msg] - (if (= msg expected-output) + handler-fn (fn [{:keys [message metadata]}] + (when (and (= json-message message) + (= (:topic metadata) "topic") + (= (:timestamp metadata) 1234567890) + (= (:partition metadata) 1)) (reset! handler-fn-called? true)))] - ((parse-json handler-fn topic-entity-name false) (generate-string message)) + ((parse-json handler-fn topic-entity-name false) {:message (generate-string json-message) :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}) (is (true? @handler-fn-called?)))) (testing "Given a handler function and a key-fn, parse-json should call that function after deserializing the string by applying key-fn to keys." (let [handler-fn-called? (atom false) key-fn (fn [k] (str k "-modified")) - message {"a" "A" + json-message {"a" "A" "b" "B"} - expected-output {"a-modified" "A" "b-modified" "B"} topic-entity-name "test" - handler-fn (fn [msg] - (if (= msg expected-output) + handler-fn (fn [{:keys [message metadata]}] + (is (= {"a-modified" "A", "b-modified" "B"} message)) + (when (and (= {"a-modified" "A", "b-modified" "B"} message) + (= (:topic metadata) "topic") + (= (:timestamp metadata) 1234567890) + (= (:partition metadata) 1)) (reset! handler-fn-called? true)))] - ((parse-json handler-fn topic-entity-name key-fn) (generate-string message)) + ((parse-json handler-fn topic-entity-name key-fn) {:message (generate-string json-message) :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}) (is (true? @handler-fn-called?)))) (testing "Should report metrics when JSON deserialization fails" (let [handler-fn-called? (atom false) metric-reporter-called? (atom false) topic-entity-name "test" - message "{\"foo\":\"bar" - handler-fn (fn [msg] - (if (nil? msg) + json-message "{\"foo\":\"bar" + handler-fn (fn [{:keys [message _]}] + (when (nil? message) (reset! handler-fn-called? true)))] (with-redefs [metrics/multi-ns-increment-count (fn [_ _ _] (reset! metric-reporter-called? true))] - ((parse-json handler-fn topic-entity-name true) message)) + ((parse-json handler-fn topic-entity-name true) {:message json-message :metadata {:topic "topic" :timestamp 1234567890 :partition 1}})) (is (true? @handler-fn-called?)) (is (true? @metric-reporter-called?))))) diff --git a/test/ziggurat/middleware/stream_joins_test.clj b/test/ziggurat/middleware/stream_joins_test.clj index 46de4cc2..16fd24fa 100644 --- a/test/ziggurat/middleware/stream_joins_test.clj +++ b/test/ziggurat/middleware/stream_joins_test.clj @@ -19,11 +19,14 @@ topic-entity-name "test" left-proto-message (proto/->bytes (proto/create Example$Photo left-message)) right-proto-message (proto/->bytes (proto/create Example$Photo right-message)) - handler-fn (fn [{:keys [left right]}] - (if (and (= left left-message) - (= right right-message)) + handler-fn (fn [{:keys [message metadata]}] + (when (and (= (:left message) left-message) + (= (:right message) right-message) + (= (:topic metadata) "topic") + (= (:timestamp metadata) 1234567890) + (= (:partition metadata) 1)) (reset! handler-fn-called? true)))] - ((sjmw/protobuf->hash handler-fn proto-class topic-entity-name) {:left left-proto-message :right right-proto-message}) + ((sjmw/protobuf->hash handler-fn proto-class topic-entity-name) {:message {:left left-proto-message :right right-proto-message} :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}) (is (true? @handler-fn-called?)))) (testing "deserialize a message from a stream join using 2 proto classes" (let [handler-fn-called? (atom false) @@ -35,9 +38,12 @@ topic-entity-name "test" left-proto-message (proto/->bytes (proto/create Example$Photo left-message)) right-proto-message (proto/->bytes (proto/create Example$Photo right-message)) - handler-fn (fn [{:keys [left right]}] - (if (and (= left left-message) - (= right right-message)) + handler-fn (fn [{:keys [message metadata]}] + (when (and (= (:left message) left-message) + (= (:right message) right-message) + (= (:topic metadata) "topic") + (= (:timestamp metadata) 1234567890) + (= (:partition metadata) 1)) (reset! handler-fn-called? true)))] - ((sjmw/protobuf->hash handler-fn [proto-class proto-class] topic-entity-name) {:left left-proto-message :right right-proto-message}) + ((sjmw/protobuf->hash handler-fn [proto-class proto-class] topic-entity-name) {:message {:left left-proto-message :right right-proto-message} :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}) (is (true? @handler-fn-called?))))) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 98925947..ee5961a9 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -56,9 +56,9 @@ (get-mapped-fn message-received-count message)) ([message-received-count expected-message] (fn [message-from-kafka] - (when (= expected-message message-from-kafka) - (swap! message-received-count inc)) - :success))) + (when (= expected-message (:message message-from-kafka)) + (swap! message-received-count inc) + :success)))) (defn- poll-to-check-if-running ([stream] @@ -119,8 +119,8 @@ handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) _ (mount/start)] (mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}} - (ziggurat-config))) - :stop (fn [] (stop-streams #'ziggurat.streams/stream))}})) + (ziggurat-config))) + :stop (fn [] (stop-streams #'ziggurat.streams/stream))}})) (poll-to-check-if-running ziggurat.streams/stream) (stop-stream :default) (is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING))) @@ -131,8 +131,8 @@ handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) _ (mount/start)] (mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}} - (ziggurat-config))) - :stop (fn [] (stop-streams ziggurat.streams/stream))}})) + (ziggurat-config))) + :stop (fn [] (stop-streams ziggurat.streams/stream))}})) (poll-to-check-if-running ziggurat.streams/stream) (stop-stream :default) (is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING)) @@ -146,8 +146,8 @@ handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) _ (mount/start)] (mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}} - (ziggurat-config))) - :stop (fn [] (stop-streams ziggurat.streams/stream))}})) + (ziggurat-config))) + :stop (fn [] (stop-streams ziggurat.streams/stream))}})) (poll-to-check-if-running ziggurat.streams/stream) (stop-stream :default) (is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING)) @@ -161,15 +161,16 @@ (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count) handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) - _ (mount/start)] - (mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn} - :using-string-serde {:handler-fn handler-fn}} - (-> (ziggurat-config) - (assoc-in [:stream-router :default :application-id] (rand-application-id)) - (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor) - (assoc-in [:stream-router :using-string-serde :application-id] (rand-application-id)) - (assoc-in [:stream-router :using-string-serde :changelog-topic-replication-factor] changelog-topic-replication-factor)))) - :stop (fn [] (stop-streams ziggurat.streams/stream))}})) + _ (mount/start) + stream-routes {:default {:handler-fn handler-fn} + :using-string-serde {:handler-fn handler-fn}}] + (mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams stream-routes + (-> (ziggurat-config) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor) + (assoc-in [:stream-router :using-string-serde :application-id] (rand-application-id)) + (assoc-in [:stream-router :using-string-serde :changelog-topic-replication-factor] changelog-topic-replication-factor)))) + :stop (fn [] (stop-streams ziggurat.streams/stream))}})) (poll-to-check-if-running ziggurat.streams/stream) (stop-stream :default) (is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING)) @@ -181,34 +182,34 @@ handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) _ (mount/start)] (mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}} - (ziggurat-config))) - :stop (fn [] (stop-streams ziggurat.streams/stream))}})) + (ziggurat-config))) + :stop (fn [] (stop-streams ziggurat.streams/stream))}})) (poll-to-check-if-running ziggurat.streams/stream) (stop-stream :default) (is (not= (.state (get ziggurat.streams/stream :default)) KafkaStreams$State/RUNNING)) (stop-stream :default)) (deftest stop-invalid-stream-test - (let [is-close-called (atom 0) - mapped-fn (get-mapped-fn is-close-called) - handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) - _ (mount/start)] + (let [is-close-called (atom 0) + mapped-fn (get-mapped-fn is-close-called) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + _ (mount/start)] (mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}} - (ziggurat-config))) - :stop (fn [] (stop-streams ziggurat.streams/stream))}}) + (ziggurat-config))) + :stop (fn [] (stop-streams ziggurat.streams/stream))}}) (poll-to-check-if-running ziggurat.streams/stream) (with-redefs [ziggurat.streams/close-stream (fn [] (swap! is-close-called inc))] (stop-stream :invalid-topic-entity) (is (= @is-close-called 0))))) (deftest start-invalid-stream-test - (let [is-close-called (atom 0) - mapped-fn (get-mapped-fn is-close-called) - handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) - _ (mount/start)] + (let [is-close-called (atom 0) + mapped-fn (get-mapped-fn is-close-called) + handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + _ (mount/start)] (mount/start-with-states {#'ziggurat.streams/stream {:start (fn [] (start-streams {:default {:handler-fn handler-fn}} - (ziggurat-config))) - :stop (fn [] (stop-streams ziggurat.streams/stream))}})) + (ziggurat-config))) + :stop (fn [] (stop-streams ziggurat.streams/stream))}})) (let [is-close-called (atom 0)] (with-redefs [mount/start-with-states (fn [] (swap! is-close-called inc))] (start-stream :invalid-topic-entity) @@ -218,7 +219,7 @@ (testing "stream joins using inner join" (let [orig-config (ziggurat-config)] (with-redefs [ziggurat-config (fn [] (-> orig-config - (assoc-in [:alpha-features :stream-joins] true)))] + (assoc-in [:alpha-features :stream-joins] true)))] (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message}) times 1 @@ -248,7 +249,7 @@ (testing "stream joins using left join" (let [orig-config (ziggurat-config)] (with-redefs [ziggurat-config (fn [] (-> orig-config - (assoc-in [:alpha-features :stream-joins] true)))] + (assoc-in [:alpha-features :stream-joins] true)))] (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message}) times 1 @@ -278,7 +279,7 @@ (testing "stream joins using outer join" (let [orig-config (ziggurat-config)] (with-redefs [ziggurat-config (fn [] (-> orig-config - (assoc-in [:alpha-features :stream-joins] true)))] + (assoc-in [:alpha-features :stream-joins] true)))] (let [message-received-count (atom 0) mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message}) times 1 @@ -308,15 +309,15 @@ (testing "stream-joins should not start if :alpha-features for stream-joins is `false`" (let [original-config (ziggurat-config)] (with-redefs [ziggurat-config (fn [] (-> original-config - (assoc-in [:alpha-features :stream-joins] false)))] + (assoc-in [:alpha-features :stream-joins] false)))] (let [handler-fn (constantly nil) - streams (start-streams {:default {:handler-fn handler-fn}} - (-> (ziggurat-config) - (assoc-in [:stream-router :default :consumer-type] :stream-joins) - (assoc-in [:stream-router :default :input-topics] {:topic {:name "topic"} :another-test-topic {:name "another-test-topic"}}) - (assoc-in [:stream-router :default :join-cfg] {:topic-and-another-test-topic {:join-window-ms 5000 :join-type :outer}}) - (assoc-in [:stream-router :default :application-id] (rand-application-id)) - (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))] + streams (start-streams {:default {:handler-fn handler-fn}} + (-> (ziggurat-config) + (assoc-in [:stream-router :default :consumer-type] :stream-joins) + (assoc-in [:stream-router :default :input-topics] {:topic {:name "topic"} :another-test-topic {:name "another-test-topic"}}) + (assoc-in [:stream-router :default :join-cfg] {:topic-and-another-test-topic {:join-window-ms 5000 :join-type :outer}}) + (assoc-in [:stream-router :default :application-id] (rand-application-id)) + (assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))] (is (empty? streams))))))) (deftest start-streams-test-with-string-serde @@ -467,4 +468,3 @@ (testing "should return REPLACE_THREAD" (let [r (handle-uncaught-exception :replace-thread t)] (is (= r StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse/REPLACE_THREAD)))))) -