Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: clojure/clojure
...
head fork: clojure/clojure
  • 4 commits
  • 9 files changed
  • 0 commit comments
  • 2 contributors
Commits on May 05, 2012
@richhickey richhickey better vector iterators f8856dc
Commits on May 07, 2012
@richhickey richhickey added parallel fold 4876c14
@stuarthalloway stuarthalloway build-time depedency for jsr166
Signed-off-by: Rich Hickey <richhickey@gmail.com>
ce2d1d3
Commits on May 08, 2012
@richhickey richhickey added reducers 89e5dce
View
2  .gitignore
@@ -3,3 +3,5 @@ target
clojure.iws
clojure.ipr
nbproject/private/
+maven-classpath
+maven-classpath.properties
View
7 antsetup.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+mvn -q dependency:build-classpath -Dmdep.outputFile=maven-classpath
+cat <<EOF >maven-classpath.properties
+maven.compile.classpath=`cat maven-classpath`
+EOF
+echo "Wrote maven-classpath.properties for standalone ant use"
View
4 build.xml
@@ -17,6 +17,7 @@
<property name="build" location="${target}/classes"/>
<property name="test-classes" location="${target}/test-classes"/>
<property name="dist" location="dist"/>
+ <property file="maven-classpath.properties"/>
<!-- Get the version string out of the POM -->
<xmlproperty file="pom.xml" prefix="pom"/>
@@ -43,7 +44,7 @@
<target name="compile-clojure"
description="Compile Clojure sources.">
<java classname="clojure.lang.Compile"
- classpath="${build}:${cljsrc}"
+ classpath="${maven.compile.classpath}:${build}:${cljsrc}"
failonerror="true"
fork="true">
<sysproperty key="clojure.compile.path" value="${build}"/>
@@ -52,6 +53,7 @@
<!-- <sysproperty key="clojure.compile.warn-on-reflection" value="true"/> -->
<arg value="clojure.core"/>
<arg value="clojure.core.protocols"/>
+ <arg value="clojure.core.reducers"/>
<arg value="clojure.main"/>
<arg value="clojure.set"/>
<arg value="clojure.xml"/>
View
12 pom.xml
@@ -38,6 +38,15 @@
<url>git@github.com:clojure/clojure.git</url>
</scm>
+ <dependencies>
+ <dependency>
+ <groupId>org.codehaus.jsr166-mirror</groupId>
+ <artifactId>jsr166y</artifactId>
+ <version>1.7.0</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
<build>
<resources>
<resource>
@@ -72,7 +81,8 @@
</goals>
<configuration>
<target>
- <ant target="compile-clojure" />
+ <property name="maven.compile.classpath" refid="maven.compile.classpath"/>
+ <ant target="compile-clojure"/>
</target>
</configuration>
</execution>
View
4 readme.txt
@@ -13,8 +13,10 @@ Getting Started: http://dev.clojure.org/display/doc/Getting+Started
To run: java -cp clojure-${VERSION}.jar clojure.main
-To build locally with Ant: ant
+To build locally with Ant:
+ One-time setup: ./antsetup.sh
+ To build: ant
Maven 2 build instructions:
View
322 src/clj/clojure/core/reducers.clj
@@ -0,0 +1,322 @@
+; Copyright (c) Rich Hickey. All rights reserved.
+; The use and distribution terms for this software are covered by the
+; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+; which can be found in the file epl-v10.html at the root of this distribution.
+; By using this software in any fashion, you are agreeing to be bound by
+; the terms of this license.
+; You must not remove this notice, or any other, from this software.
+
+(ns ^{:doc
+ "A library for reduction and parallel folding. Alpha and subject
+ to change. Note that fold and its derivatives require
+ jsr166y.jar for fork/join support. See Clojure's pom.xml for the
+ dependency info."
+ :author "Rich Hickey"}
+ clojure.core.reducers
+ (:refer-clojure :exclude [reduce map filter remove take take-while drop flatten])
+ (:require [clojure.walk :as walk]))
+
+(alias 'core 'clojure.core)
+(set! *warn-on-reflection* true)
+
+;;;;;;;;;;;;;; some fj stuff ;;;;;;;;;;
+;;todo - dynamic java 7+ detection and use
+
+(def pool (delay (jsr166y.ForkJoinPool.)))
+
+(defn fjtask [^Callable f]
+ (jsr166y.ForkJoinTask/adapt f))
+
+(defn- fjinvoke [f]
+ (if (jsr166y.ForkJoinTask/inForkJoinPool)
+ (f)
+ (.invoke ^jsr166y.ForkJoinPool @pool ^jsr166y.ForkJoinTask (fjtask f))))
+
+(defn- fjfork [task] (.fork ^jsr166y.ForkJoinTask task))
+
+(defn- fjjoin [task] (.join ^jsr166y.ForkJoinTask task))
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+
+(defn reduce
+ "Like core/reduce except:
+ When init is not provided, (f) is used.
+ Maps are reduced with reduce-kv"
+ ([f coll] (reduce f (f) coll))
+ ([f init coll]
+ (if (instance? java.util.Map coll)
+ (clojure.core.protocols/kv-reduce coll f init)
+ (clojure.core.protocols/coll-reduce coll f init))))
+
+(defprotocol CollFold
+ (coll-fold [coll n combinef reducef]))
+
+(defn fold
+ "Reduces a collection using a (potentially parallel) reduce-combine
+ strategy. The collection is partitioned into groups of approximately
+ n (default 512), each of which is reduced with reducef (with a seed
+ value obtained by calling (combinef) with no arguments). The results
+ of these reductions are then reduced with combinef (default
+ reducef). combinef must be associative, and, when called with no
+ arguments, (combinef) must produce its identity element. These
+ operations may be performed in parallel, but the results will
+ preserve order."
+ {:added "1.5"}
+ ([reducef coll] (fold reducef reducef coll))
+ ([combinef reducef coll] (fold 512 combinef reducef coll))
+ ([n combinef reducef coll]
+ (coll-fold coll n combinef reducef)))
+
+(defn reducer
+ "Given a reducible collection, and a transformation function xf,
+ returns a reducible collection, where any supplied reducing
+ fn will be transformed by xf. xf is a function of reducing fn to
+ reducing fn."
+ {:added "1.5"}
+ ([coll xf]
+ (reify
+ clojure.core.protocols/CollReduce
+ (coll-reduce [this f1]
+ (clojure.core.protocols/coll-reduce this f1 (f1)))
+ (coll-reduce [_ f1 init]
+ (clojure.core.protocols/coll-reduce coll (xf f1) init)))))
+
+(defn folder
+ "Given a foldable collection, and a transformation function xf,
+ returns a foldable collection, where any supplied reducing
+ fn will be transformed by xf. xf is a function of reducing fn to
+ reducing fn."
+ {:added "1.5"}
+ ([coll xf]
+ (reify
+ clojure.core.protocols/CollReduce
+ (coll-reduce [_ f1]
+ (clojure.core.protocols/coll-reduce coll (xf f1) (f1)))
+ (coll-reduce [_ f1 init]
+ (clojure.core.protocols/coll-reduce coll (xf f1) init))
+
+ CollFold
+ (coll-fold [_ n combinef reducef]
+ (coll-fold coll n combinef (xf reducef))))))
+
+(defn- do-curried
+ [name doc meta args body]
+ (let [cargs (vec (butlast args))]
+ `(defn ~name ~doc ~meta
+ (~cargs (fn [x#] (~name ~@cargs x#)))
+ (~args ~@body))))
+
+(defmacro ^:private defcurried
+ "Builds another arity of the fn that returns a fn awaiting the last
+ param"
+ [name doc meta args & body]
+ (do-curried name doc meta args body))
+
+(defn- do-rfn [f1 k fkv]
+ `(fn
+ ([] (~f1))
+ ~(clojure.walk/postwalk
+ #(if (sequential? %)
+ ((if (vector? %) vec identity)
+ (core/remove #{k} %))
+ %)
+ fkv)
+ ~fkv))
+
+(defmacro ^:private rfn
+ "Builds 3-arity reducing fn given names of wrapped fn and key, and k/v impl."
+ [[f1 k] fkv]
+ (do-rfn f1 k fkv))
+
+(defcurried map
+ "Applies f to every value in the reduction of coll. Foldable."
+ {:added "1.5"}
+ [f coll]
+ (folder coll
+ (fn [f1]
+ (rfn [f1 k]
+ ([ret k v]
+ (f1 ret (f k v)))))))
+
+(defcurried filter
+ "Retains values in the reduction of coll for which (pred val)
+ returns logical true. Foldable."
+ {:added "1.5"}
+ [pred coll]
+ (folder coll
+ (fn [f1]
+ (rfn [f1 k]
+ ([ret k v]
+ (if (pred k v)
+ (f1 ret k v)
+ ret))))))
+
+(defcurried remove
+ "Removes values in the reduction of coll for which (pred val)
+ returns logical true. Foldable."
+ {:added "1.5"}
+ [pred coll]
+ (filter (complement pred) coll))
+
+(defcurried take-while
+ "Ends the reduction of coll when (pred val) returns logical false."
+ {:added "1.5"}
+ [pred coll]
+ (reducer coll
+ (fn [f1]
+ (rfn [f1 k]
+ ([ret k v]
+ (if (pred k v)
+ (f1 ret k v)
+ (reduced ret)))))))
+
+(defcurried take
+ "Ends the reduction of coll after consuming n values."
+ {:added "1.5"}
+ [n coll]
+ (reducer coll
+ (fn [f1]
+ (let [cnt (atom n)]
+ (rfn [f1 k]
+ ([ret k v]
+ (swap! cnt dec)
+ (if (neg? @cnt)
+ (reduced ret)
+ (f1 ret k v))))))))
+
+(defcurried drop
+ "Elides the first n values from the reduction of coll."
+ {:added "1.5"}
+ [n coll]
+ (reducer coll
+ (fn [f1]
+ (let [cnt (atom n)]
+ (rfn [f1 k]
+ ([ret k v]
+ (swap! cnt dec)
+ (if (neg? @cnt)
+ (f1 ret k v)
+ ret)))))))
+
+(defcurried flatten
+ "Takes any nested combination of sequential things (lists, vectors,
+ etc.) and returns their contents as a single, flat foldable
+ collection."
+ {:added "1.5"}
+ [coll]
+ (let [rf (fn [f1]
+ (fn
+ ([] (f1))
+ ([ret v]
+ (if (sequential? v)
+ (clojure.core.protocols/coll-reduce (flatten v) f1 ret)
+ (f1 ret v)))))]
+ (reify
+ clojure.core.protocols/CollReduce
+ (coll-reduce [this f1] (clojure.core.protocols/coll-reduce this f1 (f1)))
+ (coll-reduce [_ f1 init] (clojure.core.protocols/coll-reduce coll (rf f1) init))
+
+ CollFold
+ (coll-fold [_ n combinef reducef] (coll-fold coll n combinef (rf reducef))))))
+
+;;do not construct this directly, use cat
+(deftype Cat [cnt left right]
+ clojure.lang.Counted
+ (count [_] cnt)
+
+ clojure.lang.Seqable
+ (seq [_] (concat (seq left) (seq right)))
+
+ clojure.core.protocols/CollReduce
+ (coll-reduce [this f1] (clojure.core.protocols/coll-reduce this f1 (f1)))
+ (coll-reduce
+ [_ f1 init]
+ (clojure.core.protocols/coll-reduce
+ right f1
+ (clojure.core.protocols/coll-reduce left f1 init)))
+
+ CollFold
+ (coll-fold
+ [_ n combinef reducef]
+ (fjinvoke
+ (fn []
+ (let [rt (fjfork (fjtask #(coll-fold right n combinef reducef)))]
+ (combinef
+ (coll-fold left n combinef reducef)
+ (fjjoin rt)))))))
+
+(defn cat
+ "A high-performance combining fn that yields the catenation of the
+ reduced values. The result is reducible, foldable, seqable and
+ counted, providing the identity collections are reducible, seqable
+ and counted. The single argument version will build a combining fn
+ with the supplied identity constructor. Tests for identity
+ with (zero? (count x)). See also foldcat."
+ {:added "1.5"}
+ ([] (java.util.ArrayList.))
+ ([ctor]
+ (fn
+ ([] (ctor))
+ ([left right] (cat left right))))
+ ([left right]
+ (cond
+ (zero? (count left)) right
+ (zero? (count right)) left
+ :else
+ (Cat. (+ (count left) (count right)) left right))))
+
+(defn append!
+ ".adds x to acc and returns acc"
+ {:added "1.5"}
+ [^java.util.Collection acc x]
+ (doto acc (.add x)))
+
+(defn foldcat
+ "Equivalent to (fold cat append! coll)"
+ {:added "1.5"}
+ [coll]
+ (fold cat append! coll))
+
+(defn monoid
+ "Builds a combining fn out of the supplied operator and identity
+ constructor. op must be associative and ctor called with no args
+ must return an identity value for it."
+ {:added "1.5"}
+ [op ctor]
+ (fn m
+ ([] (ctor))
+ ([a b] (op a b))))
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; fold impls ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+(defn- foldvec
+ [v n combinef reducef]
+ (cond
+ (empty? v) (combinef)
+ (<= (count v) n) (reduce reducef (combinef) v)
+ :else
+ (let [split (quot (count v) 2)
+ v1 (subvec v 0 split)
+ v2 (subvec v split (count v))
+ fc (fn [child] #(foldvec child n combinef reducef))]
+ (fjinvoke
+ #(let [f1 (fc v1)
+ t2 (fjtask (fc v2))]
+ (fjfork t2)
+ (combinef (f1) (fjjoin t2)))))))
+
+(extend-protocol CollFold
+ Object
+ (coll-fold
+ [coll n combinef reducef]
+ ;;can't fold, single reduce
+ (reduce reducef (combinef) coll))
+
+ clojure.lang.IPersistentVector
+ (coll-fold
+ [v n combinef reducef]
+ (foldvec v n combinef reducef))
+
+ clojure.lang.PersistentHashMap
+ (coll-fold
+ [m n combinef reducef]
+ (.fold m n combinef reducef fjinvoke fjtask fjfork fjjoin)))
+
View
2  src/jvm/clojure/lang/APersistentVector.java
@@ -520,6 +520,8 @@ public SubVector(IPersistentMap meta, IPersistentVector v, int start, int end){
this.end = end;
}
+ public Iterator iterator(){return ((PersistentVector)v).rangedIterator(start,end);}
+
public Object nth(int i){
if(start + i >= end)
throw new IndexOutOfBoundsException();
View
75 src/jvm/clojure/lang/PersistentHashMap.java
@@ -11,9 +11,8 @@
package clojure.lang;
import java.io.Serializable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
/*
@@ -188,6 +187,22 @@ public Object kvreduce(IFn f, Object init){
return init;
}
+public Object fold(long n, final IFn combinef, final IFn reducef,
+ IFn fjinvoke, final IFn fjtask, final IFn fjfork, final IFn fjjoin){
+ //we are ignoring n for now
+ Callable top = new Callable(){
+ public Object call() throws Exception{
+ Object ret = combinef.invoke();
+ if(root != null)
+ ret = combinef.invoke(ret, root.fold(combinef,reducef,fjtask,fjfork,fjjoin));
+ return hasNull?
+ combinef.invoke(ret,reducef.invoke(combinef.invoke(),null,nullValue))
+ :ret;
+ }
+ };
+ return fjinvoke.invoke(top);
+}
+
public int count(){
return count;
}
@@ -324,6 +339,7 @@ void ensureEditable(){
public Object kvreduce(IFn f, Object init);
+ Object fold(IFn combinef, IFn reducef, IFn fjtask, IFn fjfork, IFn fjjoin);
}
final static class ArrayNode implements INode{
@@ -395,6 +411,52 @@ public Object kvreduce(IFn f, Object init){
return init;
}
+ public Object fold(final IFn combinef, final IFn reducef,
+ final IFn fjtask, final IFn fjfork, final IFn fjjoin){
+ List<Callable> tasks = new ArrayList();
+ for(final INode node : array){
+ if(node != null){
+ tasks.add(new Callable(){
+ public Object call() throws Exception{
+ return node.fold(combinef, reducef, fjtask, fjfork, fjjoin);
+ }
+ });
+ }
+ }
+
+ return foldTasks(tasks,combinef,fjtask,fjfork,fjjoin);
+ }
+
+ static public Object foldTasks(List<Callable> tasks, final IFn combinef,
+ final IFn fjtask, final IFn fjfork, final IFn fjjoin){
+
+ if(tasks.isEmpty())
+ return combinef.invoke();
+
+ if(tasks.size() == 1){
+ Object ret = null;
+ try
+ {
+ return tasks.get(0).call();
+ }
+ catch(Exception e)
+ {
+ //aargh
+ }
+ }
+
+ List<Callable> t1 = tasks.subList(0,tasks.size()/2);
+ final List<Callable> t2 = tasks.subList(tasks.size()/2, tasks.size());
+
+ Object forked = fjfork.invoke(fjtask.invoke(new Callable() {
+ public Object call() throws Exception{
+ return foldTasks(t2,combinef,fjtask,fjfork,fjjoin);
+ }
+ }));
+
+ return combinef.invoke(foldTasks(t1,combinef,fjtask,fjfork,fjjoin),fjjoin.invoke(forked));
+ }
+
private ArrayNode ensureEditable(AtomicReference<Thread> edit){
if(this.edit == edit)
@@ -629,6 +691,9 @@ public Object kvreduce(IFn f, Object init){
return NodeSeq.kvreduce(array,f,init);
}
+ public Object fold(IFn combinef, IFn reducef, IFn fjtask, IFn fjfork, IFn fjjoin){
+ return NodeSeq.kvreduce(array, reducef, combinef.invoke());
+ }
private BitmapIndexedNode ensureEditable(AtomicReference<Thread> edit){
if(this.edit == edit)
@@ -818,6 +883,10 @@ public Object kvreduce(IFn f, Object init){
return NodeSeq.kvreduce(array,f,init);
}
+ public Object fold(IFn combinef, IFn reducef, IFn fjtask, IFn fjfork, IFn fjjoin){
+ return NodeSeq.kvreduce(array, reducef, combinef.invoke());
+ }
+
public int findIndex(Object key){
for(int i = 0; i < 2*count; i+=2)
{
View
27 src/jvm/clojure/lang/PersistentVector.java
@@ -13,6 +13,7 @@
package clojure.lang;
import java.io.Serializable;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -232,6 +233,32 @@ public ISeq seq(){
return chunkedSeq();
}
+Iterator rangedIterator(final int start, final int end){
+ return new Iterator(){
+ int i = start;
+ int base = i - (i%32);
+ Object[] array = (start < count())?arrayFor(i):null;
+
+ public boolean hasNext(){
+ return i < end;
+ }
+
+ public Object next(){
+ if(i-base == 32){
+ array = arrayFor(i);
+ base += 32;
+ }
+ return array[i++ & 0x01f];
+ }
+
+ public void remove(){
+ throw new UnsupportedOperationException();
+ }
+ };
+}
+
+public Iterator iterator(){return rangedIterator(0,count());}
+
public Object kvreduce(IFn f, Object init){
int step = 0;
for(int i=0;i<cnt;i+=step){

No commit comments for this range

Something went wrong with that request. Please try again.