[BEAM-2022] fix triggering for processing time timers#2782
[BEAM-2022] fix triggering for processing time timers#2782asfgit merged 1 commit intoapache:masterfrom
Conversation
kennknowles
left a comment
There was a problem hiding this comment.
LGTM. Added a couple questions but feel free to merge, or I will when I have another minute.
| } | ||
|
|
||
| @VisibleForTesting | ||
| protected TimerSet getTimerSet(TimeDomain domain) { |
There was a problem hiding this comment.
In a quick pass, I don't see this protected being used.
There was a problem hiding this comment.
It is required for the unit test. It could be package scope, but it cannot be private.
| ComparisonChain chain = | ||
| ComparisonChain.start().compare(timerData.getTimerId(), timerId); | ||
| if (chain.result() == 0 && !timerData.getNamespace().equals(namespace)) { | ||
| // Obtaining the stringKey may be expensive; only do so if required |
There was a problem hiding this comment.
The StateNamespace is supposed to do equals and hashCode right so that you only need to obtain the stringKey() for serialization purposes. So here I think you should just compare them directly.
There was a problem hiding this comment.
There was a problem hiding this comment.
Well, hmm. I guess keep it that way since there is probably some context for it. Perhaps it was a workaround for BEAM-1022 that was recently fixed.
|
@kennknowles once this is merged and we have the internal timers in shape, any thoughts on what it takes to address BEAM-1114 (perhaps as comment on that JIRA) ? |
|
I commented on the JIRA. |
| processElement(t.getValue()); | ||
| } catch (Exception e) { | ||
| Throwables.propagateIfPossible(e); | ||
| Throwables.throwIfUnchecked(e); |
There was a problem hiding this comment.
Can 'throw new RuntimeException' be folded into throwIfUnchecked(e) ?
That way, code is simplified on the caller side.
| private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) { | ||
| this.inputWatermark = new Instant(mark.getTimestamp()); | ||
| timerInternals.fireReadyTimers(this.inputWatermark.getMillis(), | ||
| this, TimeDomain.EVENT_TIME); |
There was a problem hiding this comment.
Shouldn't this be TimeDomain.PROCESSING_TIME ?
| if (timersForKey != null) { | ||
| try { | ||
| Slice timerBytes = new Slice(CoderUtils.encodeToByteArray(timerDataCoder, timerKey)); | ||
| timersForKey.add(timerBytes); |
There was a problem hiding this comment.
Why call remove() after add() ?
Looks like add() call can be omitted.
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.
R: @kennknowles