Skip to content

Commit

Permalink
Include metadata in the message-payload (#234)
Browse files Browse the repository at this point in the history
* include the metadata in the message-payload

* Added metadata as a separate key in message payload and remove kafka headers from rabbitmq message headers

* Restoring headers specific changes in rabbitmq producer and mapper

* Fixing tests in default_test and consumer_test

* Added tests to verify that metadata is provided in the message payload

* Modifying tests to take care of the new structure of the message payload

* adds an assertion for retry-count

* fixes linting

* updates CHANGELOG and ziggurat version to 4.0.0

* Increase sleep before closing stream as well as the join window ms

Co-authored-by: Michael Angelo Calimlim <macalimlim@gmail.com>
Co-authored-by: prateek.khatri <prateek.khatri@gojek.com>
Co-authored-by: shubhang.balkundi <2932-shubhang.balkundi@users.noreply.source.golabs.io>
Co-authored-by: Anmol Vijaywargiya <anmol.vijaywargiya@go-jek.com>
  • Loading branch information
5 people committed Aug 10, 2021
1 parent c4c6d81 commit 9fc470d
Show file tree
Hide file tree
Showing 18 changed files with 273 additions and 241 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/).

## 4.0.0

- The kafka-metadata is now exposed to the stream handler function along with the message itself`{:topic <string> :partition <int> :timestamp <long>}`
- The stream handler receives a map containing two keys `:message` and `:metadata`

## 3.15.0
- Includes a `StreamsUncaughtExceptionHandler` which shuts down the client in case of an uncaught exception.
- Introduces a new stream-route config `:stream-thread-exception-response` which lets user control the behaviour of `StreamsUncaughtExceptionHandler`.
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "3.15.0"
(defproject tech.gojek/ziggurat "4.0.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
11 changes: 8 additions & 3 deletions src/ziggurat/header_transformer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

(deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformer
(^void init [_ ^ProcessorContext context]
(do (set! processor-context context)
nil))
(transform [_ record-value] {:value record-value :headers (.headers processor-context)})
(set! processor-context context))
(transform [_ record-value]
(let [topic (.topic processor-context)
timestamp (.timestamp processor-context)
partition (.partition processor-context)
headers (.headers processor-context)
metadata {:topic topic :timestamp timestamp :partition partition}]
{:value record-value :headers headers :metadata metadata}))
(close [_] nil))

(defn create []
Expand Down
25 changes: 17 additions & 8 deletions src/ziggurat/mapper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
(throw (ex-info "Invalid mapper return code" {:code return-code})))
(producer/publish-to-channel-instant-queue return-code message-payload))

