From cf1ea054cedf29ef3bb804410eb56f17f8ff1b37 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sun, 5 Feb 2017 11:22:00 +0200 Subject: [PATCH] [BEAM-1392] DoFn teardown not called on empty partitions --- .../beam/runners/spark/translation/SparkProcessContext.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 9957bf330650..60c9d4d620b7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -62,8 +62,10 @@ Iterable processPartition( // skip if partition is empty. if (!partition.hasNext()) { + DoFnInvokers.invokerFor(doFn).invokeTeardown(); return Lists.newArrayList(); } + // call startBundle() before beginning to process the partition. doFnRunner.startBundle(); // process the partition; finishBundle() is called from within the output iterator.