forked from zero-one-group/geni
/
dataset_creation.clj
107 lines (92 loc) · 3.61 KB
/
dataset_creation.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
(ns zero-one.geni.core.dataset-creation
(:require
[zero-one.geni.defaults]
[zero-one.geni.interop :as interop])
(:import
(org.apache.spark.sql.types ArrayType DataTypes)
(org.apache.spark.ml.linalg VectorUDT
DenseVector
SparseVector)))
(def default-spark zero-one.geni.defaults/spark)
(def data-type->spark-type
{:bool DataTypes/BooleanType
:boolean DataTypes/BooleanType
:byte DataTypes/ByteType
:date DataTypes/DateType
:double DataTypes/DoubleType
:float DataTypes/FloatType
:int DataTypes/IntegerType
:integer DataTypes/IntegerType
:long DataTypes/LongType
:nil DataTypes/NullType
:short DataTypes/ShortType
:string DataTypes/StringType
:timestamp DataTypes/TimestampType
:vector (VectorUDT.)
nil DataTypes/NullType})
(defn struct-field [col-name data-type nullable]
(let [spark-type (data-type->spark-type data-type)]
(DataTypes/createStructField (name col-name) spark-type nullable)))
(defn struct-type [& fields]
(DataTypes/createStructType fields))
(defn create-dataframe
([rows schema] (create-dataframe @default-spark rows schema))
([spark rows schema] (.createDataFrame spark rows schema)))
(def java-type->spark-type
{java.lang.Boolean DataTypes/BooleanType
java.lang.Byte DataTypes/ByteType
java.lang.Double DataTypes/DoubleType
java.lang.Float DataTypes/FloatType
java.lang.Integer DataTypes/IntegerType
java.lang.Long DataTypes/LongType
java.lang.Short DataTypes/ShortType
java.lang.String DataTypes/StringType
java.sql.Timestamp DataTypes/TimestampType
java.util.Date DataTypes/DateType
DenseVector (VectorUDT.)
SparseVector (VectorUDT.)
nil DataTypes/NullType})
(defn infer-spark-type [value]
(cond
(coll? value) (ArrayType. (infer-spark-type (first value)) true)
:else (get java-type->spark-type (type value) DataTypes/BinaryType)))
(defn infer-struct-field [col-name value]
(let [spark-type (infer-spark-type value)]
(DataTypes/createStructField col-name spark-type true)))
(defn infer-schema [col-names values]
(DataTypes/createStructType
(mapv infer-struct-field col-names values)))
(defn first-non-nil [values]
(first (clojure.core/filter identity values)))
(defn transpose [xs]
(apply map list xs))
(defn table->dataset
([table col-names] (table->dataset @default-spark table col-names))
([spark table col-names]
(let [col-names (map name col-names)
values (map first-non-nil (transpose table))
rows (interop/->java-list (map interop/->spark-row table))
schema (infer-schema col-names values)]
(.createDataFrame spark rows schema))))
(defn map->dataset
([map-of-values] (map->dataset @default-spark map-of-values))
([spark map-of-values]
(let [table (transpose (vals map-of-values))
col-names (keys map-of-values)]
(table->dataset spark table col-names))))
(defn conj-record [map-of-values record]
(let [col-names (keys map-of-values)]
(reduce
(fn [acc-map col-name]
(update acc-map col-name #(conj % (get record col-name))))
map-of-values
col-names)))
(defn records->dataset
([records] (records->dataset @default-spark records))
([spark records]
(let [col-names (-> (map keys records) flatten clojure.core/distinct)
map-of-values (reduce
conj-record
(zipmap col-names (repeat []))
records)]
(map->dataset spark map-of-values))))