From d965e29fe24a49712a22c9c5b141aedb470b2ba1 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Tue, 19 Apr 2016 09:20:30 +0200 Subject: [PATCH] [BEAM-207] Flink test flake in ReadSourceStreamingITCase --- .../flink/translation/wrappers/SourceInputFormat.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index dc11c77e83ad..3a885f13786f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -46,8 +46,8 @@ public class SourceInputFormat implements InputFormat> private transient PipelineOptions options; private final SerializedPipelineOptions serializedOptions; - private transient BoundedSource.BoundedReader reader = null; - private boolean inputAvailable = true; + private transient BoundedSource.BoundedReader reader; + private boolean inputAvailable = false; public SourceInputFormat(BoundedSource initialSource, PipelineOptions options) { this.initialSource = initialSource; @@ -135,6 +135,8 @@ public T nextRecord(T t) throws IOException { @Override public void close() throws IOException { - reader.close(); + if (reader != null) { + reader.close(); + } } }