Skip to content

Commit

Permalink
Merge pull request #2528 from markheger/java-win-def
Browse files Browse the repository at this point in the history
Resolve issue Java Window definition
  • Loading branch information
markheger committed Jul 29, 2020
2 parents 2d19403 + 44cc2ca commit a63b253
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public <J, U> TStream<J> joinInternal(TStream<U> xstream,
public <U> TWindow<T,U> key(Function<? super T, ? extends U> keyGetter) {
if (keyGetter == null)
throw new NullPointerException();
return new WindowDefinition<T,U>(stream, policy, config, timeUnit, keyGetter, null);
return new WindowDefinition<T,U>(stream, policy, config, timeUnit, keyGetter, supplierConfig);
}
@Override
public TWindow<T, T> key() {
Expand Down
29 changes: 29 additions & 0 deletions test/java/src/com/ibm/streamsx/topology/test/api/WindowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,36 @@ public void testKeyedAggregate() throws Exception {
"A:1015", "B:4010", "B:4009", "B:4008", "A:1021", "C:2005",
"C:2018", "A:1024");
}

@Test
public void testKeyedAggregate_stv() throws Exception {
assumeTrue(!isEmbedded());
TStream<StockPrice> aggregate = _testKeyedAggregateStv();

completeAndValidate(aggregate, 10, "A:1000", "B:4004", "C:2013", "A:1005",
"A:1010", "B:4005", "A:1010", "C:2007", "B:4008", "C:2003",
"A:1015", "B:4010", "B:4009", "B:4008", "A:1021", "C:2005",
"C:2018", "A:1024");
}

private static TStream<StockPrice> _testKeyedAggregateStv() throws Exception {

final Topology f = newTopology("PartitionedAggregateStv");
TStream<StockPrice> source = f.constants(Arrays.asList(PRICES)).asType(StockPrice.class);

Supplier<Integer> count = f.createSubmissionParameter("count", 2);
TStream<StockPrice> aggregate = source.last(count).key(new Function<StockPrice,String>() {

private static final long serialVersionUID = 1L;

@Override
public String apply(StockPrice v) {
return v.getKey();
}}).aggregate(new AveragePrice());

return aggregate;
}

private static TStream<StockPrice> _testKeyedAggregate() throws Exception {

final Topology f = newTopology("PartitionedAggregate");
Expand Down

0 comments on commit a63b253

Please sign in to comment.