/
protocols.clj
259 lines (207 loc) · 8.41 KB
/
protocols.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
(ns jackdaw.streams.protocols
"Kafka streams protocols."
{:license "BSD 3-Clause License <https://github.com/FundingCircle/jackdaw/blob/master/LICENSE>"}
(:refer-clojure :exclude [count map merge reduce group-by filter peek])
(:import org.apache.kafka.streams.KafkaStreams
org.apache.kafka.streams.StreamsBuilder))
(defprotocol IStreamsBuilder
(kstream
[topology-builder topic-config]
[topology-builder topic-config topic-pattern]
"Creates a KStream that will consume messages from the specified topic.")
(kstreams
[topology-builder topic-configs]
"Creates a KStream that will consume messages from the specified topics.")
(ktable
[topology-builder topic-config]
[topology-builder topic-config store-name]
"Creates a KTable that will consist of data from the specified topic.")
(global-ktable
[topology-builder topic-config]
"Creates a GlobalKTable that will consist of data from the specified
topic.")
(source-topics
[topology-builder]
"Gets the names of source topics for the topology.")
(streams-builder*
[streams-builder]
"Returns the underlying KStreamBuilder."))
(defprotocol IKStreamBase
"Methods common to KStream and KTable."
(left-join
[kstream-or-ktable ktable value-joiner-fn]
[kstream-or-ktable ktable value-joiner-fn this-topic-config other-topic-config]
"Creates a KStream from the result of calling `value-joiner-fn` with
each element in the KStream and the value in the KTable with the same
key.")
(filter
[kstream-or-ktable predicate-fn]
"Creates a KStream that consists of all elements that satisfy a
predicate.")
(filter-not
[kstream-or-ktable predicate-fn]
"Creates a KStream that consists of all elements that do not satisfy a
predicate.")
(group-by
[ktable-or-ktable key-value-mapper-fn]
[ktable-or-ktable key-value-mapper-fn topic-config]
"Groups the records of this KStream/KTable using the key-value-mapper-fn.")
(peek
[kstream-or-ktable peek-fn]
"Performs `peek-fn` on each element of the input stream.")
(map-values
[kstream-or-ktable value-mapper-fn]
"Creates a KStream that is the result of calling `value-mapper-fn` on each
element of the input stream.")
(write-as-text!
[kstream-or-ktable file-path]
[kstream-or-ktable file-path topic-config]
"Writes the elements of a stream to a file at the given path."))
(defprotocol IKStream
"A KStream is an abstraction of a stream of key-value pairs."
(branch
[kstream predicate-fns]
"Returns a list of KStreams, one for each of the `predicate-fns`
provided.")
(flat-map
[kstream key-value-mapper-fn]
"Creates a KStream that will consist of the concatenation of messages
returned by calling `key-value-mapper-fn` on each key/value pair in the
input stream.")
(flat-map-values
[kstream value-mapper-fn]
"Creates a KStream that will consist of the concatenation of the values
returned by calling `value-mapper-fn` on each value in the input stream.")
(for-each!
[kstream foreach-fn]
"Performs an action on each element of KStream.")
(print!
[kstream]
"Prints the elements of the stream to *out*.")
(through
[kstream topic-config]
"Materializes a stream to a topic, and returns a new KStream that will
consume messages from the topic. Messages in the new topic will be partitioned
based on the output of the optional partition function that represents StreamPartitioner class")
(to!
[kstream topic-config]
"Materializes a stream to a topic.")
(group-by-key
[kstream]
[kstream topic-config]
"Groups records with the same key into a KGroupedStream.")
(join-windowed
[kstream other-kstream value-joiner-fn windows]
[kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
"Combines the values of two streams that share the same key using a
windowed inner join.")
(left-join-windowed
[kstream other-kstream value-joiner-fn windows]
[kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
"Combines the values of two streams that share the same key using a
windowed left join.")
(map
[kstream key-value-mapper-fn]
"Creates a KStream that consists of the result of applying
`key-value-mapper-fn` to each key/value pair in the input stream.")
(merge
[kstream other]
"Creates a KStream that has the records from both streams.")
(outer-join-windowed
[kstream other-kstream value-joiner-fn windows]
[kstream other-kstream value-joiner-fn windows this-topic-config other-topic-config]
"Combines the values of two streams that share the same key using a
windowed outer join.")
(process!
[kstream processor-fn state-store-names]
"Applies `processor-fn` to each item in the input stream.")
(select-key
[kstream select-key-value-mapper-fn]
"Create a new key from the current key and value.
`select-key-value-mapper-fn` should be a function that takes a key-value
pair, and returns the value of the new key. Here is example multiplies each
key by 10:
```(fn [[k v]] (* 10 k))```")
(transform
[kstream transformer-supplier-fn]
[kstream transformer-supplier-fn state-store-names]
"Creates a KStream that consists of the results of applying the transformer
to each key/value in the input stream.")
(transform-values
[kstream value-transformer-supplier-fn]
[kstream value-transformer-supplier-fn state-store-names]
"Creates a KStream that consists of the results of applying the transformer
to each value in the input stream.")
(join-global
[kstream global-ktable kv-mapper joiner])
(left-join-global
[kstream global-ktable kv-mapper joiner])
(kstream*
[kstream]
"Returns the underlying KStream object."))
(defprotocol IKTable
"A Ktable is an abstraction of a changlog stream."
(join
[ktable other-ktable value-joiner-fn]
"Combines the values of the two KTables that share the same key using an
inner join.")
(outer-join
[ktable other-ktable value-joiner-fn]
"Combines the values of two KTables that share the same key using an outer
join.")
(to-kstream
[ktable]
[ktable key-value-mapper-fn]
"Converts a KTable to a KStream.")
(suppress
[ktable {:keys [max-records max-bytes until-time-limit-ms]}]
"Suppress some updates from this changelog stream.
You can either specify `max-records` or `max-bytes`. If an empty map is
passed, the suppress will be unbounded. If `until-time-limit-ms` is set,
this will override the `TimeWindow` interval. Note that when relying on the
configured `TimeWindow` the default `grace` period is `24h - window-size`.")
(ktable*
[ktable]
"Returns the underlying KTable object."))
(defprotocol IKGroupedBase
"Methods shared between `IKGroupedTable` and `IKGroupedStream`."
(aggregate
[kgrouped initializer-fn adder-fn subtractor-or-merger-fn topic-config]
[kgrouped initializer-fn aggregator-fn subtractor-fn-or-topic-config]
[kgrouped initializer-fn aggregator-fn]
"Aggregates values by key into a new KTable.")
(count
[kgrouped]
[kgrouped topic-config]
"Counts the number of records by key into a new KTable.")
(reduce
[kgrouped adder-fn subtractor-fn topic-config]
[kgrouped reducer-fn subtractor-fn-or-topic-config]
[kgrouped reducer-fn]
"Combines values of a stream by key into a new KTable."))
(defprotocol IKGroupedTable
"KGroupedTable is an abstraction of a grouped changelog stream."
(kgroupedtable*
[kgroupedtable]
"Returns the underlying KGroupedTable object."))
(defprotocol IKGroupedStream
"KGroupedStream is an abstraction of a grouped stream."
(windowed-by-time [kgroupedstream window])
(windowed-by-session [kgroupedstream window])
(kgroupedstream*
[kgroupedstream]
"Returns the underlying KGroupedStream object."))
(defprotocol ITimeWindowedKStream
"ITimeWindowedKStream is an abstraction of a time windowed stream."
(time-windowed-kstream*
[ktime-windowed-kstream]
"Returns the underlying TimeWindowedKStream object."))
(defprotocol ISessionWindowedKStream
"ISessionWindowedKStream is an abstraction of a session windowed stream."
(session-windowed-kstream*
[ksession-windowed-kstream]
"Returns the underlying SessionWindowedKStream object."))
(defprotocol IGlobalKTable
(global-ktable*
[globalktable]
"Returns the underlying GlobalKTable"))