Skip to content

Commit

Permalink
include the metadata in the message-payload
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed Jul 28, 2021
1 parent 77b81ec commit e1ed887
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 95 deletions.
11 changes: 8 additions & 3 deletions src/ziggurat/header_transformer.clj
Expand Up @@ -4,9 +4,14 @@

(deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformer
(^void init [_ ^ProcessorContext context]
(do (set! processor-context context)
nil))
(transform [_ record-value] {:value record-value :headers (.headers processor-context)})
(set! processor-context context))
(transform [_ record-value]
(let [topic (.topic processor-context)
timestamp (.timestamp processor-context)
partition (.partition processor-context)
headers (.headers processor-context)
metadata {:topic topic :timestamp timestamp :partition partition}]
{:value record-value :headers headers :metadata metadata}))
(close [_] nil))

(defn create []
Expand Down
8 changes: 4 additions & 4 deletions src/ziggurat/mapper.clj
Expand Up @@ -13,7 +13,7 @@
(producer/publish-to-channel-instant-queue return-code message-payload))

(defn mapper-func [mapper-fn channels]
(fn [{:keys [topic-entity message] :as message-payload}]
(fn [{:keys [topic-entity] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
Expand All @@ -29,7 +29,7 @@
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
return-code (mapper-fn (dissoc message-payload :topic-entity))
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
Expand All @@ -51,7 +51,7 @@
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))

(defn channel-mapper-func [mapper-fn channel]
(fn [{:keys [topic-entity message] :as message-payload}]
(fn [{:keys [topic-entity] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
Expand All @@ -68,7 +68,7 @@
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
return-code (mapper-fn (dissoc message-payload :topic-entity))
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
Expand Down
4 changes: 2 additions & 2 deletions src/ziggurat/message_payload.clj
Expand Up @@ -4,5 +4,5 @@
(defrecord MessagePayload [message topic-entity])

(defn mk-message-payload
[msg topic-entity]
{:message msg :topic-entity (name topic-entity)})
[msg topic-entity metadata]
{:message msg :topic-entity (name topic-entity) :metadata metadata})
2 changes: 1 addition & 1 deletion src/ziggurat/middleware/default.clj
Expand Up @@ -89,4 +89,4 @@
(protobuf->hash handler-fn proto-class topic-entity-name false))
([handler-fn proto-class topic-entity-name flatten-protobuf-struct?]
(fn [message]
(handler-fn (deserialize-message message proto-class topic-entity-name flatten-protobuf-struct?)))))
(handler-fn (update-in message [:message] deserialize-message proto-class topic-entity-name flatten-protobuf-struct?)))))
2 changes: 1 addition & 1 deletion src/ziggurat/middleware/json.clj
Expand Up @@ -45,4 +45,4 @@
(parse-json handler-fn topic-entity true))
([handler-fn topic-entity key-fn]
(fn [message]
(handler-fn (deserialize-json message topic-entity key-fn)))))
(handler-fn (assoc message :message (deserialize-json (:message message) topic-entity key-fn))))))
2 changes: 1 addition & 1 deletion src/ziggurat/middleware/stream_joins.clj
Expand Up @@ -18,4 +18,4 @@
"This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap"
[handler-fn proto-class topic-entity-name]
(fn [message]
(handler-fn (deserialize-stream-joins-message message proto-class topic-entity-name))))
(handler-fn (update-in message [:message] deserialize-stream-joins-message proto-class topic-entity-name))))
2 changes: 1 addition & 1 deletion src/ziggurat/streams.clj
Expand Up @@ -137,7 +137,7 @@
(.asChildOf t parent-ctx))
(.start t))]
(try
((mapper-func handler-fn channels) (mk-message-payload (:value message) topic-entity))
((mapper-func handler-fn channels) (mk-message-payload (:value message) topic-entity (:metadata message)))
(finally
(.finish span)))))

Expand Down
42 changes: 27 additions & 15 deletions test/ziggurat/header_transformer_test.clj
@@ -1,23 +1,35 @@
(ns ziggurat.header-transformer-test
(:require [clojure.test :refer :all]
[ziggurat.header-transformer :refer :all])
(:import [org.apache.kafka.streams.processor ProcessorContext]
[org.apache.kafka.common.header.internals RecordHeaders RecordHeader]))
(:require [clojure.test :refer [deftest is testing]]
[ziggurat.header-transformer :refer [create]])
(:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader]
[org.apache.kafka.streams.processor ProcessorContext]))

