Skip to content

Commit

Permalink
Merge 0ad2b2e into 236ab90
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim authored Jul 17, 2020
2 parents 236ab90 + 0ad2b2e commit 528c9f2
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 81 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ This will allow an actor to join messages from 2 topics into 1 result. To be abl

And your actor's handler function be like
```clojure
(ns my-actor
(:require [ziggurat.middleware.stream-joins :as mw))

(def handler-func
(-> main-func
(mw/protobuf->hash [com.gojek.esb.booking.BookingLogMessage com.gojek.esb.booking.BookingLogMessage] :booking)))
Expand All @@ -291,11 +294,11 @@ Your handler function will receive a message in the following format/structure
{:topic-1-key "message-from-1st-topic" :topic-2-key "message-from-2nd-topic"}
```

## Connecting to a RabbitMQ cluster (alpha feature)
## Connecting to a RabbitMQ cluster (alpha feature)
* To connect to RabbitMQ clusters add the following config to your `config.edn`
```clojure
{:ziggurat {:messaging {:constructor "ziggurat.messaging.rabbitmq-cluster-wrapper/->RabbitMQMessaging"
:rabbit-mq-connection {:hosts "g-lambda-lambda-rabbitmq-a-01,g-lambda-lambda-rabbitmq-a-02,g-lambda-lambda-rabbitmq-a-03"
:rabbit-mq-connection {:hosts "g-lambda-lambda-rabbitmq-a-01,g-lambda-lambda-rabbitmq-a-02,g-lambda-lambda-rabbitmq-a-03"
:port [5672 :int]
:prefetch-count [3 :int]
:username "guest"
Expand All @@ -304,7 +307,7 @@ Your handler function will receive a message in the following format/structure
:admin-port [15672 :int]
:ha-sync-mode "automatic"
:channel-timeout [2000 :int]}}}}
```
```
* `:hosts` is a comma separated values of RabbitMQ hostnames (dns-names OR IPs).
* `:port` specifies the port number on which the RabbitMQ nodes are running.
* `:ha-mode` is set to `"all"`.
Expand Down
24 changes: 2 additions & 22 deletions src/ziggurat/middleware/default.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[ziggurat.metrics :as metrics]
[ziggurat.sentry :refer [sentry-reporter]]))

(defn- deserialize-message
(defn deserialize-message
"This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a
Clojure PersistentHashMap.
Temporary logic for migration of services to Ziggurat V3.0
Expand All @@ -32,28 +32,8 @@
nil)))
message))

(defprotocol Deserializable
(deserialize [message proto-class topic-entity-name]))

(defrecord RegularMessage [message])
(defrecord StreamJoinsMessage [message])

(extend-type RegularMessage
Deserializable
(deserialize [this proto-class topic-entity-name]
(deserialize-message (:message this) proto-class topic-entity-name)))

(extend-type StreamJoinsMessage
Deserializable
(deserialize [this proto-class topic-entity-name]
(reduce
(fn [[k1 v1] [k2 v2]]
{k1 (deserialize-message v1 (if (vector? proto-class) (first proto-class) proto-class) topic-entity-name)
k2 (deserialize-message v2 (if (vector? proto-class) (second proto-class) proto-class) topic-entity-name)})
(:message this))))

(defn protobuf->hash
"This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap"
[handler-fn proto-class topic-entity-name]
(fn [message]
(handler-fn (deserialize message proto-class topic-entity-name))))
(handler-fn (deserialize-message message proto-class topic-entity-name))))
21 changes: 21 additions & 0 deletions src/ziggurat/middleware/stream_joins.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
(ns ziggurat.middleware.stream-joins
(:require [ziggurat.middleware.default :as dmw]))

(defn- deserialize-stream-joins-message
"This function takes in the message(proto Byte Array) and the proto-class and deserializes the proto ByteArray into a
Clojure PersistentHashMap.
Temporary logic for migration of services to Ziggurat V3.0
If the message is of type map, the function just returns the map as it is. In older versions of Ziggurat (< 3.0) we stored
the messages in deserialized formats in RabbitMQ and those messages can be processed by this function. So we have this logic here."
[message proto-class topic-entity-name]
(reduce
(fn [[k1 v1] [k2 v2]]
{k1 (dmw/deserialize-message v1 (if (vector? proto-class) (first proto-class) proto-class) topic-entity-name)
k2 (dmw/deserialize-message v2 (if (vector? proto-class) (second proto-class) proto-class) topic-entity-name)})
message))

(defn protobuf->hash
"This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap"
[handler-fn proto-class topic-entity-name]
(fn [message]
(handler-fn (deserialize-stream-joins-message message proto-class topic-entity-name))))
13 changes: 2 additions & 11 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.header-transformer :as header-transformer]
[ziggurat.mapper :refer [mapper-func ->MessagePayload]]
[ziggurat.middleware.default :as mw]
[ziggurat.metrics :as metrics]
[ziggurat.timestamp-transformer :as timestamp-transformer]
[ziggurat.util.map :as umap]
Expand All @@ -23,8 +22,7 @@
[io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier]
[io.opentracing.contrib.kafka TracingKafkaUtils]
[io.opentracing.tag Tags]
[java.time Duration]
[ziggurat.middleware.default StreamJoinsMessage]))
[java.time Duration]))

(def default-config-for-stream
{:buffered-records-per-partition 10000
Expand Down Expand Up @@ -199,7 +197,7 @@
join-type (:join-type cfg-1)
value-joiner (reify ValueJoiner
(apply [_ left right]
(mw/->StreamJoinsMessage {topic-key-1 left topic-key-2 right})))
{topic-key-1 left topic-key-2 right}))
out-strm (if cfg-1
(case join-type
:left (.leftJoin strm-1 strm-2 value-joiner join-window-ms)
Expand All @@ -221,12 +219,6 @@
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(.build builder)))

(defn- normalize-message-value [message]
(let [msg-val (:value message)]
(if (string? msg-val)
message ;; TODO: this should be JsonMessage
(assoc-in message [:value] (mw/->RegularMessage msg-val)))))

(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels]
(let [builder (StreamsBuilder.)
topic-entity-name (name topic-entity)
Expand All @@ -236,7 +228,6 @@
(timestamp-transform-values topic-entity-name oldest-processed-message-in-s)
(header-transform-values)
(map-values #(log-and-report-metrics topic-entity-name %))
(map-values normalize-message-value)
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(.build builder)))

Expand Down
44 changes: 5 additions & 39 deletions test/ziggurat/middleware/default_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
[ziggurat.fixtures :as fix]
[ziggurat.metrics :as metrics]
[ziggurat.middleware.default :as mw])
(:import [flatland.protobuf.test Example$Photo]
[ziggurat.middleware.default RegularMessage StreamJoinsMessage]))
(:import [flatland.protobuf.test Example$Photo]))

(use-fixtures :once (join-fixtures [fix/mount-only-config
fix/silence-logging]))
Expand All @@ -21,39 +20,7 @@
handler-fn (fn [msg]
(if (= msg message)
(reset! handler-fn-called? true)))]
((mw/protobuf->hash handler-fn proto-class topic-entity-name) (mw/->RegularMessage proto-message))
(is (true? @handler-fn-called?))))
(testing "deserialize a message from a stream join"
(let [handler-fn-called? (atom false)
left-message {:id 123
:path "/path/to/left"}
right-message {:id 456
:path "/path/to/right"}
proto-class Example$Photo
topic-entity-name "test"
left-proto-message (proto/->bytes (proto/create Example$Photo left-message))
right-proto-message (proto/->bytes (proto/create Example$Photo right-message))
handler-fn (fn [{:keys [left right]}]
(if (and (= left left-message)
(= right right-message))
(reset! handler-fn-called? true)))]
((mw/protobuf->hash handler-fn proto-class topic-entity-name) (mw/->StreamJoinsMessage {:left left-proto-message :right right-proto-message}))
(is (true? @handler-fn-called?))))
(testing "deserialize a message from a stream join using 2 proto classes"
(let [handler-fn-called? (atom false)
left-message {:id 123
:path "/path/to/left"}
right-message {:id 456
:path "/path/to/right"}
proto-class Example$Photo
topic-entity-name "test"
left-proto-message (proto/->bytes (proto/create Example$Photo left-message))
right-proto-message (proto/->bytes (proto/create Example$Photo right-message))
handler-fn (fn [{:keys [left right]}]
(if (and (= left left-message)
(= right right-message))
(reset! handler-fn-called? true)))]
((mw/protobuf->hash handler-fn [proto-class proto-class] topic-entity-name) (mw/->StreamJoinsMessage {:left left-proto-message :right right-proto-message}))
((mw/protobuf->hash handler-fn proto-class topic-entity-name) proto-message)
(is (true? @handler-fn-called?))))
(testing "When an already deserialised message is passed to the function it calls the handler fn without altering it"
(let [handler-fn-called? (atom false)
Expand All @@ -64,7 +31,7 @@
handler-fn (fn [msg]
(if (= msg message)
(reset! handler-fn-called? true)))]
((mw/protobuf->hash handler-fn proto-class topic-entity-name) (mw/->RegularMessage message))
((mw/protobuf->hash handler-fn proto-class topic-entity-name) message)
(is (true? @handler-fn-called?))))
(testing "When deserialisation fails, it reports to sentry, publishes metrics and passes nil to handler function"
(let [handler-fn-called? (atom false)
Expand All @@ -75,7 +42,7 @@
(reset! handler-fn-called? true)))]
(with-redefs [metrics/multi-ns-increment-count (fn [_ _ _]
(reset! metric-reporter-called? true))]
((mw/protobuf->hash handler-fn nil topic-entity-name) (mw/->RegularMessage nil)))
((mw/protobuf->hash handler-fn nil topic-entity-name) nil))
(is (true? @handler-fn-called?))
(is (true? @metric-reporter-called?))))
(testing "using the new deserializer function"
Expand All @@ -86,6 +53,5 @@
proto-class Example$Photo
proto-message (proto/->bytes (proto/create Example$Photo message))]
(with-redefs [mw/deserialize-message (fn [_ _ _] (reset! deserialize-message-called? true))]
((mw/protobuf->hash (constantly nil) proto-class topic-entity-name) (mw/->RegularMessage proto-message))
((mw/protobuf->hash (constantly nil) proto-class topic-entity-name) proto-message)
(is (true? @deserialize-message-called?))))))

43 changes: 43 additions & 0 deletions test/ziggurat/middleware/stream_joins_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
(ns ziggurat.middleware.stream-joins-test
(:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]]
[protobuf.core :as proto]
[ziggurat.fixtures :as fix]
[ziggurat.middleware.stream-joins :as sjmw])
(:import [flatland.protobuf.test Example$Photo]))

(use-fixtures :once (join-fixtures [fix/mount-only-config
fix/silence-logging]))

(deftest common-protobuf->hash-test
(testing "deserialize a message from a stream join"
(let [handler-fn-called? (atom false)
left-message {:id 123
:path "/path/to/left"}
right-message {:id 456
:path "/path/to/right"}
proto-class Example$Photo
topic-entity-name "test"
left-proto-message (proto/->bytes (proto/create Example$Photo left-message))
right-proto-message (proto/->bytes (proto/create Example$Photo right-message))
handler-fn (fn [{:keys [left right]}]
(if (and (= left left-message)
(= right right-message))
(reset! handler-fn-called? true)))]
((sjmw/protobuf->hash handler-fn proto-class topic-entity-name) {:left left-proto-message :right right-proto-message})
(is (true? @handler-fn-called?))))
(testing "deserialize a message from a stream join using 2 proto classes"
(let [handler-fn-called? (atom false)
left-message {:id 123
:path "/path/to/left"}
right-message {:id 456
:path "/path/to/right"}
proto-class Example$Photo
topic-entity-name "test"
left-proto-message (proto/->bytes (proto/create Example$Photo left-message))
right-proto-message (proto/->bytes (proto/create Example$Photo right-message))
handler-fn (fn [{:keys [left right]}]
(if (and (= left left-message)
(= right right-message))
(reset! handler-fn-called? true)))]
((sjmw/protobuf->hash handler-fn [proto-class proto-class] topic-entity-name) {:left left-proto-message :right right-proto-message})
(is (true? @handler-fn-called?)))))
13 changes: 7 additions & 6 deletions test/ziggurat/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[ziggurat.fixtures :as fix]
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.middleware.default :as default-middleware]
[ziggurat.middleware.stream-joins :as stream-joins-middleware]
[ziggurat.middleware.json :as json-middleware]
[ziggurat.tracer :refer [tracer]]
[ziggurat.messaging.producer :as producer])
Expand Down Expand Up @@ -103,10 +104,10 @@
(deftest start-stream-joins-test
(testing "stream joins using inner join"
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count {:topic {:id 7, :path "/photos/h2k3j4h9h23"}, :another-test-topic {:id 7, :path "/photos/h2k3j4h9h23"}})
mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message})
times 1
kvs (repeat times message-key-value)
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
handler-fn (stream-joins-middleware/protobuf->hash mapped-fn proto-class :default)
streams (start-streams {:default {:handler-fn handler-fn}}
(-> (ziggurat-config)
(assoc-in [:stream-router :default :consumer-type] :stream-joins)
Expand All @@ -128,10 +129,10 @@
(is (= times @message-received-count))))
(testing "stream joins using left join"
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count {:topic {:id 7, :path "/photos/h2k3j4h9h23"}, :another-test-topic {:id 7, :path "/photos/h2k3j4h9h23"}})
mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message})
times 1
kvs (repeat times message-key-value)
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
handler-fn (stream-joins-middleware/protobuf->hash mapped-fn proto-class :default)
streams (start-streams {:default {:handler-fn handler-fn}}
(-> (ziggurat-config)
(assoc-in [:stream-router :default :consumer-type] :stream-joins)
Expand All @@ -153,10 +154,10 @@
(is (= times @message-received-count))))
(testing "stream joins using outer join"
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count {:topic {:id 7, :path "/photos/h2k3j4h9h23"}, :another-test-topic {:id 7, :path "/photos/h2k3j4h9h23"}})
mapped-fn (get-mapped-fn message-received-count {:topic message :another-test-topic message})
times 1
kvs (repeat times message-key-value)
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
handler-fn (stream-joins-middleware/protobuf->hash mapped-fn proto-class :default)
streams (start-streams {:default {:handler-fn handler-fn}}
(-> (ziggurat-config)
(assoc-in [:stream-router :default :consumer-type] :stream-joins)
Expand Down

0 comments on commit 528c9f2

Please sign in to comment.