Skip to content

Commit

Permalink
Add Collector for collect to Collectors2 and refactor merge operation…
Browse files Browse the repository at this point in the history
…s for collections and partitions into reusable methods.

Signed-off-by: Donald Raab <Donald.Raab@gs.com>
  • Loading branch information
Donald Raab authored and Donald Raab committed Jun 15, 2016
1 parent 4120379 commit c7dbe5a
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Iterator;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
import java.util.stream.Collector;

Expand Down Expand Up @@ -655,11 +656,7 @@ private Collectors2()
collection.add(each);
}
},
(collection1, collection2) ->
{
collection1.addAll(collection2);
return collection1;
},
Collectors2.mergeCollections(),
EMPTY_CHARACTERISTICS);
}

Expand All @@ -677,11 +674,7 @@ private Collectors2()
collection.add(each);
}
},
(collection1, collection2) ->
{
collection1.addAll(collection2);
return collection1;
},
Collectors2.mergeCollections(),
EMPTY_CHARACTERISTICS);
}

Expand All @@ -696,11 +689,7 @@ private Collectors2()
collection.add(each);
}
},
(collection1, collection2) ->
{
collection1.addAll(collection2);
return collection1;
},
Collectors2.mergeCollections(),
EMPTY_CHARACTERISTICS);
}

Expand All @@ -718,11 +707,7 @@ private Collectors2()
collection.add(each);
}
},
(collection1, collection2) ->
{
collection1.addAll(collection2);
return collection1;
},
Collectors2.mergeCollections(),
EMPTY_CHARACTERISTICS);
}

Expand All @@ -737,12 +722,7 @@ private Collectors2()
MutableCollection<T> bucket = predicate.accept(each) ? partition.getSelected() : partition.getRejected();
bucket.add(each);
},
(partition1, partition2) ->
{
partition1.getSelected().addAll(partition2.getSelected());
partition1.getRejected().addAll(partition2.getRejected());
return partition1;
},
Collectors2.mergePartitions(),
EMPTY_CHARACTERISTICS);
}

Expand All @@ -759,13 +739,49 @@ private Collectors2()
predicate.accept(each, parameter) ? partition.getSelected() : partition.getRejected();
bucket.add(each);
},
(partition1, partition2) ->
{
partition1.getSelected().addAll(partition2.getSelected());
partition1.getRejected().addAll(partition2.getRejected());
return partition1;
},
Collectors2.mergePartitions(),
EMPTY_CHARACTERISTICS);
}

public static <T, V, R extends Collection<V>> Collector<T, ?, R> collect(
Function<? super T, ? extends V> function, Supplier<R> supplier)
{
return Collector.of(
supplier,
(collection, each) -> collection.add(function.valueOf(each)),
Collectors2.mergeCollections(),
EMPTY_CHARACTERISTICS);
}

public static <T, P, V, R extends Collection<V>> Collector<T, ?, R> collectWith(
Function2<? super T, ? super P, ? extends V> function,
P parameter,
Supplier<R> supplier)
{
return Collector.of(
supplier,
(collection, each) -> collection.add(function.value(each, parameter)),
Collectors2.mergeCollections(),
EMPTY_CHARACTERISTICS);
}

private static <T, R extends Collection<T>> BinaryOperator<R> mergeCollections()
{
return (collection1, collection2) ->
{
collection1.addAll(collection2);
return collection1;
};
}