(deftest header-transformer-test
(testing "transforms value with passed headers"
(let [headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value")))))
context (reify ProcessorContext
(headers [_] headers))
transformer (create)
_ (.init transformer context)
(let [headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value")))))
topic "topic"
timestamp 1234567890
partition 1
context (reify ProcessorContext
(headers [_] headers)
(topic [_] topic)
(timestamp [_] timestamp)
(partition [_] partition))
transformer (create)
_ (.init transformer context)
transformed-val (.transform transformer "val")]
(is (= {:value "val" :headers headers} transformed-val))))
(is (= {:value "val" :headers headers :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val))))

(testing "transforms value with nil headers when not passed"
(let [context (reify ProcessorContext
(headers [_] nil))
transformer (create)
_ (.init transformer context)
(let [topic "topic"
timestamp 1234567890
partition 1
context (reify ProcessorContext
(headers [_] nil)
(topic [_] topic)
(timestamp [_] timestamp)
(partition [_] partition))
transformer (create)
_ (.init transformer context)
transformed-val (.transform transformer "val")]
(is (= {:value "val" :headers nil} transformed-val)))))
(is (= {:value "val" :headers nil :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val)))))
13 changes: 6 additions & 7 deletions test/ziggurat/middleware/default_test.clj
Expand Up @@ -79,16 +79,15 @@
:path "/photos/h2k3j4h9h23"}
proto-class Example$Photo
topic-entity-name "test"
proto-message (proto/->bytes (proto/create proto-class message))
proto-message {:message (proto/->bytes (proto/create proto-class message)) :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}
handler-fn (fn [msg]
(when (= msg message)
(when (= (:message msg) message)
(reset! handler-fn-called? true)))]
((mw/protobuf->hash handler-fn proto-class topic-entity-name) proto-message)
(is (true? @handler-fn-called?))))
(testing "When an already deserialised message is passed to the function it calls the handler fn without altering it"
(let [handler-fn-called? (atom false)
message {:id 7
:path "/photos/h2k3j4h9h23"}
message {:message {:id 7 :path "/photos/h2k3j4h9h23"} :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}
proto-class Example$Photo
topic-entity-name "test"
handler-fn (fn [msg]
Expand All @@ -101,11 +100,11 @@
metric-reporter-called? (atom false)
topic-entity-name "test"
handler-fn (fn [msg]
(when (nil? msg)
(when (nil? (:message msg))
(reset! handler-fn-called? true)))]
(with-redefs [metrics/multi-ns-increment-count (fn [_ _ _]
(reset! metric-reporter-called? true))]
((mw/protobuf->hash handler-fn nil topic-entity-name) nil))
((mw/protobuf->hash handler-fn nil topic-entity-name) {:message nil :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}))
(is (true? @handler-fn-called?))
(is (true? @metric-reporter-called?))))
(testing "using the new deserializer function"
Expand All @@ -114,7 +113,7 @@
message {:id 7
:path "/photos/h2k3j4h9h23"}
proto-class Example$Photo
proto-message (proto/->bytes (proto/create Example$Photo message))]
proto-message {:message (proto/->bytes (proto/create Example$Photo message)) :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}]
(with-redefs [mw/deserialize-message (fn [_ _ _ _] (reset! deserialize-message-called? true))]
((mw/protobuf->hash (constantly nil) proto-class topic-entity-name) proto-message)
(is (true? @deserialize-message-called?))))))
53 changes: 30 additions & 23 deletions test/ziggurat/middleware/json_test.clj
@@ -1,59 +1,66 @@
(ns ziggurat.middleware.json-test
(:require [clojure.test :refer :all]
[cheshire.core :refer [generate-string]]
[ziggurat.middleware.json :refer [parse-json]]
(:require [cheshire.core :refer [generate-string]]
[clojure.test :refer [deftest is join-fixtures testing use-fixtures]]
[ziggurat.fixtures :as fix]
[ziggurat.metrics :as metrics]))
[ziggurat.metrics :as metrics]
[ziggurat.middleware.json :refer [parse-json]]))

(use-fixtures :once (join-fixtures [fix/mount-only-config
fix/silence-logging]))

