forked from zero-one-group/geni
/
functions.clj
349 lines (339 loc) · 16.1 KB
/
functions.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
(ns zero-one.geni.core.functions
(:refer-clojure :exclude [concat
flatten
hash
map
not
rand
reverse
second
sequence
struct
when])
(:require
[zero-one.geni.core.column :refer [->col-array ->column]]
[zero-one.geni.interop :as interop]
[zero-one.geni.utils :refer [->string-map]])
(:import
(org.apache.spark.sql Column functions)))
;;;; Agg Functions
(defn approx-count-distinct
([expr] (functions/approx_count_distinct (->column expr)))
([expr rsd] (functions/approx_count_distinct (->column expr) rsd)))
(defn count-distinct [& exprs]
(let [[head & tail] (->col-array exprs)]
(functions/countDistinct head (into-array Column tail))))
(defn grouping [expr] (functions/grouping (->column expr)))
(defn grouping-id [& exprs]
(functions/grouping_id (interop/->scala-seq (->col-array exprs))))
;;;; Collection Functions
(defn aggregate
([expr init merge-fn] (aggregate expr init merge-fn identity))
([expr init merge-fn finish-fn]
(functions/aggregate (->column expr)
(->column init)
(interop/->scala-function2 merge-fn)
(interop/->scala-function1 finish-fn))))
(defn array-contains [expr value]
(functions/array_contains (->column expr) value))
(defn array-distinct [expr]
(functions/array_distinct (->column expr)))
(defn array-except [left right]
(functions/array_except (->column left) (->column right)))
(defn array-intersect [left right]
(functions/array_intersect (->column left) (->column right)))
(defn array-join
([expr delimiter] (functions/array_join (->column expr) delimiter))
([expr delimiter null-replacement]
(functions/array_join (->column expr) delimiter null-replacement)))
(defn array-max [expr]
(functions/array_max (->column expr)))
(defn array-min [expr]
(functions/array_min (->column expr)))
(defn array-position [expr value]
(functions/array_position (->column expr) value))
(defn array-remove [expr element]
(functions/array_remove (->column expr) element))
(defn array-repeat [left right]
(if (nat-int? right)
(functions/array_repeat (->column left) right)
(functions/array_repeat (->column left) (->column right))))
(defn array-sort [expr]
(functions/array_sort (->column expr)))
(defn array-union [left right]
(functions/array_union (->column left) (->column right)))
(defn arrays-overlap [left right]
(functions/arrays_overlap (->column left) (->column right)))
(defn arrays-zip [& exprs]
(functions/arrays_zip (->col-array exprs)))
(defn collect-list [expr] (functions/collect_list (->column expr)))
(defn collect-set [expr] (functions/collect_set (->column expr)))
(defn concat [& exprs] (functions/concat (->col-array exprs)))
(defn exists [expr predicate]
(functions/exists (->column expr) (interop/->scala-function1 predicate)))
(defn explode [expr] (functions/explode (->column expr)))
(def explode-outer explode)
(defn element-at [expr value]
(functions/element_at (->column expr) (clojure.core/int value)))
(defn flatten [expr] (functions/flatten (->column expr)))
(defn forall [expr predicate]
(functions/forall (->column expr) (interop/->scala-function1 predicate)))
(defn from-csv
([expr schema] (functions/from_csv (->column expr) (->column schema) {}))
([expr schema options]
(functions/from_csv (->column expr) (->column schema) (->string-map options))))
(defn from-json
([expr schema] (functions/from_json (->column expr) (->column schema) {}))
([expr schema options]
(functions/from_json (->column expr) (->column schema) (->string-map options))))
(defn map-concat [& exprs] (functions/map_concat (->col-array exprs)))
(defn map-entries [expr] (functions/map_entries (->column expr)))
(defn map-filter [expr predicate]
(functions/map_filter (->column expr) (interop/->scala-function2 predicate)))
(defn map-from-entries [expr] (functions/map_from_entries (->column expr)))
(defn map-keys [expr] (functions/map_keys (->column expr)))
(defn map-values [expr] (functions/map_values (->column expr)))
(defn map-zip-with [left right merge-fn]
(functions/map_zip_with (->column left) (->column right) (interop/->scala-function3 merge-fn)))
(defn posexplode [expr] (functions/posexplode (->column expr)))
(def posexplode-outer posexplode)
(defn reverse [expr]
(functions/reverse (->column expr)))
(defn schema-of-csv
([expr] (functions/schema_of_csv (->column expr)))
([expr options] (functions/schema_of_csv (->column expr) (->string-map options))))
(defn schema-of-json
([expr] (functions/schema_of_json (->column expr)))
([expr options] (functions/schema_of_json (->column expr) (->string-map options))))
(defn sequence [start stop step]
(functions/sequence (->column start) (->column stop) (->column step)))
(defn size [expr]
(functions/size (->column expr)))
(defn slice [expr start length]
(functions/slice (->column expr) start length))
(defn sort-array
([expr] (functions/sort_array (->column expr)))
([expr asc] (functions/sort_array (->column expr) asc)))
(defn to-csv
([expr] (functions/to_csv (->column expr) {}))
([expr options]
(functions/to_csv (->column expr) (->string-map options))))
(defn transform [expr xform-fn]
(functions/transform (->column expr) (interop/->scala-function1 xform-fn)))
(defn transform-keys [expr key-fn]
(functions/transform_keys (->column expr) (interop/->scala-function2 key-fn)))
(defn transform-values [expr key-fn]
(functions/transform_values (->column expr) (interop/->scala-function2 key-fn)))
(defn zip-with [left right merge-fn]
(functions/zip_with (->column left)
(->column right)
(interop/->scala-function2 merge-fn)))
;;;; Date and Time Functions
(defn add-months [expr months]
(functions/add_months (->column expr) months))
(defn current-date [] (functions/current_date))
(defn current-timestamp [] (functions/current_timestamp))
(defn date-add [expr days]
(functions/date_add (->column expr) days))
(defn date-format [expr date-fmt]
(functions/date_format (->column expr) date-fmt))
(defn date-sub [expr days]
(functions/date_sub (->column expr) days))
(defn date-trunc [fmt expr]
(functions/date_trunc fmt (->column expr)))
(defn datediff [l-expr r-expr]
(functions/datediff (->column l-expr) (->column r-expr)))
(def date-diff datediff)
(defn day-of-month [expr] (functions/dayofmonth (->column expr)))
(defn day-of-week [expr] (functions/dayofweek (->column expr)))
(defn day-of-year [expr] (functions/dayofyear (->column expr)))
(defn from-unixtime
([expr] (functions/from_unixtime (->column expr)))
([expr fmt] (functions/from_unixtime (->column expr) fmt)))
(defn hour [expr] (functions/hour (->column expr)))
(defn last-day [expr] (functions/last_day (->column expr)))
(defn minute [expr] (functions/minute (->column expr)))
(defn month [expr] (functions/month (->column expr)))
(defn months-between [l-expr r-expr]
(functions/months_between (->column l-expr) (->column r-expr)))
(defn next-day [expr day-of-week]
(functions/next_day (->column expr) day-of-week))
(defn quarter [expr] (functions/quarter (->column expr)))
(defn second [expr] (functions/second (->column expr)))
(defn to-date
([expr] (functions/to_date (->column expr)))
([expr date-format] (functions/to_date (->column expr) date-format)))
(def ->date-col to-date)
(defn to-timestamp
([expr] (functions/to_timestamp (->column expr)))
([expr date-format] (functions/to_timestamp (->column expr) date-format)))
(def ->timestamp-col to-timestamp)
(defn to-utc-timestamp [expr]
(functions/to_timestamp (->column expr)))
(def ->utc-timestamp to-utc-timestamp)
(defn unix-timestamp
([] (functions/unix_timestamp))
([expr] (functions/unix_timestamp (->column expr)))
([expr pattern] (functions/unix_timestamp (->column expr) pattern)))
(defn time-window
([time-expr duration] (functions/window (->column time-expr) duration))
([time-expr duration slide] (functions/window (->column time-expr) duration slide))
([time-expr duration slide start] (functions/window (->column time-expr) duration slide start)))
(defn week-of-year [expr] (functions/weekofyear (->column expr)))
(defn year [expr] (functions/year (->column expr)))
;;;; Maths Functions
(def pi (functions/lit Math/PI))
(defn abs [expr] (functions/abs (->column expr)))
(defn acos [expr] (functions/acos (->column expr)))
(defn asin [expr] (functions/asin (->column expr)))
(defn atan [expr] (functions/atan (->column expr)))
(defn atan2 [expr-x expr-y] (functions/atan2 (->column expr-x) (->column expr-y)))
(defn bin [expr] (functions/bin (->column expr)))
(defn bround [expr] (functions/bround (->column expr)))
(defn cbrt [expr] (functions/cbrt (->column expr)))
(def cube-root cbrt)
(defn ceil [expr] (functions/ceil (->column expr)))
(defn conv [expr from-base to-base] (functions/conv (->column expr) from-base to-base))
(defn cos [expr] (functions/cos (->column expr)))
(defn cosh [expr] (functions/cosh (->column expr)))
(defn degrees [expr] (functions/degrees (->column expr)))
(defn exp [expr] (functions/exp (->column expr)))
(defn expm1 [expr] (functions/expm1 (->column expr)))
(defn factorial [expr] (functions/factorial (->column expr)))
(defn floor [expr] (functions/floor (->column expr)))
(defn hex [expr] (functions/hex (->column expr)))
(defn hypot [left-expr right-expr] (functions/hypot (->column left-expr) (->column right-expr)))
(defn log [expr] (functions/log (->column expr)))
(defn log10 [expr] (functions/log10 (->column expr)))
(defn log1p [expr] (functions/log1p (->column expr)))
(defn log2 [expr] (functions/log2 (->column expr)))
(defn pmod [left-expr right-expr] (functions/pmod (->column left-expr) (->column right-expr)))
(defn pow [base exponent] (functions/pow (->column base) (->column exponent)))
(def ** pow)
(defn radians [expr] (functions/radians (->column expr)))
(defn rint [expr] (functions/rint (->column expr)))
(defn round [expr] (functions/round (->column expr)))
(defn shift-left [expr num-bits] (functions/shiftLeft (->column expr) num-bits))
(defn shift-right [expr num-bits] (functions/shiftRight (->column expr) num-bits))
(defn shift-right-unsigned [expr num-bits] (functions/shiftRightUnsigned (->column expr) num-bits))
(defn signum [expr] (functions/signum (->column expr)))
(def sign signum)
(defn sin [expr] (functions/sin (->column expr)))
(defn sinh [expr] (functions/sinh (->column expr)))
(defn sqr [expr] (.multiply (->column expr) (->column expr)))
(defn sqrt [expr] (functions/sqrt (->column expr)))
(defn tan [expr] (functions/tan (->column expr)))
(defn tanh [expr] (functions/tanh (->column expr)))
(defn unhex [expr] (functions/unhex (->column expr)))
;;;; Misc Functions
(defn crc32 [expr] (functions/crc32 (->column expr)))
(defn hash [& exprs] (functions/hash (->col-array exprs)))
(defn md5 [expr] (functions/md5 (->column expr)))
(defn sha1 [expr] (functions/sha1 (->column expr)))
(defn sha2 [expr n-bits] (functions/sha2 (->column expr) n-bits))
(defn xxhash64 [& exprs] (functions/xxhash64 (->col-array exprs)))
;;;; Non-Agg Functions
(defn array [& exprs]
(functions/array (->col-array exprs)))
(defn bitwise-not [expr] (functions/bitwiseNOT (->column expr)))
(defn broadcast [dataframe] (functions/broadcast dataframe))
(defn expr [s] (functions/expr s))
(defn greatest [& exprs] (functions/greatest (->col-array exprs)))
(defn input-file-name [] (functions/input_file_name))
(defn least [& exprs] (functions/least (->col-array exprs)))
(defn map [& exprs] (functions/map (->col-array exprs)))
(defn map-from-arrays [key-expr val-expr]
(functions/map_from_arrays (->column key-expr) (->column val-expr)))
(defn monotonically-increasing-id [] (functions/monotonically_increasing_id))
(defn nanvl [left-expr right-expr] (functions/nanvl (->column left-expr) (->column right-expr)))
(defn negate [expr] (functions/negate (->column expr)))
(defn not [expr] (functions/not (->column expr)))
(def ! not)
(defn randn
([] (functions/randn))
([seed] (functions/randn seed)))
(defn rand
([] (functions/rand))
([seed] (functions/rand seed)))
(defn spark-partition-id [] (functions/spark-partition-id))
(defn struct [& exprs] (functions/struct (->col-array exprs)))
(defn when
([condition if-expr]
(functions/when (->column condition) (->column if-expr)))
([condition if-expr else-expr]
(-> (when condition if-expr) (.otherwise (->column else-expr)))))
;;;; Partition Transform Functions
;(defn bucket [num-buckets expr] (functions/bucket num-buckets (->column expr)))
;(defn days [expr] (functions/days (->column expr)))
;(defn hours [expr] (functions/hours (->column expr)))
;(defn months [expr] (functions/months (->column expr)))
;(defn years [expr] (functions/years (->column expr)))
;;;; String Functions
(defn ascii [expr] (functions/ascii (->column expr)))
(defn base64 [expr] (functions/base64 (->column expr)))
(defn concat-ws [sep & exprs] (functions/concat_ws sep (->col-array exprs)))
(defn decode [expr charset] (functions/decode (->column expr) charset))
(defn encode [expr charset] (functions/encode (->column expr) charset))
(defn format-number [expr decimal-places]
(functions/format_number (->column expr) decimal-places))
(defn format-string [fmt & exprs]
(functions/format_string fmt (->col-array exprs)))
(defn initcap [expr] (functions/initcap (->column expr)))
(defn instr [expr substr] (functions/instr (->column expr) substr))
(defn length [expr] (functions/length (->column expr)))
(defn levenshtein [left-expr right-expr]
(functions/levenshtein (->column left-expr) (->column right-expr)))
(defn locate [substr expr] (functions/locate substr (->column expr)))
(defn lower [expr] (functions/lower (->column expr)))
(defn lpad [expr length pad] (functions/lpad (->column expr) length pad))
(defn ltrim [expr] (functions/ltrim (->column expr)))
(defn overlay
([src rep pos] (functions/overlay (->column src) (->column rep) (->column pos)))
([src rep pos len] (functions/overlay (->column src) (->column rep) (->column pos) (->column len))))
(defn regexp-extract [expr regex idx]
(functions/regexp_extract (->column expr) regex idx))
(defn regexp-replace [expr pattern-expr replacement-expr]
(functions/regexp_replace
(->column expr)
(->column pattern-expr)
(->column replacement-expr)))
(defn rpad [expr length pad] (functions/rpad (->column expr) length pad))
(defn rtrim [expr] (functions/rtrim (->column expr)))
(defn soundex [expr] (functions/soundex (->column expr)))
(defn split [expr pattern] (functions/split (->column expr) pattern))
(defn substring [expr pos len] (functions/substring (->column expr) pos len))
(defn substring-index [expr delim cnt]
(functions/substring-index (->column expr) delim cnt))
(defn translate [expr match replacement]
(functions/translate (->column expr) match replacement))
(defn trim [expr trim-string] (functions/trim (->column expr) trim-string))
(defn unbase64 [expr] (functions/unbase64 (->column expr)))
(defn upper [expr] (functions/upper (->column expr)))
;;;; Window Functions
(defn cume-dist [] (functions/cume_dist))
(defn dense-rank [] (functions/dense_rank))
(defn lag
([expr offset] (functions/lag (->column expr) offset))
([expr offset default] (functions/lag (->column expr) offset default)))
(defn lead
([expr offset] (functions/lead (->column expr) offset))
([expr offset default] (functions/lead (->column expr) offset default)))
(defn ntile [n] (functions/ntile n))
(defn percent-rank [] (functions/percent_rank))
(defn rank [] (functions/rank))
(defn row-number [] (functions/row_number))
;;;; Stats Functions
(defn covar [l-expr r-expr]
(functions/covar_samp (->column l-expr) (->column r-expr)))
(def covar-samp covar)
(defn covar-pop [l-expr r-expr] (functions/covar_pop (->column l-expr) (->column r-expr)))
(defn kurtosis [expr] (functions/kurtosis (->column expr)))
(defn skewness [expr] (functions/skewness (->column expr)))
(defn stddev [expr] (functions/stddev (->column expr)))
(def stddev-samp stddev)
(def std stddev)
(defn stddev-pop [expr] (functions/stddev_pop (->column expr)))
(defn sum-distinct [expr] (functions/sumDistinct (->column expr)))
(defn var-pop [expr] (functions/var_pop (->column expr)))
(defn variance [expr] (functions/variance (->column expr)))
(def var-samp variance)