-
Notifications
You must be signed in to change notification settings - Fork 64
/
batch_proto_deserializer.clj
19 lines (17 loc) · 1.08 KB
/
batch_proto_deserializer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(ns ziggurat.middleware.batch.batch-proto-deserializer
(:require [ziggurat.middleware.default :refer [deserialize-message]]))
(defn- deserialize-key-and-value
[key-proto-class value-proto-class topic-entity]
(fn [message]
(let [key (:key message)
value (:value message)
deserialized-key (when (some? key) (deserialize-message key key-proto-class (name topic-entity)))
deserialized-value (when (some? value) (deserialize-message value value-proto-class (name topic-entity)))]
(assoc (assoc message :key deserialized-key) :value deserialized-value))))
(defn deserialize-batch-of-proto-messages
"This is a middleware function that takes in a sequence of proto message and calls forms a lazy sequence of
de-serialized messages before passing it to the handler-fn"
[handler-fn key-proto-class value-proto-class topic-entity]
(fn [batch-message]
(let [key-value-deserializer (deserialize-key-and-value key-proto-class value-proto-class topic-entity)]
(handler-fn (map key-value-deserializer batch-message)))))