Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5932: Avoid call to fetchPrevious in FlushListeners #3978

Conversation

bbejeck
Copy link
Contributor

@bbejeck bbejeck commented Sep 27, 2017

No description provided.

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 27, 2017

ping @dguy @mjsax @guozhangwang

@@ -110,16 +111,20 @@ private void maybeForward(final ThreadCache.DirtyEntry entry,
final RecordContext current = context.recordContext();
context.setRecordContext(entry.recordContext());
try {
V previous = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: previous -> oldValue

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One nit.

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 27, 2017

Updated for comments.

@mjsax
Copy link
Member

mjsax commented Sep 27, 2017

PR seems reasonable to me. But don't know this part of the code very well.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 minor and 1 not so minor but easy to fix comment. Would also be good to verify that for time window aggregations this does indeed provide some performance boost. I suspect it should as we avoid the rockdb seek

@@ -110,16 +111,20 @@ private void maybeForward(final ThreadCache.DirtyEntry entry,
final RecordContext current = context.recordContext();
context.setRecordContext(entry.recordContext());
try {
V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final

@@ -170,7 +171,7 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final Intern
final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
if (flushListener != null) {
final AGG newValue = serdes.valueFrom(entry.newValue());
final AGG oldValue = fetchPrevious(rawKey, key.window());
final AGG oldValue = sendOldValues ? fetchPrevious(rawKey, key.window()) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we need to do:
final Agg oldValue == newValue == null || sendOldValues ? fetchPervious(..) : null;
This is because SessionWindows have a dynamic time range, the the start is always fixed. So we need to send deletes for the previous smaller window when a window is merged, i.e, a simple count:
a@0 -> SessionKey(key=a start=0, end=0), 1
a@5 -> SessionKey(key=a start=0, end=0), null (delete this as it is merged)
SessionKey(key=a start=0, end=5), 2 (this is the new merged session)

@@ -170,7 +171,7 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final Intern
final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
if (flushListener != null) {
final AGG newValue = serdes.valueFrom(entry.newValue());
final AGG oldValue = fetchPrevious(rawKey, key.window());
final AGG oldValue = sendOldValues ? fetchPrevious(rawKey, key.window()) : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why fetchPrevious has to rely on iterating over the store instead of just calling get() for the right key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no get() for a SessionStore The key in the session store is a combination of the record key, start and end time. We only know the start time for the previous key so we need to find the previous session with the correct start time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about for window stores, couldn't we use get() there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's a similar situation, there isn't a get() for a WindowStore. The key is a combination of the record key and a timestamp when the record is placed in the store and records with the same key could be stored across multiple segments. @dguy correct me if I'm mistaken here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the timestamp would uniquely define the segment in which that key is stored.

Copy link
Contributor

@dguy dguy Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl there is no get on WindowStore. We could add one and it would work in scenarios where we don't have duplicates, i.e., the key for a WindowStore is (recordkey, timestamp, sequenceNumber) - if the store doesn't have duplicates the sequence number is always 0. If the store does have duplicates then we don't know what the sequence number is.
Without a KIP to add a get() to WindowStore, the only thing we could do is add a bit of a hack to see if the inner most store is a RocksDBSegmentedBytesStore and then we could call get(..) on that. If it isn't, then we'd still need to call fetch.
For the DSL this would work as the only time we have duplicates in the WindowStore is for joins and we disable caching for those so it skips this code path. However, for the PAPI, we would need to always disable caching if duplicates are set. Which we probably should do anyway as it won't work as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation @dguy, very helpful to understand where caching and sequence numbers come into play. It might be worthwhile to put this in a JIRA somewhere. I do think it would be a useful optimization to have eventually, as fetches have some setup / teardown overhead.

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 28, 2017

updated this

@xvrl
Copy link
Member

xvrl commented Sep 28, 2017

looks like one of my comments got swallowed in the update #3978 (comment)

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 28, 2017

@xvrl responded above

@dguy
Copy link
Contributor

dguy commented Sep 29, 2017

retest this please

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bbejeck, LGTM. We can consider doing further WindowStore optimizations in another PR

@asfgit asfgit closed this in 36556b8 Sep 29, 2017
@dguy
Copy link
Contributor

dguy commented Sep 29, 2017

merged to trunk

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 29, 2017

To benchmark the changes I modified the count test in SimpleBenchmark to this

input.groupByKey() .windowedBy(TimeWindows.of(500))
            .count(Materialized.<Integer, Long, WindowStore<Bytes, byte[]>>as("tmpStoreName")
            .withValueSerde(Serdes.Long()).withCachingEnabled())
            .toStream().foreach(new CountDownAction(latch));

Bottom line, skipping fetchPrevious helps performance by roughly 30%.
Here is the results spreadsheet

@xvrl
Copy link
Member

xvrl commented Sep 29, 2017

@bbejeck thanks for the benchmark numbers. I'm assuming you ran those with default caching settings? If so that's a pretty big improvement given that we probably aren't flushing the cache very often, but it's hard to interpret without knowing how often we flush. I'd be curious to see what the "raw" improvements would be if we set the cache size to zero.

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 29, 2017

@xvrl no problem.

Yes, I ran those benchmarks using default cache settings. I can re-run the benchmarks and set the cache size to zero and update the results.

@bbejeck
Copy link
Contributor Author

bbejeck commented Oct 3, 2017

@xvrl here's an updated benchmark using the same code, but with caching set to zero

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants