Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4730: Streams does not have an in-memory windowed store #6239

Merged
merged 11 commits into from Feb 21, 2019
1 change: 1 addition & 0 deletions docs/ops.html
Expand Up @@ -1728,6 +1728,7 @@ <h5><a id="kafka_streams_store_monitoring" href="#kafka_streams_store_monitoring
<ul>
<li><code>in-memory-state</code></li>
<li><code>in-memory-lru-state</code></li>
<li><code>in-memory-window-state</code></li>
<li><code>rocksdb-state</code> (for RocksDB backed key-value store)</li>
<li><code>rocksdb-window-state</code> (for RocksDB backed window store)</li>
<li><code>rocksdb-session-state</code> (for RocksDB backed session store)</li>
Expand Down
4 changes: 3 additions & 1 deletion docs/streams/developer-guide/processor-api.html
Expand Up @@ -227,7 +227,7 @@ <h2>
space.</li>
<li>RocksDB settings can be fine-tuned, see
<a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB configuration</span></a>.</li>
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.PersistentKeyValueFactory.html">store variants</a>:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This link is broken so I updated it to where I think it was intended to point, please check if it makes sense or not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another doc change we need to make is in docs/ops.html, under

 <h5><a id="kafka_streams_store_monitoring" href="#kafka_streams_store_monitoring">State Store Metrics</a></h5>

we need to add the store-scope: <li><code>in-memory-window-state</code></li>.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in docs/streams/upgrade-guide.html we can add a new section

<h3><a id="streams_api_changes_230" href="#streams_api_changes_220">Streams API changes in 2.3.0</a></h3>

and add a one-line paragraph saying that we added a new interface in Stores that provides a built-in in-memory window store, etc etc.

<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore-java.lang.String-">store variants</a>:
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
time window key-value store, session window key-value store.</li>
</ul>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating a persistent key-value store:</span>
Expand Down Expand Up @@ -258,6 +258,8 @@ <h2>
<li>Useful when application instances run in an environment where local
disk space is either not available or local disk space is wiped
in-between app instance restarts.</li>
<li>Available <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-">store variants</a>:
time window key-value store</li>
</ul>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Creating an in-memory key-value store:</span>
<span class="c1">// here, we create a `KeyValueStore&lt;String, Long&gt;` named &quot;inmemory-counts&quot;.</span>
Expand Down
5 changes: 5 additions & 0 deletions docs/streams/upgrade-guide.html
Expand Up @@ -68,6 +68,11 @@ <h1>Upgrade Guide and API Changes</h1>
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>

<h3><a id="streams_api_changes_230" href="#streams_api_changes_230">Streams API changes in 2.3.0</a></h3>
<p>
As of 2.3.0 Streams now offers an in-memory version of the window store, in addition to the persistent one based on RocksDB. The new public interface <code>inMemoryWindowStore()</code> is added to Stores that provides a built-in in-memory window store.
</p>

<h3><a id="streams_api_changes_220" href="#streams_api_changes_220">Streams API changes in 2.2.0</a></h3>
<p>
We've simplified the <code>KafkaStreams#state</code> transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from <code>CREATED</code> to <code>RUNNING</code>, and then to <code>REBALANCING</code> to get the first
Expand Down
40 changes: 39 additions & 1 deletion streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
Expand Down Expand Up @@ -143,6 +144,43 @@ public String metricsScope() {
};
}

/**
* Create an in-memory {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* Note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @param windowSize size of the windows (cannot be negative)
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);

Objects.requireNonNull(name, "name cannot be null");
if (retentionMs < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (windowSizeMs > retentionMs) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size. Got size=["
+ windowSize + "], retention=[" + retentionPeriod + "]");
}

return new InMemoryWindowBytesStoreSupplier(name, retentionMs, windowSizeMs, retainDuplicates);
}

/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
Expand All @@ -166,7 +204,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long windowSize,
final boolean retainDuplicates) {
if (numSegments < 2) {
throw new IllegalArgumentException("numSegments cannot must smaller than 2");
throw new IllegalArgumentException("numSegments cannot be smaller than 2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof. Thanks.

}

final long legacySegmentInterval = Math.max(retentionPeriod / (numSegments - 1), 60_000L);
Expand Down
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;

public class InMemoryWindowBytesStoreSupplier implements WindowBytesStoreSupplier {
private final String name;
private final long retentionPeriod;
private final long windowSize;
private final boolean retainDuplicates;

public InMemoryWindowBytesStoreSupplier(final String name,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates) {
this.name = name;
this.retentionPeriod = retentionPeriod;
this.windowSize = windowSize;
this.retainDuplicates = retainDuplicates;
}

@Override
public String name() {
return name;
}

@Override
public WindowStore<Bytes, byte[]> get() {
return new InMemoryWindowStore<>(name,
Serdes.Bytes(),
Serdes.ByteArray(),
retentionPeriod,
windowSize,
retainDuplicates,
metricsScope());
}

@Override
public String metricsScope() {
return "in-memory-window-state";
}

@Deprecated
@Override
public int segments() {
throw new IllegalStateException("Segments is deprecated and should not be called");
}

@Override
public long retentionPeriod() {
return retentionPeriod;
}


@Override
public long windowSize() {
return windowSize;
}

// In-memory window store is not *really* segmented, so just say size is 1 ms
@Override
public long segmentIntervalMs() {
return 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. if users decides to have a caching layer on top of this in-memory window store (which would be a weird usage, but we cannot forbid it) then the SegmentedCacheFunction would use 1ms as segments, causing millions of segments. Would that be okay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow, SegmentedCacheFunction only seems to use this for comparison, why would that be a problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, it is only logical, not physically impacting the data layout. With a long segmentId we should be fine with the range of segment ids with 1ms interval only.

}

@Override
public boolean retainDuplicates() {
return retainDuplicates;
}
}