Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: KIP-328: suppression operator #5337

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b786a6e
sketch
vvcephei Jun 26, 2018
1cdac1d
sketching
vvcephei Jul 2, 2018
a57b8a0
sketching
vvcephei Jul 2, 2018
c09a2da
sketching
vvcephei Jul 3, 2018
1bab0e1
sketching
vvcephei Jul 3, 2018
a98a11b
naive time-based suppression
vvcephei Jul 5, 2018
404b493
wip
vvcephei Jul 5, 2018
36beb81
interediate suppression seems to be in order
vvcephei Jul 5, 2018
0856b88
lateness suppression
vvcephei Jul 5, 2018
a1ac5c4
WIP: still working on integration test
vvcephei Jul 5, 2018
68089a4
finish integration tests
vvcephei Jul 6, 2018
526f04d
adding final results feature. Realized there's a problem with the pro…
vvcephei Jul 6, 2018
3ca5e16
fixed boundary condition so final results works
vvcephei Jul 9, 2018
d616356
integration test for final results
vvcephei Jul 9, 2018
4c7c3f4
refactor to bufferConfig
vvcephei Jul 9, 2018
0fc090a
cleanup
vvcephei Jul 9, 2018
c075d2d
move suppression allowed lateness to window definition
vvcephei Jul 9, 2018
f02b0b5
super messy, but sorts keys by time in the buffer to correct the algo…
vvcephei Jul 10, 2018
81e961d
fix test
vvcephei Jul 10, 2018
4504917
simplify test
vvcephei Jul 10, 2018
9f0698b
use window close to ... close the window
vvcephei Jul 10, 2018
a3d6df3
lateness is handled by windowed computation now
vvcephei Jul 10, 2018
4e1a0df
get tests running
vvcephei Jul 10, 2018
1b5708c
make sessions respect close time
vvcephei Jul 10, 2018
6ffa42e
ong
vvcephei Jul 10, 2018
d036075
rename close -> grace
vvcephei Jul 11, 2018
9aa6fb8
add retention to joined
vvcephei Jul 11, 2018
8b26035
deprecate until
vvcephei Jul 11, 2018
fd93d3f
add type parameters back to Suppress
vvcephei Jul 11, 2018
318f917
tweaks
vvcephei Jul 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.kafka.streams.kstream;

import java.time.Duration;

