Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink;

import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
Expand All @@ -25,6 +26,7 @@
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.util.UserCodeException;
import org.joda.time.Duration;

/**
* Test Flink runner.
Expand Down Expand Up @@ -55,7 +57,18 @@ public static TestFlinkRunner create(boolean streaming) {
@Override
public PipelineResult run(Pipeline pipeline) {
try {
return delegate.run(pipeline);
PipelineResult result = delegate.run(pipeline);
// For tests that don't call waitUntilFinish().
try {
result.waitUntilFinish(Duration.standardSeconds(30));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this duration?

Also the nested tries and catches and exception propagation make my head hurt a little. I'm not sure off the top of my head if there's any other way to do it.

You should also consider only calling cancel if the job has not already terminated.

Also it turns out the FlinkRunner doesn't support cancel

} finally {
try {
result.cancel();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return result;
} catch (Throwable e) {
// Special case hack to pull out assertion errors from PAssert; instead there should
// probably be a better story along the lines of UserCodeException.
Expand Down