New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1674] Fix Flink State GC #2217
Conversation
We now set the GC timer for window.maxTimestamp() + 1 to ensure that a user timer set for window.maxTimestamp() still has all state. This also adds tests for late data dropping and state GC specifically for the Flink DoFnOperator.
Refer to this link for build results (access rights to CI server needed): |
Run Flink RunnableOnService |
Refer to this link for build results (access rights to CI server needed): |
Sorry, a few days ago I have some private matters. Now see it. I had a comment here. |
doFnRunner.onTimer(timerId, window, timestamp, timeDomain); | ||
} | ||
// a timer can never be late because we don't allow setting timers after GC time | ||
doFnRunner.onTimer(timerId, window, timestamp, timeDomain); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A event timer can never be late, but a process timer may be late. We need drop the late processTimer here. What do you think? @aljoscha
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, you mean a processing-time timer that fires for a window that was already garbage collected?
I'll add this again. Thanks for spotting this!
@JingsongLi Don't worry. 😃 |
@JingsongLi I pushed another commit for the processing-time timer thing. |
This is good~ |
Run Flink RunnableOnService |
} | ||
|
||
/** | ||
* A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc still thinks it is in the other file
} | ||
|
||
/** | ||
* A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it in two places now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the original one to the Flink Runner because I'm changing the GC time to window.max + 1
, which seemed like a Flink-specific thing. Do you think I should simply leave it as is and change to the +1
behaviour for the default implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think you should just change the common implementation to do a +1. I bet most runners leveraging it might have similar troubles, and likely the same solution will work a lot of the time. It should be clearly documented that it can only be used if timers are delivered in order.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Since it is release blocking, and the weekend already for many timezones, let's merge and tidy up later. |
Properly deal with late processing-time timers Introduce Flink-specific state GC implementations Move GC timer checking to StatefulDoFnRunner.CleanupTimer
Filed BEAM-1689 to do the cleanup. |
Thanks! 😃 |
stateCleaner.clearForWindow(window); | ||
// There should invoke the onWindowExpiration of DoFn | ||
} else { | ||
if (isEventTimer || !dropLateData(window)) { | ||
// An event-time timer can never be late because we don't allow setting timers after GC time. | ||
// Ot can happen that a processing-time time fires for a late window, we need to ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time fires -> timer fires
This is a proper solution, as discussed in the Jira issue. If we merge this we can drop #2215. (Thanks for quickly providing that PR, though!)
R: @kennknowles