Skip to content

Commit

Permalink
[#54] HeadTailSpliterator: tail call optimization where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
amaembo committed Jan 14, 2016
1 parent 498d30e commit 6540b17
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 9 deletions.
22 changes: 18 additions & 4 deletions src/main/java/one/util/streamex/HeadTailSpliterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* @author Tagir Valeev
*/
/*package*/ final class HeadTailSpliterator<T, U> extends AbstractSpliterator<U> {
/*package*/ final class HeadTailSpliterator<T, U> extends AbstractSpliterator<U> implements TailCallSpliterator<U> {
private final Spliterator<T> source;
private final BiFunction<? super T, ? super StreamEx<T>, ? extends Stream<U>> mapper;
private Spliterator<U> target;
Expand All @@ -44,13 +44,22 @@
public boolean tryAdvance(Consumer<? super U> action) {
if(!init())
return false;
target = traverseTail(target);
return target.tryAdvance(action);
}

@Override
public void forEachRemaining(Consumer<? super U> action) {
if(init())
target.forEachRemaining(action);
if(init()) {
Spliterator<U> t = target;
while(t instanceof TailCallSpliterator) {
t = traverseTail(t);
if(!t.tryAdvance(action))
return;
}
t.forEachRemaining(action);
target = t;
}
}

private boolean init() {
Expand All @@ -59,7 +68,7 @@ private boolean init() {
if(!source.tryAdvance(x -> first.a = x)) {
return false;
}
Stream<U> stream = mapper.apply(first.a, StreamEx.of(source));
Stream<U> stream = mapper.apply(first.a, StreamEx.of(traverseTail(source)));
this.stream = stream;
target = stream == null ? Spliterators.emptySpliterator() : stream.spliterator();
}
Expand All @@ -79,4 +88,9 @@ public long estimateSize() {
}
return target.estimateSize();
}

@Override
public Spliterator<U> tail() {
return target == null ? this : target;
}
}
8 changes: 7 additions & 1 deletion src/main/java/one/util/streamex/PairSpliterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* @author Tagir Valeev
*/
/* package */abstract class PairSpliterator<T, S extends Spliterator<T>, R, SS extends PairSpliterator<T, S, R, SS>>
implements Spliterator<R>, Cloneable {
implements TailCallSpliterator<R>, Cloneable {
static final int MODE_PAIRS = 0;
static final int MODE_MAP_FIRST = 1;
static final int MODE_MAP_LAST = 2;
Expand Down Expand Up @@ -131,6 +131,12 @@ void clear() {
this.right = sink;
}
}

@SuppressWarnings("unchecked")
@Override
public Spliterator<R> tail() {
return mode == MODE_MAP_FIRST && right == EMPTY && left == null ? (Spliterator<R>)source : this;
}

@Override
public long estimateSize() {
Expand Down
69 changes: 68 additions & 1 deletion src/main/java/one/util/streamex/StreamEx.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -77,6 +78,72 @@
* @param <T> the type of the stream elements
*/
public class StreamEx<T> extends AbstractStreamEx<T, StreamEx<T>> {

private static class PrependSpliterator<T> implements TailCallSpliterator<T> {
private final Spliterator<T> source;
private T[] prepended;
private int position;
private int characteristics;

public PrependSpliterator(Spliterator<T> source, T[] prepended) {
this.source = source;
this.prepended = prepended.length == 0 ? null : prepended;
this.characteristics = source.characteristics() & (ORDERED | SIZED| SUBSIZED);
if(((this.characteristics & SIZED) != 0) && prepended.length+source.estimateSize() < 0)
this.characteristics &= (~SIZED) & (~SUBSIZED);
}

@Override
public boolean tryAdvance(Consumer<? super T> action) {
if(prepended != null) {
action.accept(prepended[position++]);
if(position == prepended.length)
prepended = null;
return true;
}
return source.tryAdvance(action);
}

@Override
public void forEachRemaining(Consumer<? super T> action) {
if(prepended != null) {
while(position < prepended.length) {
action.accept(prepended[position++]);
}
prepended = null;
}
source.forEachRemaining(action);
}

@Override
public Spliterator<T> trySplit() {
if(prepended == null)
return source.trySplit();
Spliterator<T> prefix = Spliterators.spliterator(prepended, position, prepended.length, characteristics);
prepended = null;
return prefix;
}

@Override
public long estimateSize() {
long size = source.estimateSize();
if(prepended == null)
return size;
size = size+prepended.length-position;
return size < 0 ? Long.MAX_VALUE : size;
}

@Override
public int characteristics() {
return characteristics;
}

@Override
public Spliterator<T> tail() {
return prepended == null ? source : this;
}
}

StreamEx(Stream<T> stream) {
super(stream);
}
Expand Down Expand Up @@ -956,7 +1023,7 @@ public StreamEx<T> append(Collection<? extends T> collection) {
public final StreamEx<T> prepend(T... values) {
if (values.length == 0)
return this;
return prepend(Stream.of(values));
return supply(delegate(new PrependSpliterator<>(stream.spliterator(), values)));
}

/**
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/one/util/streamex/StreamExInternals.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.Spliterator;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.DoublePredicate;
Expand Down Expand Up @@ -893,6 +894,10 @@ public Object[] toArray() {
return arr;
}
}

static interface TailCallSpliterator<T> extends Spliterator<T> {
Spliterator<T> tail();
}

static <T> T copy(T src, T dest, int size) {
System.arraycopy(src, 0, dest, 0, size);
Expand Down Expand Up @@ -951,6 +956,16 @@ static <A> Predicate<A> finished(Collector<?, A, ?> collector) {
return ((CancellableCollector<?, A, ?>) collector).finished();
return null;
}

static <T> Spliterator<T> traverseTail(Spliterator<T> spltr) {
Spliterator<T> current = spltr;
while(current instanceof TailCallSpliterator) {
Spliterator<T> next = ((TailCallSpliterator<T>)current).tail();
if(next == current) break;
current = next;
}
return current;
}

@SuppressWarnings("unchecked")
static <T> T none() {
Expand Down
16 changes: 13 additions & 3 deletions src/test/java/one/util/streamex/StreamExTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,16 @@ public void testHeadTail() {
assertEquals(asList(1, 2, 3, 4, 5, 5, 4, 3, 2, 1), mirror(StreamEx.of(1, 2, 3, 4, 5)).toList());
}

@Test
public void testHeadTailTCO() {
assertEquals(200010000, (int) scanLeft(StreamEx.iterate(1, x -> x + 1), Integer::sum).limit(20000).reduce(
(a, b) -> b).get());
assertEquals(19999, takeWhile(StreamEx.iterate(1, x -> x + 1), x -> x < 20000).count());
assertTrue(takeWhile(StreamEx.iterate(1, x -> x + 1), x -> x < 20000).has(19999));
assertEquals(20000, takeWhileClosed(StreamEx.iterate(1, x -> x + 1), x -> x < 20000).count());
assertTrue(takeWhileClosed(StreamEx.iterate(1, x -> x + 1), x -> x < 20000).has(20000));
}

// Returns either the first stream element matching the predicate or just the first element if nothing matches
private static <T> T firstMatchingOrFirst(StreamEx<T> stream, Predicate<T> predicate) {
return stream.headTail((head, tail) -> tail.prepend(head).filter(predicate).append(head)).findFirst().get();
Expand All @@ -1622,18 +1632,18 @@ private static <T> StreamEx<T> cycle(StreamEx<T> input) {
return input.headTail((head, tail) -> cycle(tail.append(head)).prepend(head));
}

// Creates lazy scanLeft stream
// Creates lazy scanLeft stream (TCO)
private static <T> StreamEx<T> scanLeft(StreamEx<T> input, BinaryOperator<T> operator) {
return input.headTail((head, tail) -> scanLeft(tail.mapFirst(cur -> operator.apply(head, cur)), operator)
.prepend(head));
}

// takeWhile intermediate op implementation
// takeWhile intermediate op implementation (TCO)
private static <T> StreamEx<T> takeWhile(StreamEx<T> input, Predicate<T> predicate) {
return input.headTail((head, tail) -> predicate.test(head) ? takeWhile(tail, predicate).prepend(head) : null );
}

// takeWhileClosed intermediate op implementation
// takeWhileClosed intermediate op implementation (TCO)
private static <T> StreamEx<T> takeWhileClosed(StreamEx<T> input, Predicate<T> predicate) {
return input.headTail((head, tail) -> predicate.test(head) ? takeWhileClosed(tail, predicate).prepend(head)
: Stream.of(head));
Expand Down

0 comments on commit 6540b17

Please sign in to comment.