diff --git a/README.md b/README.md index bbc23d33..17a7c6d8 100644 --- a/README.md +++ b/README.md @@ -278,6 +278,9 @@ This will allow an actor to join messages from 2 topics into 1 result. To be abl And your actor's handler function be like ```clojure +(ns my-actor + (:require [ziggurat.middleware.stream-joins :as mw])) + (def handler-func (-> main-func (mw/protobuf->hash [com.gojek.esb.booking.BookingLogMessage com.gojek.esb.booking.BookingLogMessage] :booking))) @@ -291,11 +294,11 @@ Your handler function will receive a message in the following format/structure {:topic-1-key "message-from-1st-topic" :topic-2-key "message-from-2nd-topic"} ``` -## Connecting to a RabbitMQ cluster (alpha feature) +## Connecting to a RabbitMQ cluster (alpha feature) * To connect to RabbitMQ clusters add the following config to your `config.edn` ```clojure {:ziggurat {:messaging {:constructor "ziggurat.messaging.rabbitmq-cluster-wrapper/->RabbitMQMessaging" - :rabbit-mq-connection {:hosts "g-lambda-lambda-rabbitmq-a-01,g-lambda-lambda-rabbitmq-a-02,g-lambda-lambda-rabbitmq-a-03" + :rabbit-mq-connection {:hosts "g-lambda-lambda-rabbitmq-a-01,g-lambda-lambda-rabbitmq-a-02,g-lambda-lambda-rabbitmq-a-03" :port [5672 :int] :prefetch-count [3 :int] :username "guest" @@ -304,7 +307,7 @@ Your handler function will receive a message in the following format/structure :admin-port [15672 :int] :ha-sync-mode "automatic" :channel-timeout [2000 :int]}}}} -``` +``` * `:hosts` is a comma separated values of RabbitMQ hostnames (dns-names OR IPs). * `:port` specifies the port number on which the RabbitMQ nodes are running. * `:ha-mode` is set to `"all"`. diff --git a/src/ziggurat/middleware/default.clj b/src/ziggurat/middleware/default.clj index 1e527320..92f8eec0 100644 --- a/src/ziggurat/middleware/default.clj +++ b/src/ziggurat/middleware/default.clj @@ -5,7 +5,7 @@ [ziggurat.metrics :as metrics] [ziggurat.sentry :refer [sentry-reporter]])) -(defn- deserialize-message +(defn deserialize-message "This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a Clojure PersistentHashMap. Temporary logic for migration of services to Ziggurat V3.0 @@ -32,28 +32,8 @@ nil))) message)) -(defprotocol Deserializable - (deserialize [message proto-class topic-entity-name])) - -(defrecord RegularMessage [message]) -(defrecord StreamJoinsMessage [message]) - -(extend-type RegularMessage - Deserializable - (deserialize [this proto-class topic-entity-name] - (deserialize-message (:message this) proto-class topic-entity-name))) - -(extend-type StreamJoinsMessage - Deserializable - (deserialize [this proto-class topic-entity-name] - (reduce - (fn [[k1 v1] [k2 v2]] - {k1 (deserialize-message v1 (if (vector? proto-class) (first proto-class) proto-class) topic-entity-name) - k2 (deserialize-message v2 (if (vector? proto-class) (second proto-class) proto-class) topic-entity-name)}) - (:message this)))) - (defn protobuf->hash "This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap" [handler-fn proto-class topic-entity-name] (fn [message] - (handler-fn (deserialize message proto-class topic-entity-name)))) + (handler-fn (deserialize-message message proto-class topic-entity-name)))) diff --git a/src/ziggurat/middleware/stream_joins.clj b/src/ziggurat/middleware/stream_joins.clj new file mode 100644 index 00000000..7a839f6d --- /dev/null +++ b/src/ziggurat/middleware/stream_joins.clj @@ -0,0 +1,21 @@ +(ns ziggurat.middleware.stream-joins + (:require [ziggurat.middleware.default :as dmw])) + +(defn- deserialize-stream-joins-message + "This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a + Clojure PersistentHashMap. + Temporary logic for migration of services to Ziggurat V3.0 + If the message is of type map, the function just returns the map as it is. In older versions of Ziggurat (< 3.0) we stored + the messages in deserialized formats in RabbitMQ and those messages can be processed by this function. So we have this logic here." + [message proto-class topic-entity-name] + (reduce + (fn [[k1 v1] [k2 v2]] + {k1 (dmw/deserialize-message v1 (if (vector? proto-class) (first proto-class) proto-class) topic-entity-name) + k2 (dmw/deserialize-message v2 (if (vector? proto-class) (second proto-class) proto-class) topic-entity-name)}) + message)) + +(defn protobuf->hash + "This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap" + [handler-fn proto-class topic-entity-name] + (fn [message] + (handler-fn (deserialize-stream-joins-message message proto-class topic-entity-name)))) diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj index 4029be33..27554baf 100644 --- a/src/ziggurat/streams.clj +++ b/src/ziggurat/streams.clj @@ -5,7 +5,6 @@ [ziggurat.config :refer [ziggurat-config]] [ziggurat.header-transformer :as header-transformer] [ziggurat.mapper :refer [mapper-func ->MessagePayload]] - [ziggurat.middleware.default :as mw] [ziggurat.metrics :as metrics] [ziggurat.timestamp-transformer :as timestamp-transformer] [ziggurat.util.map :as umap] @@ -23,8 +22,7 @@ [io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier] [io.opentracing.contrib.kafka TracingKafkaUtils] [io.opentracing.tag Tags] - [java.time Duration] - [ziggurat.middleware.default StreamJoinsMessage])) + [java.time Duration])) (def default-config-for-stream {:buffered-records-per-partition 10000 @@ -199,7 +197,7 @@ join-type (:join-type cfg-1) value-joiner (reify ValueJoiner (apply [_ left right] - (mw/->StreamJoinsMessage {topic-key-1 left topic-key-2 right}))) + {topic-key-1 left topic-key-2 right})) out-strm (if cfg-1 (case join-type :left (.leftJoin strm-1 strm-2 value-joiner join-window-ms) @@ -221,12 +219,6 @@ (map-values #(traced-handler-fn handler-fn channels % topic-entity))) (.build builder))) -(defn- normalize-message-value [message] - (let [msg-val (:value message)] - (if (string? msg-val) - message ;; TODO: this should be JsonMessage - (assoc-in message [:value] (mw/->RegularMessage msg-val))))) - (defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels] (let [builder (StreamsBuilder.) topic-entity-name (name topic-entity) @@ -236,7 +228,6 @@ (timestamp-transform-values topic-entity-name oldest-processed-message-in-s) (header-transform-values) (map-values #(log-and-report-metrics topic-entity-name %)) - (map-values normalize-message-value) (map-values #(traced-handler-fn handler-fn channels % topic-entity))) (.build builder))) diff --git a/test/ziggurat/middleware/default_test.clj b/test/ziggurat/middleware/default_test.clj index 37f68cc1..8ca8a8ae 100644 --- a/test/ziggurat/middleware/default_test.clj +++ b/test/ziggurat/middleware/default_test.clj @@ -4,8 +4,7 @@ [ziggurat.fixtures :as fix] [ziggurat.metrics :as metrics] [ziggurat.middleware.default :as mw]) - (:import [flatland.protobuf.test Example$Photo] - [ziggurat.middleware.default RegularMessage StreamJoinsMessage])) + (:import [flatland.protobuf.test Example$Photo])) (use-fixtures :once (join-fixtures [fix/mount-only-config fix/silence-logging])) @@ -21,39 +20,7 @@ handler-fn (fn [msg] (if (= msg message) (reset! handler-fn-called? true)))] - ((mw/protobuf->hash handler-fn proto-class topic-entity-name) (mw/->RegularMessage proto-message)) - (is (true? @handler-fn-called?)))) - (testing "deserialize a message from a stream join" - (let [handler-fn-called? (atom false) - left-message {:id 123 - :path "/path/to/left"} - right-message {:id 456 - :path "/path/to/right"} - proto-class Example$Photo - topic-entity-name "test" - left-proto-message (proto/->bytes (proto/create Example$Photo left-message)) - right-proto-message (proto/->bytes (proto/create Example$Photo right-message)) - handler-fn (fn [{:keys [left right]}] - (if (and (= left left-message) - (= right right-message)) - (reset! handler-fn-called? true)))] - ((mw/protobuf->hash handler-fn proto-class topic-entity-name) (mw/->StreamJoinsMessage {:left left-proto-message :right right-proto-message})) - (is (true? @handler-fn-called?)))) - (testing "deserialize a message from a stream join using 2 proto classes" - (let [handler-fn-called? (atom false) - left-message {:id 123 - :path "/path/to/left"} - right-message {:id 456 - :path "/path/to/right"} - proto-class Example$Photo - topic-entity-name "test" - left-proto-message (proto/->bytes (proto/create Example$Photo left-message)) - right-proto-message (proto/->bytes (proto/create Example$Photo right-message)) - handler-fn (fn [{:keys [left right]}] - (if (and (= left left-message) - (= right right-message)) - (reset! handler-fn-called? true)))] - ((mw/protobuf->hash handler-fn [proto-class proto-class] topic-entity-name) (mw/->StreamJoinsMessage {:left left-proto-message :right right-proto-message})) + ((mw/protobuf->hash handler-fn proto-class topic-entity-name) proto-message) (is (true? @handler-fn-called?)))) (testing "When an already deserialised message is passed to the function it calls the handler fn without altering it" (let [handler-fn-called? (atom false) @@ -64,7 +31,7 @@ handler-fn (fn [msg] (if (= msg message) (reset! handler-fn-called? true)))] - ((mw/protobuf->hash handler-fn proto-class topic-entity-name) (mw/->RegularMessage message)) + ((mw/protobuf->hash handler-fn proto-class topic-entity-name) message) (is (true? @handler-fn-called?)))) (testing "When deserialisation fails, it reports to sentry, publishes metrics and passes nil to handler function" (let [handler-fn-called? (atom false) @@ -75,7 +42,7 @@ (reset! handler-fn-called? true)))] (with-redefs [metrics/multi-ns-increment-count (fn [_ _ _] (reset! metric-reporter-called? true))] - ((mw/protobuf->hash handler-fn nil topic-entity-name) (mw/->RegularMessage nil))) + ((mw/protobuf->hash handler-fn nil topic-entity-name) nil)) (is (true? @handler-fn-called?)) (is (true? @metric-reporter-called?)))) (testing "using the new deserializer function" @@ -86,6 +53,5 @@ proto-class Example$Photo proto-message (proto/->bytes (proto/create Example$Photo message))] (with-redefs [mw/deserialize-message (fn [_ _ _] (reset! deserialize-message-called? true))] - ((mw/protobuf->hash (constantly nil) proto-class topic-entity-name) (mw/->RegularMessage proto-message)) + ((mw/protobuf->hash (constantly nil) proto-class topic-entity-name) proto-message) (is (true? @deserialize-message-called?)))))) - diff --git a/test/ziggurat/middleware/stream_joins_test.clj b/test/ziggurat/middleware/stream_joins_test.clj new file mode 100644 index 00000000..ac713943 --- /dev/null +++ b/test/ziggurat/middleware/stream_joins_test.clj @@ -0,0 +1,43 @@ +(ns ziggurat.middleware.stream-joins-test + (:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]] + [protobuf.core :as proto] + [ziggurat.fixtures :as fix] + [ziggurat.middleware.stream-joins :as sjmw]) + (:import [flatland.protobuf.test Example$Photo])) + +(use-fixtures :once (join-fixtures [fix/mount-only-config + fix/silence-logging])) + +(deftest common-protobuf->hash-test + (testing "deserialize a message from a stream join" + (let [handler-fn-called? (atom false) + left-message {:id 123 + :path "/path/to/left"} + right-message {:id 456 + :path "/path/to/right"} + proto-class Example$Photo + topic-entity-name "test" + left-proto-message (proto/->bytes (proto/create Example$Photo left-message)) + right-proto-message (proto/->bytes (proto/create Example$Photo right-message)) + handler-fn (fn [{:keys [left right]}] + (if (and (= left left-message) + (= right right-message)) + (reset! handler-fn-called? true)))] + ((sjmw/protobuf->hash handler-fn proto-class topic-entity-name) {:left left-proto-message :right right-proto-message}) + (is (true? @handler-fn-called?)))) + (testing "deserialize a message from a stream join using 2 proto classes" + (let [handler-fn-called? (atom false) + left-message {:id 123 + :path "/path/to/left"} + right-message {:id 456 + :path "/path/to/right"} + proto-class Example$Photo + topic-entity-name "test" + left-proto-message (proto/->bytes (proto/create Example$Photo left-message)) + right-proto-message (proto/->bytes (proto/create Example$Photo right-message)) + handler-fn (fn [{:keys [left right]}] + (if (and (= left left-message) + (= right right-message)) + (reset! handler-fn-called? true)))] + ((sjmw/protobuf->hash handler-fn [proto-class proto-class] topic-entity-name) {:left left-proto-message :right right-proto-message}) + (is (true? @handler-fn-called?))))) diff --git a/test/ziggurat/streams_test.clj b/test/ziggurat/streams_test.clj index 6527db7e..84b73630 100644 --- a/test/ziggurat/streams_test.clj +++ b/test/ziggurat/streams_test.clj @@ -5,6 +5,7 @@ [ziggurat.fixtures :as fix] [ziggurat.config :refer [ziggurat-config]] [ziggurat.middleware.default :as default-middleware] + [ziggurat.middleware.stream-joins :as stream-joins-middleware] [ziggurat.middleware.json :as json-middleware] [ziggurat.tracer :refer [tracer]] [ziggurat.messaging.producer :as producer]) @@ -103,10 +104,10 @@ (deftest start-stream-joins-test (testing "stream joins using inner join" (let [message-received-count (atom 0) - mapped-fn (get-mapped-fn message-received-count {:topic {:id 7, :path "/photos/h2k3j4h9h23"}, :another-test-topic {:id 7, :path "/photos/h2k3j4h9h23"}}) + mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message}) times 1 kvs (repeat times message-key-value) - handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + handler-fn (stream-joins-middleware/protobuf->hash mapped-fn proto-class :default) streams (start-streams {:default {:handler-fn handler-fn}} (-> (ziggurat-config) (assoc-in [:stream-router :default :consumer-type] :stream-joins) @@ -128,10 +129,10 @@ (is (= times @message-received-count)))) (testing "stream joins using left join" (let [message-received-count (atom 0) - mapped-fn (get-mapped-fn message-received-count {:topic {:id 7, :path "/photos/h2k3j4h9h23"}, :another-test-topic {:id 7, :path "/photos/h2k3j4h9h23"}}) + mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message}) times 1 kvs (repeat times message-key-value) - handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + handler-fn (stream-joins-middleware/protobuf->hash mapped-fn proto-class :default) streams (start-streams {:default {:handler-fn handler-fn}} (-> (ziggurat-config) (assoc-in [:stream-router :default :consumer-type] :stream-joins) @@ -153,10 +154,10 @@ (is (= times @message-received-count)))) (testing "stream joins using outer join" (let [message-received-count (atom 0) - mapped-fn (get-mapped-fn message-received-count {:topic {:id 7, :path "/photos/h2k3j4h9h23"}, :another-test-topic {:id 7, :path "/photos/h2k3j4h9h23"}}) + mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message}) times 1 kvs (repeat times message-key-value) - handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default) + handler-fn (stream-joins-middleware/protobuf->hash mapped-fn proto-class :default) streams (start-streams {:default {:handler-fn handler-fn}} (-> (ziggurat-config) (assoc-in [:stream-router :default :consumer-type] :stream-joins)