/
accumulator.clj
134 lines (99 loc) · 3.35 KB
/
accumulator.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
(ns sparkplug.accumulator
"Functions for working with Accumulator objects which can aggregate values
across executors."
(:refer-clojure :exclude [count empty? name reset!])
(:require
[sparkplug.scala :as scala])
(:import
org.apache.spark.api.java.JavaSparkContext
(org.apache.spark.util
AccumulatorV2
CollectionAccumulator
DoubleAccumulator
LongAccumulator)))
;; ## Constructors
(defn long-accumulator
"Create and register a long accumulator, which starts with 0 and accumulates
inputs by summing them."
([^JavaSparkContext spark-context]
(.longAccumulator (.sc spark-context)))
([^JavaSparkContext spark-context acc-name]
(.longAccumulator (.sc spark-context) acc-name)))
(defn double-accumulator
"Create and register a double accumulator, which starts with 0.0 and
accumulates inputs by summing them."
([^JavaSparkContext spark-context]
(.doubleAccumulator (.sc spark-context)))
([^JavaSparkContext spark-context acc-name]
(.doubleAccumulator (.sc spark-context) acc-name)))
(defn collection-accumulator
"Create and register a collection accumulator, which starts with empty list
and accumulates inputs by adding them into the list."
([^JavaSparkContext spark-context]
(.collectionAccumulator (.sc spark-context)))
([^JavaSparkContext spark-context acc-name]
(.collectionAccumulator (.sc spark-context) acc-name)))
;; ## Accumulator Methods
(defn name
"Return the name of the accumulator, if any."
[^AccumulatorV2 acc]
(scala/resolve-option (.name acc)))
(defn value
"Return the current value of the accumulator. This can only be called by the
driver."
[^AccumulatorV2 acc]
(.value acc))
(defn empty?
"True if the accumulator has not had any values added to it."
[^AccumulatorV2 acc]
(.isZero acc))
(defn add!
"Add an element to the accumulated value."
[^AccumulatorV2 acc v]
(.add acc v))
(defn merge!
"Merge an accumulator `b` into `a`. Both accumulators must have the same
type."
[^AccumulatorV2 a ^AccumulatorV2 b]
(.merge a b))
(defn reset!
"Reset the accumulator to its empty or zero value."
[^AccumulatorV2 acc]
(.reset acc))
;; ## Numeric Accumulators
(defn count
"Return the number of values added to the accumulator. The accumulator must
hold either long or double values."
[acc]
(condp instance? acc
LongAccumulator
(.count ^LongAccumulator acc)
DoubleAccumulator
(.count ^DoubleAccumulator acc)
(throw (IllegalArgumentException.
(str "Cannot call count on accumulator type "
(class acc))))))
(defn sum
"Return the sum of all the values added to the accumulator. The accumulator
must hold either long or double values."
[acc]
(condp instance? acc
LongAccumulator
(.sum ^LongAccumulator acc)
DoubleAccumulator
(.sum ^DoubleAccumulator acc)
(throw (IllegalArgumentException.
(str "Cannot call sum on accumulator type "
(class acc))))))
(defn avg
"Return the average of all the values added to the accumulator. The
accumulator must hold either long or double values."
[acc]
(condp instance? acc
LongAccumulator
(.avg ^LongAccumulator acc)
DoubleAccumulator
(.avg ^DoubleAccumulator acc)
(throw (IllegalArgumentException.
(str "Cannot call avg on accumulator type "
(class acc))))))