/
streams.clj
335 lines (274 loc) · 10.6 KB
/
streams.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
(ns jackdaw.streams
"Kafka streams protocols."
{:license "BSD 3-Clause License <https://github.com/FundingCircle/jackdaw/blob/master/LICENSE>"}
(:refer-clojure :exclude [count map merge reduce group-by filter peek])
(:require [clojure.string :as str]
[jackdaw.streams.interop :as interop]
[jackdaw.streams.protocols :as p])
(:import org.apache.kafka.streams.KafkaStreams
org.apache.kafka.streams.StreamsBuilder
org.apache.kafka.streams.KafkaStreams$State
org.apache.kafka.streams.Topology))
;; StreamsBuilder
(defn kstream
"Creates a KStream that will consume messages from the specified topic."
([streams-builder topic-config]
{:pre [(map? topic-config)]}
(p/kstream streams-builder topic-config))
([streams-builder topic-config topic-pattern]
{:pre [(map? topic-config)]}
(p/kstream streams-builder topic-config topic-pattern)))
(defn kstreams
"Creates a KStream that will consume messages from the specified topics."
[streams-builder topic-configs]
(p/kstreams streams-builder topic-configs))
(defn ktable
"Creates a KTable that will consist of data from the specified topic."
([streams-builder topic-config]
(p/ktable streams-builder topic-config))
([streams-builder topic-config store-name]
(p/ktable streams-builder topic-config store-name)))
(defn global-ktable
"Creates a GlobalKTable that will consist of data from the specified
topic."
[streams-builder topic-config]
(p/global-ktable streams-builder topic-config))
(defn source-topics
"Gets the names of source topics for the topology."
[streams-builder]
(p/source-topics streams-builder))
(defn streams-builder*
"Returns the underlying KStreamBuilder."
[streams-builder]
(p/streams-builder* streams-builder))
;; IKStreamBase
(defn left-join
"Creates a KStream from the result of calling `value-joiner-fn` with
each element in the KStream and the value in the KTable with the same
key."
([kstream ktable value-joiner-fn]
(p/left-join kstream ktable value-joiner-fn))
([kstream ktable value-joiner-fn this-topic-config other-topic-config]
(p/left-join kstream ktable value-joiner-fn this-topic-config other-topic-config)))
(defn filter
[kstream predicate-fn]
(p/filter kstream predicate-fn))
(defn filter-not
"Creates a KStream that consists of all elements that do not satisfy a
predicate."
[kstream predicate-fn]
(p/filter-not kstream predicate-fn))
(defn group-by
"Groups the records of this KStream/KTable using the key-value-mapper-fn."
([ktable key-value-mapper-fn]
(p/group-by ktable key-value-mapper-fn))
([ktable key-value-mapper-fn topic-config]
(p/group-by ktable key-value-mapper-fn topic-config)))
(defn peek
"Performs the action defined by `peek-fn` on each element of the input
KStream, returning that stream untransformed."
[kstream peek-fn]
(p/peek kstream peek-fn))
(defn map-values
"Creates a KStream that is the result of calling `value-mapper-fn` on each
element of the input stream."
[kstream value-mapper-fn]
(p/map-values kstream value-mapper-fn))
(defn print!
"Prints the elements of the stream to *out*."
[kstream]
(p/print! kstream))
(defn through
"Materializes a stream to a topic, and returns a new KStream that will
consume messages from the topic."
[kstream topic-config]
(p/through kstream topic-config))
(defn to
"Materializes a stream to a topic."
[kstream topic-config]
(p/to! kstream topic-config))
;; IKStream
(defn branch
"Returns a list of KStreams, one for each of the `predicate-fns`
provided."
[kstream predicate-fns]
(p/branch kstream predicate-fns))
(defn flat-map
"Creates a KStream that will consist of the concatenation of messages
returned by calling `key-value-mapper-fn` on each key/value pair in the
input stream."
[kstream key-value-mapper-fn]
(p/flat-map kstream key-value-mapper-fn))
(defn flat-map-values
"Creates a KStream that will consist of the concatenation of the values
returned by calling `value-mapper-fn` on each value in the input stream."
[kstream value-mapper-fn]
(p/flat-map-values kstream value-mapper-fn))
(defn for-each!
"Performs an action on each element of KStream."
[kstream foreach-fn]
(p/for-each! kstream foreach-fn))
(defn group-by-key
"Groups records with the same key into a KGroupedStream."
([kstream]
(p/group-by-key kstream))
([kstream topic-config]
(p/group-by-key kstream topic-config)))
(defn join-windowed
"Combines the values of two streams that share the same key using a
windowed inner join."
([kstream other-kstream value-joiner-fn windows]
(p/join-windowed kstream other-kstream value-joiner-fn windows))
([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
(p/join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)))
(defn left-join-windowed
"Combines the values of two streams that share the same key using a
windowed left join."
([kstream other-kstream value-joiner-fn windows]
(p/left-join-windowed kstream other-kstream value-joiner-fn windows))
([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
(p/left-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)))
(defn map
"Creates a KStream that consists of the result of applying
`key-value-mapper-fn` to each key/value pair in the input stream."
[kstream key-value-mapper-fn]
(p/map kstream key-value-mapper-fn))
(defn outer-join-windowed
"Combines the values of two streams that share the same key using a
windowed outer join."
([kstream other-kstream value-joiner-fn windows]
(p/outer-join-windowed kstream other-kstream value-joiner-fn windows))
([kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
(p/outer-join-windowed kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config)))
(defn process!
"Applies `processor-fn` to each item in the input stream."
[kstream processor-fn state-store-names]
(p/process! kstream processor-fn state-store-names))
(defn select-key
"Create a new key from the current key and value.
`select-key-value-mapper-fn` should be a function that takes a key-value
pair, and returns the value of the new key. Here is example multiplies each
key by 10:
```(fn [[k v]] (* 10 k))```"
[kstream select-key-value-mapper-fn]
(p/select-key kstream select-key-value-mapper-fn))
(defn transform
"Creates a KStream that consists of the results of applying the transformer
to each key/value in the input stream."
([kstream transformer-supplier-fn]
(p/transform kstream transformer-supplier-fn))
([kstream transformer-supplier-fn state-store-names]
(p/transform kstream transformer-supplier-fn state-store-names)))
(defn transform-values
"Creates a KStream that consists of the results of applying the transformer
to each value in the input stream."
([kstream value-transformer-supplier-fn]
(p/transform-values kstream value-transformer-supplier-fn))
([kstream value-transformer-supplier-fn state-store-names]
(p/transform-values kstream value-transformer-supplier-fn state-store-names)))
(defn join-global
[kstream global-ktable kv-mapper joiner]
(p/join-global kstream global-ktable kv-mapper joiner))
(defn left-join-global
[kstream global-ktable kv-mapper joiner]
(p/left-join-global kstream global-ktable kv-mapper joiner))
(defn merge
[kstream other]
(p/merge kstream other))
(defn kstream*
"Returns the underlying KStream object."
[kstream]
(p/kstream* kstream))
;; IKTable
(defn join
"Combines the values of the two KTables that share the same key using an
inner join."
[ktable other-ktable value-joiner-fn]
(p/join ktable other-ktable value-joiner-fn))
(defn outer-join
"Combines the values of two KTables that share the same key using an outer
join."
[ktable other-ktable value-joiner-fn]
(p/outer-join ktable other-ktable value-joiner-fn))
(defn to-kstream
"Converts a KTable to a KStream."
([ktable]
(p/to-kstream ktable))
([ktable key-value-mapper-fn]
(p/to-kstream ktable key-value-mapper-fn)))
(defn suppress
"Suppress some updates from this changelog stream"
[ktable suppressed]
(p/suppress ktable suppressed))
(defn ktable*
"Returns the underlying KTable object."
[ktable]
(p/ktable* ktable))
;; IKGroupedBase
(defn aggregate
"Aggregates values by key into a new KTable."
([kgrouped initializer-fn adder-fn]
(p/aggregate kgrouped initializer-fn adder-fn))
([kgrouped initializer-fn aggregator-fn subtractor-fn-or-topic-config]
(p/aggregate kgrouped initializer-fn aggregator-fn subtractor-fn-or-topic-config))
([kgrouped initializer-fn adder-fn subtractor-or-merger-fn topic-config]
(p/aggregate kgrouped initializer-fn adder-fn subtractor-or-merger-fn topic-config)))
(defn count
"Counts the number of records by key into a new KTable."
([kgrouped]
(p/count kgrouped))
([kgrouped name]
(p/count kgrouped name)))
(defn reduce
"Combines values of a stream by key into a new KTable."
([kgrouped adder-fn subtractor-fn topic-config]
(p/reduce kgrouped adder-fn subtractor-fn topic-config))
([kgrouped reducer-fn subtractor-fn-or-topic-config]
(p/reduce kgrouped reducer-fn subtractor-fn-or-topic-config))
([kgrouped reducer-fn]
(p/reduce kgrouped reducer-fn)))
;; IKGroupedTable
(defn kgroupedtable*
"Returns the underlying KGroupedTable object."
[kgroupedtable]
(p/kgroupedtable* kgroupedtable))
;; IKGroupedStream
(defn window-by-time
"Windows the KStream"
([kgroupedstream window]
(p/windowed-by-time kgroupedstream window)))
(defn window-by-session
"Windows the KStream"
([kgroupedstream window]
(p/windowed-by-session kgroupedstream window)))
(defn kgroupedstream*
"Returns the underlying KGroupedStream object."
([kgroupedstream]
(p/kgroupedstream* kgroupedstream)))
;; IGlobalKTable
(defn global-ktable*
"Returns the underlying GlobalKTable"
[globalktable]
(p/global-ktable* globalktable))
(defn streams-builder
[]
(interop/streams-builder))
(defn kafka-streams
"Makes a Kafka Streams object."
([builder opts]
(let [props (java.util.Properties.)]
(.putAll props opts)
(KafkaStreams. ^Topology (.build (streams-builder* builder))
^java.util.Properties props))))
(defn start
"Starts processing."
[kafka-streams]
(.start ^KafkaStreams kafka-streams))
(defn close
"Stops the kafka streams."
[kafka-streams]
(.close ^KafkaStreams kafka-streams))
(defn state->keyword [^KafkaStreams$State state]
(-> state .name str/lower-case (str/replace #"_" "-") keyword))
(defn state [^KafkaStreams k-streams]
(-> k-streams .state state->keyword))