-
Notifications
You must be signed in to change notification settings - Fork 6
/
thurber.clj
266 lines (217 loc) · 8.76 KB
/
thurber.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
(ns thurber
(:require [camel-snake-kebab.core :as csk]
[clojure.data.json :as json]
[clojure.string :as str]
[clojure.walk :as walk]
[taoensso.nippy :as nippy]
[clojure.tools.logging :as log])
(:import (org.apache.beam.sdk.transforms PTransform Create ParDo DoFn$ProcessContext Count SerializableFunction Combine SerializableBiFunction DoFn$OnTimerContext GroupByKey)
(java.util Map)
(thurber.java TDoFn TCoder TOptions TSerializableFunction TProxy TCombine TSerializableBiFunction TDoFn_Stateful)
(org.apache.beam.sdk.values PCollection KV PCollectionView TupleTag TupleTagList PCollectionTuple)
(org.apache.beam.sdk Pipeline)
(org.apache.beam.sdk.options PipelineOptionsFactory PipelineOptions)
(clojure.lang MapEntry)
(org.apache.beam.sdk.transforms.windowing BoundedWindow)
(org.apache.beam.sdk.coders KvCoder CustomCoder)
(java.io DataInputStream InputStream DataOutputStream OutputStream)
(org.apache.beam.sdk.state ValueState Timer)))
;; --
(def ^:private nippy-impl
(proxy [CustomCoder] []
(encode [val ^OutputStream out]
(nippy/freeze-to-out! (DataOutputStream. out) val))
(decode [^InputStream in]
(nippy/thaw-from-in! (DataInputStream. in)))))
(def nippy
(TCoder. #'nippy-impl))
(def nippy-kv (KvCoder/of nippy nippy))
;; nippy codes MapEntry as vectors by default; but we want them to stay
;; MapEntry after thaw:
(nippy/extend-freeze
MapEntry :thurber/map-entry
[val data-output]
(let [[k v] val]
(nippy/freeze-to-out! data-output [k v])))
(nippy/extend-thaw
:thurber/map-entry
[data-input]
(let [[k v] (nippy/thaw-from-in! data-input)]
(MapEntry/create k v)))
;; --
;; Clojure thread bindings are more expensive than needed for hot
;; code; ThreadLocals are faster, so we use them for thread bindings instead.
(def ^:private ^ThreadLocal tl-context (ThreadLocal.))
(def ^:private ^ThreadLocal tl-proxy-args (ThreadLocal.))
(defn ^PipelineOptions *pipeline-options [] (:pipeline-options (.get tl-context)))
(defn ^DoFn$ProcessContext *process-context [] (:process-context (.get tl-context)))
(defn ^BoundedWindow *element-window [] (:element-window (.get tl-context)))
(defn ^ValueState *value-state [] (:value-state (.get tl-context)))
(defn ^Timer *event-timer [] (:event-timer (.get tl-context)))
(defn ^DoFn$OnTimerContext *timer-context [] (:timer-context (.get tl-context)))
(defn ^"[Ljava.lang.Object;" *proxy-args [] (.get tl-proxy-args))
;; --
(defn proxy-with-signature* [proxy-var sig & args]
(TProxy/create proxy-var sig (into-array Object args)))
(defn proxy* [proxy-var & args]
(proxy-with-signature* proxy-var nil args))
;; --
(defn ->beam-args [m]
(map (fn [[k v]]
(format "--%s=%s"
(-> k csk/->camelCase name)
(cond
(map? v) (json/write-str v)
(coll? v) (str/join "," v)
:else (-> v str (str/escape {\" "\\\""}))))) m))
(defn ^PipelineOptions create-options
([]
(create-options [] TOptions))
([opts]
(create-options opts TOptions))
([opts as]
(-> (PipelineOptionsFactory/fromArgs
(cond
(map? opts) (into-array String (->beam-args opts))
(coll? opts) (into-array String opts)
:else opts))
(.as as))))
(defn ^Pipeline create-pipeline
([] (Pipeline/create))
([opts] (-> (if (instance? PipelineOptions opts)
opts (create-options opts))
(Pipeline/create))))
(defn get-custom-config [obj]
(if (instance? Pipeline obj)
(recur (.getOptions obj))
(->> (.getCustomConfig ^TOptions (.as obj TOptions))
(into {}) walk/keywordize-keys)))
;; --
(defn- var->name [v]
(or (:th/name (meta v)) (:name (meta v)))
(-> v meta :name name))
(defn ^PTransform partial*
[fn-var-or-name & args]
(if (string? fn-var-or-name)
{:th/name fn-var-or-name
:th/xform (first args)
:th/params (rest args)}
{:th/name (format "partial*:%s" (var->name fn-var-or-name))
:th/xform fn-var-or-name
:th/params args}))
(defn- filter-impl [pred-fn & args]
(when (apply pred-fn args)
(last args)))
(defn ^PTransform filter* [pred-var-or-name & args]
(if (string? pred-var-or-name)
{:th/name pred-var-or-name
:th/xform #'filter-impl
:th/params args}
{:th/name (format "filter*:%s" (var->name pred-var-or-name))
:th/xform #'filter-impl
:th/params (conj args pred-var-or-name)}))
(defn ^SerializableFunction simple* [fn-var & args]
(TSerializableFunction. fn-var args))
(defn ^SerializableBiFunction simple-bi* [fn-var & args]
(TSerializableBiFunction. fn-var args))
;; --
(defn- ^TCoder ->explicit-coder* [prev nxf]
(when-let [c (:th/coder nxf)]
(if (= c :th/inherit)
(.getCoder prev) c)))
(defn- ->pardo [xf params timer-params stateful? timer-fn]
(let [tags (into [] (filter #(instance? TupleTag %) params))
views (into [] (filter #(instance? PCollectionView %)) params)]
(cond-> (ParDo/of (if (or stateful? timer-fn)
(TDoFn_Stateful. xf timer-fn (object-array params) (object-array timer-params))
(TDoFn. xf (object-array params))))
(not-empty tags)
(.withOutputTags ^TupleTag (first tags)
(reduce (fn [^TupleTagList acc ^TupleTag tag]
(.and acc tag)) (TupleTagList/empty) (rest tags)))
(not-empty views)
(.withSideInputs
^Iterable (into [] (filter #(instance? PCollectionView %)) params)))))
(defn- set-coder! [pcoll-or-tuple coder]
(cond
(instance? PCollection pcoll-or-tuple) (.setCoder ^PCollection pcoll-or-tuple coder)
(instance? PCollectionTuple pcoll-or-tuple) (do
(->> ^PCollectionTuple pcoll-or-tuple
(.getAll)
(.values)
(run! #(.setCoder ^PCollection % coder)))
pcoll-or-tuple)))
(defn- ->normal-xf*
([xf] (->normal-xf* xf {}))
([xf override]
(cond
(instance? PTransform xf) (merge {:th/xform xf} override)
(map? xf) (->normal-xf* (:th/xform xf) (merge (dissoc xf :th/xform) override)) ;; note: maps may nest.
(var? xf) (let [normal (merge {:th/name (var->name xf) :th/coder nippy}
(select-keys (meta xf) [:th/name :th/coder :th/params :th/timer-params :th/stateful]) override)]
(assoc normal :th/xform (->pardo xf (:th/params normal) (:th/timer-params normal) (:th/stateful normal)
(:th/timer-fn normal)))))))
(defn ^PCollection apply!
"Apply transforms to an input (Pipeline, PCollection, PBegin ...)"
[input & xfs]
(reduce
(fn [acc xf]
(let [nxf (->normal-xf* xf)
;; Take care here. acc' may commonly be PCollection but can also be
;; PCollectionTuple or PCollectionView, eg.
acc' (if (:th/name nxf)
(.apply acc (:th/name nxf) (:th/xform nxf))
(.apply acc (:th/xform nxf)))
explicit-coder (->explicit-coder* acc nxf)]
(when explicit-coder
(set-coder! acc' explicit-coder)) acc')) input xfs))
(defn ^PTransform comp* [& [xf-or-name :as xfs]]
(proxy [PTransform] [(when (string? xf-or-name) xf-or-name)]
(expand [^PCollection pc]
(apply apply! pc (if (string? xf-or-name) (rest xfs) xfs)))))
;; --
(defn ^PTransform create [coll]
(if (map? coll)
(-> (Create/of ^Map coll) (.withCoder nippy))
(-> (Create/of ^Iterable (seq coll)) (.withCoder nippy))))
;; --
(defprotocol CombineFn
(create-accumulator [this])
(add-input [this acc input])
(merge-accumulators [this acc-coll])
(extract-output [this acc]))
(defmacro def-combiner [& body]
`(reify CombineFn
~@body))
(defn- combiner* [xf-var]
(let [xf (deref xf-var)]
(cond
(satisfies? CombineFn xf) (TCombine. xf-var)
(fn? xf) (simple-bi* xf-var))))
(defn combine-globally [xf-var]
{:th/name (var->name xf-var)
:th/xform (Combine/globally (combiner* xf-var))})
(defn combine-per-key [xf-var]
{:th/name (var->name xf-var)
:th/xform (Combine/perKey (combiner* xf-var))})
;; --
(defn ^{:th/coder nippy-kv} ->kv
([seg]
(KV/of seg seg))
([key-fn seg]
(KV/of (key-fn seg) seg))
([key-fn val-fn seg]
(KV/of (key-fn seg) (val-fn seg))))
;; --
(defn kv->clj [^KV kv]
(MapEntry/create (.getKey kv) (.getValue kv)))
;; --
(defn log-elem*
([elem] (log/logp :info elem))
([level elem] (log/logp level elem)))
;; --
(defn count-per-key
([] (count-per-key "count-per-key"))
([xf-name] (comp* xf-name
(Count/perKey)
#'kv->clj)))