diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java index ef56b55dc9bd..6bffad2b71fe 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink; +import java.io.IOException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; @@ -25,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.util.UserCodeException; +import org.joda.time.Duration; /** * Test Flink runner. @@ -55,7 +57,18 @@ public static TestFlinkRunner create(boolean streaming) { @Override public PipelineResult run(Pipeline pipeline) { try { - return delegate.run(pipeline); + PipelineResult result = delegate.run(pipeline); + // For tests that don't call waitUntilFinish(). + try { + result.waitUntilFinish(Duration.standardSeconds(30)); + } finally { + try { + result.cancel(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return result; } catch (Throwable e) { // Special case hack to pull out assertion errors from PAssert; instead there should // probably be a better story along the lines of UserCodeException.