Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

We’re showing branches in this repository, but you can also compare across forks.

base fork: clojure/clojure
...
head fork: clojure/clojure
  • 6 commits
  • 16 files changed
  • 0 commit comments
  • 1 contributor
243 src/clj/clojure/core.clj
View
@@ -448,6 +448,31 @@
(recur (first zs) (rest zs)))))]
(cat (concat x y) zs))))
+;;;;;;;;;;;;;;;;;;;;;;;;;;;; streams ;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+(defn stream
+ "Creates a stream of the items in coll."
+ {:tag clojure.lang.AStream}
+ [coll] (clojure.lang.RT/stream coll))
+
+(defn stream-iter
+ "Returns an iter on (stream coll). Only one iter on a stream is
+ supported at a time."
+ {:tag clojure.lang.AStream$Iter}
+ [coll] (.iter (stream coll)))
+
+(defn next!
+ "Takes a stream iter and an eos value, returns (and consumes) the next element in the stream, or eos."
+ [#^clojure.lang.AStream$Iter iter eos] (.next iter eos))
+
+(defn push-back!
+ "Takes a stream iter and pushes x onto front of stream, returns iter."
+ [#^clojure.lang.AStream$Iter iter x] (.pushBack iter x))
+
+(defn detach!
+ "Takes a stream iter and disconnects it from the underlying stream,
+ returning the stream. All further operations on the iter will fail."
+ [#^clojure.lang.AStream$Iter iter] (.detach iter))
+
;;;;;;;;;;;;;;;;at this point all the support for syntax-quote exists;;;;;;;;;;;;;;;;;;;;;;
(defmacro if-not
"Evaluates test. If logical false, evaluates and returns then expr, otherwise else expr, if supplied, else nil."
@@ -1274,6 +1299,8 @@
(. ref (touch))
(. ref (get)))
+(def *io-context* nil)
+
(defmacro sync
"transaction-flags => TBD, pass nil for now
@@ -1281,23 +1308,13 @@
exprs and any nested calls. Starts a transaction if none is already
running on this thread. Any uncaught exception will abort the
transaction and flow out of sync. The exprs may be run more than
- once, but any effects on Refs will be atomic."
+ once, but any effects on Refs will be atomic. Transactions are not
+ allowed in io! blocks - will throw IllegalStateException."
[flags-ignored-for-now & body]
- `(. clojure.lang.LockingTransaction
- (runInTransaction (fn [] ~@body))))
-
-
-(defmacro io!
- "If an io! block occurs in a transaction, throws an
- IllegalStateException, else runs body in an implicit do. If the
- first expression in body is a literal string, will use that as the
- exception message."
- [& body]
- (let [message (when (string? (first body)) (first body))
- body (if message (rest body) body)]
- `(if (clojure.lang.LockingTransaction/isRunning)
- (throw (new IllegalStateException ~(or message "I/O in transaction")))
- (do ~@body))))
+ `(if *io-context*
+ (throw (IllegalStateException. "Transaction in io!"))
+ (. clojure.lang.LockingTransaction
+ (runInTransaction (fn [] ~@body)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; fn stuff ;;;;;;;;;;;;;;;;
@@ -1380,10 +1397,55 @@
(map f (rest c1) (rest c2) (rest c3)))))
([f c1 c2 c3 & colls]
(let [step (fn step [cs]
- (when (every? seq cs)
+ (when (every? seq cs)
(lazy-cons (map first cs) (step (map rest cs)))))]
(map #(apply f %) (step (conj colls c3 c2 c1))))))
+(defn map-stream
+ "Returns a stream consisting of the result of applying f to the
+ set of first items of each coll, followed by applying f to the set
+ of second items in each coll, until any one of the colls is
+ exhausted. Any remaining items in other colls are ignored. Function
+ f should accept number-of-colls arguments."
+ ([f coll]
+ (identity (let [iter (stream-iter coll)]
+ (stream
+ #(let [x (next! iter %)]
+ (if (= % x) x (f x)))))))
+ ([f c1 c2]
+ (identity (let [s1 (stream-iter c1), s2 (stream-iter c2)]
+ (stream
+ #(let [x1 (next! s1 %), x2 (next! s2 %)]
+ (if (or (= % x1) (= % x2))
+ %
+ (f x1 x2)))))))
+ ([f c1 c2 c3]
+ (identity (let [s1 (stream-iter c1), s2 (stream-iter c2), s3 (stream-iter c3)]
+ (stream
+ #(let [x1 (next! s1 %), x2 (next! s2 %), x3 (next! s3 %)]
+ (if (or (= % x1) (= % x2) (= % x3))
+ %
+ (f x1 x2 x3)))))))
+ ([f c1 c2 c3 & colls]
+ (identity (let [iters (map stream-iter (list* c1 c2 c3 colls))]
+ (stream
+ (fn [eos]
+ (let [xs (seq (map #(next! % eos) iters))]
+ (if (some #{eos} xs)
+ eos
+ (apply f xs)))))))))
+
+(defn map
+ "Returns a lazy seq consisting of the result of applying f to the
+ set of first items of each coll, followed by applying f to the set
+ of second items in each coll, until any one of the colls is
+ exhausted. Any remaining items in other colls are ignored. Function
+ f should accept number-of-colls arguments."
+ ([f coll] (seq (map-stream f coll)))
+ ([f c1 c2] (seq (map-stream f c1 c2)))
+ ([f c1 c2 c3] (seq (map-stream f c1 c2 c3)))
+ ([f c1 c2 c3 & colls] (seq (apply map-stream f c1 c2 c3 colls))))
+
(defn mapcat
"Returns the result of applying concat to the result of applying map
to f and colls. Thus function f should return a collection."
@@ -1391,13 +1453,25 @@
(apply concat (apply map f colls)))
(defn filter
- "Returns a lazy seq of the items in coll for which
+ "Returns a stream of the items in coll for which
(pred item) returns true. pred must be free of side-effects."
[pred coll]
- (when (seq coll)
- (if (pred (first coll))
- (lazy-cons (first coll) (filter pred (rest coll)))
- (recur pred (rest coll)))))
+ (seq
+ (let [iter (stream-iter coll)]
+ (stream
+ #(let [x (next! iter %)]
+ (if (or (= % x) (pred x))
+ x
+ (recur %)))))))
+
+;(defn filter
+; "Returns a lazy seq of the items in coll for which
+; (pred item) returns true. pred must be free of side-effects."
+; [pred coll]
+; (when (seq coll)
+; (if (pred (first coll))
+; (lazy-cons (first coll) (filter pred (rest coll)))
+; (recur pred (rest coll)))))
(defn remove
"Returns a lazy seq of the items in coll for which
@@ -1633,39 +1707,6 @@
(dorun n coll)
coll))
-(defn await
- "Blocks the current thread (indefinitely!) until all actions
- dispatched thus far, from this thread or agent, to the agent(s) have
- occurred."
- [& agents]
- (io! "await in transaction"
- (when *agent*
- (throw (new Exception "Can't await in agent action")))
- (let [latch (new java.util.concurrent.CountDownLatch (count agents))
- count-down (fn [agent] (. latch (countDown)) agent)]
- (doseq [agent agents]
- (send agent count-down))
- (. latch (await)))))
-
-(defn await1 [#^clojure.lang.Agent a]
- (when (pos? (.getQueueCount a))
- (await a))
- a)
-
-(defn await-for
- "Blocks the current thread until all actions dispatched thus
- far (from this thread or agent) to the agents have occurred, or the
- timeout (in milliseconds) has elapsed. Returns nil if returning due
- to timeout, non-nil otherwise."
- [timeout-ms & agents]
- (io! "await-for in transaction"
- (when *agent*
- (throw (new Exception "Can't await in agent action")))
- (let [latch (new java.util.concurrent.CountDownLatch (count agents))
- count-down (fn [agent] (. latch (countDown)) agent)]
- (doseq [agent agents]
- (send agent count-down))
- (. latch (await timeout-ms (. java.util.concurrent.TimeUnit MILLISECONDS))))))
(defmacro dotimes
"bindings => name n
@@ -1947,6 +1988,94 @@
:else (throw (IllegalArgumentException.
"with-open only allows Symbols in bindings"))))
+
+(defmacro io!
+ "If an io! block occurs in a transaction, throws an
+ IllegalStateException, else runs body in an implicit do. If the
+ first expression in body is a literal string, will use that as the
+ exception message."
+ [& body]
+ (let [message (when (string? (first body)) (first body))
+ body (if message (rest body) body)]
+ `(if (clojure.lang.LockingTransaction/isRunning)
+ (throw (new IllegalStateException ~(or message "I/O in transaction")))
+ (binding [*io-context* true]
+ ~@body))))
+
+(def *scope* nil)
+
+(defn run-scope-actions []
+ (let [failed (= (first *scope*) :failed)
+ entries (if failed (rest *scope*) *scope*)]
+ (doseq [e entries]
+ (let [cause (first e)
+ action (second e)]
+ (when (or (= cause :exits)
+ (and (= cause :fails) failed)
+ (and (= cause :succeeds) (not failed)))
+ (action))))))
+
+(defmacro scope
+ "Creates a scope for use with when-scope."
+ [& body]
+ `(binding [*scope* (list)]
+ (try
+ ~@body
+ (catch Throwable t#
+ (set! *scope* (conj *scope* :failed))
+ (throw t#))
+ (finally
+ (run-scope-actions)))))
+
+(defmacro when-scope
+ "Causes a body of expressions to be executed at the termination of
+ the nearest dynamically enclosing scope (created with scope). If no
+ scope is in effect, throws IllegalStateException. Cause must be one of:
+
+ :exits - will run unconditionally on scope exit
+ :fails - will run only if scope exits due to an exception
+ :succeeds - will run only if scope exits normally"
+
+ [cause & body]
+ `(do
+ (when-not *scope*
+ (throw (IllegalStateException. "No scope in effect")))
+ (set! *scope* (conj *scope* [~cause (fn [] ~@body)]))))
+
+(defn await
+ "Blocks the current thread (indefinitely!) until all actions
+ dispatched thus far, from this thread or agent, to the agent(s) have
+ occurred."
+ [& agents]
+ (io! "await in transaction"
+ (when *agent*
+ (throw (new Exception "Can't await in agent action")))
+ (let [latch (new java.util.concurrent.CountDownLatch (count agents))
+ count-down (fn [agent] (. latch (countDown)) agent)]
+ (doseq [agent agents]
+ (send agent count-down))
+ (. latch (await)))))
+
+(defn await1 [#^clojure.lang.Agent a]
+ (when (pos? (.getQueueCount a))
+ (await a))
+ a)
+
+(defn await-for
+ "Blocks the current thread until all actions dispatched thus
+ far (from this thread or agent) to the agents have occurred, or the
+ timeout (in milliseconds) has elapsed. Returns nil if returning due
+ to timeout, non-nil otherwise."
+ [timeout-ms & agents]
+ (io! "await-for in transaction"
+ (when *agent*
+ (throw (new Exception "Can't await in agent action")))
+ (let [latch (new java.util.concurrent.CountDownLatch (count agents))
+ count-down (fn [agent] (. latch (countDown)) agent)]
+ (doseq [agent agents]
+ (send agent count-down))
+ (. latch (await timeout-ms (. java.util.concurrent.TimeUnit MILLISECONDS))))))
+
(defmacro doto
"Evaluates x then calls all of the methods and functions with the
value of x supplied at the from of the given arguments. The forms
@@ -2883,7 +3012,8 @@
exprs and any nested calls. Starts a transaction if none is already
running on this thread. Any uncaught exception will abort the
transaction and flow out of dosync. The exprs may be run more than
- once, but any effects on Refs will be atomic."
+ once, but any effects on Refs will be atomic. Transactions are not
+ allowed in io! blocks - will throw IllegalStateException."
[& exprs]
`(sync nil ~@exprs))
@@ -3772,3 +3902,4 @@
(load "genclass")
+
28 src/jvm/clojure/lang/APersistentVector.java
View
@@ -13,7 +13,6 @@
package clojure.lang;
import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
public abstract class APersistentVector extends AFn implements IPersistentVector, Iterable,
List,
@@ -358,18 +357,25 @@ else if(count() > v.count())
return 0;
}
-public IStream stream() throws Exception {
- final AtomicInteger ai = new AtomicInteger(0);
- return new IStream(){
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if(i < count())
- return nth(i);
- return RT.eos();
- }
- };
+public AStream stream() throws Exception {
+ return new AStream(new Src(this));
}
+ static class Src extends AFn{
+ final IPersistentVector v;
+ int i = 0;
+
+ Src(IPersistentVector v) {
+ this.v = v;
+ }
+
+ public Object invoke(Object eos) throws Exception {
+ if (i < v.count())
+ return v.nth(i++);
+ return eos;
+ }
+ }
+
static class Seq extends ASeq implements IndexedSeq, IReduce{
//todo - something more efficient
final IPersistentVector v;
13 src/jvm/clojure/lang/ASeq.java
View
@@ -12,7 +12,6 @@
import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicReference;
public abstract class ASeq extends Obj implements ISeq, Collection, Streamable{
transient int _hash = -1;
@@ -177,25 +176,25 @@ public Iterator iterator(){
return new SeqIterator(this);
}
-public IStream stream() throws Exception {
- return new Stream(this);
+public AStream stream() throws Exception {
+ return new AStream(new Src(this));
}
- static class Stream implements IStream{
+static class Src extends AFn{
ISeq s;
- public Stream(ISeq s) {
+ public Src(ISeq s) {
this.s = s;
}
- synchronized public Object next() throws Exception {
+ public Object invoke(Object eos) throws Exception {
if(s != null)
{
Object ret = s.first();
s = s.rest();
return ret;
}
- return RT.eos();
+ return eos;
}
}
}
160 src/jvm/clojure/lang/AStream.java
View
@@ -0,0 +1,160 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+ * which can be found in the file epl-v10.html at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Dec 14, 2008 */
+
+package clojure.lang;
+
+final public class AStream implements Seqable, Streamable, Sequential {
+
+ static final ISeq NO_SEQ = new Cons(null, null);
+
+ ISeq seq = NO_SEQ;
+ final IFn src;
+ Cons pushed = null;
+ Iter iter = null;
+
+ public AStream(IFn src) {
+ this.src = src;
+ }
+
+ final synchronized public ISeq seq() {
+ if (seq == NO_SEQ)
+ {
+ iter();
+ seq = Seq.create(pushed,src);
+ }
+ return seq;
+ }
+
+ final synchronized public AStream stream() throws Exception {
+// if (seq == NO_SEQ)
+ return this;
+// return RT.stream(seq);
+ }
+
+ final synchronized public Iter iter() {
+ if (iter != null)
+ throw new IllegalStateException("Already iterating");
+
+ return iter = new Iter(this);
+ }
+
+ static public class Iter extends AFn{
+ final AStream s;
+
+ Iter(AStream s) {
+ this.s = s;
+ }
+
+ final public Iter pushBack(Object x) throws Exception {
+ synchronized (s)
+ {
+ if (s.iter != this)
+ throw new IllegalAccessError("Invalid iterator");
+ s.pushed = new Cons(x,s.pushed);
+ return this;
+ }
+ }
+
+ final public AStream detach() {
+ synchronized (s)
+ {
+ if (s.iter != this)
+ throw new IllegalAccessError("Invalid iterator");
+ s.iter = null;
+ return s;
+ }
+ }
+
+ final public Object next(Object eos) {
+ synchronized (s)
+ {
+ if (s.iter != this)
+ throw new IllegalAccessError("Invalid iterator");
+ if (s.pushed != null)
+ {
+ Object ret = s.pushed.first();
+ s.pushed = (Cons) s.pushed.rest();
+ return ret;
+ }
+ try
+ {
+ return s.src.invoke(eos);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public Object invoke(Object arg1) throws Exception {
+ return next(arg1);
+ }
+ }
+
+ static class Seq extends ASeq {
+ static final Object EOS = new Object();
+
+ final Object _first;
+ ISeq _rest = NO_SEQ;
+ final ISeq pushed;
+ final IFn src;
+
+ Seq(Object first, ISeq pushed, IFn src) {
+ _first = first;
+ this.pushed = pushed;
+ this.src = src;
+ }
+
+ Seq(IPersistentMap meta, Object _first, ISeq _rest) {
+ super(meta);
+ this._first = _first;
+ this._rest = _rest;
+ this.pushed = null;
+ this.src = null;
+ }
+
+ final public Object first() {
+ return _first;
+ }
+
+ final synchronized public ISeq rest() {
+ if (_rest == NO_SEQ)
+ {
+ _rest = create(pushed, src);
+ }
+ return _rest;
+ }
+
+ static Seq create(ISeq pushed, IFn src) {
+ if(pushed != null)
+ return new Seq(pushed.first(),pushed.rest(),src);
+ try
+ {
+ Object x = src.invoke(EOS);
+ if (x == EOS)
+ return null;
+ return new Seq(x, null, src);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Obj withMeta(IPersistentMap meta) {
+ rest();
+ return new Seq(meta, _first, _rest);
+ }
+
+ }
+}
342 src/jvm/clojure/lang/ArrayStream.java
View
@@ -12,183 +12,171 @@
package clojure.lang;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.lang.reflect.Array;
-
-public class ArrayStream implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final Object[] array;
-
- public ArrayStream(Object[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
-
- static IStream createFromObject(Object array) {
- Class aclass = array.getClass().getComponentType();
- if (!aclass.isPrimitive())
- return new ArrayStream((Object[]) array);
- if (aclass == int.class)
- return new ArrayStream_int((int[]) array);
- if (aclass == long.class)
- return new ArrayStream_long((long[]) array);
- if (aclass == float.class)
- return new ArrayStream_float((float[]) array);
- if (aclass == double.class)
- return new ArrayStream_double((double[]) array);
- if (aclass == char.class)
- return new ArrayStream_char((char[]) array);
- if (aclass == byte.class)
- return new ArrayStream_byte((byte[]) array);
- if (aclass == short.class)
- return new ArrayStream_short((short[]) array);
- if (aclass == boolean.class)
- return new ArrayStream_boolean((boolean[]) array);
- throw new IllegalArgumentException(String.format("Unsupported array type %s", array));
- }
-
- static public class ArrayStream_int implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final int[] array;
-
- public ArrayStream_int(int[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_long implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final long[] array;
-
- public ArrayStream_long(long[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_float implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final float[] array;
-
- public ArrayStream_float(float[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_double implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final double[] array;
-
- public ArrayStream_double(double[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_char implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final char[] array;
-
- public ArrayStream_char(char[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_byte implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final byte[] array;
-
- public ArrayStream_byte(byte[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_short implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final short[] array;
-
- public ArrayStream_short(short[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_boolean implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final boolean[] array;
-
- public ArrayStream_boolean(boolean[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
+public class ArrayStream extends AFn{
+
+int i = 0;
+final Object[] array;
+
+public ArrayStream(Object[] array){
+ this.array = array;
+}
+
+public Object invoke(Object eos) throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return eos;
+}
+
+static AStream createFromObject(Object array){
+ Class aclass = array.getClass().getComponentType();
+ if(!aclass.isPrimitive())
+ return new AStream(new ArrayStream((Object[]) array));
+ if(aclass == int.class)
+ return new AStream(new ArrayStream_int((int[]) array));
+ if(aclass == long.class)
+ return new AStream(new ArrayStream_long((long[]) array));
+ if(aclass == float.class)
+ return new AStream(new ArrayStream_float((float[]) array));
+ if(aclass == double.class)
+ return new AStream(new ArrayStream_double((double[]) array));
+ if(aclass == char.class)
+ return new AStream(new ArrayStream_char((char[]) array));
+ if(aclass == byte.class)
+ return new AStream(new ArrayStream_byte((byte[]) array));
+ if(aclass == short.class)
+ return new AStream(new ArrayStream_short((short[]) array));
+ if(aclass == boolean.class)
+ return new AStream(new ArrayStream_boolean((boolean[]) array));
+ throw new IllegalArgumentException(String.format("Unsupported array type %s", array));
+}
+
+static public class ArrayStream_int extends AFn{
+
+ int i = 0;
+ final int[] array;
+
+ public ArrayStream_int(int[] array){
+ this.array = array;
+ }
+
+ public Object invoke(Object eos) throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return eos;
+ }
+}
+
+static public class ArrayStream_long extends AFn{
+
+ int i = 0;
+ final long[] array;
+
+ public ArrayStream_long(long[] array){
+ this.array = array;
+ }
+
+ public Object invoke(Object eos) throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return eos;
+ }
+}
+
+static public class ArrayStream_float extends AFn{
+
+ int i = 0;
+ final float[] array;
+
+ public ArrayStream_float(float[] array){
+ this.array = array;
+ }
+
+ public Object invoke(Object eos) throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return eos;
+ }
+}
+
+static public class ArrayStream_double extends AFn{
+
+ int i = 0;
+ final double[] array;
+
+ public ArrayStream_double(double[] array){
+ this.array = array;
+ }
+
+ public Object invoke(Object eos) throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return eos;
+ }
+}
+
+static public class ArrayStream_char extends AFn{
+
+ int i = 0;
+ final char[] array;
+
+ public ArrayStream_char(char[] array){
+ this.array = array;
+ }
+
+ public Object invoke(Object eos) throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return eos;
+ }
+}
+
+static public class ArrayStream_byte extends AFn{
+
+ int i = 0;
+ final byte[] array;
+
+ public ArrayStream_byte(byte[] array){
+ this.array = array;
+ }
+
+ public Object invoke(Object eos) throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return eos;
+ }
+}
+
+static public class ArrayStream_short extends AFn{
+
+ int i = 0;
+ final short[] array;
+
+ public ArrayStream_short(short[] array){
+ this.array = array;
+ }
+
+ public Object invoke(Object eos) throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return eos;
+ }
+}
+
+static public class ArrayStream_boolean extends AFn{
+
+ int i = 0;
+ final boolean[] array;
+
+ public ArrayStream_boolean(boolean[] array){
+ this.array = array;
+ }
+
+ public Object invoke(Object eos) throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return eos;
+ }
+}
}
33 src/jvm/clojure/lang/Closer.java
View
@@ -0,0 +1,33 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+ * which can be found in the file epl-v10.html at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Jan 10, 2009 */
+
+package clojure.lang;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class Closer implements Closeable{
+ ISeq closes;
+
+
+ public void close() throws IOException {
+ for(ISeq s = closes;s!=null;s = s.rest())
+ {
+ ((Closeable)s.first()).close();
+ }
+ }
+
+ public Closer register(Closeable c) {
+ closes = new Cons(c, closes);
+ return this;
+ }
+}
4 src/jvm/clojure/lang/Compiler.java
View
@@ -1,7 +1,7 @@
/**
* Copyright (c) Rich Hickey. All rights reserved.
* The use and distribution terms for this software are covered by the
- * Eclipse Public License 1.0 (http://opensource.org/licenses/cpl1.0.php)
+ * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
* which can be found in the file epl-v10.html at the root of this distribution.
* By using this software in any fashion, you are agreeing to be bound by
* the terms of this license.
@@ -3721,7 +3721,7 @@ public Expr parse(C context, Object frm) throws Exception{
ISeq body = RT.rest(RT.rest(form));
- if(context == C.EVAL)
+ if(context == C.EVAL || (context == C.EXPRESSION && isLoop))
return analyze(context, RT.list(RT.list(FN, PersistentVector.EMPTY, form)));
IPersistentMap dynamicBindings = RT.map(LOCAL_ENV, LOCAL_ENV.get(),
4 src/jvm/clojure/lang/Cons.java
View
@@ -29,11 +29,11 @@ public Cons(IPersistentMap meta, Object _first, ISeq _rest){
this._rest = _rest;
}
-public Object first(){
+final public Object first(){
return _first;
}
-public ISeq rest(){
+final public ISeq rest(){
return _rest;
}
4 src/jvm/clojure/lang/IPersistentCollection.java
View
@@ -11,12 +11,10 @@
*/
-public interface IPersistentCollection {
+public interface IPersistentCollection extends Seqable{
int count();
-ISeq seq();
-
IPersistentCollection cons(Object o);
IPersistentCollection empty();
24 src/jvm/clojure/lang/IteratorStream.java
View
@@ -14,16 +14,20 @@
import java.util.Iterator;
-public class IteratorStream implements IStream{
- final Iterator iter;
+public class IteratorStream extends AFn{
+final Iterator iter;
- public IteratorStream(Iterator iter) {
- this.iter = iter;
- }
+static public AStream create(Iterator iter){
+ return new AStream(new IteratorStream(iter));
+}
+
+IteratorStream(Iterator iter){
+ this.iter = iter;
+}
- synchronized public Object next() throws Exception {
- if(iter.hasNext())
- return iter.next();
- return RT.eos();
- }
+public Object invoke(Object eos) throws Exception{
+ if(iter.hasNext())
+ return iter.next();
+ return eos;
+}
}
2  src/jvm/clojure/lang/LispReader.java
View
@@ -712,7 +712,7 @@ else if(form instanceof IPersistentSet)
{
ret = RT.list(APPLY, HASHSET, RT.cons(CONCAT, sqExpandList(((IPersistentSet) form).seq())));
}
- else if(form instanceof ISeq)
+ else if(form instanceof ISeq || form instanceof IPersistentList)
{
ISeq seq = RT.seq(form);
ret = RT.cons(CONCAT, sqExpandList(seq));
59 src/jvm/clojure/lang/RT.java
View
@@ -243,6 +243,12 @@ static public void addURL(Object url) throws Exception{
getRootClassLoader().addURL(u);
}
+static final public IFn EMPTY_GEN = new AFn(){
+ synchronized public Object invoke(Object eos) throws Exception {
+ return eos;
+ }
+};
+
static
{
Keyword dockw = Keyword.intern(null, "doc");
@@ -447,35 +453,27 @@ static public ISeq seq(Object coll){
return null;
else if(coll instanceof ISeq)
return (ISeq) coll;
- else if(coll instanceof IPersistentCollection)
- return ((IPersistentCollection) coll).seq();
+ else if(coll instanceof Seqable)
+ return ((Seqable) coll).seq();
else
return seqFrom(coll);
}
-static public IStream stream(final Object coll) throws Exception{
+static public AStream stream(final Object coll) throws Exception{
if(coll == null)
- return EMPTY_STREAM;
- else if(coll instanceof IStream)
- return (IStream) coll;
- else if(coll instanceof Streamable)
- return ((Streamable)coll).stream();
+ return new AStream(EMPTY_GEN);
+ else if(coll instanceof Streamable)
+ return ((Streamable) coll).stream();
else if(coll instanceof Fn)
- {
- return new IStream(){
- public Object next() throws Exception {
- return ((IFn)coll).invoke();
- }
- };
- }
+ return new AStream((IFn)coll);
else if(coll instanceof Iterable)
- return new IteratorStream(((Iterable) coll).iterator());
+ return IteratorStream.create(((Iterable) coll).iterator());
else if (coll.getClass().isArray())
return ArrayStream.createFromObject(coll);
else if (coll instanceof String)
return ArrayStream.createFromObject(((String)coll).toCharArray());
-
- throw new IllegalArgumentException("Don't know how to create IStream from: " + coll.getClass().getSimpleName());
+ else
+ return new AStream(new ASeq.Src(RT.seq(coll)));
}
static ISeq seqFrom(Object coll){
@@ -705,7 +703,7 @@ else if(coll instanceof String)
return Character.valueOf(((String) coll).charAt(n));
else if(coll.getClass().isArray())
return Reflector.prepRet(Array.get(coll, n));
- else if(coll instanceof List)
+ else if(coll instanceof RandomAccess)
return ((List) coll).get(n);
else if(coll instanceof Matcher)
return ((Matcher) coll).group(n);
@@ -722,7 +720,7 @@ else if(n == 1)
else if(coll instanceof Sequential)
{
- ISeq seq = ((IPersistentCollection) coll).seq();
+ ISeq seq = RT.seq(coll);
coll = null;
for(int i = 0; i <= n && seq != null; ++i, seq = seq.rest())
{
@@ -760,7 +758,7 @@ else if(coll.getClass().isArray())
return Reflector.prepRet(Array.get(coll, n));
return notFound;
}
- else if(coll instanceof List)
+ else if(coll instanceof RandomAccess)
{
List list = (List) coll;
if(n < list.size())
@@ -785,7 +783,7 @@ else if(n == 1)
}
else if(coll instanceof Sequential)
{
- ISeq seq = ((IPersistentCollection) coll).seq();
+ ISeq seq = RT.seq(coll);
coll = null;
for(int i = 0; i <= n && seq != null; ++i, seq = seq.rest())
{
@@ -1214,7 +1212,7 @@ static public void print(Object x, Writer w) throws Exception{
}
if(x == null)
w.write("nil");
- else if(x instanceof ISeq || x instanceof IPersistentList)
+ else if(x instanceof ISeq || x instanceof IPersistentList || x instanceof AStream)
{
w.write('(');
printInnerSeq(seq(x), w);
@@ -1685,22 +1683,7 @@ static public int alength(Object xs){
return Array.getLength(xs);
}
-final static private Object EOS = new Object();
-
-final static public Object eos() {
- return EOS;
- }
-
-static public boolean isEOS(Object o){
- return o == EOS;
- }
-
-static final public IStream EMPTY_STREAM = new IStream(){
- public Object next() throws Exception {
- return eos();
- }
-};
synchronized public static DynamicClassLoader getRootClassLoader() {
if(ROOT_CLASSLOADER == null)
30 src/jvm/clojure/lang/Range.java
View
@@ -12,7 +12,7 @@
package clojure.lang;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Callable;
public class Range extends ASeq implements IReduce, Streamable{
final int end;
@@ -63,15 +63,23 @@ public int count() {
return end - n;
}
- public IStream stream() throws Exception {
- final AtomicInteger an = new AtomicInteger(n);
- return new IStream(){
- public Object next() throws Exception {
- int i = an.getAndIncrement();
- if (i < end)
- return i;
- return RT.eos();
- }
- };
+public AStream stream() throws Exception {
+ return new AStream(new Src(n,end));
}
+
+ static class Src extends AFn{
+ int n;
+ final int end;
+
+ public Src(int n, int end) {
+ this.n = n;
+ this.end = end;
+ }
+
+ public Object invoke(Object eos) throws Exception {
+ if(n < end)
+ return n++;
+ return eos;
+ }
+ }
}
6 src/jvm/clojure/lang/IStream.java → src/jvm/clojure/lang/Seqable.java
View
@@ -8,10 +8,10 @@
* You must not remove this notice, or any other, from this software.
**/
-/* rich Dec 7, 2008 */
+/* rich Dec 14, 2008 */
package clojure.lang;
-public interface IStream {
- Object next() throws Exception;
+public interface Seqable {
+ ISeq seq();
}
64 src/jvm/clojure/lang/StreamSeq.java
View
@@ -1,64 +0,0 @@
-/**
- * Copyright (c) Rich Hickey. All rights reserved.
- * The use and distribution terms for this software are covered by the
- * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
- * which can be found in the file epl-v10.html at the root of this distribution.
- * By using this software in any fashion, you are agreeing to be bound by
- * the terms of this license.
- * You must not remove this notice, or any other, from this software.
- **/
-
-/* rich Dec 8, 2008 */
-
-package clojure.lang;
-
-public class StreamSeq extends ASeq {
- IStream stream;
- final Object _first;
- ISeq _rest;
-
- static public StreamSeq create(IStream stream) throws Exception {
- Object x = stream.next();
- if (RT.isEOS(x))
- return null;
- return new StreamSeq(x, stream);
- }
-
- StreamSeq(IPersistentMap meta, Object _first, ISeq _rest) {
- super(meta);
- this._first = _first;
- this._rest = _rest;
- this.stream = null;
- }
-
- StreamSeq(Object first, IStream stream) {
- this._first = first;
- this.stream = stream;
- }
-
-
- public Object first() {
- return _first;
- }
-
- synchronized public ISeq rest() {
- if (stream != null) {
- try {
- _rest = create(stream);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- stream = null;
- }
- return _rest;
- }
-
- public Obj withMeta(IPersistentMap meta) {
- if(meta != this.meta())
- {
- rest();
- return new StreamSeq(meta, _first, _rest);
- }
- return this;
- }
-}
2  src/jvm/clojure/lang/Streamable.java
View
@@ -13,5 +13,5 @@
package clojure.lang;
public interface Streamable {
- IStream stream() throws Exception;
+ AStream stream() throws Exception;
}

No commit comments for this range

Something went wrong with that request. Please try again.