Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 353 lines (308 sloc) 10.563 kb
89e5dce @richhickey added reducers
richhickey authored
1 ; Copyright (c) Rich Hickey. All rights reserved.
2 ; The use and distribution terms for this software are covered by the
3 ; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
4 ; which can be found in the file epl-v10.html at the root of this distribution.
5 ; By using this software in any fashion, you are agreeing to be bound by
6 ; the terms of this license.
7 ; You must not remove this notice, or any other, from this software.
8
9 (ns ^{:doc
10 "A library for reduction and parallel folding. Alpha and subject
fd96484 @richhickey doc fix for Java 7 support
richhickey authored
11 to change. Note that fold and its derivatives require Java 7+ or
12 Java 6 + jsr166y.jar for fork/join support. See Clojure's pom.xml for the
89e5dce @richhickey added reducers
richhickey authored
13 dependency info."
14 :author "Rich Hickey"}
15 clojure.core.reducers
16 (:refer-clojure :exclude [reduce map filter remove take take-while drop flatten])
17 (:require [clojure.walk :as walk]))
18
19 (alias 'core 'clojure.core)
20 (set! *warn-on-reflection* true)
21
22 ;;;;;;;;;;;;;; some fj stuff ;;;;;;;;;;
23
41ff918 @tsdh Compile-time dispatch usage of jsr166y vs. bundled FJ.
tsdh authored
24 (defmacro ^:private compile-if
25 "Evaluate `exp` and if it returns logical true and doesn't error, expand to
26 `then`. Else expand to `else`.
89e5dce @richhickey added reducers
richhickey authored
27
41ff918 @tsdh Compile-time dispatch usage of jsr166y vs. bundled FJ.
tsdh authored
28 (compile-if (Class/forName \"java.util.concurrent.ForkJoinTask\")
29 (do-cool-stuff-with-fork-join)
30 (fall-back-to-executor-services))"
31 [exp then else]
32 (if (try (eval exp)
33 (catch Throwable _ false))
34 `(do ~then)
35 `(do ~else)))
89e5dce @richhickey added reducers
richhickey authored
36
41ff918 @tsdh Compile-time dispatch usage of jsr166y vs. bundled FJ.
tsdh authored
37 (compile-if
38 (Class/forName "java.util.concurrent.ForkJoinTask")
39 ;; We're running a JDK 7+
40 (do
41 (def pool (delay (java.util.concurrent.ForkJoinPool.)))
89e5dce @richhickey added reducers
richhickey authored
42
41ff918 @tsdh Compile-time dispatch usage of jsr166y vs. bundled FJ.
tsdh authored
43 (defn fjtask [^Callable f]
44 (java.util.concurrent.ForkJoinTask/adapt f))
89e5dce @richhickey added reducers
richhickey authored
45
41ff918 @tsdh Compile-time dispatch usage of jsr166y vs. bundled FJ.
tsdh authored
46 (defn- fjinvoke [f]
47 (if (java.util.concurrent.ForkJoinTask/inForkJoinPool)
48 (f)
49 (.invoke ^java.util.concurrent.ForkJoinPool @pool ^java.util.concurrent.ForkJoinTask (fjtask f))))
50
51 (defn- fjfork [task] (.fork ^java.util.concurrent.ForkJoinTask task))
52
53 (defn- fjjoin [task] (.join ^java.util.concurrent.ForkJoinTask task)))
fd96484 @richhickey doc fix for Java 7 support
richhickey authored
54 ;; We're running a JDK <7
41ff918 @tsdh Compile-time dispatch usage of jsr166y vs. bundled FJ.
tsdh authored
55 (do
56 (def pool (delay (jsr166y.ForkJoinPool.)))
57
58 (defn fjtask [^Callable f]
59 (jsr166y.ForkJoinTask/adapt f))
60
61 (defn- fjinvoke [f]
62 (if (jsr166y.ForkJoinTask/inForkJoinPool)
63 (f)
64 (.invoke ^jsr166y.ForkJoinPool @pool ^jsr166y.ForkJoinTask (fjtask f))))
65
66 (defn- fjfork [task] (.fork ^jsr166y.ForkJoinTask task))
67
68 (defn- fjjoin [task] (.join ^jsr166y.ForkJoinTask task))))
89e5dce @richhickey added reducers
richhickey authored
69 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
70
71 (defn reduce
72 "Like core/reduce except:
73 When init is not provided, (f) is used.
74 Maps are reduced with reduce-kv"
75 ([f coll] (reduce f (f) coll))
76 ([f init coll]
77 (if (instance? java.util.Map coll)
78 (clojure.core.protocols/kv-reduce coll f init)
79 (clojure.core.protocols/coll-reduce coll f init))))
80
81 (defprotocol CollFold
82 (coll-fold [coll n combinef reducef]))
83
84 (defn fold
85 "Reduces a collection using a (potentially parallel) reduce-combine
86 strategy. The collection is partitioned into groups of approximately
87 n (default 512), each of which is reduced with reducef (with a seed
88 value obtained by calling (combinef) with no arguments). The results
89 of these reductions are then reduced with combinef (default
90 reducef). combinef must be associative, and, when called with no
91 arguments, (combinef) must produce its identity element. These
92 operations may be performed in parallel, but the results will
93 preserve order."
94 {:added "1.5"}
95 ([reducef coll] (fold reducef reducef coll))
96 ([combinef reducef coll] (fold 512 combinef reducef coll))
97 ([n combinef reducef coll]
98 (coll-fold coll n combinef reducef)))
99
100 (defn reducer
101 "Given a reducible collection, and a transformation function xf,
102 returns a reducible collection, where any supplied reducing
103 fn will be transformed by xf. xf is a function of reducing fn to
104 reducing fn."
105 {:added "1.5"}
106 ([coll xf]
107 (reify
108 clojure.core.protocols/CollReduce
109 (coll-reduce [this f1]
110 (clojure.core.protocols/coll-reduce this f1 (f1)))
111 (coll-reduce [_ f1 init]
112 (clojure.core.protocols/coll-reduce coll (xf f1) init)))))
113
114 (defn folder
115 "Given a foldable collection, and a transformation function xf,
116 returns a foldable collection, where any supplied reducing
117 fn will be transformed by xf. xf is a function of reducing fn to
118 reducing fn."
119 {:added "1.5"}
120 ([coll xf]
121 (reify
122 clojure.core.protocols/CollReduce
123 (coll-reduce [_ f1]
124 (clojure.core.protocols/coll-reduce coll (xf f1) (f1)))
125 (coll-reduce [_ f1 init]
126 (clojure.core.protocols/coll-reduce coll (xf f1) init))
127
128 CollFold
129 (coll-fold [_ n combinef reducef]
130 (coll-fold coll n combinef (xf reducef))))))
131
132 (defn- do-curried
133 [name doc meta args body]
134 (let [cargs (vec (butlast args))]
135 `(defn ~name ~doc ~meta
136 (~cargs (fn [x#] (~name ~@cargs x#)))
137 (~args ~@body))))
138
139 (defmacro ^:private defcurried
140 "Builds another arity of the fn that returns a fn awaiting the last
141 param"
142 [name doc meta args & body]
143 (do-curried name doc meta args body))
144
145 (defn- do-rfn [f1 k fkv]
146 `(fn
147 ([] (~f1))
148 ~(clojure.walk/postwalk
149 #(if (sequential? %)
150 ((if (vector? %) vec identity)
151 (core/remove #{k} %))
152 %)
153 fkv)
154 ~fkv))
155
156 (defmacro ^:private rfn
157 "Builds 3-arity reducing fn given names of wrapped fn and key, and k/v impl."
158 [[f1 k] fkv]
159 (do-rfn f1 k fkv))
160
161 (defcurried map
162 "Applies f to every value in the reduction of coll. Foldable."
163 {:added "1.5"}
164 [f coll]
165 (folder coll
166 (fn [f1]
167 (rfn [f1 k]
168 ([ret k v]
169 (f1 ret (f k v)))))))
170
171 (defcurried filter
172 "Retains values in the reduction of coll for which (pred val)
173 returns logical true. Foldable."
174 {:added "1.5"}
175 [pred coll]
176 (folder coll
177 (fn [f1]
178 (rfn [f1 k]
179 ([ret k v]
180 (if (pred k v)
181 (f1 ret k v)
182 ret))))))
183
184 (defcurried remove
185 "Removes values in the reduction of coll for which (pred val)
186 returns logical true. Foldable."
187 {:added "1.5"}
188 [pred coll]
189 (filter (complement pred) coll))
190
191 (defcurried take-while
192 "Ends the reduction of coll when (pred val) returns logical false."
193 {:added "1.5"}
194 [pred coll]
195 (reducer coll
196 (fn [f1]
197 (rfn [f1 k]
198 ([ret k v]
199 (if (pred k v)
200 (f1 ret k v)
201 (reduced ret)))))))
202
203 (defcurried take
204 "Ends the reduction of coll after consuming n values."
205 {:added "1.5"}
206 [n coll]
207 (reducer coll
208 (fn [f1]
209 (let [cnt (atom n)]
210 (rfn [f1 k]
211 ([ret k v]
212 (swap! cnt dec)
213 (if (neg? @cnt)
214 (reduced ret)
215 (f1 ret k v))))))))
216
217 (defcurried drop
218 "Elides the first n values from the reduction of coll."
219 {:added "1.5"}
220 [n coll]
221 (reducer coll
222 (fn [f1]
223 (let [cnt (atom n)]
224 (rfn [f1 k]
225 ([ret k v]
226 (swap! cnt dec)
227 (if (neg? @cnt)
228 (f1 ret k v)
229 ret)))))))
230
231 (defcurried flatten
232 "Takes any nested combination of sequential things (lists, vectors,
233 etc.) and returns their contents as a single, flat foldable
234 collection."
235 {:added "1.5"}
236 [coll]
237 (let [rf (fn [f1]
238 (fn
239 ([] (f1))
240 ([ret v]
241 (if (sequential? v)
242 (clojure.core.protocols/coll-reduce (flatten v) f1 ret)
243 (f1 ret v)))))]
244 (reify
245 clojure.core.protocols/CollReduce
246 (coll-reduce [this f1] (clojure.core.protocols/coll-reduce this f1 (f1)))
247 (coll-reduce [_ f1 init] (clojure.core.protocols/coll-reduce coll (rf f1) init))
41ff918 @tsdh Compile-time dispatch usage of jsr166y vs. bundled FJ.
tsdh authored
248
89e5dce @richhickey added reducers
richhickey authored
249 CollFold
250 (coll-fold [_ n combinef reducef] (coll-fold coll n combinef (rf reducef))))))
251
252 ;;do not construct this directly, use cat
253 (deftype Cat [cnt left right]
254 clojure.lang.Counted
255 (count [_] cnt)
256
257 clojure.lang.Seqable
258 (seq [_] (concat (seq left) (seq right)))
41ff918 @tsdh Compile-time dispatch usage of jsr166y vs. bundled FJ.
tsdh authored
259
89e5dce @richhickey added reducers
richhickey authored
260 clojure.core.protocols/CollReduce
261 (coll-reduce [this f1] (clojure.core.protocols/coll-reduce this f1 (f1)))
262 (coll-reduce
263 [_ f1 init]
264 (clojure.core.protocols/coll-reduce
265 right f1
266 (clojure.core.protocols/coll-reduce left f1 init)))
267
268 CollFold
269 (coll-fold
270 [_ n combinef reducef]
271 (fjinvoke
272 (fn []
273 (let [rt (fjfork (fjtask #(coll-fold right n combinef reducef)))]
274 (combinef
275 (coll-fold left n combinef reducef)
276 (fjjoin rt)))))))
277
278 (defn cat
279 "A high-performance combining fn that yields the catenation of the
280 reduced values. The result is reducible, foldable, seqable and
281 counted, providing the identity collections are reducible, seqable
282 and counted. The single argument version will build a combining fn
283 with the supplied identity constructor. Tests for identity
284 with (zero? (count x)). See also foldcat."
285 {:added "1.5"}
286 ([] (java.util.ArrayList.))
287 ([ctor]
288 (fn
289 ([] (ctor))
290 ([left right] (cat left right))))
291 ([left right]
292 (cond
293 (zero? (count left)) right
294 (zero? (count right)) left
295 :else
296 (Cat. (+ (count left) (count right)) left right))))
297
298 (defn append!
299 ".adds x to acc and returns acc"
300 {:added "1.5"}
301 [^java.util.Collection acc x]
302 (doto acc (.add x)))
303
304 (defn foldcat
305 "Equivalent to (fold cat append! coll)"
306 {:added "1.5"}
307 [coll]
308 (fold cat append! coll))
309
310 (defn monoid
311 "Builds a combining fn out of the supplied operator and identity
312 constructor. op must be associative and ctor called with no args
313 must return an identity value for it."
314 {:added "1.5"}
315 [op ctor]
316 (fn m
317 ([] (ctor))
318 ([a b] (op a b))))
319
320 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; fold impls ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
321 (defn- foldvec
322 [v n combinef reducef]
323 (cond
324 (empty? v) (combinef)
325 (<= (count v) n) (reduce reducef (combinef) v)
326 :else
327 (let [split (quot (count v) 2)
328 v1 (subvec v 0 split)
329 v2 (subvec v split (count v))
330 fc (fn [child] #(foldvec child n combinef reducef))]
331 (fjinvoke
332 #(let [f1 (fc v1)
333 t2 (fjtask (fc v2))]
334 (fjfork t2)
335 (combinef (f1) (fjjoin t2)))))))
336
337 (extend-protocol CollFold
338 Object
339 (coll-fold
340 [coll n combinef reducef]
341 ;;can't fold, single reduce
342 (reduce reducef (combinef) coll))
41ff918 @tsdh Compile-time dispatch usage of jsr166y vs. bundled FJ.
tsdh authored
343
89e5dce @richhickey added reducers
richhickey authored
344 clojure.lang.IPersistentVector
345 (coll-fold
346 [v n combinef reducef]
347 (foldvec v n combinef reducef))
348
349 clojure.lang.PersistentHashMap
350 (coll-fold
351 [m n combinef reducef]
352 (.fold m n combinef reducef fjinvoke fjtask fjfork fjjoin)))
Something went wrong with that request. Please try again.