Skip to content

Commit

Permalink
KAFKA-6521: Use timestamped stores for KTables (#6667)
Browse files Browse the repository at this point in the history
Reviewers: John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
  • Loading branch information
mjsax committed May 12, 2019
1 parent 5236a3e commit 8649717
Show file tree
Hide file tree
Showing 44 changed files with 1,055 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, K, V,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
return aggregateBuilder.build(
functionName,
new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
new TimestampedKeyValueStoreMaterializer<>(materializedInternal).materialize(),
aggregateSupplier,
materializedInternal.queryableStoreName(),
materializedInternal.keySerde(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggre
final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
funcName,
new ProcessorParameters<>(aggregateSupplier, funcName),
new KeyValueStoreMaterializer<>(materialized).materialize()
new TimestampedKeyValueStoreMaterializer<>(materialized).materialize()
);

// now the repartition node must be the parent of the StateProcessorNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class);
private final String storeName;
private final Initializer<T> initializer;
private final Aggregator<? super K, ? super V, T> aggregator;


private boolean sendOldValues = false;

KStreamAggregate(final String storeName, final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator) {
Expand All @@ -51,22 +53,25 @@ public void enableSendingOldValues() {
sendOldValues = true;
}

private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {

private KeyValueStore<K, T> store;
private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
private TimestampedKeyValueStore<K, T> store;
private StreamsMetricsImpl metrics;
private TupleForwarder<K, T> tupleForwarder;
private TimestampedTupleForwarder<K, T> tupleForwarder;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
store,
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
}


@Override
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
Expand All @@ -79,7 +84,8 @@ key, value, context().topic(), context().partition(), context().offset()
return;
}

T oldAgg = store.get(key);
final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
T oldAgg = getValueOrNull(oldAggAndTimestamp);

if (oldAgg == null) {
oldAgg = initializer.apply();
Expand All @@ -91,14 +97,13 @@ key, value, context().topic(), context().partition(), context().offset()
newAgg = aggregator.apply(key, value, newAgg);

// update the store with the new value
store.put(key, newAgg);
store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
}
}

@Override
public KTableValueGetterSupplier<K, T> view() {

return new KTableValueGetterSupplier<K, T>() {

public KTableValueGetter<K, T> get() {
Expand All @@ -112,23 +117,22 @@ public String[] storeNames() {
};
}

private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> {

private KeyValueStore<K, T> store;
private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> {
private TimestampedKeyValueStore<K, T> store;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
}

@Override
public T get(final K key) {
return store.get(key);
return getValueOrNull(store.get(key));
}

@Override
public void close() {
}
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamReduce.class);

Expand All @@ -48,22 +51,25 @@ public void enableSendingOldValues() {
sendOldValues = true;
}

private class KStreamReduceProcessor extends AbstractProcessor<K, V> {

private KeyValueStore<K, V> store;
private TupleForwarder<K, V> tupleForwarder;
private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
private TimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
private StreamsMetricsImpl metrics;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
store,
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
}


@Override
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
Expand All @@ -76,7 +82,8 @@ key, value, context().topic(), context().partition(), context().offset()
return;
}

final V oldAgg = store.get(key);
final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
final V oldAgg = getValueOrNull(oldAggAndTimestamp);
V newAgg = oldAgg;

// try to add the new value
Expand All @@ -87,14 +94,13 @@ key, value, context().topic(), context().partition(), context().offset()
}

// update the store with the new value
store.put(key, newAgg);
store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
}
}

