-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1981] Fix ApexTimerInternals serialization error. #2734
Conversation
|
* @param <K> | ||
*/ | ||
@DefaultSerializer(JavaSerializer.class) | ||
public class ApexTimerInternals<K> implements TimerInternals, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could keep this package private if you leave it in the same directory, just to keep the API totally hidden from users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the class.
@DefaultSerializer(JavaSerializer.class) | ||
public class ApexTimerInternals<K> implements TimerInternals, Serializable { | ||
|
||
private Map<Slice, Set<Slice>> activeTimers = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want a NavigableSet
in case many windows become active on a key (I think this would require the watermark to stall for a bit).
Given the way you implement this - with the impetus for processing coming separately and just saving the timer data here - could you easily support cancel in the same manner as the direct runner? https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java#L223
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean deleteTimer(..)? Yes, that would work in the same manner as I see it in the direct runner. Will take that up later though.
@Override | ||
public void setTimer(StateNamespace namespace, String timerId, Instant target, | ||
TimeDomain timeDomain) { | ||
TimerData timerData = TimerData.of(timerId, namespace, target, timeDomain); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this is just a move of existing code. But here you enqueue timers for all time domains and below treat them all as event time timers. I think either you should make two sets/queues or else throw unsupported for processing time timers.
Is it correct that something like watermark update tuples will continue to cause the operator to inquire about ready timers? In that case the two queue solution is easy and it is actually OK to use Instant.now()
since there's no distributed clock problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the timers are currently processed when the input watermark is updated. Having separate sets will make sense because the processing timers need to be looked at irrespective of watermark change. I would prefer to address that as part of the other JIRA though. This here is to fix the serialization issue.
public class ApexTimerInternalsTest { | ||
|
||
@Test | ||
public void testSerialization() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest one or two smoke tests here for different time domains, since actually our validation suite doesn't really exercise this code much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test for event time timers.
d85da48
to
f37d954
Compare
@kennknowles addressed comments. Processing time timer changes will be addressed as part of BEAM-2022, after this is merged. Want to get both fixed for the release. |
LGTM. Definitely an improvement and unblocks things. Merging. |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.
R: @kennknowles @dhalperi