-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-3257] Add Exactly-Once Processing Guarantees for Iterative DataStream Jobs #1668
Conversation
Thanks Paris, I like the idea, it's a correct modification of the original algorithm to make it much easier to implement on Flink for the price of buffering more records. I have some comments to the implementation:
I agree that we definitely need to spill the buffered records to disk or use managed memory for this. This can be similar to the BarrierBuffer logic. We can combine this with checkpointing to an output stream to never have to materialize the full state. |
Thanks for going through it Gyula! I agree, the About the operators, the I agree with the spill buffer logic. I am only confused a bit with the output stream thing (the other part of the problem), is there something already I can use? I haven't check recent changes. How does this work if we use in-memory backend for example? The blob with all the messages will be anyway packaged and sent within the stateHandle to the job manager in a single message (potentially being over the limits), even if we use a stream API, or? |
Great idea @senorcarbone. I also really like it :-) I agree with @gyfora to include the logic of the For the spillable state we can only use the |
af8c095
to
d3aa206
Compare
Thanks @tillrohrmann for the feedback! One more think I missed pointing out is that when the iteration timeout occurs the Regarding the state snapshotting, should I switch to using the We anyway have to revisit the iteration implementation due to other pending problems such as backpressure deadlocks (see FLINK-2470). I guess this PR is just a first step towards improving them further. |
I can't really add anything to the timeout question :D As for the snapshotting. I would go with the ListState as that potentially provides very efficient implementation for this logging phase. (It is actually designed to handle this) Then we don't have to worry about spilling there either. |
You can find an alternative version using I am (ab)using the PartitionedState to store the ListState in the same key, as @gyfora suggested since it is the only way to obtain the nice representations at the moment. It would be nice to have them available for operator state snapshots as well - @aljoscha have you thought about it? When there is free time (after the release) it would be nice to see what @aljoscha and @StephanEwen think of the two takes as well. No hurries, just take a look when you have time! The two annoying issues I noticed during testing and we need to check soon, probably after this PR, are the following:
|
* Checkpoints all operator states of the current StreamTask. | ||
* Thread-safety must be handled outside the scope of this function | ||
*/ | ||
protected boolean checkpointStatesInternal(final long checkpointId, long timestamp) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about naming this as in the comments drawStateSnapshot
? That it is internal is more or less communicated by the fact that it is a protected
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the JavaDocs:
- The idomiatic style is to have a short description and then a blank line for more details (the first line will be displayed as a summary in the IDE etc.) and info about the returned boolean and params is missing
- The
of the current StreamTask
is clear from context - The Thread-safety part should be more explicit, for instance
The caller has to make sure to call this method in scope of the task's checkpoint lock
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When rebasing we have to double check that nothing changed in this method when calling from triggerCheckpoint
etc.
The core idea of this is very good, also the illustration is very nice. After an offline chat with @senorcarbone, we concluded that a remaining problem in this is currently the way it integrates with the timeout-based termination detection. Which brings us to the point that we should (in my opinion) change the way that loops terminate. It should probably be based on end-of-stream events, to make it deterministic and not susceptible to delays. Question is now, does it make sense to do the termination change first, and base this on top of it, or to merge this irrespective of that... |
Thanks @StephanEwen and @uce for looking into it! I really appreciate it. How about the following:
|
Sounds like a reasonable approach Paris! Just ping here after you addressed 1. and 2. |
ok good to know @uce! Let me get back to it in a couple of weeks and make it right, now it is a bit impossible to find time. |
Hey! Good to be back :) . Let's fix this properly, as @StephanEwen recommended, now that there is some time. The FT update for loops will be rebased on top of the loop termination fix. |
0509478
to
d00d15d
Compare
Ok, so I am progressing this a bit independently from the termination stuff and then we rebase to the first PR that is merged. I just changed everything and rebased to the current master. Some notable changes:
Whenever you find time go ahead and check it out. It passes my super-strict test which is a good thing. :) |
d00d15d
to
05157f8
Compare
Hey again, @StephanEwen @uce . The changes are just a few and straightforward so I really encourage you to skim them at your earliest convenience. Thanks! |
I think the PR looks pretty good, and it sounds fair to to address termination in a later PR as this will still greatly improve the current guarantees without making the backpressure/termination problems any worse. +1 from me |
Exactly, these two issues do not depend on each other. No doubt loop FT is the first thing that can enable iterations in a production deployment so I would merge that first. Thank you again @gyfora for looking into it :) |
Thanks for the reminder, I went over the code today. The code looks mostly good, but here are some thoughts:
Since loops did not support any fault tolerance guarantees, I guess this does improve recovery behavior. But as long as the loops can either deadlock or drop data, the hard guarantees are in the end still a bit weak. So that leaves me a bit ambivalent what to do with this pull request. |
To suggest some way to fix the guarantees: To my mind, the crux lies in the way that the feedback channel is implemented - a simple blocking queue just does not cut it for that case. To make this proper, I think we need to do the following:
|
These are some good points @StephanEwen, thanks for checking it.
What do you think? |
@senorcarbone I agree, let's fix the multiple checkpoints issue and do the rest in FLIP-15 The other operators have a pretty simply way of doing this:
Dealing with in-flight data probably means that you need to open a ListState for each checkpoint that arrives and add the feed back values to each state, until that particular checkpoints barrier comes back through the feedback channel. I think that should be sufficient. |
agreed @StephanEwen! I will do that. EDIT 1:
@StephanEwen Please correct me if I am wrong, regarding the second point. I am just not very familiar with async snapshotting for EDIT 2: @StephanEwen could you check my question above when you can? |
126b526
to
43ba4f6
Compare
The last update implements a variant of what @StephanEwen proposes to support concurrency under asynchronous snapshots (i.e. in rocksdb). We have put some more thought on this offline too thanks to @gyfora ! The idea is that instead of putting records to each More specifically, the
Before I polish this we need to close a memory leak. The |
Very interested in this work. It sounds like there are few loose ends and then some cleanup before it might be ready for merge, @senorcarbone or @StephanEwen anything that can be supported by someone else? Would love to help wherever possible |
Hey @addisonj. |
I have just rebased to the current master and tests pass locally. Unregistering state in the OperatorStateStore is a very tiny fix. |
Hey. Any update/opinion/something anyone? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Paris, I added some minor comments inline.
I like the current approach with the slices for the different checkpoints, too bad we can't completely delete states registered in the backends.
The only problem I see with the current implementation is that it will hold the elements in the list states in-memory before the checkpoint but this should not be a serious limitation.
|
||
@Override | ||
public void open() throws Exception { | ||
super.open(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why override if nothing changes?
|
||
@Override | ||
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<IN>> output) { | ||
super.setup(containingTask, config, output); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
} | ||
|
||
private void progressLog() { | ||
while (!currentIterator.hasNext() && ++indx < wrappedIterators.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do a lot of checking against !currentIterator.hasNext() you could probably have it only in progressLog and call that without the if in the other places (maybe rename to progressIfNecessary)
} | ||
|
||
public void clearLog() throws Exception { | ||
for (String outputLogs : getOperatorStateBackend().getRegisteredStateNames()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of bad that we can't remove the state completely and keep iterating over them when replaying the log...
this.lock = getCheckpointLock(); | ||
getConfiguration().setStreamOperator(new UpstreamLogger(getConfiguration())); | ||
operatorChain = new OperatorChain<>(this); | ||
this.upstreamLogger = (UpstreamLogger<IN>) operatorChain.getHeadOperator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is the same UpstreamLogger instance that you pass 2 lines above then why not use that? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's another instance that is why I was fetching it back there.
The OperatorChain basically deserialises and sets up another instance through the configuration.
output.emitWatermark(new Watermark(Long.MAX_VALUE)); | ||
} | ||
} | ||
|
||
synchronized (lock) { | ||
//emit in-flight events in the upstream log upon recovery | ||
for (StreamRecord<IN> rec : upstreamLogger.getReplayLog()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would make sense to put this in a while loop and check "running" as well, to cancel early if the job is cancelled during replay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the algorithm is good. For the implementation, I have some suggestions:
List State Checkpoints
Storing the feedback channel data in a ListState
means that each element in the list is tracked by metadata in the checkpoint (the list elements are the granularity of redistribution).
Since we probably do not need fine-grained redistribution, each operator can simply create one entry in the list (which has all the feedback elements). That would make checkpoint metadata much smaller and more robust.
Ideally, we actually do not checkpoint at all to an actual list, but simply write to raw operator state directly (then we are also not memory/rpc-payload-size constrained). The operator state should have the "list redistribution pattern".
@StefanRRichter Can you share a few pointers how to write to raw operator state from a task / operator?
Upstream Logger
Can we solve the access to the ListState
or the OperatorState
in general slightly different than creating a mock operator (UpStreamLogger). It works, but I feel it is a workaround due to the fact that an operator is missing.
We could simply give the IterationHead
always an operator (that forwards input and feedback elements). That operator then checkpoints the feedback data.
As a nice extra, this would eliminate the case that a task can have null as the head operator (a case that always is a bit nasty to work around).
synchronized (getCheckpointLock()) { | ||
if (isRunning()) { | ||
dataChannel.put(new Either.Right(new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions))); | ||
getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), checkpointMetrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the IterationTailTask
contain operators as well, or is it always a task without operators? If it has operators, we cannot immediately acknowledge here, but need to delegate to superclass checkpoint method instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, not really. I cannot think of a possible usage of loaded operators in IterationSink tasks. Perhaps structured window iterations will involve these special operators in the future (if we decide to keep them anyway) but I can only see the loop tracking logic to be implemented in the Heads only.
Thanks for the review @gyfora and @StephanEwen , these are very good points. @StephanEwen makes sense to not really index/keep metadata of individual records in log slices, it is extra overhead. Writing raw operator state also makes sense to balance the serialisation overhead, so I will do that once @StefanRRichter gives me some pointers, that would be great. Any redistribution of the checkpoint slices would violate causality so I hope the "list redistribution pattern" actually keeps the set of registered operator states per instance intact. The garbage collection issue still remains but maybe (if @StefanRRichter approves) I can add an I can also add preconfigured operators (not that they will be reused anywhere). It is more clean but I really need to see how can I get full control of the |
For raw operator state, override For restoring, override |
sweet! thanks @StefanRRichter |
Hi. Do we have any updates on this :) |
well..major runtime changes are coming with FLIP-6, 15 and 16 so I would suggest you watch those. Loop FT will be included in one of these along with other loop redesign features. |
Hi, are there any updates on this feature? |
Hi, are there any updates on this feature please? Loop fault tolerance enables lots of very cool use cases and it would be great to see this incorporated. |
I'm closing this as "Abandoned", since there is no more activity and the code base has moved on quite a bit. I think the feature is still very needed, the Jira issue remains open, and there is also a new FLIP about it: https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance |
Motivation and Algorithm
This is a first version of the adapted snapshot algorithm to support iterations. It is correct and works in practice...well, when memory capacity is enough for its logging requirements but I am working on that, hopefully with a little feedback from you. Before we go into the implementation details let me describe briefly the new algorithm.
Algorithm
Our existing checkpoint algorithm has a very fluid and straightforward protocol. It just makes sure that all checkpoint barriers are aligned in each operator so that all records before barriers (pre-shot) are processed before taking a snapshot. Since waiting indefinitely for all records in-transit within a cycle of an execution graph can violate termination (crucial liveness property) we have to...save any unprocessed records for later and get done with the snapshot. In this take of the algorithm on Flink we assign that role to the
Iteration Head
. The steps this version varies from the vanilla algorithm are simply the following:(1) An
Iteration Head
receives a barrier from the system runtime (as before) and:Iteration Sink
is buffered in its own operator state (log) and not forwarded further until it goes back to normal mode.(2) Eventually, the
Iteration Head
receives a barrier back from itsIteration Sink
. At that point:(3) When the
Iteration Head
starts/restarts it looks at its initial operator state (the log) and flushes any records that are possibly pending.Example
This is just a very simple example topology. We have a single source S and a mapper M within an iteration with a head H and tail T. This is the bare minimum we need for now to check how this works.
In the diagram below you can see the sequence of possible events, containing both barrier and record transmissions. For completeness I included the
Runtime
as a separate entity, this is in our case the Checkpoint Coordinator who periodically initialises checkpointing in all tasks without inputs.The point that this diagram tries to make is the following:
Record R1 (or any record received in normal mode) gets forwarded to M, the only consumer of H before the checkpoint barrier. On the other hand, R2 is not forwarded to M until H has finished snapshotting (or during the same atomic block anyway). In case of a failure R2 is not lost, rather than saved for later.
A very brief description of a consistent/correct snapshot in our context could be summed up in the following sentence:
This algorithm guarantees this property and as explained earlier, it also terminates.
Current Implementation Details
In the current prototype I tried to keep changes to a minimum until we agree on the pending issues (see below) so there is plenty of room for improvement, engineering-wise. The important changes are the following:
StreamGraph
construction.triggerCheckpoint
function ofStreamTask
was split (please do not freak out). That was necessary in order to extract the checkpointing logic from the barrier handling logic (which should be obviously decoupled in the updated algorithm). The actual state snapshotting now happens insidecheckpointStatesInternal
.StreamIterationTail
has to process and forwardEither<StreamRecord<IN>, CheckpointBarrier>
objects toStreamIterationHead
which takes the necessary steps depending on the event according to the algorithm. An internal task abstractionForwardingOneInputStreamTask
was introduced for forwarding barriers to the tail.StreamIterationHead
uses a simpleUpstreamLogger
operator internally for its logging needs, nothing special about it.StreamIterationCheckpointingITCase
was added that checks exactly-once guarantees, loops included. More tests will follow upon demand.I am sure we can improve this implementation but let's focus on the non-trivial problems first.
Open/Pending Issues
During stress-testing I found the following issues that we need to discuss hoping that some of you are maybe eager to propose something.
StreamIterationTail
to theHead
. Apparently the tail offers records to the head with a specified timeout on a blocking queue of size 1(!?). What is going on there? The Head makes sense to timeout on reading, but, why does theStreamIterationTail
throw away events like there is no tomorrow?Since at-most-once processing guarantees are not what we want I just commented out the offer part until I understand why it is there. I think it is safe (deadlock-wise) to simply put records to the queue, right?...I can also fire a new JIRA issue reporting this if needed.
BufferSpiller
?), or/and restrict iterative jobs to out-of-core backend and forbid in-memory. The upstream log size can very easily pass the boundaries of the allowed in-memory state (e.g. when millions records in transit are snapshotted). Any ideas on that are welcome!Thanks a lot for looking into this. Really looking forward to your suggestions.