New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13785: [7/N][Emit final] emit final for sliding window #12135
Conversation
enum StrategyType { | ||
ON_WINDOW_CLOSE, | ||
ON_WINDOW_UPDATE | ||
ON_WINDOW_UPDATE(0, new WindowUpdateStrategy()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I augmented the enum with a code so that users can translate between the type to the actual strategy easily --- previously in code one has to use a switch to map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not go over the testing code yet.
return this.strategy; | ||
} | ||
|
||
StrategyType(final int code, final EmitStrategy strategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you using int
here? Seems cleaner to use short
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because Java's default type is int -- i.e. line 36/37 above would take the value 0/1 as int. So we basically need to either do the conversion in each line of 36/37 above, or just do the conversion once here.
I followed our other enums like Errors
to do the conversion here.
static { | ||
for (final StrategyType type : StrategyType.values()) { | ||
if (TYPE_TO_STRATEGY.put(type.code(), type.strategy()) != null) | ||
throw new IllegalStateException("Code " + type.code() + " for type " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never seen anything like this before -- is it best practice to have a guard like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following the enum Errors
as well to add this guard.
); | ||
timeTracker.setEmitInterval(emitInterval); | ||
} else { | ||
tupleForwarder = new TimestampedTupleForwarder<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we init the tupleForwarder
only for "emit on change" -- don't we also need the forwarder for "emit final" (and just use it differently)?
In the original code, we setup TimestampedTupleForwarder
w/ or w/o the TimestampedCacheFlushListener
for both cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the emit on final
case, the tuple forwarder would not be used (you can see that in the old code, we have a separate constructor for it in emit on final
which does not pass in the cache at all) since we always rely on the logic to scan/emit to downstream that does not call the tuple forwarder. I think it's cleaner to not construct the object at all for this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: actually you're right! :) The maybeForward should still be called in emit final
case.
while (windowToEmit.hasNext()) { | ||
emittedCount++; | ||
final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv = windowToEmit.next(); | ||
tupleForwarder.maybeForward( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we use tupleForwarded
for the "emit final" case here, but it seems it was not initialized (cf my commend above on init()
method)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack!
.withHeaders(record.headers())); | ||
} | ||
emittedRecordsSensor.record(emittedCount); | ||
emitFinalLatencySensor.record(time.milliseconds() - startMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the definition for this sensor? Wondering about the semantics given the current implemenation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It measures each time when emit final is triggered, how long it took to scan the store and emit the record for downstream processing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we do depth-first processing, it includes the time of downstream processing of the emitted record, right? Is this intentional and easy to understand for users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I was concerned about this too.. but in another thought I think in most cases this processor should be the last of the sub-topologies, plus the main goal is to see if a single record's processing, that including the emitting procedure, could be taking too long to become a problem, hence just measuring the store scan time is not sufficient anyways.
internalProcessorContext.addProcessorMetadataKeyValue(storeName, closeTime); | ||
} | ||
|
||
abstract protected void maybeForwardFinalResult(final Record<KIn, VIn> record, final long windowCloseTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I though fetchAndEmit
would take care of "emit final" -- what is this method about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The instantiated maybeForwardFinalResult
for time / sliding windows has a slight different validation check before calling fetchAndEmit
.
.withTimestamp(newTimestamp)); | ||
} | ||
|
||
protected boolean shouldEmitFinal(final long closeTime) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this method is called as a first step inside the implementation of maybeForwardFinalResult
-- thus, I am wondering if we should not make it private and call inside maybeMeasureEmitFinalLatency
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yes I think that's a good point --- we are double measuring the emitFinalLatencySensor
both here and inside the fetchAndEmit
, and I think we should only keep the latter. Will update.
|
||
abstract protected void maybeForwardFinalResult(final Record<KIn, VIn> record, final long windowCloseTime); | ||
|
||
protected void maybeMeasureEmitFinalLatency(final Record<KIn, VIn> record, final long windowCloseTime) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Should we make this method final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this function since we already measure emitFinalLatencySensor
inside the fetchAndEmit
function.
} | ||
|
||
final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ? | ||
0L : lastEmitWindowCloseTime - windows.timeDifferenceMs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be:
final long emitRangeLowerBoundInclusive = Math.max(0L, lastEmitWindowCloseTime - windows.timeDifferenceMs());
Otherwise, if lastEmitWindowCloseTime < windows.timeDifferenceMs()
the result could still be negative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far it seems lastEmitWindowCloseTime
should always be no smaller than window size in either sliding or time windows, but when there's bugs it's possible that the read value from the processor metadata is small. I will update it accordingly.
// Because we only get here when emitRangeUpperBoundInclusive > 0 which means closeTime > windows.size() | ||
// Since we set lastEmitCloseTime to closeTime before storing to processor metadata | ||
// lastEmitCloseTime - windows.size() is always > 0 | ||
// Set emitRangeLowerBoundInclusive to -1L if not set so that when we fetchAll, we fetch from 0L | ||
final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ? | ||
-1L : lastEmitWindowCloseTime - windows.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code up to here is basically identical, except the use of window.size()
vs window. timeDifferenceMs()
-- would it be worth to unify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made some refactoring, LMK what do you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not fully review test code -- it's very time consuming...
Overall LGTM. Few more minor comments. Feel free to merge.
if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { | ||
return; | ||
} else if (tupleForwarder == null) { | ||
throw new IllegalStateException("Emit strategy type is " + emitStrategy.type() + " but flush listener is not initialized."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tupleForwarded
should never be null
?
.withHeaders(record.headers())); | ||
} | ||
emittedRecordsSensor.record(emittedCount); | ||
emitFinalLatencySensor.record(time.milliseconds() - startMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we do depth-first processing, it includes the time of downstream processing of the emitted record, right? Is this intentional and easy to understand for users?
|
||
private boolean sendOldValues = false; | ||
|
||
public KStreamSlidingWindowAggregate(final SlidingWindows windows, | ||
final String storeName, | ||
final Initializer<VAgg> initializer, | ||
final Aggregator<? super KIn, ? super VIn, VAgg> aggregator) { | ||
this(windows, storeName, EmitStrategy.onWindowUpdate(), initializer, aggregator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: would it be better to not have this constructor with a default emit strategy, but force the caller to pick on explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replied in the other comment that this is now only used in cogroup
which do not have emit-on-final yet, but I guess we can always just call it explicitly.
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), | ||
recordMetadata.topic(), | ||
recordMetadata.partition(), | ||
recordMetadata.offset(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing the formatting!
protected long emitRangeUpperBound(final long windowCloseTime) { | ||
// Sliding window's start and end timestamps are inclusive, so | ||
// we should minus 1 for the inclusive closed window-end upper bound | ||
return windowCloseTime - windows.timeDifferenceMs() - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe add a comment and explain why we don't need a guard for a negative result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can actually be negative, and we would skip the range fetching in that case.
29)), | ||
actual | ||
); | ||
} | ||
} | ||
|
||
@Test | ||
public void testJoin() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as above: why do we have join()-test in the class?
import static org.junit.Assert.assertTrue; | ||
|
||
@RunWith(Parameterized.class) | ||
public class TimeOrderedCachingPersistentWindowStoreTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side comment: Might have been better to add this test in a separate PR?
|
||
final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class); | ||
// Nothing happens | ||
new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems redundant to test the happy path?
|
||
@Test | ||
public void shouldNotReturnDuplicatesInRanges() { | ||
final StreamsBuilder builder = new StreamsBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to test this using StreamsBuilder
? Can't we call the store methods directly?
} | ||
|
||
@Test | ||
public void shouldForwardOldValuesWhenDisabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...WhenEnabled
-- what does it refer to? (seems same applies to shouldForwardOldValuesWhenEnabled
?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be shouldNot..
will update.
Merged to trunk. |
This is a copy PR of #12037.
Committer Checklist (excluded from commit message)