Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2140] Fix SplittableDoFn ValidatesRunner tests in Flink Streaming Runner #4639

Closed

Conversation

aljoscha
Copy link
Contributor

@aljoscha aljoscha commented Feb 8, 2018

This is a new and improved version of #4348. The gist of the changes is this:

  • We now block closing the DoFnOperator while there are processing-time timers remaining.
  • We don't use StatefulDoFnRunner when executing an SDF because this will start dropping timers when the input watermark of the SDF goes to +Inf. We don't want those timers to be dropped because they are what keeps SDF execution "alive".
  • We use ProcessFnRunner when executing an SDF. For this, I had to change how we wrap KeyedWorkItems in WindowedValues before sending them to the SDF operator because ProcessFnRunner likes its input to be in the GlobalWindow while normal DoFnOperator input should be in a window as assigned.

We don't yet have SDF support for the Flink Batch Runner because that's a completely different code path. An initial version of this should not be too hard, though.

@aljoscha
Copy link
Contributor Author

aljoscha commented Feb 8, 2018

Run Flink ValidatesRunner

@aljoscha aljoscha force-pushed the fix-flink-splittable-dofn-take-2 branch 2 times, most recently from 4ad6ba7 to 22ad573 Compare February 8, 2018 12:53
@aljoscha
Copy link
Contributor Author

aljoscha commented Feb 8, 2018

Run Flink ValidatesRunner

@aljoscha
Copy link
Contributor Author

aljoscha commented Feb 8, 2018

rerun tests please

@aljoscha
Copy link
Contributor Author

aljoscha commented Feb 8, 2018

retest this please

@aljoscha
Copy link
Contributor Author

aljoscha commented Feb 8, 2018

This still blocks operator shutdown in close() but there is currently no other way of doing this with Flink.

@aljoscha
Copy link
Contributor Author

aljoscha commented Feb 8, 2018

retest this please

@jkff
Copy link
Contributor

jkff commented Feb 8, 2018

Thank you! Will take a look today. To clarify: will this run correctly (including fault tolerance, watermarks, late data dropping by downstream operators and everything) in the common case of an SDF emitting an infinite amount of output? I.e. are there any limitations to what a Flink operator can do in close()?

(SDF ValidatesRunner tests, naturally, have to terminate, so they test only terminating cases...)

@aljoscha
Copy link
Contributor Author

aljoscha commented Feb 9, 2018

Yes, I believe this works correctly with watermarking. It will not work with fault-tolerance, though, because of a very peculiar "feature" of Flink: if any of the operators of a topology shut down checkpointing will not work anymore. However, in practice this is not a problem since sources of a streaming topology are not supposed to shut down so we will never come to close() in real-world use cases.

The issue this PR fixes only comes up in tests because we have bounded sources in a streaming pipeline, meaning we actually see a close() happening.

Also, there are two operators in Flink that follow the same pattern:

I'm not saying that I like this, to be honest, I'm just saying that this is the way we currently have to do it.

@jkff
Copy link
Contributor

jkff commented Feb 10, 2018

if any of the operators of a topology shut down checkpointing will not work anymore

Ouch, that is very unfortunate, and means that Flink will not be able to fault-tolerantly run an SDF (or a stateful/timerful DoFn, for that matter) which runs forever per element (e.g. reading from Kafka). This is the intended use case of SDF: e.g. to read 1 Kafka topic, create a "bounded" source containing 1 element (name of the topic), followed by an SDF that reads kafka topics. The "bounded source" emits the single element and immediately completes, but the SDF runs perpetually.

This on its own might seem silly ("why not express reading from Kafka as a root operator?"), but SDF affords the flexibility that you now can also easily read a collection of Kafka topics, bounded or not (e.g. receive over a Kafka topic a stream of names of new Kafka topics to read). Or more interestingly: you can create a "bounded" source containing a set of filepatterns, and apply to it two SDFs: one (filepattern -> new files as they appear), the other (file -> new records as the file grows).

It also seems problematic that close() is rarely exercised in Flink pipelines. That means that this codepath is probably buggy in ways we don't know, it would be awkward if it started playing a central role in most streaming pipelines as the various unbounded sources are transitioned to SDF.

Am I understanding correctly that, if the upstream operators didn't terminate, then you wouldn't need to put the processing into close()? Maybe that's the way to fix this - have Flink in streaming mode treat bounded sources as unbounded, in the sense that once they emit their elements, instead of terminating, they just hang forever pretending that they might have more data?

@aljoscha
Copy link
Contributor Author

Yes, I know quite well how SDF IOs are supposed to work. 😃