(deftest parse-json-test
(testing "Given a handler function (without passing key-fn), parse-json should call that function on after deserializing the string to JSON object."
(let [handler-fn-called? (atom false)
message {:a "A"
json-message {:a "A"
:b "B"}
topic-entity-name "test"
handler-fn (fn [msg]
(if (= msg message)
handler-fn (fn [{:keys [message metadata]}]
(when (and (= json-message message)
(= (:topic metadata) "topic")
(= (:timestamp metadata) 1234567890)
(= (:partition metadata) 1))
(reset! handler-fn-called? true)))]
((parse-json handler-fn topic-entity-name) (generate-string message))
((parse-json handler-fn topic-entity-name) {:message (generate-string json-message) :metadata {:topic "topic" :timestamp 1234567890 :partition 1}})
(is (true? @handler-fn-called?))))
(testing "Given a handler function and key-fn as false, parse-json should call that function on after
deserializing the string without coercing the keys to keywords."
(let [handler-fn-called? (atom false)
message {:a "A"
:b "B"}
expected-output {"a" "A" "b" "B"}
json-message {"a" "A" "b" "B"}
topic-entity-name "test"
handler-fn (fn [msg]
(if (= msg expected-output)
handler-fn (fn [{:keys [message metadata]}]
(when (and (= json-message message)
(= (:topic metadata) "topic")
(= (:timestamp metadata) 1234567890)
(= (:partition metadata) 1))
(reset! handler-fn-called? true)))]
((parse-json handler-fn topic-entity-name false) (generate-string message))
((parse-json handler-fn topic-entity-name false) {:message (generate-string json-message) :metadata {:topic "topic" :timestamp 1234567890 :partition 1}})
(is (true? @handler-fn-called?))))
(testing "Given a handler function and a key-fn, parse-json should call that function after
deserializing the string by applying key-fn to keys."
(let [handler-fn-called? (atom false)
key-fn (fn [k] (str k "-modified"))
message {"a" "A"
json-message {"a" "A"
"b" "B"}
expected-output {"a-modified" "A" "b-modified" "B"}
topic-entity-name "test"
handler-fn (fn [msg]
(if (= msg expected-output)
handler-fn (fn [{:keys [message metadata]}]
(is (= {"a-modified" "A", "b-modified" "B"} message))
(when (and (= {"a-modified" "A", "b-modified" "B"} message)
(= (:topic metadata) "topic")
(= (:timestamp metadata) 1234567890)
(= (:partition metadata) 1))
(reset! handler-fn-called? true)))]
((parse-json handler-fn topic-entity-name key-fn) (generate-string message))
((parse-json handler-fn topic-entity-name key-fn) {:message (generate-string json-message) :metadata {:topic "topic" :timestamp 1234567890 :partition 1}})
(is (true? @handler-fn-called?))))
(testing "Should report metrics when JSON deserialization fails"
(let [handler-fn-called? (atom false)
metric-reporter-called? (atom false)
topic-entity-name "test"
message "{\"foo\":\"bar"
handler-fn (fn [msg]
(if (nil? msg)
json-message "{\"foo\":\"bar"
handler-fn (fn [{:keys [message _]}]
(when (nil? message)
(reset! handler-fn-called? true)))]
(with-redefs [metrics/multi-ns-increment-count (fn [_ _ _]
(reset! metric-reporter-called? true))]
((parse-json handler-fn topic-entity-name true) message))
((parse-json handler-fn topic-entity-name true) {:message json-message :metadata {:topic "topic" :timestamp 1234567890 :partition 1}}))
(is (true? @handler-fn-called?))
(is (true? @metric-reporter-called?)))))
22 changes: 14 additions & 8 deletions test/ziggurat/middleware/stream_joins_test.clj
Expand Up @@ -19,11 +19,14 @@
topic-entity-name "test"
left-proto-message (proto/->bytes (proto/create Example$Photo left-message))
right-proto-message (proto/->bytes (proto/create Example$Photo right-message))
handler-fn (fn [{:keys [left right]}]
(if (and (= left left-message)
(= right right-message))
handler-fn (fn [{:keys [message metadata]}]
(when (and (= (:left message) left-message)
(= (:right message) right-message)
(= (:topic metadata) "topic")
(= (:timestamp metadata) 1234567890)
(= (:partition metadata) 1))
(reset! handler-fn-called? true)))]
((sjmw/protobuf->hash handler-fn proto-class topic-entity-name) {:left left-proto-message :right right-proto-message})
((sjmw/protobuf->hash handler-fn proto-class topic-entity-name) {:message {:left left-proto-message :right right-proto-message} :metadata {:topic "topic" :timestamp 1234567890 :partition 1}})
(is (true? @handler-fn-called?))))
(testing "deserialize a message from a stream join using 2 proto classes"
(let [handler-fn-called? (atom false)
Expand All @@ -35,9 +38,12 @@
topic-entity-name "test"
left-proto-message (proto/->bytes (proto/create Example$Photo left-message))
right-proto-message (proto/->bytes (proto/create Example$Photo right-message))
handler-fn (fn [{:keys [left right]}]
(if (and (= left left-message)
(= right right-message))
handler-fn (fn [{:keys [message metadata]}]
(when (and (= (:left message) left-message)
(= (:right message) right-message)
(= (:topic metadata) "topic")
(= (:timestamp metadata) 1234567890)
(= (:partition metadata) 1))
(reset! handler-fn-called? true)))]
((sjmw/protobuf->hash handler-fn [proto-class proto-class] topic-entity-name) {:left left-proto-message :right right-proto-message})
((sjmw/protobuf->hash handler-fn [proto-class proto-class] topic-entity-name) {:message {:left left-proto-message :right right-proto-message} :metadata {:topic "topic" :timestamp 1234567890 :partition 1}})
(is (true? @handler-fn-called?)))))

0 comments on commit e1ed887

Please sign in to comment.