private static <T, R extends PartitionMutableCollection<T>> BinaryOperator<R> mergePartitions()
{
return (partition1, partition2) ->
{
partition1.getSelected().addAll(partition2.getSelected());
partition1.getRejected().addAll(partition2.getRejected());
return partition1;
};
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.collections.api.partition.bag.PartitionMutableBag;
import org.eclipse.collections.api.partition.list.PartitionMutableList;
import org.eclipse.collections.api.partition.set.PartitionMutableSet;
import org.eclipse.collections.impl.block.factory.Functions;
import org.eclipse.collections.impl.block.factory.IntegerPredicates;
import org.eclipse.collections.impl.block.factory.Predicates2;
import org.eclipse.collections.impl.factory.Bags;
Expand All @@ -39,7 +40,7 @@ public class Collectors2Test
{
public static final Interval SMALL_INTERVAL = Interval.oneTo(5);
public static final Interval LARGE_INTERVAL = Interval.oneTo(20000);
public static final int HALF_SIZE = LARGE_INTERVAL.size() / 2;
public static final Integer HALF_SIZE = Integer.valueOf(LARGE_INTERVAL.size() / 2);
private final List<Integer> smallData = new ArrayList<Integer>(SMALL_INTERVAL);
private final List<Integer> bigData = new ArrayList<Integer>(LARGE_INTERVAL);

Expand Down Expand Up @@ -1389,17 +1390,20 @@ public void partitionParallel()
@Test
public void partitionWith()
{
PartitionMutableList<Integer> expectedList = LARGE_INTERVAL.toList().partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableList<Integer> expectedList = LARGE_INTERVAL.toList()
.partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableList<Integer> actualList = this.bigData.stream()
.collect(Collectors2.partitionWith(Predicates2.greaterThan(), HALF_SIZE, PartitionFastList::new));
Assert.assertEquals(expectedList.getSelected(), actualList.getSelected());
Assert.assertEquals(expectedList.getRejected(), actualList.getRejected());
PartitionMutableSet<Integer> expectedSet = LARGE_INTERVAL.toSet().partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableSet<Integer> expectedSet = LARGE_INTERVAL.toSet()
.partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableSet<Integer> actualSet = this.bigData.stream()
.collect(Collectors2.partitionWith(Predicates2.greaterThan(), HALF_SIZE, PartitionUnifiedSet::new));
Assert.assertEquals(expectedSet.getSelected(), actualSet.getSelected());
Assert.assertEquals(expectedSet.getRejected(), actualSet.getRejected());
PartitionMutableBag<Integer> expectedBag = LARGE_INTERVAL.toBag().partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableBag<Integer> expectedBag = LARGE_INTERVAL.toBag()
.partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableBag<Integer> actualBag = this.bigData.stream()
.collect(Collectors2.partitionWith(Predicates2.greaterThan(), HALF_SIZE, PartitionHashBag::new));
Assert.assertEquals(expectedBag.getSelected(), actualBag.getSelected());
Expand All @@ -1409,20 +1413,97 @@ public void partitionWith()
@Test
public void partitionWithParallel()
{
PartitionMutableList<Integer> expectedList = LARGE_INTERVAL.toList().partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableList<Integer> expectedList = LARGE_INTERVAL.toList()
.partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableList<Integer> actualList = this.bigData.parallelStream()
.collect(Collectors2.partitionWith(Predicates2.greaterThan(), HALF_SIZE, PartitionFastList::new));
Assert.assertEquals(expectedList.getSelected(), actualList.getSelected());
Assert.assertEquals(expectedList.getRejected(), actualList.getRejected());
PartitionMutableSet<Integer> expectedSet = LARGE_INTERVAL.toSet().partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableSet<Integer> expectedSet = LARGE_INTERVAL.toSet()
.partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableSet<Integer> actualSet = this.bigData.parallelStream()
.collect(Collectors2.partitionWith(Predicates2.greaterThan(), HALF_SIZE, PartitionUnifiedSet::new));
Assert.assertEquals(expectedSet.getSelected(), actualSet.getSelected());
Assert.assertEquals(expectedSet.getRejected(), actualSet.getRejected());
PartitionMutableBag<Integer> expectedBag = LARGE_INTERVAL.toBag().partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableBag<Integer> expectedBag = LARGE_INTERVAL.toBag()
.partitionWith(Predicates2.greaterThan(), HALF_SIZE);
PartitionMutableBag<Integer> actualBag = this.bigData.parallelStream()
.collect(Collectors2.partitionWith(Predicates2.greaterThan(), HALF_SIZE, PartitionHashBag::new));
Assert.assertEquals(expectedBag.getSelected(), actualBag.getSelected());
Assert.assertEquals(expectedBag.getRejected(), actualBag.getRejected());
}

@Test
public void collect()
{
Assert.assertEquals(
LARGE_INTERVAL.toList().collect(Functions.getToString()),
this.bigData.stream().collect(Collectors2.collect(Functions.getToString(), Lists.mutable::empty))
);
Assert.assertEquals(
LARGE_INTERVAL.toSet().collect(Functions.getToString()),
this.bigData.stream().collect(Collectors2.collect(Functions.getToString(), Sets.mutable::empty))
);
Assert.assertEquals(
LARGE_INTERVAL.toBag().collect(Functions.getToString()),
this.bigData.stream().collect(Collectors2.collect(Functions.getToString(), Bags.mutable::empty))
);
}

@Test
public void collectParallel()
{
Assert.assertEquals(
LARGE_INTERVAL.toList().collect(Functions.getToString()),
this.bigData.parallelStream().collect(Collectors2.collect(Functions.getToString(), Lists.mutable::empty))
);
Assert.assertEquals(
LARGE_INTERVAL.toSet().collect(Functions.getToString()),
this.bigData.parallelStream().collect(Collectors2.collect(Functions.getToString(), Sets.mutable::empty))
);
Assert.assertEquals(
LARGE_INTERVAL.toBag().collect(Functions.getToString()),
this.bigData.parallelStream().collect(Collectors2.collect(Functions.getToString(), Bags.mutable::empty))
);
}

@Test
public void collectWith()
{
Assert.assertEquals(
LARGE_INTERVAL.toList().collectWith(Integer::sum, Integer.valueOf(10)),
this.bigData.stream()
.collect(Collectors2.collectWith(Integer::sum, Integer.valueOf(10), Lists.mutable::empty))
);
Assert.assertEquals(
LARGE_INTERVAL.toSet().collectWith(Integer::sum, Integer.valueOf(10)),
this.bigData.stream()
.collect(Collectors2.collectWith(Integer::sum, Integer.valueOf(10), Sets.mutable::empty))
);
Assert.assertEquals(
LARGE_INTERVAL.toBag().collectWith(Integer::sum, Integer.valueOf(10)),
this.bigData.stream()
.collect(Collectors2.collectWith(Integer::sum, Integer.valueOf(10), Bags.mutable::empty))
);
}

@Test
public void collectWithParallel()
{
Assert.assertEquals(
LARGE_INTERVAL.toList().collectWith(Integer::sum, Integer.valueOf(10)),
this.bigData.parallelStream()
.collect(Collectors2.collectWith(Integer::sum, Integer.valueOf(10), Lists.mutable::empty))
);
Assert.assertEquals(
LARGE_INTERVAL.toSet().collectWith(Integer::sum, Integer.valueOf(10)),
this.bigData.parallelStream()
.collect(Collectors2.collectWith(Integer::sum, Integer.valueOf(10), Sets.mutable::empty))
);
Assert.assertEquals(
LARGE_INTERVAL.toBag().collectWith(Integer::sum, Integer.valueOf(10)),
this.bigData.parallelStream()
.collect(Collectors2.collectWith(Integer::sum, Integer.valueOf(10), Bags.mutable::empty))
);
}
}

0 comments on commit c7dbe5a

Please sign in to comment.