forked from FundingCircle/jackdaw
-
Notifications
You must be signed in to change notification settings - Fork 1
/
serde.clj
131 lines (113 loc) · 3.7 KB
/
serde.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
(ns jackdaw.test.serde
(:require
[clojure.tools.logging :as log]
[jackdaw.serdes.edn :as edn-serde]
[jackdaw.serdes.json :as json-serde])
(:import
(org.apache.kafka.clients.consumer ConsumerRecord)
(org.apache.kafka.common.serialization Deserializer Serdes Serializer
ByteArraySerializer
ByteArrayDeserializer)
(org.apache.kafka.common.errors SerializationException)))
(set! *warn-on-reflection* false)
;; Access to various serdes
(defn resolver [topic-config]
(let [serde-lookup {:edn (edn-serde/serde)
:json (json-serde/serde)
:long (Serdes/Long)
:string (Serdes/String)}]
(merge
topic-config
{:key-serde (serde-lookup (:key-serde topic-config))
:value-serde (serde-lookup (:value-serde topic-config))})))
;; Serialization/Deserialization
;;
;; Using a byte-array-serde allows us to use a single consumer to consume
;; from all topics. The test-machine knows how to further deserialize
;; the topic-info based on the topic-config supplied by the test author.
(def byte-array-serde
"Byte-array key and value serde."
{:key-serde (Serdes/ByteArray)
:value-serde (Serdes/ByteArray)})
(def byte-array-serializer (ByteArraySerializer.))
(def byte-array-deserializer (ByteArrayDeserializer.))
(defn serialize-key
"Serializes a key."
[k {topic-name :topic-name
key-serde :key-serde :as t}]
(when k
(-> (.serializer key-serde)
(.serialize topic-name k))))
(defn serialize-value
[v {topic-name :topic-name
value-serde :value-serde :as t}]
(when v
(-> (.serializer value-serde)
(.serialize topic-name v))))
(defn serializer
"Serializes a message."
[topic]
(fn [record]
(assoc record
:key (serialize-key (:key record) topic)
:value (serialize-value (:value record) topic))))
(defn deserialize-key
"Deserializes a key."
[k {topic-name :topic-name
key-serde :key-serde}]
(when k
(-> (.deserializer key-serde)
(.deserialize topic-name k))))
(defn deserialize-value
"Deserializes a value."
[v {topic-name :topic-name
value-serde :value-serde}]
(when v
(-> (.deserializer value-serde)
(.deserialize topic-name v))))
(defn deserializer
"Deserializes a message."
[topic]
(fn [m]
{:topic (:topic-name topic)
:key (deserialize-key (:key m) topic)
:value (deserialize-value (:value m) topic)
:partition (:partition m 0)
:offset (:offset m 0)
:headers (:headers m {})}))
(defn deserializers
"Returns a map of topics to the corresponding deserializer"
[topic-config]
(->> topic-config
(map (fn [[k v]]
[(:topic-name v)
(deserializer v)]))
(into {})))
(defn serializers
"Returns a map of topic to the corresponding serializer"
[topic-config]
(->> topic-config
(map (fn [[k v]]
[(:topic-name v)
(serializer v)]))
(into {})))
(defn serde-map
[topic-config]
{:serializers (serializers topic-config)
:deserializers (deserializers topic-config)})
(defn apply-serializers
[serializers m]
(let [topic (:topic m)
serialize (get serializers (:topic-name topic))]
(if (nil? serialize)
(throw (IllegalArgumentException.
(str "Message refers to unknown topic: " (:topic-name topic))))
(serialize m))))
(defn apply-deserializers
[deserializers m]
(let [topic-name (:topic m)
deserialize (get deserializers topic-name)]
(if (nil? deserialize)
(throw (IllegalArgumentException.
(str "Record comes from unknown topic: " topic-name)))
(deserialize m))))