/
lambdas.clj
184 lines (150 loc) · 5.4 KB
/
lambdas.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
(ns jackdaw.streams.lambdas
"Wrappers for the Java 'lambda' functions."
{:license "BSD 3-Clause License <https://github.com/FundingCircle/jackdaw/blob/master/LICENSE>"}
(:import org.apache.kafka.streams.KeyValue
[org.apache.kafka.streams.kstream
Aggregator ForeachAction Initializer KeyValueMapper
Merger Predicate Reducer TransformerSupplier ValueJoiner
ValueMapper ValueTransformerSupplier]
[org.apache.kafka.streams.processor
Processor ProcessorSupplier StreamPartitioner]))
(defn key-value
"A key-value pair defined for a single Kafka Streams record."
[[key value]]
(KeyValue. key value))
(deftype FnAggregator [aggregator-fn]
Aggregator
(apply [this agg-key value aggregate]
(aggregator-fn aggregate [agg-key value])))
(defn aggregator
"Packages up a Clojure fn in a kstream aggregator."
^Aggregator [aggregator-fn]
(FnAggregator. aggregator-fn))
(deftype FnForeachAction [foreach-action-fn]
ForeachAction
(apply [this key value]
(foreach-action-fn [key value])
nil))
(defn foreach-action
"Packages up a Clojure fn in a kstream ForeachAction."
[foreach-action-fn]
(FnForeachAction. foreach-action-fn))
(deftype FnInitializer [initializer-fn]
Initializer
(apply [this]
(initializer-fn)))
(defn initializer
"Packages up a Clojure fn in a kstream Initializer."
^Initializer [initializer-fn]
(FnInitializer. initializer-fn))
(deftype FnKeyValueMapper [key-value-mapper-fn]
KeyValueMapper
(apply [this key value]
(key-value (key-value-mapper-fn [key value]))))
(defn key-value-mapper
"Packages up a Clojure fn in a kstream key value mapper."
[key-value-mapper-fn]
(FnKeyValueMapper. key-value-mapper-fn))
(deftype FnSelectKeyValueMapper [select-key-value-mapper-fn]
KeyValueMapper
(apply [this key value]
(select-key-value-mapper-fn [key value])))
(defn select-key-value-mapper
"Packages up a Clojure fn in a kstream key value mapper for use with
`select-key`."
[select-key-value-mapper-fn]
(FnSelectKeyValueMapper. select-key-value-mapper-fn))
(deftype FnKeyValueFlatMapper [key-value-flatmapper-fn]
KeyValueMapper
(apply [this key value]
(mapv key-value (key-value-flatmapper-fn [key value]))))
(defn key-value-flatmapper
"Packages up a Clojure fn in a kstream key value mapper for use with .flatMap.
`key-value-flatmapper-fn` should be a function that takes a `[key value]` as a
single parameter, and returns a list of `[key value]`."
[key-value-flatmapper-fn]
(FnKeyValueFlatMapper. key-value-flatmapper-fn))
(deftype FnMerger [merger-fn]
Merger
(apply [this agg-key aggregate1 aggregate2]
(merger-fn agg-key aggregate1 aggregate2)))
(defn merger
"Packages up a Clojure fn in a kstream merger (merges together two SessionWindows aggregate values)."
^Merger [merger-fn]
(FnMerger. merger-fn))
(deftype FnPredicate [predicate-fn]
Predicate
(test [this key value]
(boolean (predicate-fn [key value]))))
(defn predicate
"Packages up a Clojure fn in a kstream predicate."
[predicate-fn]
(FnPredicate. predicate-fn))
(deftype FnReducer [reducer-fn]
Reducer
(apply [this value1 value2]
(reducer-fn value1 value2)))
(defn reducer
"Packages up a Clojure fn in a kstream reducer."
^Reducer [reducer-fn]
(FnReducer. reducer-fn))
(deftype FnValueJoiner [value-joiner-fn]
ValueJoiner
(apply [this value1 value2]
(value-joiner-fn value1 value2)))
(defn value-joiner
"Packages up a Clojure fn in a kstream value joiner."
[value-joiner-fn]
(FnValueJoiner. value-joiner-fn))
(deftype FnValueMapper [value-mapper-fn]
ValueMapper
(apply [this value]
(value-mapper-fn value)))
(defn value-mapper
"Packages up a Clojure fn in a kstream value mapper."
[value-mapper-fn]
(FnValueMapper. value-mapper-fn))
(deftype FnStreamPartitioner [stream-partitioner-fn]
StreamPartitioner
(partition [this topic-name key val partition-count]
(stream-partitioner-fn topic-name key val partition-count)))
(defn stream-partitioner
"Packages up a Clojure fn in a kstream partitioner."
[stream-partitioner-fn]
(when stream-partitioner-fn
(FnStreamPartitioner. stream-partitioner-fn)))
(deftype FnProcessor [context processor-fn]
Processor
(close [_])
(init [_ processor-context]
(reset! context processor-context))
(process [_ key message]
(processor-fn @context key message)))
(defn processor [processor-fn]
"Packages up a Clojure fn as a kstream processor."
(FnProcessor. (atom nil) processor-fn))
(deftype FnProcessorSupplier [processor-supplier-fn]
ProcessorSupplier
(get [this]
processor-supplier-fn))
(defn processor-supplier
"Packages up a Clojure fn in a kstream processor supplier."
[processor-fn]
(let [fn-processor (processor processor-fn)]
(FnProcessorSupplier. fn-processor)))
(deftype FnTransformerSupplier [transformer-supplier-fn]
TransformerSupplier
(get [this]
(transformer-supplier-fn)))
(defn transformer-supplier
"Packages up a Clojure fn in a kstream transformer supplier."
[transformer-supplier-fn]
(FnTransformerSupplier. transformer-supplier-fn))
(deftype FnValueTransformerSupplier [value-transformer-supplier-fn]
ValueTransformerSupplier
(get [this]
(value-transformer-supplier-fn)))
(defn value-transformer-supplier
"Packages up a Clojure fn in a kstream value transformer supplier."
[value-transformer-supplier-fn]
(FnValueTransformerSupplier. value-transformer-supplier-fn))