[BEAM-2140] Execute Splittable DoFn directly in Flink Runner#3480
[BEAM-2140] Execute Splittable DoFn directly in Flink Runner#3480aljoscha wants to merge 1 commit intoapache:masterfrom
Conversation
Before, we were using ProcessFn. This was causing problems with the Flink Runner for two reasons: 1. StatefulDoFnRunner is in the processing path, which means processing-time timers are being dropped when the watermark reaches +Inf 2. When a pipeline shuts down (for example, when bounded sources shut down) Flink will drop any outstanding processing-time timers, meaning that that any remaining Restrictions will not be processed. The fix for 1. is to execute the splittable DoFn directly, thereby bypassing the late data/timer dropping logic. The fix for 2. builds on the fix for 1. and also introduces a "last resort" even-time timer that fires at +Inf and makes sure that any remaining restrictions are being exhausted.
|
Run Flink ValidatesRunner |
|
Thanks! I'm gonna be out for the whole week though, with no connectivity, so maybe @kennknowles or @tgroh can take a look meanwhile? |
|
I'll take a peek |
|
@jkff / @kennknowles, any chance you can take a look at this shortly? This would be a great addition to the Flink runner! |
|
Sorry, forgot about this PR while on vacation. Looking. |
|
Run Flink ValidatesRunner |
| restrictionExhausted = | ||
| processRestriction(elementState, restrictionState, holdState, elementAndRestriction); | ||
|
|
||
| } while (!restrictionExhausted); |
There was a problem hiding this comment.
What if it is never exhausted, e.g. this is an SDF reading from pubsub forever?
There was a problem hiding this comment.
Then we forever block here, which is not good. However, this is the last chance we have of executing the splittable DoFn. If we get the +Inf watermark, this means that the upstream operators have shut down and that our operator will also be shutdown by Flink if all incoming elements have been processed (Flink will shutdown an operator if it has no more upstream live operators, it will not block shutdown on outstanding timers).
This is also the main reason why we cannot simply reuse ProcessFn as is.
There was a problem hiding this comment.
Hm, this sounds like a serious issue, because SDF is specifically intended to support infinite outputs per element. Do you think it's feasible to fix Flink to support this (e.g. optionally block shutdown on outstanding timers for an operator)?
|
|
||
| // Initialize state (element and restriction) | ||
|
|
||
| WindowedValue<KV<InputT, RestrictionT>> windowedValue = |
There was a problem hiding this comment.
Hm, I'm not super happy about copy-pasting code of ProcessFn here. Why can't we just use the processFn, like https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java does?
There was a problem hiding this comment.
Yes, I'm also not happy about this. The mean reason for copying is so that I can set that last-resort event-time timer that allows me to block shutting down when our upstream operators have shut down.
I'll think about this, maybe I can set the timer outside of the ProcessFn, I probably just have to make sure to set the right state namespace on the timer.
|
Hmm just had another thought. What if, when applying an SDF with unbounded
output per element to a bounded collection, Flink runner does some magic
and just makes sure the watermark of that collection never goes to
infinity?.. I don't know what form this could take and it seems hacky, but
it could work. Do we even care at all about watermarks of bounded
collections in a streaming pipeline? I guess normally they advance in one
shot from -inf to +inf, and in case they are used to produce an unbounded
collection, we could just make them stay at -inf.
…On Thu, Jul 20, 2017, 2:48 AM Aljoscha Krettek ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
<#3480 (comment)>:
> - Duration.standardSeconds(10)));
+ Duration.standardSeconds(10));
+ }
+
+
+ @OverRide
+ public void processElement(
+ StreamRecord<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> streamRecord)
+ throws Exception {
+
+ KeyedWorkItem<String, KV<InputT, RestrictionT>> inputElement =
+ streamRecord.getValue().getValue();
+
+ // Initialize state (element and restriction)
+
+ WindowedValue<KV<InputT, RestrictionT>> windowedValue =
Yes, I'm also not happy about this. The mean reason for copying is so that
I can set that last-resort event-time timer that allows me to block
shutting down when our upstream operators have shut down.
I'll think about this, maybe I can set the timer outside of the ProcessFn,
I probably just have to make sure to set the right state namespace on the
timer.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#3480 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAU-qlJ_ZsQCPaD2t080Qy2uLjI9To1Uks5sPyJxgaJpZM4OKn7c>
.
|
|
Closing this for now since the implementation does not work. I have another one, that is simpler but also doesn't work because there is currently no way of getting around the fact that Flink will shutdown the timer service before calling |
Before, we were using ProcessFn. This was causing problems with the
Flink Runner for two reasons:
StatefulDoFnRunner is in the processing path, which means
processing-time timers are being dropped when the watermark reaches +Inf
When a pipeline shuts down (for example, when bounded sources shut
down) Flink will drop any outstanding processing-time timers, meaning
that that any remaining Restrictions will not be processed.
The fix for 1. is to execute the splittable DoFn directly, thereby
bypassing the late data/timer dropping logic.
The fix for 2. builds on the fix for 1. and also introduces a "last
resort" even-time timer that fires at +Inf and makes sure that any
remaining restrictions are being exhausted.
R: @jkff Not sure if we wan't to fix it like this or maybe adapt
ProcessFnand removeStatefulDoFnRunnerfrom the processing path.