New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10940] Add sdf initiated checkpoint support to portable Flink. #13105
Conversation
Run Java Flink PortableValidatesRunner Streaming |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Streaming |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Streaming |
Run Python_PVR_Flink PreCommit |
SDF + side input tests fail on validateParDo: Lines 231 to 237 in ddcb600
|
Run Java Flink PortableValidatesRunner Batch |
1 similar comment
Run Java Flink PortableValidatesRunner Batch |
It seems like we only have main input for SplittableParDo transform:
@lukecwik Have we added the logic to populate the side input into transform proto? |
Run Java Flink PortableValidatesRunner Batch |
3 similar comments
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Batch |
I can verify that the pipeline proto is correct before fusion. The validation error happens when fusion returns the new pipeline proto. It seems like the something is wrong in pipeline fuser. |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Streaming |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Streaming |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Streaming |
The |
Run Portable_Python PreCommit |
Run Java Flink PortableValidatesRunner Streaming |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Streaming |
Run Java PreCommit |
Run Python_PVR_Flink PreCommit |
Run Java Flink PortableValidatesRunner Streaming |
} | ||
}, | ||
testFilter: { | ||
// TODO(BEAM-10016) | ||
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2' | ||
excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testPairWithIndexWindowedTimestampedBounded' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment why this one is excluded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't explore the failure yet but the test fails on the same reason(wrong results) for batch and streaming. I filed a jira for tracking this: https://issues.apache.org/jira/browse/BEAM-11141
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured out the test failure and enabled the test in the latest commit.
@@ -1315,7 +1316,7 @@ void processPendingProcessingTimeTimers() { | |||
keyedStateBackend.setCurrentKey(internalTimer.getKey()); | |||
TimerData timer = internalTimer.getNamespace(); | |||
checkInvokeStartBundle(); | |||
fireTimer(timer); | |||
fireTimerInternal((ByteBuffer) internalTimer.getKey(), timer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming this was a bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. The only difference is that fireTimerInternal
will grab the lock and set the key for state backend, which is helpful when the state is needed. Another potential bug around here is that Flink operator will clean up global state first then fire the timer, which may lead to data loss if firing timer is for retrieving the state.
/** Holds the watermark when there is an sdf timer. */ | ||
private void onNewSdfTimer(TimerData newTimer) { | ||
Preconditions.checkState( | ||
StateAndTimerBundleCheckpointHandler.isSdfTimer(newTimer.getTimerId())); | ||
Preconditions.checkState(timerUsesOutputTimestamp(newTimer)); | ||
keyedStateInternals.addWatermarkHoldUsage(newTimer.getOutputTimestamp()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it make sense to integrate this check with the timerUsesOutputTimestamp
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could rename the mentioned method to onFiredTimer
and include the checks for output watermark holds in there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I want to revisit the idea of having onFiredTimer
here. I think it's a good idea to have onFiredTimer
for firing timers. But the function onNewSdfTimer
and onNewEventTimer
is about to set watermark hold when registering timers. Different from event timer, an SDF timer must have the output timestamp for controlling watermark hold. It's important for SDF execution. That's why we have a check instead of an if block here.
private void onRemovedSdfTimer(TimerData removedTimer) { | ||
Preconditions.checkState( | ||
StateAndTimerBundleCheckpointHandler.isSdfTimer(removedTimer.getTimerId())); | ||
Preconditions.checkState(timerUsesOutputTimestamp(removedTimer)); | ||
// Remove the watermark hold which is set for this sdf timer. | ||
keyedStateInternals.removeWatermarkHoldUsage(removedTimer.getOutputTimestamp()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it make sense to integrate this check with the timerUsesOutputTimestamp
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could rename the mentioned method to onFiredTimer and include the checks for output watermark holds in there.
if (StateAndTimerBundleCheckpointHandler.isSdfTimer(timer.getTimerId())) { | ||
onNewSdfTimer(timer); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be simplified by having a generic call here to onFiredTimer
. See above.
if (timer.getDomain() == TimeDomain.EVENT_TIME) { | ||
onRemovedEventTimer(timer); | ||
} else if (StateAndTimerBundleCheckpointHandler.isSdfTimer(timer.getTimerId())) { | ||
onRemovedSdfTimer(timer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be simplified by having a generic call here to onFiredTimer
. See above.
} | ||
|
||
/** A {@link StateInternals} for keeping {@link DelayedBundleApplication}s as states. */ | ||
class SdfFlinkStateInternals implements StateInternals { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should move these out of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to keep SdfFlinkInternals
and SdfTimerInternals
as inner class so that they can access to getKeyedStateBackend()
, timerInternals
and stateInternals
from outer class.
* StateAndTimerBundleCheckpointHandler} to handle {@link | ||
* org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication}. | ||
*/ | ||
class SdfFlinkTimerInternalsFactory implements TimerInternalsFactory<InputT> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should move these out of this class.
// For an SDF, we know that the input element should be | ||
// KV<KV<element, KV<restriction, watermarkState>>, size>. We are going to use the element | ||
// as the key. | ||
if (!(((KvCoder) valueCoder).getKeyCoder() instanceof KvCoder)) { | ||
throw new IllegalStateException( | ||
String.format( | ||
Locale.ENGLISH, | ||
"The element coder for splittable DoFn '%s' must be KVCoder(KvCoder, DoubleCoder) but is: %s", | ||
inputPCollectionId, | ||
valueCoder.getClass().getSimpleName())); | ||
} | ||
keyCoder = ((KvCoder) ((KvCoder) valueCoder).getKeyCoder()).getKeyCoder(); | ||
keySelector = new SdfByteBufferKeySelector(keyCoder); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will SDFs ever support stateful operations? If so, this wouldn't work anymore because keys are not guaranteed to be processed on the same operator instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least for now we don't support SDF using user states and timers. cc: @robertwb
Would you like to explain more about "because keys are not guaranteed to be processed on the same operator instance." ? Is it because we check stateful DoFn first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because we partition based on the element instead of on the key. This wouldn't work with stateful operations where we expect all keys to land on the same partition.
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Streaming |
1 similar comment
Run Java Flink PortableValidatesRunner Streaming |
outputTimestamp = Math.min(outputTimestamp, outputWatermark.getSeconds() * 1000); | ||
} | ||
} else { | ||
outputTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a good idea to rely on TimerInternals.getCurrentInputWatermark()
to not hold back the watermark so much?
@mxm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use keyedStateInternals.minWatermarkHoldMs()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Max! I just double checked the contract we have:
beam/model/fn-execution/src/main/proto/beam_fn_api.proto
Lines 236 to 243 in 7eb0e95
// The map is keyed by the local output name of the PTransform. Each | |
// value represents a lower bound on the timestamps of elements that | |
// are produced by this PTransform into each of its output PCollections | |
// when invoked with this application. | |
// | |
// If there is no watermark reported from RestrictionTracker, the runner will | |
// use MIN_TIMESTAMP by default. | |
map<string, google.protobuf.Timestamp> output_watermarks = 4; |
Let's stick with MIN_TIMESTAMP for now.
@mxm Would you like to take another pass on this PR? |
@@ -502,6 +690,8 @@ public void close() throws Exception { | |||
processWatermark1(Watermark.MAX_WATERMARK); | |||
while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) { | |||
invokeFinishBundle(); | |||
// Sleep for 5s to wait for any timer to be fired. | |||
Thread.sleep(5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be necessary. All timers will be drained on close(), at least in super.close().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my testing, I found that if we don't free the occupation here, the DoFnOperator.onProcessingTime
will never get a chance to be executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kindly pinged : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug then. All processing timers should be fired on close()
. It would be great to get rid of the sleep which can have unexpected side effects, e.g. when operators are chained in Flink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was not a bug before we support sdf initiate checkpoint because normal processing time timer doesn't hold watermark. Now we are setting processing time timer to reschedule sdf residuals which holds the watermark back. When entering ExecutableStageDoFnOperator.close()
, we will always in the while loop until all sdf processing timers get fired and watermark holds are removed. So the comment // Sleep for 5s to wait for any timer to be fired.
is not precise. The comment should be // Sleep for 5s to wait for any SDF timer to be fired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that Thread.sleep(5s)
is not a correct solution. The reason why it worked occasionally is that the bundle is small enough and I got luck on race condition. I think we should figure out a way to be able to fire the SDF processing time timer when a bundle is closed within the life cycle of one ExecutableStageDoFnOperator
.
Please correct me if I'm understanding it wrong:
- Flink starts all operators at the same time and closes the operators when the input watermark reaches MAX_TIMESTAMP, or it closes operators in a reverse topological order and
close()
is a blocking call? - The processing time timers will not be fired anymore by the system once the
operator.close()
is invoked. - The assumption around
ExecutableStageDoFnOperator
is that there is only one bundle executing inside one operator. When the output watermark advances to MAX_TIMESTAMP, we consider this bundle completed.
With supporting SDF initiated checkpoint, we do need to have several bundles invoked inside one ExecutableStageDoFnOperator
life cycle, which means we either:
- Enable Flink to fire processing time timers after
Operator.close()
is invoked -- this may not be preferrable. - Or we try to close the bundle before we reach to the
Operator.close()
. - Or we manually drain SDF timers with scarifying the ability of
resumeDelay()
. For example, the user may want to reschedule the SDF residuals in 5 mins but we have to fire it now.
Do you have any ideas/suggestions? Thanks for your help!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double check the implementation of DoFnOperator
and ExecutableStageDoFnOperator
, we have already invoked finishBundle
when reaching 1000 input elements or 1s processing time by default.
The real problem for SDF is that it's natural for SDF to read from Impluse
and execute as a high fan-out DoFn. Based on current structure, once Impluse
finishes, close()
of SDF operator will be called, but meanwhile no more processing time timer can be registered. Simply draining timers from operator itself is not ideal.
Is it possible for us to change something here? For example, the operator should wait for global watermark advancing to MAX_TIMESTAMP to finish? Or the task should invokes operator.close()
when global watermark advancing to MAX_TIMESTAMP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you describe happens when you enable checkpointing or when you pass in --shutdownSourcesAfterIdleMs
. The operator doesn't shutdown then and continues to process timers. The reason we do not enable this by default is that we want to cleanly shutdown pipelines during tests. Once an operator has been shut down, the entire job cannot be checkpointed anymore.
The new workaround looks good to me. We do the same in the super.close()
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! It seems like this problem has got attention from Flink: https://issues.apache.org/jira/browse/FLINK-18647.
I'm still feeling confused on when the operator.close() will be called. It seems like it happens under several condition:
-
When checkpointing happens
-
When source operator closes successfully, the next downstream operator's close() will be called.
Do I understand it correctly?
@@ -502,6 +692,10 @@ public void close() throws Exception { | |||
processWatermark1(Watermark.MAX_WATERMARK); | |||
while (getCurrentOutputWatermark() < Watermark.MAX_WATERMARK.getTimestamp()) { | |||
invokeFinishBundle(); | |||
if (hasSdfProcessFn) { | |||
// Sleep for 5s to wait for any SDF timer to be fired. | |||
Thread.sleep(5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps add a TODO
here, since we may want to eventually get rid of the sleep?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Filed here: https://issues.apache.org/jira/browse/BEAM-11210
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Streaming |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Streaming |
Run Java Flink PortableValidatesRunner Batch |
Run Java Flink PortableValidatesRunner Streaming |
I'm going to merge this PR. Thanks for your help! |
My pleasure! |
This PR added the sdf self-checkpoint support to portable Flink by using timer and state where
In batch, we use
InMemoryTimerInternals
andInMemoryStateInternals
to set the timer and persist residual elements.In streaming, we use
SdfFlinkTimerInternals
as the wrapper ofFlinkTimerInternals
andSdfFlinkStateInternals
as the wrapper ofFlinkStateInternals
for one specific key.For more design details, please refer to this doc
r: @robertwb @mxm
cc: @ibzib @lukecwik
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.