diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index bfae241de6e93..533a3e40677c2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -3541,7 +3541,22 @@ public void onTimer() {} UsesTimersInParDo.class, DataflowPortabilityApiUnsupported.class }) - public void testOutputTimestampDefault() throws Exception { + public void testOutputTimestampDefaultBounded() throws Exception { + runTestOutputTimestampDefault(false); + } + + @Test + @Category({ + ValidatesRunner.class, + UsesUnboundedPCollections.class, + UsesTimersInParDo.class, + DataflowPortabilityApiUnsupported.class + }) + public void testOutputTimestampDefaultUnbounded() throws Exception { + runTestOutputTimestampDefault(true); + } + + public void runTestOutputTimestampDefault(boolean useStreaming) throws Exception { final String timerId = "foo"; DoFn, Long> fn1 = new DoFn, Long>() { @@ -3566,7 +3581,7 @@ public void onTimer(@Timestamp Instant timestamp, OutputReceiver o) { PCollection output = pipeline .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 1L), new Instant(3)))) - .setIsBoundedInternal(IsBounded.UNBOUNDED) + .setIsBoundedInternal(useStreaming ? IsBounded.UNBOUNDED : IsBounded.BOUNDED) .apply("first", ParDo.of(fn1)); PAssert.that(output).containsInAnyOrder(new Instant(8).getMillis()); // result output