@Override
public KTableValueGetterSupplier<K, V> view() {

return new KTableValueGetterSupplier<K, V>() {

public KTableValueGetter<K, V> get() {
Expand All @@ -108,24 +114,23 @@ public String[] storeNames() {
};
}

private class KStreamReduceValueGetter implements KTableValueGetter<K, V> {

private KeyValueStore<K, V> store;
private class KStreamReduceValueGetter implements KTableValueGetter<K, V> {
private TimestampedKeyValueStore<K, V> store;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
store = (KeyValueStore<K, V>) context.getStateStore(storeName);
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(storeName);
}

@Override
public V get(final K key) {
return store.get(key);
return getValueOrNull(store.get(key));
}

@Override
public void close() {
}
public void close() {}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
private final Logger log = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -69,10 +72,10 @@ public void enableSendingOldValues() {
sendOldValues = true;
}

private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {

private WindowStore<K, Agg> windowStore;
private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
private TimestampedWindowStore<K, Agg> windowStore;
private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
private StreamsMetricsImpl metrics;
private InternalProcessorContext internalProcessorContext;
private Sensor lateRecordDropSensor;
Expand All @@ -83,13 +86,14 @@ private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
public void init(final ProcessorContext context) {
super.init(context);
internalProcessorContext = (InternalProcessorContext) context;

metrics = (StreamsMetricsImpl) context.metrics();

lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);

windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
windowStore,
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
}

@Override
Expand All @@ -115,7 +119,8 @@ value, context().topic(), context().partition(), context().offset()
final Long windowStart = entry.getKey();
final long windowEnd = entry.getValue().end();
if (windowEnd > closeTime) {
Agg oldAgg = windowStore.fetch(key, windowStart);
final ValueAndTimestamp<Agg> oldAggAndTimestamp = windowStore.fetch(key, windowStart);
Agg oldAgg = getValueOrNull(oldAggAndTimestamp);

if (oldAgg == null) {
oldAgg = initializer.apply();
Expand All @@ -124,7 +129,7 @@ value, context().topic(), context().partition(), context().offset()
final Agg newAgg = aggregator.apply(key, value, oldAgg);

// update the store with the new value
windowStore.put(key, newAgg, windowStart);
windowStore.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()), windowStart);
tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, sendOldValues ? oldAgg : null);
} else {
log.debug(
Expand Down Expand Up @@ -154,7 +159,6 @@ value, context().topic(), context().partition(), context().offset()

@Override
public KTableValueGetterSupplier<Windowed<K>, Agg> view() {

return new KTableValueGetterSupplier<Windowed<K>, Agg>() {

public KTableValueGetter<Windowed<K>, Agg> get() {
Expand All @@ -168,27 +172,25 @@ public String[] storeNames() {
};
}

private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> {

private WindowStore<K, Agg> windowStore;
private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Windowed<K>, Agg> {
private TimestampedWindowStore<K, Agg> windowStore;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName);
}

@SuppressWarnings("unchecked")
@Override
public Agg get(final Windowed<K> windowedKey) {
final K key = windowedKey.key();
final W window = (W) windowedKey.window();

return windowStore.fetch(key, window.start());
return getValueOrNull(windowStore.fetch(key, window.start()));
}

@Override
public void close() {
}
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T> {

Expand Down Expand Up @@ -54,15 +57,19 @@ public Processor<K, Change<V>> get() {
}

private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
private KeyValueStore<K, T> store;
private TupleForwarder<K, T> tupleForwarder;
private TimestampedKeyValueStore<K, T> store;
private TimestampedTupleForwarder<K, T> tupleForwarder;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
store = (KeyValueStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues);
store = (TimestampedKeyValueStore<K, T>) context.getStateStore(storeName);
tupleForwarder = new TimestampedTupleForwarder<>(
store,
context,
new TimestampedCacheFlushListener<>(context),
sendOldValues);
}

/**
Expand All @@ -75,7 +82,8 @@ public void process(final K key, final Change<V> value) {
throw new StreamsException("Record key for KTable aggregate operator with state " + storeName + " should not be null.");
}

final T oldAgg = store.get(key);
final ValueAndTimestamp<T> oldAggAndTimestamp = store.get(key);
final T oldAgg = getValueOrNull(oldAggAndTimestamp);
final T intermediateAgg;

// first try to remove the old value
Expand All @@ -101,7 +109,7 @@ public void process(final K key, final Change<V> value) {
}

// update the store with the new value
store.put(key, newAgg);
store.put(key, ValueAndTimestamp.make(newAgg, context().timestamp()));
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
}

Expand Down
Loading

0 comments on commit 8649717

Please sign in to comment.