forked from zero-one-group/geni
-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark_context.clj
135 lines (111 loc) · 4.35 KB
/
spark_context.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
(ns zero-one.geni.spark-context
(:require
[zero-one.geni.defaults :as defaults]
[zero-one.geni.interop :as interop]
[zero-one.geni.rdd.unmangle :as unmangle])
(:import
(org.apache.spark.api.java JavaSparkContext)
(org.apache.spark.sql SparkSession)))
(defn java-spark-context [spark]
(JavaSparkContext/fromSparkContext (.sparkContext spark)))
(defn app-name
([] (app-name @defaults/spark))
([spark] (-> spark java-spark-context .appName)))
(defmulti binary-files (fn [head & _] (class head)))
(defmethod binary-files :default
([path] (binary-files @defaults/spark path))
([path num-partitions] (binary-files @defaults/spark path num-partitions)))
(defmethod binary-files SparkSession
([spark path] (.binaryFiles (java-spark-context spark) path))
([spark path num-partitions]
(.binaryFiles (java-spark-context spark) path num-partitions)))
(defn broadcast
([value] (broadcast @defaults/spark value))
([spark value] (-> spark java-spark-context (.broadcast value))))
(defn checkpoint-dir
([] (checkpoint-dir @defaults/spark))
([spark]
(-> spark java-spark-context .getCheckpointDir interop/optional->nillable)))
(defn conf
([] (conf @defaults/spark))
([spark] (-> spark java-spark-context .getConf interop/spark-conf->map)))
(defn default-min-partitions
([] (default-min-partitions @defaults/spark))
([spark] (-> spark java-spark-context .defaultMinPartitions)))
(defn default-parallelism
([] (default-parallelism @defaults/spark))
([spark] (-> spark java-spark-context .defaultParallelism)))
(defn empty-rdd
([] (empty-rdd @defaults/spark))
([spark] (-> spark java-spark-context .emptyRDD)))
(defn jars
([] (jars @defaults/spark))
([spark] (->> spark java-spark-context .jars (into []))))
(defn local?
([] (local? @defaults/spark))
([spark] (-> spark java-spark-context .isLocal)))
(def is-local local?)
(defn local-property
([k] (local-property @defaults/spark k))
([spark k] (-> spark java-spark-context (.getLocalProperty k))))
(defn master
([] (master @defaults/spark))
([spark] (-> spark java-spark-context .master)))
;; TODO: support min-partitions arg
(defn parallelise
([data] (parallelise @defaults/spark data))
([spark data] (-> spark
java-spark-context
(.parallelize data)
unmangle/unmangle-name)))
(def parallelize parallelise)
(defn parallelise-doubles
([data] (parallelise-doubles @defaults/spark data))
([spark data]
(-> spark
java-spark-context
(.parallelizeDoubles (clojure.core/map double data))
unmangle/unmangle-name)))
(def parallelize-doubles parallelise-doubles)
(defn parallelise-pairs
([data] (parallelise-pairs @defaults/spark data))
([spark data]
(-> spark
java-spark-context
(.parallelizePairs (clojure.core/map interop/->scala-tuple2 data))
unmangle/unmangle-name)))
(def parallelize-pairs parallelise-pairs)
(defn persistent-rdds
([] (persistent-rdds @defaults/spark))
([spark] (->> spark java-spark-context .getPersistentRDDs (into {}))))
(defn resources
([] (resources @defaults/spark))
([spark] (->> spark java-spark-context .resources (into {}))))
(defn spark-context
([] (spark-context @defaults/spark))
([spark] (-> spark java-spark-context .sc)))
(def sc spark-context)
(defn spark-home
([] (spark-home @defaults/spark))
([spark] (-> spark java-spark-context .getSparkHome interop/optional->nillable)))
(defmulti text-file (fn [head & _] (class head)))
(defmethod text-file :default
([path] (text-file @defaults/spark path))
([path min-partitions] (text-file @defaults/spark path min-partitions)))
(defmethod text-file SparkSession
([spark path] (-> spark java-spark-context (.textFile path)))
([spark path min-partitions] (-> spark java-spark-context (.textFile path min-partitions))))
(defn version
([] (version @defaults/spark))
([spark] (-> spark java-spark-context .version)))
(defmulti whole-text-files (fn [head & _] (class head)))
(defmethod whole-text-files :default
([path] (whole-text-files @defaults/spark path))
([path min-partitions] (whole-text-files @defaults/spark path min-partitions)))
(defmethod whole-text-files SparkSession
([spark path]
(.wholeTextFiles (java-spark-context spark) path))
([spark path min-partitions]
(.wholeTextFiles (java-spark-context spark) path min-partitions)))
;; Broadcast
(def value (memfn value))