forked from zero-one-group/geni
-
Notifications
You must be signed in to change notification settings - Fork 0
/
clojure_idioms.clj
131 lines (114 loc) · 4.75 KB
/
clojure_idioms.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
(ns zero-one.geni.core.clojure-idioms
(:refer-clojure :exclude [=
boolean
byte
case
cond
condp
dec
double
even?
float
if
inc
int
keys
long
merge
merge-with
neg?
odd?
pos?
rand-nth
remove
rename-keys
select-keys
short
str
vals
zero?
zipmap])
(:require
[zero-one.geni.core.column :as column]
[zero-one.geni.core.dataset :as dataset]
[zero-one.geni.core.polymorphic :as polymorphic]
[zero-one.geni.core.functions :as sql]
[zero-one.geni.interop :as interop])
(:import
(org.apache.spark.sql functions)))
;; Collections
(defn remove [dataframe expr]
(.filter dataframe (-> expr column/->column (.cast "boolean") functions/not)))
(defn rand-nth [dataframe]
(let [small-frac (min 1.0 (/ 10.0 (.count dataframe)))]
(-> dataframe (dataset/sample small-frac) (dataset/limit 1) dataset/head)))
;; Arithmetic
(defn inc [expr] (column/+ (column/->column expr) 1))
(defn dec [expr] (column/- (column/->column expr) 1))
;; Casting
(defn short [expr] (column/cast (column/->column expr) "short"))
(defn int [expr] (column/cast (column/->column expr) "int"))
(defn long [expr] (column/cast (column/->column expr) "long"))
(defn float [expr] (column/cast (column/->column expr) "float"))
(defn double [expr] (column/cast (column/->column expr) "double"))
(defn boolean [expr] (column/cast (column/->column expr) "boolean"))
(defn byte [expr] (column/cast (column/->column expr) "byte"))
(defn str [expr] (column/cast (column/->column expr) "string"))
;; Predicates
(defn = [l-expr r-expr] (column/=== (column/->column l-expr) (column/->column r-expr)))
(defn zero? [expr] (column/=== (column/->column expr) 0))
(defn pos? [expr] (column/< 0 (column/->column expr)))
(defn neg? [expr] (column/< (column/->column expr) 0))
(defn even? [expr] (column/=== (column/mod (column/->column expr) 2) 0))
(defn odd? [expr] (column/=== (column/mod (column/->column expr) 2) 1))
;; Map Operations
(def keys sql/map-keys)
(defn merge [expr & ms] (reduce sql/map-concat expr ms))
(def merge-with sql/map-zip-with)
(defn- rename-cols [k kmap]
(concat
(map
(fn [[old-k new-k]]
(sql/when (.equalTo (column/->column k) (column/->column old-k))
(column/->column new-k)))
kmap)
[(column/->column k)]))
(defn rename-keys [expr kmap]
(sql/transform-keys
expr
(fn [k _] (functions/coalesce (column/->col-array (rename-cols k kmap))))))
(defn select-keys [expr ks]
(sql/map-filter expr (fn [k _] (.isin k (interop/->scala-seq ks)))))
(def vals sql/map-values)
(def zipmap sql/map-from-arrays)
;; Common Macros
(def if sql/when)
(defn cond [& clauses]
(let [predicates (take-nth 2 clauses)
then-cols (take-nth 2 (rest clauses))
whenned-cols (map (fn [pred then]
;; clojure.core/if not available for some reason.
;; this is a workaround using a map lookup with a default.
({:else (column/->column then)} pred (sql/when pred then)))
predicates
then-cols)]
(apply polymorphic/coalesce whenned-cols)))
(defn condp [pred expr & clauses]
(let [default (when (clojure.core/odd? (count clauses))
(last clauses))
test-exprs (take-nth 2 clauses)
then-cols (take-nth 2 (rest clauses))
whenned-cols (map #(sql/when
(pred (column/->column %1)
(column/->column expr))
%2)
test-exprs
then-cols)]
(apply polymorphic/coalesce (concat whenned-cols [(column/->column default)]))))
(defn case [expr & clauses]
(let [default (when (clojure.core/odd? (count clauses))
(last clauses))
match-cols (take-nth 2 clauses)
then-cols (take-nth 2 (rest clauses))
whenned-cols (map #(sql/when (column/=== %1 expr) %2) match-cols then-cols)]
(apply polymorphic/coalesce (concat whenned-cols [(column/->column default)]))))