public final class ApiUtils {
private ApiUtils() {}

public static Duration validateMillisecondDuration(final Duration duration, final String message) {
try {
//noinspection ResultOfMethodCallIgnored
duration.toMillis();
return duration;
} catch (final ArithmeticException e) {
throw new IllegalArgumentException(message, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

import java.util.Map;

Expand Down Expand Up @@ -137,8 +139,10 @@ public long size() {
* @param durationMs the window retention time in milliseconds
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
* @deprecated since 2.1. Use {@link Joined#retention()}.
*/
@Override
@Deprecated
public JoinWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < size()) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
Expand Down
22 changes: 22 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import org.apache.kafka.common.serialization.Serde;

import java.time.Duration;
import java.util.Objects;

/**
* The {@code Joined} class represents optional params that can be passed to
* {@link KStream#join}, {@link KStream#leftJoin}, and {@link KStream#outerJoin} operations.
Expand All @@ -27,6 +30,7 @@ public class Joined<K, V, VO> {
private Serde<K> keySerde;
private Serde<V> valueSerde;
private Serde<VO> otherValueSerde;
private Duration retention;

private Joined(final Serde<K> keySerde,
final Serde<V> valueSerde,
Expand Down Expand Up @@ -132,6 +136,20 @@ public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) {
return this;
}

/**
* Configure the retention time for events in the join.
* Events are only considered for joining while they are retained.
*
* @param retention
* @return
*/
public Joined<K, V, VO> withRetention(final Duration retention) {
Objects.requireNonNull( retention, "Retention must not be null");
ApiUtils.validateMillisecondDuration(retention, "Retention must be expressible in milliseconds");
this.retention = retention;
return this;
}

public Serde<K> keySerde() {
return keySerde;
}
Expand All @@ -143,4 +161,8 @@ public Serde<V> valueSerde() {
public Serde<VO> otherValueSerde() {
return otherValueSerde;
}

public Duration retention() {
return retention;
}
}
10 changes: 10 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,16 @@ <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? ex
*/
<KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);

/**
* Suppress some updates from this changelog stream, determined by the supplied {@link Suppress}.
*
* This controls what updates downstream table and stream operations will receive.
*
* @param suppress Configuration object determining what, if any, updates to suppress.
* @return A new KTable with the desired suppress characteristics.
*/
KTable<K, V> suppress(final Suppress<K, V> suppress);

/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type), with default serializers, deserializers, and state store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class Materialized<K, V, S extends StateStore> {
protected boolean loggingEnabled = true;
protected boolean cachingEnabled = true;
protected Map<String, String> topicConfig = new HashMap<>();
protected Duration retention;

private Materialized(final StoreSupplier<S> storeSupplier) {
this.storeSupplier = storeSupplier;
Expand Down Expand Up @@ -222,4 +224,18 @@ public Materialized<K, V, S> withCachingDisabled() {
return this;
}

/**
* Configure retention period for window and session stores. Ignored for key/value stores.
*
* Overridden by pre-configured store suppliers
* ({@link Materialized#as(SessionBytesStoreSupplier)} or {@link Materialized#as(WindowBytesStoreSupplier)}).
*
* @return itself
*/
public Materialized<K, V, S> withRetention(final Duration retention) {
Objects.requireNonNull(retention, "Retention must not be null");
ApiUtils.validateMillisecondDuration(retention, "Retention must be expressible in milliseconds");
this.retention = retention;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.SessionStore;

import java.time.Duration;

/**
* {@code SessionWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
* It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
Expand All @@ -36,7 +38,7 @@
* They have no fixed time boundaries, rather the size of the window is determined by the records.
* Please see {@link SessionWindows} for more details.
* <p>
* {@link SessionWindows} are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
* New events are added to {@link SessionWindows} until their grace period ends (see {@link SessionWindows#grace(Duration)}).
*
* Furthermore, updates are sent downstream into a windowed {@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;

import java.time.Duration;
import java.util.Objects;

import static org.apache.kafka.streams.kstream.ApiUtils.validateMillisecondDuration;

/**
* A session based window specification used for aggregating events into sessions.
* <p>
Expand Down Expand Up @@ -67,10 +71,13 @@ public final class SessionWindows {

private final long gapMs;
private final long maintainDurationMs;
private final Duration grace;


private SessionWindows(final long gapMs, final long maintainDurationMs) {
private SessionWindows(final long gapMs, final long maintainDurationMs, final Duration grace) {
this.gapMs = gapMs;
this.maintainDurationMs = maintainDurationMs;
this.grace = grace;
}

/**
Expand All @@ -86,7 +93,7 @@ public static SessionWindows with(final long inactivityGapMs) {
throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
}
final long oneDayMs = 24 * 60 * 60_000L;
return new SessionWindows(inactivityGapMs, oneDayMs);
return new SessionWindows(inactivityGapMs, oneDayMs, null);
}

/**
Expand All @@ -95,13 +102,45 @@ public static SessionWindows with(final long inactivityGapMs) {
*
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is smaller than window gap
*
* @deprecated since 2.1. Use {@link Materialized#retention}
* or directly configure the retention in a store supplier and use
* {@link Materialized#as(SessionBytesStoreSupplier)}.
*/
@Deprecated
public SessionWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < gapMs) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
}

return new SessionWindows(gapMs, durationMs);
return new SessionWindows(gapMs, durationMs, null);
}

/**
* Reject late events that arrive more than {@code afterWindowEnd}
* after the end of its window.
*
* Note that new events may change the boundaries of session windows, so aggressive
* close times can lead to surprising results in which a too-late event is rejected and then
* a subsequent event moves the window boundary forward.
*
* @param afterWindowEnd The grace period to admit late-arriving events to a window.
* @return this updated builder
*/
public SessionWindows grace(final Duration afterWindowEnd) {
if (afterWindowEnd.isNegative()) {
throw new IllegalArgumentException("Grace period must not be negative.");
}

return new SessionWindows(
gapMs,
maintainDurationMs,
validateMillisecondDuration(afterWindowEnd, "Grace period must be expressible in milliseconds")
);
}

public Duration grace() {
return grace != null ? grace : Duration.ofMillis(maintainMs());
}

/**
Expand Down Expand Up @@ -130,11 +169,13 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) return false;
final SessionWindows that = (SessionWindows) o;
return gapMs == that.gapMs &&
maintainDurationMs == that.maintainDurationMs;
maintainDurationMs == that.maintainDurationMs &&
Objects.equals(grace, that.grace);
}

@Override
public int hashCode() {
return Objects.hash(gapMs, maintainDurationMs);

return Objects.hash(gapMs, maintainDurationMs, grace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.SuppressImpl.BufferConfigImpl;
import org.apache.kafka.streams.kstream.SuppressImpl.IntermediateSuppressionImpl;
import org.apache.kafka.streams.processor.ProcessorContext;

import java.time.Duration;

@SuppressWarnings("unused") // complaint about the top-level generic parameters, which actually are necessary
public interface Suppress<K, V> {

interface TimeDefinition<K, V> {
long time(ProcessorContext context, K k, V v);
}

enum BufferFullStrategy {
EMIT,
SPILL_TO_DISK,
SHUT_DOWN
}

interface BufferConfig<K, V> {
static <K, V> BufferConfig<K, V> withBufferKeys(final long numberOfKeysToRemember) {
final BufferConfig<K, V> bufferConfig = new BufferConfigImpl<>();
return bufferConfig.bufferKeys(numberOfKeysToRemember);
}

BufferConfig<K, V> bufferKeys(final long numberOfKeysToRemember);

static <K, V> BufferConfig<K, V> withBufferBytes(final long bytesToUseForSuppressionStorage, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
final BufferConfig<K, V> bufferConfig = new BufferConfigImpl<>();
return bufferConfig.bufferBytes(bytesToUseForSuppressionStorage, keySerializer, valueSerializer);
}

BufferConfig<K, V> bufferBytes(final long bytesToUseForSuppressionStorage, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);

static <K, V> BufferConfig<K, V> withBufferFullStrategy(final BufferFullStrategy bufferFullStrategy) {
return new BufferConfigImpl<K, V>().bufferFullStrategy(bufferFullStrategy);
}

BufferConfig<K, V> bufferFullStrategy(final BufferFullStrategy bufferFullStrategy);
}

interface IntermediateSuppression<K, V> {
static <K, V> IntermediateSuppression<K, V> withEmitAfter(final Duration timeToWaitForMoreEvents) {
final IntermediateSuppression<K, V> intermediateSuppression = new IntermediateSuppressionImpl<>();
return intermediateSuppression.emitAfter(timeToWaitForMoreEvents);
}

IntermediateSuppression<K, V> emitAfter(final Duration timeToWaitForMoreEvents);

static <K, V> IntermediateSuppression<K, V> withBufferConfig(final BufferConfig<K, V> bufferConfig) {
final IntermediateSuppression<K, V> intermediateSuppression = new IntermediateSuppressionImpl<>();
return intermediateSuppression.bufferConfig(bufferConfig);
}

IntermediateSuppression<K, V> bufferConfig(final BufferConfig<K, V> bufferConfig);
}

static <K extends Windowed, V> Suppress<K, V> emitFinalResultsOnly(final BufferConfig<K, V> bufferConfig) {
if (((BufferConfigImpl<K, V>) bufferConfig).getBufferFullStrategy() == BufferFullStrategy.EMIT) {
throw new IllegalArgumentException(
"The EMIT strategy may produce intermediate results. " +
"Select either SHUT_DOWN or SPILL_TO_DISK"
);
}

return new SuppressImpl<>(bufferConfig);
}

static <K, V> Suppress<K, V> intermediateEvents(final IntermediateSuppression<K, V> intermediateSuppression) {
return new SuppressImpl<>(intermediateSuppression);
}
}