However, I think that in properly unbounded pipelines the "discovery source" will also stay online forever. (I think of SDF IOs like this: "Discovery Source/DoFn" -> "Reader SDF", where for Kafka the discovery source would listen for new partitions on the topics and forward them as they come online, for Kinesis it would listen for new shards and for files it would listen for files)

Side note: I think the Beam Kafka IO does currently not discover new partitions when a pipeline is already running, which is a bit of a problem. Having the discovery source stay always online would solve this. Not sure about the Kinesis IO. Flink has a different problem: all parallel instances of a source query for new Kafka partitions/Kinesis Shards and they have a deterministic way of assigning them to the instances (basically modulo). This, however, will DDoS Kafka/Kinesis which also can lead to problems. SDF IOs would solve this problem because there is only one part that does the periodic querying for new shards/partitions. Also, the Flink File source has worked like a SDF IO for quite a while now but we have never generalised the pattern as in what we want to do with SDF IOs in Beam.

On your last point: we currently can't do this because then our tests would not terminate and also because "bounded streaming pipelines" (which sounds very strange) would not terminate.

@jkff
Copy link
Contributor

jkff commented Feb 13, 2018

I would like to abolish the ill-defined concepts of sources and sinks, instead having everything be just transforms (and IOs be simply an informal name for transforms used for a particular purpose) - and SDF is part of that vision :) If we need to watch for new Kafka partitions - there needs to be a splittable or stateful DoFn to do that (applied to a PCollection of topics), before the partition-reading SDF. If anything is designated as a "source", it can no longer be placed anywhere except at the very root of the pipeline, and can no longer be data-dependent.

This is also in line with the Fn API where the concept of sources doesn't exist either (or, currently exists on life support but is destined to be deleted) - instead it only has a primitive "impulse" that generates a single empty value to kick off everything else.

Do we have any other options? Would it be difficult to make Flink work properly if some operators at the root shut down but others continue working potentially forever?

@aljoscha
Copy link
Contributor Author

Yes, unfortunately I agree with you, @jkff.

The option I see is to never let a Flink Streaming Pipeline shut down. PipelineResult.waitUntilFinish() would never return and a user always has to manually cancel the job. Is this what Dataflow is doing? The reason why we currently can't do this is because tests wouldn't finish. We would have to modify TestFlinkRunner to listen for completed PAsserts and shutdown the pipeline once all of them succeeded. I started work in this direction a while ago but then couldn't continue because Flink doesn't have good support for asynchronous job submission and listening on accumulators. I was basically trying to do this by sending low-level Akka messages to the JobManager. The good news is that Flink 1.5 is reworking how a client communicates with the cluster, which should make this part easier.

What do you think?

@kennknowles
Copy link
Member

I just dropped in on the last comment and it made me sad. In the earlier days of Dataflow we did just what you describe - watch counters for PAsserts and cancel when they are all passing or a single error message comes over the logging channel - and it was complex and flaky. As far as I know we never actually got the bugs out of it, but just rewrote it to not work that way once we shut down the pipeline on inf watermark.

Flink might be less flaky if listening on accumulators is reliable and exact (never overcounts or undercounts) and it does seem like this could work if done right from the start. But it is nicer not to have to? The Flink operator doesn't have to shut down to signal that it is "done" in the Beam sense, maybe? It could "hang forever pretending it has more data" but with inf watermark and the PipelineResult could still have some way of returning when watermarks are +inf globally, and it could also own canceling the job.

@jkff
Copy link
Contributor

jkff commented Feb 15, 2018

@aljoscha Is there a compelling reason for why Flink becomes unable to checkpoint once an operator shuts down? That seems like an implementation limitation, rather than a theoretical one - from the point of view of the Beam model there's nothing wrong with having a PTransform be cleaned up after processing a PCollection whose watermark has reached infinity, while downstream transforms still have something to do, potentially forever.

Maybe if we analyze why Flink currently behaves in this way, we can collectively come up with a way to fix Flink not to :)

@aljoscha
Copy link
Contributor Author

@kennknowles How you describe the new Dataflow mode of operations is actually how the Flink Runner currently works: when sources get the +Inf watermark they shut down. This shutting down, in turn, propagates through the Flink operators because operators shut down when all their input operators shut down. This, however leads to what @jkff criticised (correctly) above: we have to block shutdown of an operator in close() if there are pending processing-time timers because Flink does not consider them as data that should not be dropped. And it leads to the other problem that Flink will not do checkpointing if any of the operators are shutdown.

@jkff This is not an inherent design problem, it's just that no-one yet thought this was pressing enough to fix. I consider it very pressing and am pushing for fixing this in Flink 1.6 but that won't help us here. Btw, the code that prevents checkpointing if any tasks/operators are down is here: https://github.com/apache/flink/blob/f9dd19b584fca5932594392b0afc9f1d0eec7f1a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L453 This is quite simple to fix. The complicated part is how we deal with tasks that were already shut down when we are recovering from a failure. It's also not rocket science, though.

