/
table.clj
381 lines (215 loc) · 9.16 KB
/
table.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
(ns dvlopt.kstreams.table
"Kafka Streams' abstraction of tables.
Cf. `dvlopt.kstreams.builder` for the big picture and details
A table can be transformed by various functions. Those functions always return a new table representing the transformation.
Hence, a single table can be used for more than one transformation.
A table can be re-grouped by other keys using the `map-and-group-by` function and then the values aggregated for each key,
resulting in a new table.
A table can also be joined with another table.
A table is backed-up by a state store. As such, these options, called the standard options in this namespace, can very often
be supplied :
:dvlopt.kafka/deserializer.key
:dvlopt.kafka/deserializer.value
:dvlopt.kafka/serializer.key
:dvlopt.kafka/serializer.value
Cf. `dvlopt.kafka` for description of serializers and deserializers
:dvlopt.kstreams.store/cache?
:dvlopt.kstreams.store/changelog?
:dvlopt.kstreams.store/configuration.changelog
:dvlopt.kstreams.store/name
:dvlopt.kstreams.store/type
Exactly as described in `dvlopt.kstreams.store` but the type is restricted to #{:kv.in-memory :kv.regular}."
{:author "Adam Helinsi"}
(:require [dvlopt.kafka.-interop.java :as K.-interop.java]
[dvlopt.kstreams.store :as KS.store])
(:import org.apache.kafka.streams.StreamsBuilder
(org.apache.kafka.streams.kstream KStream
KTable
Materialized
KGroupedTable)))
;;;;;;;;;; Transformations
(defn filter-kv
"Returns a table filtering out key-values returning false on the given predicate.
Keys with nil values are removed right away.
Standard options can be provided (cf. namespace description).
Ex. (filter-kv table
(fn [k v]
(>= (:age v)
18)))"
(^KTable
[table predicate]
(filter-kv table
predicate
nil))
(^KTable
[^KTable table predicate standard-options]
(.filter table
(K.-interop.java/predicate predicate)
(K.-interop.java/materialized--kv standard-options))))
(defn map-values
"Returns a table mapping the value of each key-value.
Standard options can be provided (cf. namespace description).
Ex. (map-values table
(fn [k v]
(assoc v
:country
(country-from-ip (:ip v)))))"
(^KTable
[table f]
(map-values table
f
nil))
(^KTable
[^KTable table f standard-options]
(.mapValues table
(K.-interop.java/value-mapper-with-key f)
(K.-interop.java/materialized--kv standard-options))))
(defn process-values
"Just like ´dvlopt.kstreams.stream/process-value` but with a table.
Standard options can be provided (cf. namespace description)."
(^KTable
[table processor]
(process-values table
processor
nil))
(^KTable
[^KTable table processor options]
(.transformValues table
(K.-interop.java/value-transformer-with-key-supplier processor)
(K.-interop.java/materialized--kv options)
(into String
(::KS.store/names options)))))
;;;;;;;;; Joins
(defn join-with-table
"Returns a table joining values from both tables when they share the same key.
Records with nil values removes the corresponding key from the resulting table. Records with nil keys will be dropped.
Cf. `dvlopt.kstreams.builder` for requirements related to joins
Standard options can be provided (cf. namespace description).
Ex. (join-with-table left-table
right-table
(fn [v-left v-right]
(merge v-left
v-right)))"
(^KTable
[left-table right-table f]
(join-with-table left-table
right-table
f
nil))
(^KTable
[^KTable left-table ^KTable right-table f standard-options]
(.join left-table
right-table
(K.-interop.java/value-joiner f)
(K.-interop.java/materialized--kv standard-options))))
(defn left-join-with-table
"Exactly like `join-with-table` but the join is triggered even if the right table does not contain the key yet.
In such case, the right value provided for the join is nil."
(^KTable
[left-table right-table f]
(left-join-with-table left-table
right-table
f
nil))
(^KTable
[^KTable left-table ^KTable right-table f options]
(.leftJoin left-table
right-table
(K.-interop.java/value-joiner f)
(K.-interop.java/materialized--kv options))))
(defn outer-join-with-table
"Exactly like `join-outer-with-table` but the join is triggered even if the other table does not contain the key yet.
In such case, the value provided for this side of the join is nil."
(^KTable
[left-table right-table f]
(outer-join-with-table left-table
right-table
f
nil))
(^KTable
[^KTable left-table ^KTable right-table f options]
(.outerJoin left-table
right-table
(K.-interop.java/value-joiner f)
(K.-interop.java/materialized--kv options))))
;;;;;;;;; Aggregations
(defn map-and-group-by
"Returns a grouped table mapping key-values and then re-grouping them based on the new keys.
Drops mapped records with nil keys.
Because a new key is explicitly selected, the data is repartioned.
Cf. `dvlopt.kstreams.builder` about repartioning.
A map of options may be given :
:dvlopt.kafka/deserializer.key
:dvlopt.kafka/deserializer.value
:dvlopt.kafka/serializer.key
:dvlopt.kafka/serializer.value
Cf. `dvlopt.kafka` for description of serializers and deserializers
:dvlopt.kstreams/repartition-name
Cf. `dvlopt.kstreams.builder` section \"State and repartitioning\"
Ex. ;; Re-groups a table on countries while remembering from which user are the values.
(map-and-group-by table
(fn [k v]
[(:country v)
(assoc v
:user-id
k)]))"
(^KGroupedTable
[table f]
(map-and-group-by table
f
nil))
(^KGroupedTable
[^KTable table f options]
(.groupBy table
(K.-interop.java/key-value-mapper f)
(K.-interop.java/grouped options))))
(defn reduce-values
"Returns a new table aggregating the values for each key of the given grouped table.
Let us take the example provided in `map-and-group-by` where users were re-grouped by country. Let us say each user has an :income and
we want to sum all incomes for each country.
A seed (sum of incomes) is created for each grouped key (country). Each new value (a user with an income) is aggregated using (reduce-add [agg k v])
in order to increase the country sum. If the user is removed, then its income is removed from the country sum using (reduce-sub [agg k v]). If the
value for an existing user is replaced, then both functions are called in undefined order for updating the country sum with the possibly new income.
Ex. (reduce-values grouped-table
(fn reduce-add [country-income country user]
(+ country-income
(:income user)))
(fn reduce-sub [country-income country user]
(- country-income
(:income user)))
(fn seed []
0))"
(^KTable
[grouped-table fn-reduce-add fn-reduce-sub fn-seed]
(reduce-values grouped-table
fn-reduce-add
fn-reduce-sub
fn-seed
nil))
(^KTable
[^KGroupedTable grouped-table f-add f-sub seed options]
(.aggregate grouped-table
(K.-interop.java/initializer seed)
(K.-interop.java/aggregator f-add)
(K.-interop.java/aggregator f-sub)
(K.-interop.java/materialized--kv options))))
;;;;;;;;; Misc
(defn to-stream
"Turns a table into a stream.
A function can be provided for selecting new keys.
Ex. ;; The new key is the length of the value.
(to-stream table
(fn [k v]
(count v)))"
(^KStream
[^KTable table]
(.toStream table))
(^KStream
[^KTable table f]
(.toStream table
(K.-interop.java/key-value-mapper--raw f))))
(defn store-name
"Returns the name of the local underlying state store that can be used to query this table, or nil if the table cannot be
queried."
[^KTable table]
(.queryableStoreName table))