Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9/N][Emit final] Emit final for session window aggregations #12204

Merged

Conversation

guozhangwang
Copy link
Contributor

  1. Add a new API for session windows to range query session window by end time (KIP related).
  2. Augment session window aggregator with emit strategy.
  3. Minor: consolidated some dup classes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang guozhangwang changed the title [9/N][Emit final] Emit final for session window aggregations [9/N WIP][Emit final] Emit final for session window aggregations May 24, 2022
Copy link
Contributor Author

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lihaosky @mjsax This PR is not mergable yet since I've not committed any new tests. But I'd like to solicit your feedbacks earlier, especially on two open question I made above (one about the public API, one about the inefficiency due to conservative upper/lowerRange with null key).

@@ -39,6 +39,13 @@
*/
public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> {

// TODO: javadoc; both ends are inclusive
default KeyValueIterator<Windowed<K>, AGG> findSessions(final Instant earliestSessionEndTime,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to 1) in the description, and the first open question: is this public API worth to add? Note I added it into SessionStore not ReadOnlySessionStore, to not expose via IQv1, also I've only added this function for Instant param type as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no way around it? In the end, we allow users to plugin a custom session-store -- thus, if the use the new emit-final, why will need to implement this new method -- existing code with custom session-stores should not break, because existing code does neither implement but also not sure this new method.

If we don't make it public API, we would prevent users to pass in custom session-stores in combination with the new emit-final feature, what seems to be too restrictive?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you pick Instant over long (wondering if long might be better as it's more an internal API)?