None of those problems we can fix in this PR, I'm afraid. But we could merge it to at least enable test coverage for SDF on the Flink Runner.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think I agree that this is certainly much better than not having SDF support at all :) But probably can't be used for long-running production jobs yet, due to fault tolerance issues.

What's the shortest path to making this usable in production in some way, so that Flink runner users can use Watch and derived transforms, and, more importantly, unblock streaming Fn API jobs?

Another option I can see: at pipeline construction time, fuse Create.of() + a chain of ParDo's + a splittable ParDo into a single Flink operator that can control its own shutdown rather than being forced by the shutdown of Create.of(). Not sure if this is easier than fixing Flink itself :)

// This is our last change to block shutdown of this operator while
// there are still remaining processing-time timers. Flink will ignore pending
// processing-time timers when upstream operators have shut down and will also
// shut down this operator with pending processing-time timers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here describing the limitations of this approach (basically, typical cases of an unbounded SDF applied to a small bounded PCollection will not be fault-tolerant) and maybe link to the underlying Flink JIRA issue if there is one filed? If there isn't, would be good to file one for Flink; and either way, file one for Beam explaining that the support is limited.

Also would be good to log something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Flink issue about checkpointing is https://issues.apache.org/jira/browse/FLINK-2491. This is not the reason for the special timer-related code here, though, so it's not the correct place to mention this here. The reason is that processing-time timers in Flink are not treated as data that should not be dropped and this is currently not seen as an issue by most people. (I'm working on getting that changed, but it's more of an ideological than an implementation issue.)

@aljoscha
Copy link
Contributor Author

@aljoscha aljoscha force-pushed the fix-flink-splittable-dofn-take-2 branch from 22ad573 to c69db46 Compare February 22, 2018 13:27
@aljoscha
Copy link
Contributor Author

Run Flink ValidatesRunner

@aljoscha
Copy link
Contributor Author

@jkff I added a commit for https://issues.apache.org/jira/browse/BEAM-3727 on top but I'm very happy to remove this and open another PR for it since this is big enough as it is. Just thought it would be good for this to go in together.

@aljoscha
Copy link
Contributor Author

Flink ValidatesRunner tests currently failing: https://issues.apache.org/jira/browse/BEAM-3728

@jkff
Copy link
Contributor

jkff commented Feb 22, 2018

Thanks! LGTM - the comments and the filed JIRAs make sense. Next step would be to merge this, and try running a pipeline that e.g. keeps writing files to a directory using GenerateSequence + ParDo(create a file) and keeps FileIO.match().continuously()-ing them and deleting each file after say 15 minutes (can be done using a GBK on the filename as a key + a trigger), and see that the pipeline can comfortably run for a while. If it's fine, then declare victory and mark Flink as supporting SDF in the compatibility matrix :)

SplittableDoFnOperator is only used for executing ProcessFn, which does
not use event-time timers. However, StatefulDoFnRunner does use
event-time timers for state cleanup so this change makes sure that they
don't end up being forwarded to the ProcessFn.
It can happen that the input operation finishes while we still have
pending processing-time timers, for example from processing a Splittable
DoFn. This change makes sure that we block as long as we have pending
timers.

This change also makes sure that we forward a +Inf watermark in close().
We have to do this because it can happen that we get a +Inf watermark on
input while we still have active watermark holds (which will get
resolved when all pending timers are gone). With this change we make
sure to send a +Inf watermark downstream once everything is resolved.
We need this to be able to instantiate with the constructor in the Flink
Runner.
For this to work, we need to also change how we wrap values in
KeyedWorkItems, because ProcessFnRunner expects them to be in the
GlobalWindow.
This adds an option that controls whether to shutdown sources or not in
case of reaching the +Inf watermark.

The reason for this is https://issues.apache.org/jira/browse/FLINK-2491,
which causes checkpointing to stop once some source is shut down.
@aljoscha aljoscha force-pushed the fix-flink-splittable-dofn-take-2 branch from c69db46 to 8fa9367 Compare February 23, 2018 07:17
@aljoscha
Copy link
Contributor Author

Run Flink ValidatesRunner

@aljoscha
Copy link
Contributor Author

@jkff You mean create the files and delete them inside the same pipeline? I'll do that.

asfgit pushed a commit that referenced this pull request Feb 23, 2018
@aljoscha
Copy link
Contributor Author

Merged

@aljoscha aljoscha closed this Feb 23, 2018
@aljoscha aljoscha deleted the fix-flink-splittable-dofn-take-2 branch February 23, 2018 11:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants