Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4730: Streams does not have an in-memory windowed store #6239

Merged
merged 11 commits into from Feb 21, 2019

Conversation

ableegoldman
Copy link
Contributor

Implemented an in-memory window store allowing for range queries. A finite retention period defines how long records will be kept, ie the window of time for fetching, and the grace period defines the window within which late-arriving data may still be written to the store.

Unit tests were written to test the functionality of the window store, including its insert/update/delete and fetch operations. Single-record, all records, and range fetch were tested, for both time ranges and key ranges. The logging and metrics for late-arriving (dropped)records were tested as well as the ability to restore from a changelog.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ableegoldman
Copy link
Contributor Author

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ableegoldman ,

Thanks for this PR!

I've made a first pass on API-level concerns. I haven't looked at the implementation in detail, yet.

It looks really good overall; I just had a few concerns pop out.

Thanks,
-John

@@ -166,7 +222,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long windowSize,
final boolean retainDuplicates) {
if (numSegments < 2) {
throw new IllegalArgumentException("numSegments cannot must smaller than 2");
throw new IllegalArgumentException("numSegments cannot be smaller than 2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof. Thanks.

import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKey;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;

public class InMemoryWindowStore<K, V> implements WindowStore<K, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read the code correctly, K is always Bytes and V is always byte[]. IMHO, it would be better to just use these types directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree although the in-memory KV store just uses generic K, V. Is there any particular reason for that or would we want to change that as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InMemoryKeyValue uses generics, because this allows users to use it any type (for the case they don't want to use it with caching/logging wrappers) what seems to be ok. There is no reason to restrict the type IMHO.

For RocksDB it's different, because RocksDB does need to serialize -- thus, removing generics simplifies the code as the type must be plain bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, InMemoryKeyValue also doesn't need the generics. It's constructed in exactly one place, in Stores:

    public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
        Objects.requireNonNull(name, "name cannot be null");
        return new KeyValueBytesStoreSupplier() {
            @Override
            public String name() {
                return name;
            }

            @Override
            public KeyValueStore<Bytes, byte[]> get() {
                return new InMemoryKeyValueStore<>(name, Serdes.Bytes(), Serdes.ByteArray());
            }

            @Override
            public String metricsScope() {
                return "in-memory-state";
            }
        };
    }

Thus, we could (and should) go ahead an inline those types as well.

The generics are a historical artifact, if I understand the history. The lowest-level store used to be generic, but at some point we decided to go ahead and pin it down to just store bytes, but we left the actual generic parameters in place.

@@ -48,7 +48,7 @@
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKey;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;

public class InMemoryWindowStore<K, V> implements WindowStore<K, V> {
public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowStore<K, V> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary fix, will likely make the switch from generics to <Bytes, byte[]> in separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Maybe we should just consider that to be within the scope of https://issues.apache.org/jira/browse/KAFKA-7918 . (If you agree, can you leave a comment on the ticket to that effect?)


@Override
public boolean retainDuplicates() {
return false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation does not support duplicates, will add this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need this for a full solution. The solution should be exactly the same as in the Rocks store (using seqnum for time windows).

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR @ableegoldman, sorry for jumping in a little late.
I've made an initial pass, and overall looks good. I have a couple of minor comments.

Also, I'm wondering if we should have an integration test for this as well.

final List<KeyValue<Windowed<K>, V>> returnSet = new LinkedList<>();

for (final Entry<Long, NavigableMap<K, V>> segmentMapEntry : this.segmentMap.entrySet()) {
for (final Entry<K, V> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor - no test coverage for lines 221 - 225 in the unit test

}

@Override
public Long peekNextKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor - peekNextKey not covered in the unit test

@bbejeck
Copy link
Contributor

bbejeck commented Feb 12, 2019

Failure unrelated - kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
retest this please

private final long retentionPeriod;
private final long windowSize;

private final NavigableMap<Long, NavigableMap<K, V>> segmentMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently a mapping windowStartTime -> key -> value, which is fine, but I wonder if it would simplify the code at all to use the equivalent mapping:
(windowStartTime, key) -> value by making it a NavigableMap<Windowed<K>, V>.

You'd have to provide a Comparator to the TreeMap constructor to determine order based on timestamp first, then on key.

@@ -226,8 +226,9 @@ <h2>
application instance, but must fit into the available local disk
space.</li>
<li>RocksDB settings can be fine-tuned, see
<a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li>
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">store variants</a>:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This link is broken so I updated it to where I think it was intended to point, please check if it makes sense or not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another doc change we need to make is in docs/ops.html, under

 <h5><a id="kafka_streams_store_monitoring" href="#kafka_streams_store_monitoring">State Store Metrics</a></h5>

we need to add the store-scope: <li><code>in-memory-window-state</code></li>.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in docs/streams/upgrade-guide.html we can add a new section

<h3><a id="streams_api_changes_230" href="#streams_api_changes_220">Streams API changes in 2.3.0</a></h3>

and add a one-line paragraph saying that we added a new interface in Stores that provides a built-in in-memory window store, etc etc.

@ableegoldman
Copy link
Contributor Author

With the current implementation, fetch and, in particular, all() are inefficient as they construct new objects for each record. This has the advantage of providing a snapshot, but might not be worth the cost since we make no guarantees about that anyways. We can refactor things to improve efficiency but is probably worth a separate JIRA/PR after this one is merged?

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ableegoldman . Made a pass over the PR.

@@ -226,8 +226,9 @@ <h2>
application instance, but must fit into the available local disk
space.</li>
<li>RocksDB settings can be fine-tuned, see
<a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li>
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">store variants</a>:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

@@ -226,8 +226,9 @@ <h2>
application instance, but must fit into the available local disk
space.</li>
<li>RocksDB settings can be fine-tuned, see
<a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li>
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">store variants</a>:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another doc change we need to make is in docs/ops.html, under

 <h5><a id="kafka_streams_store_monitoring" href="#kafka_streams_store_monitoring">State Store Metrics</a></h5>

we need to add the store-scope: <li><code>in-memory-window-state</code></li>.

@@ -226,8 +226,9 @@ <h2>
application instance, but must fit into the available local disk
space.</li>
<li>RocksDB settings can be fine-tuned, see
<a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li>
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">store variants</a>:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in docs/streams/upgrade-guide.html we can add a new section

<h3><a id="streams_api_changes_230" href="#streams_api_changes_220">Streams API changes in 2.3.0</a></h3>

and add a one-line paragraph saying that we added a new interface in Stores that provides a built-in in-memory window store, etc etc.

@Deprecated
@Override
public int segments() {
return (int) retentionPeriod;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we just throw exception in this function, instead of returning retentionPeriod here. Since this is a deprecated API and should not be called anymore inside Streams library. Returning some garbage values instead of throwing an exception is vulnerable to future bugs that calls this function by mistake.

// In-memory window store is not *really* segmented, so just say size is 1 ms
@Override
public long segmentIntervalMs() {
return 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. if users decides to have a caching layer on top of this in-memory window store (which would be a weird usage, but we cannot forbid it) then the SegmentedCacheFunction would use 1ms as segments, causing millions of segments. Would that be okay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow, SegmentedCacheFunction only seems to use this for comparison, why would that be a problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, it is only logical, not physically impacting the data layout. With a long segmentId we should be fine with the range of segment ids with 1ms interval only.


private void removeExpiredSegments() {
final long minLiveTime = this.context.streamTime() - this.retentionPeriod;
final NavigableMap<Long, NavigableMap<WrappedK<K>, V>> expiredSegments = this.segmentMap.headMap(minLiveTime, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just map.headMap(...).clear(); ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

} else {
if (value != null) {
this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new TreeMap<>());
this.segmentMap.get(windowStartTimestamp).put(new WrappedK<>(key, seqnum), value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If retainDuplicates is false, could we not storing the sequence suffix at all to save 4 bytes per record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will be more straightforward after we remove the generics since we can just tack on the seqnum Bytes or not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

}
}

private KeyValue<Windowed<K>, V> getWindowedKeyValue(final K key, final long startTimestamp, final V value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a comment for this PR: when we remove the generics, this function can be replaced with WindowStoreIteratorWrapper since the inner iterator would always be bytes.

}
}

private static class InMemoryWindowStoreIterator<V> implements WindowStoreIterator<V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can consolidate these two iterators and the InMemoryKeyValueStore#InMemoryKeyValueIterator as a single wrapper iterator (note WindowStoreIterator<V> is just a KeyValueStoreIterator<Long, V>.), that takes an iteratoras inner. Then for supporting "peekNextKey" we do not really need to rely on list iterator'spreviousbut just need to maintain the next element, and update it whenevernext()` is called and this element on-hold is returned.

And when we removed the generics we can even consolidate all of them in WindowStoreIteratorWrapper as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with using the WindowStoreIteratorWrapper here is that expects the key Bytes to actually be a windowed key ie to contain the timestamp, however with the in-memory version we want to avoid having to wrap the timestamp with the key as well so the iterator cant just extract it from the Bytes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we removed the generics and allow have bytes in InMemoryWindowStore, when we return the inner ListIterator aren't we going to have each key be concatenated with key-bytes followed with timestamp-bytes anyways?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the key Bytes are wrapped up with timestamp into a windowed type directly in the fetch call, whereas the WindowStoreIteratorWrapper is given an iterator over Bytes and it does the extraction of key+timestamp -> windowed when accessed. We could rewrite the in-memory store as a single map with key+timestamp bytes as the key, but with the current implementation wrapping the key with the timestamp is a waste of space as we already map from each timestamp to a kv store

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

John actually raised that as a possibility to improve clarity. I wasn't entirely convinced that it would make things easier to read but if you both agree, I can be. Certainly I agree it would be good to share the iterator code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In retrospect, I'm not sure I'm convinced either ;)

One nice aspect of your current design is that when we need to drop a window, we can do it in one operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think I'm convinced. We should just leave it as is in this PR at least, and after it is merged I will think about it twice when reviewing the other PR for inlining the byte stores.

@Override
public V fetch(final K key, final long windowStartTimestamp) {
removeExpiredSegments();
if (windowStartTimestamp <= this.context.streamTime() - this.retentionPeriod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the cleanup, can we just check if there's a bucket in the current map still (if windowStartTimestamp <= this.context.streamTime() - this.retentionPeriod then it should be cleared in removeExpiredSegments right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@guozhangwang
Copy link
Contributor

Jenkins failures seems consistent but should be irrelevant to this PR, maybe you need to rebase on the latest trunk?

Also I'm wondering if making a 1ms bucket is most efficient: for hopping windows that maybe okay since the window-start-timestamp should be sparse in the time-span; but if it is a sliding window then we will end up with tons of buckets and hence headMap etc could be very costly, and in that case maybe having a wider bucket like 100ms is a better trade-off. WDYT @ableegoldman @vvcephei @bbejeck @mjsax ?

@ableegoldman
Copy link
Contributor Author

Also could be refactored as a map: key-> (map: timestamp -> value), maybe that makes more sense?

@guozhangwang
Copy link
Contributor

Also could be refactored as a map: key-> (map: timestamp -> value), maybe that makes more sense?

Thinking about it a bit more, I think the sliding window is relatively rare compare with the hopping windows, plus we may want to have a totally different implementation for stream-stream join moving forward anyways. So shredding on time space may still make more sense than shredding on key space.

@ableegoldman
Copy link
Contributor Author

+1 makes sense to have a different implementation for stream-stream joins in the future, we can split the duplicates support out as well then

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I had a pending comment I never submitted.

To answer your question about potentially optimizing performance, I agree we can consider that a separate scope. Correctness (and ergonomics) generally trumps performance, but if it's shockingly bad, we should consider refactoring it.

Of course, we won't know if it's poor performance or not without benchmarking it.

So I think we're justified in writing it so it behaves well (which I think you've done), and filing a Jira to do performance testing and consider optimizations.

@vvcephei
Copy link
Contributor

To weigh in on the latter question, it's not immediately obvious to me if it's better to use bigger buckets, even for large numbers of windows. It could actually be worse, as it adds overhead to other operations.

Even if we have a million windows in the map, it's still only log_2(1M)~=20 hops to the head, or any other window. Intuitively, it seems like this would be fast.

Conversely, if we were to "segment" this store, we still need to uniquely identify windows, so we'd wind up with a map: segmentId -> windowId -> key -> value or maybe segmentId -> (key, windowId) -> value . There are still the same number of elements in the whole data structure, though, so while finding the right segment would require fewer hops, finding the right key within a window is still the same number (eg, instead of log_2(1M), it's log_2(10,000) + log_2(100) = log_2(1M)). (I think)

So, it doesn't optimize seeks to specific keys, only seeks to whole segments/windows, which I think is only for expiration (headMap, as you mentioned). This should be comparatively rare.

Plus, it's also a performance optimization, which IMHO, we should back up with a benchmark or some other performance evaluation before adding complexity to the code.

@ableegoldman
Copy link
Contributor Author

Both checks failed (although one indicates 0 failed), results not available.

Retest this, please

@ableegoldman
Copy link
Contributor Author

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes LGTM. I think there are a few hanging conversations, though.

"expired-window-record-drop",
Sensor.RecordingLevel.INFO
);
addInvocationRateAndCount(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I agree. We've seen some profiling results that suggest the widespread string literals from these metrics actually do cause memory pressure.

@guozhangwang
Copy link
Contributor

LGTM modulo the one comment about filing a follow-up PR inlining the metrics name constants.

We still need to wait for the KIP to be officially accepted before merging the PR.

@ableegoldman
Copy link
Contributor Author

JDK 8 test failed this time but JDK 11 passed, so both checks have passed at some point. KIP has also been accepted so we are ready to merge (I filed a JIRA Re: string constants hard-coded in metrics)

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a minor typo in the docs, I've pushed a commit to your branch to fix it.

@@ -68,6 +68,11 @@ <h1>Upgrade Guide and API Changes</h1>
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>

<h3><a id="streams_api_changes_230" href="#streams_api_changes_220">Streams API changes in 2.3.0</a></h3>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this should be 230 not 220 (my previous comment had a typo here)

@guozhangwang guozhangwang merged commit ff603c6 into apache:trunk Feb 21, 2019
Pengxiaolong pushed a commit to Pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…#6239)

Implemented an in-memory window store allowing for range queries. A finite retention period defines how long records will be kept, ie the window of time for fetching, and the grace period defines the window within which late-arriving data may still be written to the store.

Unit tests were written to test the functionality of the window store, including its insert/update/delete and fetch operations. Single-record, all records, and range fetch were tested, for both time ranges and key ranges. The logging and metrics for late-arriving (dropped)records were tested as well as the ability to restore from a changelog.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants