Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

include the metadata in the message-payload #234

Merged
merged 10 commits into from
Aug 10, 2021
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/).

## 4.0.0

- The kafka-metadata is now exposed to the stream handler function along with the message itself`{:topic <string> :partition <int> :timestamp <long>}`
- The stream handler receives a map containing two keys `:message` and `:metadata`

## 3.15.0
- Includes a `StreamsUncaughtExceptionHandler` which shuts down the client in case of an uncaught exception.
- Introduces a new stream-route config `:stream-thread-exception-response` which lets user control the behaviour of `StreamsUncaughtExceptionHandler`.
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "3.15.0"
(defproject tech.gojek/ziggurat "4.0.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
11 changes: 8 additions & 3 deletions src/ziggurat/header_transformer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

(deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformer
(^void init [_ ^ProcessorContext context]
(do (set! processor-context context)
nil))
(transform [_ record-value] {:value record-value :headers (.headers processor-context)})
(set! processor-context context))
(transform [_ record-value]
(let [topic (.topic processor-context)
timestamp (.timestamp processor-context)
partition (.partition processor-context)
headers (.headers processor-context)
metadata {:topic topic :timestamp timestamp :partition partition}]
{:value record-value :headers headers :metadata metadata}))
(close [_] nil))

(defn create []
Expand Down
25 changes: 17 additions & 8 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
(throw (ex-info "Invalid mapper return code" {:code return-code})))
(producer/publish-to-channel-instant-queue return-code message-payload))

(defn mapper-func [mapper-fn channels]
(fn [{:keys [topic-entity message] :as message-payload}]
(defn- create-user-payload
[message-payload]
(-> message-payload
(dissoc :headers)
(dissoc :retry-count)
(dissoc :topic-entity)))

(defn mapper-func [user-handler-fn channels]
(fn [{:keys [topic-entity] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
Expand All @@ -27,11 +34,12 @@
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]]
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
Expand All @@ -52,8 +60,8 @@
(report-error e (str "Actor execution failed for " topic-entity-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))

(defn channel-mapper-func [mapper-fn channel]
(fn [{:keys [topic-entity message] :as message-payload}]
(defn channel-mapper-func [user-handler-fn channel]
(fn [{:keys [topic-entity] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
Expand All @@ -66,11 +74,12 @@
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]]
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
Expand Down
8 changes: 0 additions & 8 deletions src/ziggurat/message_payload.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,3 @@
(:require [schema.core :as s]))

(defrecord MessagePayload [message topic-entity])

(declare message-payload-schema)

(s/defschema message-payload-schema
{:message s/Any
:topic-entity s/Keyword
(s/optional-key :retry-count) s/Int
(s/optional-key :headers) s/Any})
20 changes: 1 addition & 19 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
(ns ziggurat.messaging.consumer
(:require [ziggurat.mapper :as mpr]
[ziggurat.message-payload :as mp]
[clojure.tools.logging :as log]
[langohr.basic :as lb]
[langohr.channel :as lch]
[langohr.consumers :as lcons]
[ziggurat.kafka-consumer.consumer-handler :as ch]
[schema.core :as s]
[sentry-clj.async :as sentry]
[taoensso.nippy :as nippy]
[ziggurat.config :refer [get-in-config]]
[ziggurat.messaging.connection :refer [connection]]
Expand All @@ -16,21 +13,6 @@
[ziggurat.metrics :as metrics]
[ziggurat.util.error :refer [report-error]]))

(defn- convert-to-message-payload
"This function is used for migration from Ziggurat Version 2.x to 3.x. It checks if the message is a message payload or a message(pushed by Ziggurat version < 3.0.0) and converts messages to
message-payload to pass onto the mapper-fn.

If the `:retry-count` key is absent in the `message`, then it puts `0` as the value for `:retry-count` in `MessagePayload`.
It also converts the topic-entity into a keyword while constructing MessagePayload."
[message topic-entity]
(try
(s/validate mp/message-payload-schema message)
(catch Exception e
(log/info "old message format read, converting to message-payload: " message)
(let [retry-count (or (:retry-count message) 0)
message-payload (mp/->MessagePayload (dissoc message :retry-count) (keyword topic-entity))]
(assoc message-payload :retry-count retry-count)))))

(defn convert-and-ack-message
"De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message
if `ack?` is true."
Expand All @@ -39,7 +21,7 @@
(let [message (nippy/thaw payload)]
(when ack?
(lb/ack ch delivery-tag))
(convert-to-message-payload message topic-entity))
message)
(catch Exception e
(lb/reject ch delivery-tag false)
(report-error e "Error while decoding message")
Expand Down
4 changes: 2 additions & 2 deletions src/ziggurat/middleware/default.clj
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@
([handler-fn proto-class topic-entity-name]
(protobuf->hash handler-fn proto-class topic-entity-name false))
([handler-fn proto-class topic-entity-name flatten-protobuf-struct?]
(fn [message]
(handler-fn (deserialize-message message proto-class topic-entity-name flatten-protobuf-struct?)))))
(fn [payload]
(handler-fn (update-in payload [:message] deserialize-message proto-class topic-entity-name flatten-protobuf-struct?)))))
4 changes: 2 additions & 2 deletions src/ziggurat/middleware/json.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@
([handler-fn topic-entity]
(parse-json handler-fn topic-entity true))
([handler-fn topic-entity key-fn]
(fn [message]
(handler-fn (deserialize-json message topic-entity key-fn)))))
(fn [payload]
(handler-fn (assoc payload :message (deserialize-json (:message payload) topic-entity key-fn))))))
2 changes: 1 addition & 1 deletion src/ziggurat/middleware/stream_joins.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
"This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap"
[handler-fn proto-class topic-entity-name]
(fn [message]
(handler-fn (deserialize-stream-joins-message message proto-class topic-entity-name))))
(handler-fn (update-in message [:message] deserialize-stream-joins-message proto-class topic-entity-name))))
4 changes: 3 additions & 1 deletion src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@
(.asChildOf t parent-ctx))
(.start t))]
(try
((mapper-func handler-fn channels) (assoc (->MessagePayload (:value message) topic-entity) :headers (:headers message)))
((mapper-func handler-fn channels) (-> (->MessagePayload (:value message) topic-entity)
(assoc :headers (:headers message))
(assoc :metadata (:metadata message))))
(finally
(.finish span)))))

Expand Down
42 changes: 27 additions & 15 deletions test/ziggurat/header_transformer_test.clj
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
(ns ziggurat.header-transformer-test
(:require [clojure.test :refer :all]
[ziggurat.header-transformer :refer :all])
(:import [org.apache.kafka.streams.processor ProcessorContext]
[org.apache.kafka.common.header.internals RecordHeaders RecordHeader]))
(:require [clojure.test :refer [deftest is testing]]
[ziggurat.header-transformer :refer [create]])
(:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader]
[org.apache.kafka.streams.processor ProcessorContext]))

(deftest header-transformer-test
(testing "transforms value with passed headers"
(let [headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value")))))
context (reify ProcessorContext
(headers [_] headers))
transformer (create)
_ (.init transformer context)
(let [headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value")))))
topic "topic"
timestamp 1234567890
partition 1
context (reify ProcessorContext
(headers [_] headers)
(topic [_] topic)
(timestamp [_] timestamp)
(partition [_] partition))
transformer (create)
_ (.init transformer context)
transformed-val (.transform transformer "val")]
(is (= {:value "val" :headers headers} transformed-val))))
(is (= {:value "val" :headers headers :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val))))

(testing "transforms value with nil headers when not passed"
(let [context (reify ProcessorContext
(headers [_] nil))
transformer (create)
_ (.init transformer context)
(let [topic "topic"
timestamp 1234567890
partition 1
context (reify ProcessorContext
(headers [_] nil)
(topic [_] topic)
(timestamp [_] timestamp)
(partition [_] partition))
transformer (create)
_ (.init transformer context)
transformed-val (.transform transformer "val")]
(is (= {:value "val" :headers nil} transformed-val)))))
(is (= {:value "val" :headers nil :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val)))))