(defn mapper-func [mapper-fn channels]
(fn [{:keys [topic-entity message] :as message-payload}]
(defn- create-user-payload
[message-payload]
(-> message-payload
(dissoc :headers)
(dissoc :retry-count)
(dissoc :topic-entity)))

(defn mapper-func [user-handler-fn channels]
(fn [{:keys [topic-entity] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
new-relic-transaction-name (str topic-entity-name ".handler-fn")
Expand All @@ -27,11 +34,12 @@
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]]
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
(nr/with-tracing "job" new-relic-transaction-name
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "handler-fn-execution-time"
Expand All @@ -52,8 +60,8 @@
(report-error e (str "Actor execution failed for " topic-entity-name))
(metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))

(defn channel-mapper-func [mapper-fn channel]
(fn [{:keys [topic-entity message] :as message-payload}]
(defn channel-mapper-func [user-handler-fn channel]
(fn [{:keys [topic-entity] :as message-payload}]
(let [service-name (:app-name (ziggurat-config))
topic-entity-name (name topic-entity)
channel-name (name channel)
Expand All @@ -66,11 +74,12 @@
retry-metric "retry"
skip-metric "skip"
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]]
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
(nr/with-tracing "job" metric-namespace
(try
(let [start-time (.toEpochMilli (Instant/now))
return-code (mapper-fn message)
return-code (user-handler-fn user-payload)
end-time (.toEpochMilli (Instant/now))
time-val (- end-time start-time)
execution-time-namespace "execution-time"
Expand Down
8 changes: 0 additions & 8 deletions src/ziggurat/message_payload.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,3 @@
(:require [schema.core :as s]))

(defrecord MessagePayload [message topic-entity])

(declare message-payload-schema)

(s/defschema message-payload-schema
{:message s/Any
:topic-entity s/Keyword
(s/optional-key :retry-count) s/Int
(s/optional-key :headers) s/Any})
20 changes: 1 addition & 19 deletions src/ziggurat/messaging/consumer.clj
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
(ns ziggurat.messaging.consumer
(:require [ziggurat.mapper :as mpr]
[ziggurat.message-payload :as mp]
[clojure.tools.logging :as log]
[langohr.basic :as lb]
[langohr.channel :as lch]
[langohr.consumers :as lcons]
[ziggurat.kafka-consumer.consumer-handler :as ch]
[schema.core :as s]
[sentry-clj.async :as sentry]
[taoensso.nippy :as nippy]
[ziggurat.config :refer [get-in-config]]
[ziggurat.messaging.connection :refer [connection]]
Expand All @@ -16,21 +13,6 @@
[ziggurat.metrics :as metrics]
[ziggurat.util.error :refer [report-error]]))

(defn- convert-to-message-payload
"This function is used for migration from Ziggurat Version 2.x to 3.x. It checks if the message is a message payload or a message(pushed by Ziggurat version < 3.0.0) and converts messages to
message-payload to pass onto the mapper-fn.
If the `:retry-count` key is absent in the `message`, then it puts `0` as the value for `:retry-count` in `MessagePayload`.
It also converts the topic-entity into a keyword while constructing MessagePayload."
[message topic-entity]
(try
(s/validate mp/message-payload-schema message)
(catch Exception e
(log/info "old message format read, converting to message-payload: " message)
(let [retry-count (or (:retry-count message) 0)
message-payload (mp/->MessagePayload (dissoc message :retry-count) (keyword topic-entity))]
(assoc message-payload :retry-count retry-count)))))

(defn convert-and-ack-message
"De-serializes the message payload (`payload`) using `nippy/thaw` and converts it to `MessagePayload`. Acks the message
if `ack?` is true."
Expand All @@ -39,7 +21,7 @@
(let [message (nippy/thaw payload)]
(when ack?
(lb/ack ch delivery-tag))
(convert-to-message-payload message topic-entity))
message)
(catch Exception e
(lb/reject ch delivery-tag false)
(report-error e "Error while decoding message")
Expand Down
4 changes: 2 additions & 2 deletions src/ziggurat/middleware/default.clj
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@
([handler-fn proto-class topic-entity-name]
(protobuf->hash handler-fn proto-class topic-entity-name false))
([handler-fn proto-class topic-entity-name flatten-protobuf-struct?]
(fn [message]
(handler-fn (deserialize-message message proto-class topic-entity-name flatten-protobuf-struct?)))))
(fn [payload]
(handler-fn (update-in payload [:message] deserialize-message proto-class topic-entity-name flatten-protobuf-struct?)))))
4 changes: 2 additions & 2 deletions src/ziggurat/middleware/json.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@
([handler-fn topic-entity]
(parse-json handler-fn topic-entity true))
([handler-fn topic-entity key-fn]
(fn [message]
(handler-fn (deserialize-json message topic-entity key-fn)))))
(fn [payload]
(handler-fn (assoc payload :message (deserialize-json (:message payload) topic-entity key-fn))))))
2 changes: 1 addition & 1 deletion src/ziggurat/middleware/stream_joins.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
"This is a middleware function that takes in a message (Proto ByteArray or PersistentHashMap) and calls the handler-fn with the deserialized PersistentHashMap"
[handler-fn proto-class topic-entity-name]
(fn [message]
(handler-fn (deserialize-stream-joins-message message proto-class topic-entity-name))))
(handler-fn (update-in message [:message] deserialize-stream-joins-message proto-class topic-entity-name))))
4 changes: 3 additions & 1 deletion src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@
(.asChildOf t parent-ctx))
(.start t))]
(try
((mapper-func handler-fn channels) (assoc (->MessagePayload (:value message) topic-entity) :headers (:headers message)))
((mapper-func handler-fn channels) (-> (->MessagePayload (:value message) topic-entity)
(assoc :headers (:headers message))
(assoc :metadata (:metadata message))))
(finally
(.finish span)))))

Expand Down
42 changes: 27 additions & 15 deletions test/ziggurat/header_transformer_test.clj
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
(ns ziggurat.header-transformer-test
(:require [clojure.test :refer :all]
[ziggurat.header-transformer :refer :all])
(:import [org.apache.kafka.streams.processor ProcessorContext]
[org.apache.kafka.common.header.internals RecordHeaders RecordHeader]))
(:require [clojure.test :refer [deftest is testing]]
[ziggurat.header-transformer :refer [create]])
(:import [org.apache.kafka.common.header.internals RecordHeaders RecordHeader]
[org.apache.kafka.streams.processor ProcessorContext]))

(deftest header-transformer-test
(testing "transforms value with passed headers"
(let [headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value")))))
context (reify ProcessorContext
(headers [_] headers))
transformer (create)
_ (.init transformer context)
(let [headers (RecordHeaders. (list (RecordHeader. "key" (byte-array (map byte "value")))))
topic "topic"
timestamp 1234567890
partition 1
context (reify ProcessorContext
(headers [_] headers)
(topic [_] topic)
(timestamp [_] timestamp)
(partition [_] partition))
transformer (create)
_ (.init transformer context)
transformed-val (.transform transformer "val")]
(is (= {:value "val" :headers headers} transformed-val))))
(is (= {:value "val" :headers headers :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val))))

(testing "transforms value with nil headers when not passed"
(let [context (reify ProcessorContext
(headers [_] nil))
transformer (create)
_ (.init transformer context)
(let [topic "topic"
timestamp 1234567890
partition 1
context (reify ProcessorContext
(headers [_] nil)
(topic [_] topic)
(timestamp [_] timestamp)
(partition [_] partition))
transformer (create)
_ (.init transformer context)
transformed-val (.transform transformer "val")]
(is (= {:value "val" :headers nil} transformed-val)))))
(is (= {:value "val" :headers nil :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val)))))
Loading

0 comments on commit 9fc470d

Please sign in to comment.