From b4e8494708d79d0a2a64e86619bd5b99fcf5799d Mon Sep 17 00:00:00 2001 From: mingmxu Date: Wed, 8 Nov 2017 13:44:18 -0800 Subject: [PATCH] change `withAllowedTimestampSkew` --- .../beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java index e49e79c5b469..6ed30b611887 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java @@ -80,7 +80,8 @@ public PCollection buildBeamPipeline(PCollectionTuple inputPCollecti BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); if (windowFieldIdx != -1) { upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps - .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) + .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)) + .withAllowedTimestampSkew(new Duration(Long.MAX_VALUE))) .setCoder(upstream.getCoder()); }