From 758c791a9e987fa7fb7264b4f4f2c4107f3dd452 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Fri, 24 Nov 2017 18:31:51 +0100 Subject: [PATCH 1/2] [BEAM-3244] Ensure execution of teardown method on Flink's DoFnOperator --- .../translation/wrappers/streaming/DoFnOperator.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index d203ffb67dc5..fcee0549706d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -361,6 +361,16 @@ public void onProcessingTime(long timestamp) throws Exception { SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); } + @Override + public void dispose() throws Exception { + try { + super.dispose(); + checkFinishBundleTimer.cancel(true); + } finally { + doFnInvoker.invokeTeardown(); + } + } + @Override public void close() throws Exception { super.close(); @@ -379,8 +389,6 @@ public void close() throws Exception { } } } - checkFinishBundleTimer.cancel(true); - doFnInvoker.invokeTeardown(); } private long getPushbackWatermarkHold() { From b88e7175531569bac1fdf6c611d3951dcaae8f8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Wed, 29 Nov 2017 09:50:06 +0100 Subject: [PATCH 2/2] [BEAM-3244] Enable PardoLifecycleTest for the Flink runner --- runners/flink/pom.xml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index a6ab44bcea5f..7840c328c9a3 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -87,10 +87,6 @@ test - - - org.apache.beam.sdk.transforms.ParDoLifecycleTest - org.apache.beam.sdk.testing.ValidatesRunner org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,