Skip to content

Commit

Permalink
test case added, #2524
Browse files Browse the repository at this point in the history
  • Loading branch information
markheger committed Jul 29, 2020
1 parent 338fa17 commit 44cc2ca
Showing 1 changed file with 29 additions and 0 deletions.
29 changes: 29 additions & 0 deletions test/java/src/com/ibm/streamsx/topology/test/api/WindowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,36 @@ public void testKeyedAggregate() throws Exception {
"A:1015", "B:4010", "B:4009", "B:4008", "A:1021", "C:2005",
"C:2018", "A:1024");
}

@Test
public void testKeyedAggregate_stv() throws Exception {
assumeTrue(!isEmbedded());
TStream<StockPrice> aggregate = _testKeyedAggregateStv();

completeAndValidate(aggregate, 10, "A:1000", "B:4004", "C:2013", "A:1005",
"A:1010", "B:4005", "A:1010", "C:2007", "B:4008", "C:2003",
"A:1015", "B:4010", "B:4009", "B:4008", "A:1021", "C:2005",
"C:2018", "A:1024");
}

private static TStream<StockPrice> _testKeyedAggregateStv() throws Exception {

final Topology f = newTopology("PartitionedAggregateStv");
TStream<StockPrice> source = f.constants(Arrays.asList(PRICES)).asType(StockPrice.class);

Supplier<Integer> count = f.createSubmissionParameter("count", 2);
TStream<StockPrice> aggregate = source.last(count).key(new Function<StockPrice,String>() {

private static final long serialVersionUID = 1L;

@Override
public String apply(StockPrice v) {
return v.getKey();
}}).aggregate(new AveragePrice());

return aggregate;
}

private static TStream<StockPrice> _testKeyedAggregate() throws Exception {

final Topology f = newTopology("PartitionedAggregate");
Expand Down

0 comments on commit 44cc2ca

Please sign in to comment.