Skip to content

Commit

Permalink
[#202] Make StreamEx.ofTree and friends stackoverflow-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
amaembo committed Oct 17, 2019
1 parent 7bceec2 commit b6871f1
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 55 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Expand Up @@ -2,6 +2,9 @@

Check also [MIGRATION.md](MIGRATION.md) for possible compatibility problems.

### 0.7.1
* [#202] Added: `StreamEx/EntryStream.ofTreeFast`; `ofTree` is now stack overflow safe.

### 0.7.0
* [#193] Removed optimizations which rely on internal implementation details of Stream API (unwrap IteratorSpliterator;
do not delegate to Stream::close when unnecessary)
Expand Down
4 changes: 2 additions & 2 deletions CHEATSHEET.md
Expand Up @@ -57,8 +57,8 @@ Stream of `Map` entries or explicit key-value pairs | `EntryStream.of()`
Zip two arrays or lists | `any.zip()`
Split `CharSequence` with regexp | `StreamEx.split()`
Stream of `List` subLists of fixed length | `StreamEx.ofSubLists()`
Stream of all elements of tree-like structure | `StreamEx.ofTree()`
Stream of all elements of tree-like structure tracking the elements depth | `EntryStream.ofTree()`
Stream of all elements of tree-like structure | `StreamEx.ofTree()/ofTreeFast()`
Stream of all elements of tree-like structure tracking the elements depth | `EntryStream.ofTree()/ofTreeFast()`
Stream of all possible pairs of array or `List` elements | `StreamEx/EntryStream.ofPairs()`
Stream of all possible tuples of given length of `Collection` elements | `StreamEx.cartesianPower()`
Stream of all possible tuples of given `Collection` of collections | `StreamEx.cartesianProduct()`
Expand Down
57 changes: 56 additions & 1 deletion src/main/java/one/util/streamex/EntryStream.java
Expand Up @@ -2146,9 +2146,35 @@ public static <T> EntryStream<T, T> ofPairs(T[] array) {
* @since 0.5.2
* @see StreamEx#ofTree(Object, Function)
* @see #ofTree(Object, Class, BiFunction)
* @see #ofTreeFast(Object, BiFunction)
*/
public static <T> EntryStream<Integer, T> ofTree(T root, BiFunction<Integer, T, Stream<T>> mapper) {
TreeSpliterator<T, Entry<Integer, T>> spliterator = new TreeSpliterator.Depth<>(root, mapper);
TreeSpliterator<T, Entry<Integer, T>> spliterator = new TreeSpliterator.Depth<>(root, mapper, true);
return new EntryStream<>(spliterator, StreamContext.SEQUENTIAL.onClose(spliterator::close));
}

/**
* Return a new {@link EntryStream} containing all the nodes of tree-like
* data structure in entry values along with the corresponding tree depths
* in entry keys, in depth-first order.
*
* <p>
* This method behaves in the same way as {@link #ofTree(Object, BiFunction)},
* except it may work faster at the additional cost of stack consumption
* (max two frames per tree level), so a {@link StackOverflowError} is possible
* for the deep trees.
*
* @param <T> the type of tree nodes
* @param root root node of the tree
* @param mapper a non-interfering, stateless function to apply to each tree
* node and its depth which returns null for leaf nodes or stream of
* direct children for non-leaf nodes.
* @return the new sequential ordered {@code EntryStream}
* @since 0.7.1
* @see #ofTree(Object, BiFunction)
*/
public static <T> EntryStream<Integer, T> ofTreeFast(T root, BiFunction<Integer, T, Stream<T>> mapper) {
TreeSpliterator<T, Entry<Integer, T>> spliterator = new TreeSpliterator.Depth<>(root, mapper, false);
return new EntryStream<>(spliterator, StreamContext.SEQUENTIAL.onClose(spliterator::close));
}

Expand Down Expand Up @@ -2181,13 +2207,42 @@ public static <T> EntryStream<Integer, T> ofTree(T root, BiFunction<Integer, T,
* @since 0.5.2
* @see StreamEx#ofTree(Object, Class, Function)
* @see #ofTree(Object, BiFunction)
* @see #ofTreeFast(Object, Class, BiFunction)
*/
@SuppressWarnings("unchecked")
public static <T, TT extends T> EntryStream<Integer, T> ofTree(T root, Class<TT> collectionClass,
BiFunction<Integer, TT, Stream<T>> mapper) {
return ofTree(root, (d, t) -> collectionClass.isInstance(t) ? mapper.apply(d, (TT) t) : null);
}

/**
* Return a new {@link EntryStream} containing all the nodes of tree-like
* data structure in entry values along with the corresponding tree depths
* in entry keys, in depth-first order.
*
* <p>
* This method behaves in the same way as {@link #ofTree(Object, Class, BiFunction)},
* except it may work faster at the additional cost of stack consumption
* (max two frames per tree level), so a {@link StackOverflowError} is possible
* for the deep trees.
*
* @param <T> the base type of tree nodes
* @param <TT> the sub-type of composite tree nodes which may have children
* @param root root node of the tree
* @param collectionClass a class representing the composite tree node
* @param mapper a non-interfering, stateless function to apply to each
* composite tree node and its depth which returns stream of direct
* children. May return null if the given node has no children.
* @return the new sequential ordered stream
* @since 0.7.1
* @see #ofTree(Object, Class, BiFunction)
*/
@SuppressWarnings("unchecked")
public static <T, TT extends T> EntryStream<Integer, T> ofTreeFast(T root, Class<TT> collectionClass,
BiFunction<Integer, TT, Stream<T>> mapper) {
return ofTreeFast(root, (d, t) -> collectionClass.isInstance(t) ? mapper.apply(d, (TT) t) : null);
}

/**
* Returns an infinite sequential unordered {@code EntryStream} where each
* entry key is generated by the provided {@code keySupplier} and each entry value is generated by the provided
Expand Down
54 changes: 53 additions & 1 deletion src/main/java/one/util/streamex/StreamEx.java
Expand Up @@ -2969,9 +2969,34 @@ public static <U, V, T> StreamEx<T> zip(U[] first, V[] second, BiFunction<? supe
* @since 0.2.2
* @see EntryStream#ofTree(Object, BiFunction)
* @see #ofTree(Object, Class, Function)
* @see #ofTreeFast(Object, Function)
*/
public static <T> StreamEx<T> ofTree(T root, Function<T, Stream<T>> mapper) {
TreeSpliterator<T, T> spliterator = new TreeSpliterator.Plain<>(root, mapper);
TreeSpliterator<T, T> spliterator = new TreeSpliterator.Plain<>(root, mapper, true);
return new StreamEx<>(spliterator, StreamContext.SEQUENTIAL.onClose(spliterator::close));
}

/**
* Return a new {@link StreamEx} containing all the nodes of tree-like data
* structure in depth-first order.
*
* <p>
* This method behaves in the same way as {@link #ofTree(Object, Function)},
* except it may work faster at the additional cost of stack consumption
* (max two frames per tree level), so a {@link StackOverflowError} is possible
* for the deep trees.
*
* @param <T> the type of tree nodes
* @param root root node of the tree
* @param mapper a non-interfering, stateless function to apply to each tree
* node which returns null for leaf nodes or stream of direct
* children for non-leaf nodes.
* @return the new sequential ordered stream
* @since 0.7.1
* @see #ofTree(Object, Function)
*/
public static <T> StreamEx<T> ofTreeFast(T root, Function<T, Stream<T>> mapper) {
TreeSpliterator<T, T> spliterator = new TreeSpliterator.Plain<>(root, mapper, false);
return new StreamEx<>(spliterator, StreamContext.SEQUENTIAL.onClose(spliterator::close));
}

Expand All @@ -2998,12 +3023,39 @@ public static <T> StreamEx<T> ofTree(T root, Function<T, Stream<T>> mapper) {
* @since 0.2.2
* @see EntryStream#ofTree(Object, Class, BiFunction)
* @see #ofTree(Object, Function)
* @see #ofTreeFast(Object, Class, Function)
*/
@SuppressWarnings("unchecked")
public static <T, TT extends T> StreamEx<T> ofTree(T root, Class<TT> collectionClass, Function<TT, Stream<T>> mapper) {
return ofTree(root, t -> collectionClass.isInstance(t) ? mapper.apply((TT) t) : null);
}

/**
* Return a new {@link StreamEx} containing all the nodes of tree-like data
* structure in depth-first order.
*
* <p>
* This method behaves in the same way as {@link #ofTree(Object, Class, Function)},
* except it may work faster at the additional cost of stack consumption
* (max two frames per tree level), so a {@link StackOverflowError} is possible
* for the deep trees.
*
* @param <T> the base type of tree nodes
* @param <TT> the sub-type of composite tree nodes which may have children
* @param root root node of the tree
* @param collectionClass a class representing the composite tree node
* @param mapper a non-interfering, stateless function to apply to each
* composite tree node which returns stream of direct children. May
* return null if the given node has no children.
* @return the new sequential ordered stream
* @since 0.7.1
* @see #ofTree(Object, Class, Function)
*/
@SuppressWarnings("unchecked")
public static <T, TT extends T> StreamEx<T> ofTreeFast(T root, Class<TT> collectionClass, Function<TT, Stream<T>> mapper) {
return ofTreeFast(root, t -> collectionClass.isInstance(t) ? mapper.apply((TT) t) : null);
}

/**
* Returns a new {@code StreamEx} which consists of non-overlapping sublists
* of given source list having the specified length (the last sublist may be
Expand Down
20 changes: 15 additions & 5 deletions src/main/java/one/util/streamex/TreeSpliterator.java
Expand Up @@ -35,10 +35,12 @@
T cur;
List<PairBox<Spliterator<T>, Stream<T>>> spliterators;
private Runnable closeHandler = null;
final boolean stackFriendly;
long size = Long.MAX_VALUE;

TreeSpliterator(T root) {
TreeSpliterator(T root, boolean stackFriendly) {
this.cur = root;
this.stackFriendly = stackFriendly;
}

boolean advance() {
Expand Down Expand Up @@ -168,8 +170,8 @@ public void accept(T t) {
static class Plain<T> extends TreeSpliterator<T, T> {
private final Function<T, Stream<T>> mapper;

Plain(T root, Function<T, Stream<T>> mapper) {
super(root);
Plain(T root, Function<T, Stream<T>> mapper, boolean stackFriendly) {
super(root, stackFriendly);
this.mapper = mapper;
}

Expand All @@ -184,6 +186,10 @@ public boolean tryAdvance(Consumer<? super T> action) {

@Override
public void forEachRemaining(Consumer<? super T> action) {
if (stackFriendly) {
super.forEachRemaining(action);
return;
}
Acceptor<T> acceptor = new Acceptor<>(action, mapper);
if(spliterators != null) {
for(int i=spliterators.size()-1; i>=0; i--) {
Expand Down Expand Up @@ -236,8 +242,8 @@ public void accept(T t) {
static class Depth<T> extends TreeSpliterator<T, Entry<Integer, T>> {
private final BiFunction<Integer, T, Stream<T>> mapper;

Depth(T root, BiFunction<Integer, T, Stream<T>> mapper) {
super(root);
Depth(T root, BiFunction<Integer, T, Stream<T>> mapper, boolean stackFriendly) {
super(root, stackFriendly);
this.mapper = mapper;
}

Expand All @@ -253,6 +259,10 @@ public boolean tryAdvance(Consumer<? super Entry<Integer, T>> action) {

@Override
public void forEachRemaining(Consumer<? super Entry<Integer, T>> action) {
if (stackFriendly) {
super.forEachRemaining(action);
return;
}
DepthAcceptor<T> acceptor = new DepthAcceptor<>(action, mapper, 0);
if(spliterators != null) {
for(int i=spliterators.size()-1; i>=0; i--) {
Expand Down
104 changes: 65 additions & 39 deletions src/test/java/one/util/streamex/EntryStreamTest.java
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -653,46 +654,71 @@ public void testDistinctKeysValues() {

@Test
public void testOfTree() {
entryStream(() -> EntryStream.ofTree("a", (Integer depth, String str) -> null), supplier -> checkAsString(
"0->a", supplier.get()));

List<Object> input = Arrays.asList("aa", null, asList(asList("bbbb", "cc", null, asList()), "ddd", Arrays
.asList("e"), asList("fff")), "ggg");
@SuppressWarnings("unchecked")
Supplier<Stream<Entry<Integer, Object>>> base = () -> EntryStream.ofTree(input, List.class, (depth, l) -> l
.stream());
entryStream(base, supplier -> assertEquals("{1=[aa, ggg], 2=[ddd], 3=[bbbb, cc, e, fff]}", supplier.get()
.selectValues(String.class).grouping(TreeMap::new).toString()));

Set<Integer> set = new HashSet<>();
try(EntryStream<Integer, String> stream = EntryStream.ofTree("", (Integer depth, String str) -> depth >= 3 ? null : Stream.of("a", "b")
.map(str::concat).onClose(() -> set.add(depth)))) {
assertEquals(15, stream.count());
}
assertEquals(StreamEx.of(0, 1, 2).toSet(), set);
boolean catched = false;
try(EntryStream<Integer, String> stream = EntryStream.ofTree("", (Integer depth, String str) -> depth >= 3 ? null : Stream.of("a", "b")
.map(str::concat).onClose(() -> {throw new IllegalArgumentException(String.valueOf(depth));}))) {
stream.count();
}
catch(IllegalArgumentException iae) {
catched = true;
assertEquals("2", iae.getMessage());
assertEquals(2, iae.getSuppressed().length);
assertEquals("1", iae.getSuppressed()[0].getMessage());
assertEquals("0", iae.getSuppressed()[1].getMessage());
for (boolean fast : new boolean[]{true, false}) {
entryStream(() -> ofTree("a", (Integer depth, String str) -> null, fast), supplier -> checkAsString(
"0->a", supplier.get()));

List<Object> input = Arrays.asList("aa", null, asList(asList("bbbb", "cc", null, asList()), "ddd", Arrays
.asList("e"), asList("fff")), "ggg");
@SuppressWarnings("unchecked")
Supplier<Stream<Entry<Integer, Object>>> base =
() -> ofTree(input, List.class, (depth, l) -> l.stream(), fast);
entryStream(base, supplier -> assertEquals("{1=[aa, ggg], 2=[ddd], 3=[bbbb, cc, e, fff]}", supplier.get()
.selectValues(String.class).grouping(TreeMap::new).toString()));

Set<Integer> set = new HashSet<>();
try(EntryStream<Integer, String> stream = ofTree("", (Integer depth, String str) -> depth >= 3 ? null : Stream.of("a", "b")
.map(str::concat).onClose(() -> set.add(depth)), fast)) {
assertEquals(15, stream.count());
}
assertEquals(StreamEx.of(0, 1, 2).toSet(), set);
boolean catched = false;
try(EntryStream<Integer, String> stream = ofTree("", (Integer depth, String str) -> depth >= 4 ? null : Stream.of("a", "b")
.map(str::concat).onClose(() -> {throw new IllegalArgumentException(String.valueOf(depth));}), fast)) {
stream.count();
}
catch(IllegalArgumentException iae) {
catched = true;
assertEquals("3", iae.getMessage());
assertEquals(asList("3", "2", "1", "0"), StreamEx.<Throwable>ofTree(iae, ex -> StreamEx.of(ex.getSuppressed()))
.map(Throwable::getMessage).toList());
}
assertTrue(catched);

entryStream(() -> ofTree("", (Integer depth, String str) -> depth >= 3 ? null : Stream.of("a", "b")
.map(str::concat), fast), supplier -> {
assertEquals(asList("", "a", "aa", "aaa", "aab", "ab", "aba", "abb", "b", "ba", "baa", "bab", "bb", "bba",
"bbb"), supplier.get().values().toList());
assertTrue(supplier.get().values().has("bbb"));
assertFalse(supplier.get().values().has("ccc"));
assertEquals(asList("a", "b", "aa", "ab", "ba", "bb", "aaa", "aab", "aba", "abb", "baa", "bab", "bba",
"bbb"), supplier.get().sorted(Entry.comparingByKey()).values().without("").toList());
});
}
assertTrue(catched);

entryStream(() -> EntryStream.ofTree("", (Integer depth, String str) -> depth >= 3 ? null : Stream.of("a", "b")
.map(str::concat)), supplier -> {
assertEquals(asList("", "a", "aa", "aaa", "aab", "ab", "aba", "abb", "b", "ba", "baa", "bab", "bb", "bba",
"bbb"), supplier.get().values().toList());
assertTrue(supplier.get().values().has("bbb"));
assertFalse(supplier.get().values().has("ccc"));
assertEquals(asList("a", "b", "aa", "ab", "ba", "bb", "aaa", "aab", "aba", "abb", "baa", "bab", "bba",
"bbb"), supplier.get().sorted(Entry.comparingByKey()).values().without("").toList());
});
}

private static <T> EntryStream<Integer, T> ofTree(T root, BiFunction<Integer, T, Stream<T>> mapper, boolean fast) {
return fast ? EntryStream.ofTreeFast(root, mapper) : EntryStream.ofTree(root, mapper);
}

private static <T, TT extends T> EntryStream<Integer, T> ofTree(T root, Class<TT> collectionClass,
BiFunction<Integer, TT, Stream<T>> mapper,
boolean fast) {
return fast ? EntryStream.ofTreeFast(root, collectionClass, mapper) : EntryStream
.ofTree(root, collectionClass, mapper);
}

@Test
public void testOfTreeDeep() {
List<Integer> numbers = EntryStream.ofTree(1, (d, n) -> n >= 10000 ? null : StreamEx.of(n + 1))
.values().toList();
assertEquals(IntStreamEx.rangeClosed(1, 10000).boxed().toList(), numbers);
assertThrows(StackOverflowError.class,
() -> EntryStream.ofTreeFast(1, (d, n) -> n >= 10000 ? null : StreamEx.of(n + 1))
.values().toList());
assertThrows(StackOverflowError.class,
() -> EntryStream.ofTreeFast(1, Integer.class, (d, n) -> n >= 10000 ? null : StreamEx.of(n + 1))
.values().toList());
}

@Test
Expand Down
15 changes: 13 additions & 2 deletions src/test/java/one/util/streamex/StreamExTest.java
Expand Up @@ -90,8 +90,8 @@ public void testCreate() {

assertEquals(asList("a", "b"), StreamEx.of(asList("a", "b").spliterator()).toList());
assertEquals(asList("a", "b"), StreamEx.of(asList("a", "b").iterator()).toList());
assertEquals(asList(), StreamEx.of(asList().iterator()).toList());
assertEquals(asList(), StreamEx.of(asList().iterator()).parallel().toList());
assertEquals(asList(), StreamEx.of(Collections.emptyIterator()).toList());
assertEquals(asList(), StreamEx.of(Collections.emptyIterator()).parallel().toList());
assertEquals(asList("a", "b"), StreamEx.of(new Vector<>(asList("a", "b")).elements()).toList());

assertEquals(asList("a", "b", "c", "d"), StreamEx.ofReversed(asList("d", "c", "b", "a")).toList());
Expand Down Expand Up @@ -1013,6 +1013,17 @@ public void testOfTree() {
String::valueOf) : null).parallel().count());
}

@Test
public void testOfTreeDeep() {
List<Integer> numbers = StreamEx.ofTree(1, n -> n >= 10000 ? null : StreamEx.of(n + 1))
.toList();
assertEquals(IntStreamEx.rangeClosed(1, 10000).boxed().toList(), numbers);
assertThrows(StackOverflowError.class,
() -> StreamEx.ofTreeFast(1, Integer.class, n -> n >= 10000 ? null : StreamEx.of(n + 1)).toList());
assertThrows(StackOverflowError.class,
() -> StreamEx.ofTreeFast(1, n -> n >= 10000 ? null : StreamEx.of(n + 1)).toList());
}

@Test
public void testOfTreeClose() {
CompositeNode r = CompositeNode.createTestData();
Expand Down

0 comments on commit b6871f1

Please sign in to comment.