@@ -202,25 +205,43 @@ public void remove(final Windowed<Bytes> sessionKey) {

@Override
public byte[] fetchSession(final Bytes key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
final long sessionStartTime,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor fix on the param names: the old ones are simply wrong and misleading.

final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime,
prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime"));

final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(earliestEndTime, latestEndTime);
Copy link
Contributor Author

@guozhangwang guozhangwang May 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the second open question: with the current prefixed (base, i.e. time-first) session key schema, this fetchAll would be effectively searching for [earliestEnd, INF] because of this logic: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java#L46

This is because we translate the range query without key inside AbstractRocksDBTimeOrderedSegmentedBytesStore by using the lower/upperRange instead of lower/upperRangeFixedSize): https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java#L241-L242

I cannot remember why we need to do this. @lihaosky @mjsax do you remember why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure -- I always need to think very hard to understand (not even sure if I succeed) the fetch logic and how we compute the bounds...

But same question as above: why do we need this new method instead of calling findSessions(null, null, A, B) -- I briefly dug into the code and it seems it would do the same thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason is that for emit final I need a range API that based on endTime for both ends. And that's also why within its implementation I'd have to use fetchAll instead of fetch here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read the code correctly, what fetchAll() does is correct: from my understanding, that fetchAll() is implement to find "overlapping sessions" given a lower and upper bound -- the lower bound must be smaller and session end and the upper bound must be smaller than session start to find an overlap. Because the upper bound compares to session start,, and we use the "base" we need to search the full "data/base part" of the store.

I guess the issue is, that you actually cannot use fetchAll() at all for our purpose here? Passing in lastEndTime does not work (does it) as it would be used to compare to session start-times, but we want to do a comparison to session end time. -- Thus, I think the right solution is, to actually also add the new findSessions() to the internal SegmentedStore and implement a proper iterator there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I would just have a special handling on both lower/upper bound as well as the hasNext function for this specific purpose.

@@ -91,8 +91,8 @@ public interface SegmentedBytesStore extends StateStore {
/**
* Gets all the key-value pairs that belong to the windows within in the given time range.
*
* @param from the beginning of the time slot from which to search
* @param to the end of the time slot from which to search
* @param from the beginning of the time slot from which to search (inclusive)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor javadoc improvement to remind developers.

@@ -38,7 +39,7 @@
@SuppressWarnings({"unchecked", "rawtypes"})
TimestampedTupleForwarder(final StateStore store,
final ProcessorContext<K, Change<V>> context,
final TimestampedCacheFlushListener<K, V> flushListener,
final CacheFlushListener<K, ?> flushListener,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is 3) in the description: as we can use the base class, we then would not need the duplicated TupleFowarder any more.

tupleForwarder.maybeForward(new Record<>(windowedkey, new Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp));
}

// TODO: consolidate SessionWindow with TimeWindow to merge common functions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize that our SessionWindow and TimeWindow, and even SlidingWindow caused many code duplications (e.g. here) where we can just consolidate into the same class, with boolean flags indicating if the start/end are inclusive or exclusive, with that we can further reduce code duplication. Will file a JIRA for it.

}
}

agg = aggregator.apply(record.key(), record.value(), agg);
final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow);
store.put(sessionKey, agg);

maybeForwardUpdate(sessionKey, null, agg, record.timestamp());
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove commented out code when removing WIP, ditto elsewhere.

@guozhangwang guozhangwang requested a review from mjsax May 24, 2022 17:42
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext;

private final Time time = Time.SYSTEM;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not better pass in a Time object, so we can mock it using TTD?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I will do this in a follow-up PR after merging this.

}

// Update the sent record timestamp to the window end time if possible
final long newTimestamp = windowedkey.key() != null ? windowedkey.window().end() : oldTimestamp;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what case could windowedkey.key() == null ? Is this even possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior was meant to inherit from the deleted code: https://github.com/apache/kafka/pull/12204/files#diff-85c8c92d464af8eb3a60684bf929725f8fc5263353c38cacc20bee4cefe4fd9eL53, but after checking that logic I now realized it's not necessary anymore (the original PR has to do so since we cannot programmatically guarantee it's always not null but in this change we do not have that concern anymore).

Will remove.

emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(),
internalProcessorContext.currentNode().name(), metrics);
emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), processorName, metrics);
emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), processorName, metrics);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the cleanup -- it's somewhat distracting from the actual changes.

Can we (in the future) extract refactorings/cleanups into individual PRs to simplify reviewing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! It's my bad to mingle them together here.

final long latestEndTime = ApiUtils.validateMillisecondInstant(latestSessionEndTime,
prepareMillisCheckFailMsgPrefix(latestSessionEndTime, "latestSessionEndTime"));

final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(earliestEndTime, latestEndTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure -- I always need to think very hard to understand (not even sure if I succeed) the fetch logic and how we compute the bounds...

But same question as above: why do we need this new method instead of calling findSessions(null, null, A, B) -- I briefly dug into the code and it seems it would do the same thing?

null,
Long.MAX_VALUE,
endTimeMap.subMap(earliestEndTime, latestEndTime + 1).entrySet().iterator(),
true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I fully understand why we add this new method instead of calling findSession(null, null, A, B) ?

The code to create the iterator is different, but I am also not sure why. Is it semantically actually the same? Calling findSession(null, null, A, B) would do:

registerNewIterator(null, // same
                    null, // same
                    latestSessionStartTime, // why does your code pass Long.MAX_VALUE,
// but because we use `tailMap` instead of `subMap` below, it seems to do the same thing overall?
                    endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator(),
                    true); // same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically, the main reason is that for emit-final, we need a range query where the from/to are both endTime, i.e. you can see the parameters are earliestSessionEndTime and latestSessionEndTime.

Whereas for the existing functions, their semantics are based on earliestSessionEndTime but latestSessionStartTime. And that's also the reason for using Long.MAX_VALUE here.

On the physical implementation, the main difference is not in the in-memory session store, but the rocksDB session store. I will reply there separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I read the code of InMemorySessionStore in detail and now understand what's going on. This LGTM.

return registerNewIterator(null,
null,
Long.MAX_VALUE,
endTimeMap.subMap(earliestEndTime, latestEndTime + 1).entrySet().iterator(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we call subMap(earliestEndTime, true, latestEndTime, true) which is the same thing but more "intuitive" as we always search for inclusive bounds throughout the code (otherwise, this is the only place which has an exclusive upper bound).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

// since subMap is exclusive on toKey, we need to plus one
return registerNewIterator(null,
null,
Long.MAX_VALUE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indention

@guozhangwang guozhangwang changed the title [9/N WIP][Emit final] Emit final for session window aggregations [9/N][Emit final] Emit final for session window aggregations Jun 24, 2022
}

private long emitRangeLowerBound() {
return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ? 0L : Math.max(0L, lastEmitWindowCloseTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this be simplified to Math.max(0L, lastEmitWindowCloseTime)? (Can also be address in follow up PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

final long windowCloseTime,
final long emitRangeLowerBound,
final long emitRangeUpperBound) {
final long startMs = time.milliseconds();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use milliseconds or nanoseconds (I am always unsure)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be ms to be consistent with other metrics.

@@ -204,7 +203,8 @@ public static void writeBinary(final ByteBuffer buf,
final long endTime) {
buf.putLong(endTime);
buf.putLong(startTime);
buf.put(key.get());
if (key != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can key ever be null here? (nit: add {} to block)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's possible since it's used to write lower/upper boundaries in which keys could be null.

final Windowed<Bytes> windowedKey = TimeFirstSessionKeySchema.from(bytes);
final long endTime = windowedKey.window().end();

if (endTime <= latestSessionEndTime && endTime >= earliestSessionEndTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add {} to block

@@ -35,7 +35,7 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema {
private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];

public static int keyByteLength(final Bytes key) {
return key.get().length + 2 * TIMESTAMP_SIZE;
return (key == null ? 0 : key.get().length) + 2 * TIMESTAMP_SIZE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can key ever be null here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's possible -- see above comment, for the lower/upper bound cases.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

A few nits. Also, there is some missing JavaDoc and stuff you did just put into comments but not remove yet. Also ok to cleanup in a follow up PR.

@guozhangwang
Copy link
Contributor Author

Thanks @mjsax , I've addressed your comment, and also added the test coverage. While adding tests I noticed a bug in the code and fixed it (see my comment above).

Could you please take another look?

return windowCloseTime - 1;
}

private boolean shouldRangeFetch(final long emitRangeLowerBound, final long emitRangeUpperBound) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one minor bug I detected in the latest commit.

final long latestSessionEndTime) {
final List<KeyValueSegment> searchSpace = segments.segments(earliestSessionEndTime, latestSessionEndTime, true);

// here we want [0, latestSE, FF] as the upper bound to cover any possible keys,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other minor bug I detected in the latest commit.

@@ -52,6 +53,16 @@ private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, b
if (!enableCaching) {
return inner;
}

if (!inner.persistent()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the other change I made in order to work around the current tricky situation, since in-memory stores are always "time ordered" as well, we stripe the caching if the inner store is not persistent. cc @mjsax

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we strip the caching, this applies to eager emitting, right? -- So it would be a behavioral change? Do we want to piggy-back such a change into this KIP? Sounds "risky"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I will move this logic into the earlier stage during the topology building phase for the moment. Also cc @lihaosky who would do similar things for sliding windows.

@@ -253,20 +300,29 @@ public void shouldRemoveMergedSessionsFromStateStore() {

@Test
public void shouldHandleMultipleSessionsAndMerging() {
time.sleep(1001L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might it be easier to change the internal config of the "emit interval" and set it to zero (instead of advancing time)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack!

@mjsax
Copy link
Member

mjsax commented Jun 28, 2022

LGTM (assuming Jenkins passes).

@@ -286,7 +286,8 @@ private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInt
builder.withLoggingDisabled();
}

if (materialized.cachingEnabled()) {
// do not enable cache if the emit final strategy is used
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @lihaosky this is what I did for session store during the topology building phase.

@guozhangwang guozhangwang merged commit ababc42 into apache:trunk Jun 29, 2022
@guozhangwang guozhangwang deleted the K13785-session-aggregation-impl branch June 29, 2022 16:22
mjsax pushed a commit to confluentinc/kafka that referenced this pull request Jun 30, 2022
…12204)

* Add a new API for session windows to range query session window by end time (KIP related).
* Augment session window aggregator with emit strategy.
* Minor: consolidated some dup classes.
* Test: unit test on session window aggregator.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants