Skip to content

Commit

Permalink
Short-circuit reduction (proof of concept, see
Browse files Browse the repository at this point in the history
  • Loading branch information
amaembo committed Sep 10, 2015
1 parent 0edaae8 commit f242767
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/main/java/javax/util/streamex/AbstractStreamEx.java
Expand Up @@ -236,6 +236,20 @@ public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryO
return stream.reduce(identity, accumulator, combiner);
}

public <U> U reduceShortCircuit(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner, Predicate<U> cancelPredicate) {
return strategy()
.newStreamEx(
StreamSupport.stream(
new CancellableReduceSpliterator<>(stream
.spliterator(), identity, accumulator,
cancelPredicate), stream.isParallel()))
.reduce(combiner).orElse(identity);
}

public T reduceShortCircuit(T identity, BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
return reduceShortCircuit(identity, combiner, combiner, cancelPredicate);
}

@Override
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
return stream.collect(supplier, accumulator, combiner);
Expand Down
@@ -0,0 +1,93 @@
/*
* Copyright 2015 Tagir Valeev
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package javax.util.streamex;

import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* @author Tagir Valeev
*/
/* package */ final class CancellableReduceSpliterator<T, A> implements Spliterator<A>, Consumer<T>, Cloneable {
private Spliterator<T> source;
private final BiFunction<A, ? super T, A> accumulator;
private final Predicate<A> cancelPredicate;
private final AtomicBoolean cancelled = new AtomicBoolean();
private A acc;

CancellableReduceSpliterator(Spliterator<T> source,
A identity,
BiFunction<A, ? super T, A> accumulator,
Predicate<A> cancelPredicate) {
this.source = source;
this.acc = identity;
this.accumulator = accumulator;
this.cancelPredicate = cancelPredicate;
}

@Override
public boolean tryAdvance(Consumer<? super A> action) {
if(source == null)
return false;
while(!cancelled.get() && source.tryAdvance(this)) {
if(cancelPredicate.test(acc)) {
cancelled.set(true);
break;
}
}
source = null;
action.accept(acc);
return true;
}

@Override
public void forEachRemaining(Consumer<? super A> action) {
tryAdvance(action);
}

@Override
public Spliterator<A> trySplit() {
Spliterator<T> prefix = source.trySplit();
if(prefix == null)
return null;
try {
@SuppressWarnings("unchecked")
CancellableReduceSpliterator<T, A> result = (CancellableReduceSpliterator<T, A>) this.clone();
result.source = prefix;
return result;
} catch (CloneNotSupportedException e) {
throw new InternalError();
}
}

@Override
public long estimateSize() {
return source == null ? 0 : Long.MAX_VALUE;
}

@Override
public int characteristics() {
return source == null ? SIZED : source.characteristics() & ORDERED;
}

@Override
public void accept(T t) {
this.acc = accumulator.apply(this.acc, t);
}
}

0 comments on commit f242767

Please sign in to comment.