Skip to content

Commit

Permalink
KAFKA-8410: Part 1: processor context bounds (#8414)
Browse files Browse the repository at this point in the history
Add type bounds to the ProcessorContext, which bounds the types that can be forwarded to child nodes.

Reviewers: Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
vvcephei committed Apr 7, 2020
1 parent cd1e46c commit 29e08fd
Show file tree
Hide file tree
Showing 70 changed files with 184 additions and 234 deletions.
Expand Up @@ -153,12 +153,12 @@ public Processor<String, Long> get() {
}

public static class CustomMaxAggregator implements Processor<String, Long> {
ProcessorContext context;
ProcessorContext<Object, Object> context;
private KeyValueStore<String, Long> store;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
this.context = context;
context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
Expand Down
Expand Up @@ -64,7 +64,7 @@ public interface Transformer<K, V, R> {
*
* @param context the context
*/
void init(final ProcessorContext context);
void init(final ProcessorContext<Object, Object> context);

/**
* Transform the record with the given key and value.
Expand Down
Expand Up @@ -69,7 +69,7 @@ public interface ValueTransformer<V, VR> {
* @throws IllegalStateException If store gets registered after initialization is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
void init(final ProcessorContext context);
void init(final ProcessorContext<Void, Void> context);

/**
* Transform the given value to a new value.
Expand Down
Expand Up @@ -72,7 +72,7 @@ public interface ValueTransformerWithKey<K, V, VR> {
* @throws IllegalStateException If store gets registered after initialization is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
void init(final ProcessorContext context);
void init(final ProcessorContext<Void, Void> context);

/**
* Transform the given [key and ]value to a new value.
Expand Down
Expand Up @@ -111,7 +111,7 @@ static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWi
final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new ValueTransformerWithKey<K, V, VR>() {
@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
valueTransformer.init(context);
}

Expand Down
Expand Up @@ -46,7 +46,7 @@ public KStreamFlatTransformProcessor(final Transformer<? super KIn, ? super VIn,
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
transformer.init(context);
}
Expand Down
Expand Up @@ -39,14 +39,14 @@ public Processor<KIn, VIn> get() {
public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {

private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;
private ProcessorContext context;
private ProcessorContext<Object, Object> context;

KStreamFlatTransformValuesProcessor(final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer) {
this.valueTransformer = valueTransformer;
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
this.context = context;
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,11 +50,11 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
valueGetter.init(new ForwardingDisabledProcessorContext(context));
}

@Override
Expand Down
Expand Up @@ -39,14 +39,14 @@ public Processor<K, V> get() {
public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {

private final ValueTransformerWithKey<K, V, R> valueTransformer;
private ProcessorContext context;
private ProcessorContext<Object, Object> context;

KStreamTransformValuesProcessor(final ValueTransformerWithKey<K, V, R> valueTransformer) {
this.valueTransformer = valueTransformer;
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
this.context = context;
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
Expand Down Expand Up @@ -75,11 +76,11 @@ private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>>
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
valueGetter.init(new ForwardingDisabledProcessorContext(context));
}

@Override
Expand Down Expand Up @@ -135,7 +136,7 @@ private class KTableKTableInnerJoinValueGetter implements KTableValueGetter<K, R
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
Expand Down Expand Up @@ -74,11 +75,11 @@ private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
valueGetter.init(new ForwardingDisabledProcessorContext(context));
}

@Override
Expand Down Expand Up @@ -141,7 +142,7 @@ private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R>
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
Expand Down Expand Up @@ -73,11 +74,11 @@ private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
valueGetter.init(new ForwardingDisabledProcessorContext(context));
}

@Override
Expand Down Expand Up @@ -136,7 +137,7 @@ private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, R
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
Expand Down Expand Up @@ -72,11 +73,11 @@ private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
valueGetter.init(new ForwardingDisabledProcessorContext(context));
}

@Override
Expand Down Expand Up @@ -132,7 +133,7 @@ private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, R
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
Expand Down
Expand Up @@ -135,7 +135,7 @@ private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> {
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
parentGetter.init(context);
}

Expand Down
Expand Up @@ -104,14 +104,14 @@ public void process(final K key, final Change<V> change) {
private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> {

private final KTableValueGetter<K, V> parentGetter;
private ProcessorContext context;
private ProcessorContext<Void, Void> context;

KTableMapValueGetter(final KTableValueGetter<K, V> parentGetter) {
this.parentGetter = parentGetter;
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
this.context = context;
parentGetter.init(context);
}
Expand Down
Expand Up @@ -134,7 +134,7 @@ private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
parentGetter.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
}
Expand Down
Expand Up @@ -21,7 +21,7 @@

public interface KTableValueGetter<K, V> {

void init(ProcessorContext context);
void init(ProcessorContext<Void, Void> context);

ValueAndTimestamp<V> get(K key);

Expand Down
Expand Up @@ -24,11 +24,11 @@
import org.apache.kafka.streams.state.internals.CacheFlushListener;

class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> {
private final InternalProcessorContext context;
private final ProcessorNode myNode;
private final InternalProcessorContext<Object, Object> context;
private final ProcessorNode<?, ?> myNode;

SessionCacheFlushListener(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
SessionCacheFlushListener(final ProcessorContext<Object, Object> context) {
this.context = (InternalProcessorContext<Object, Object>) context;
myNode = this.context.currentNode();
}

Expand All @@ -37,7 +37,7 @@ public void apply(final Windowed<K> key,
final V newValue,
final V oldValue,
final long timestamp) {
final ProcessorNode prev = context.currentNode();
final ProcessorNode<?, ?> prev = context.currentNode();
context.setCurrentNode(myNode);
try {
context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end()));
Expand Down
Expand Up @@ -32,13 +32,13 @@
* @param <V>
*/
class SessionTupleForwarder<K, V> {
private final ProcessorContext context;
private final ProcessorContext<Object, Object> context;
private final boolean sendOldValues;
private final boolean cachingEnabled;

@SuppressWarnings("unchecked")
SessionTupleForwarder(final StateStore store,
final ProcessorContext context,
final ProcessorContext<Object, Object> context,
final CacheFlushListener<Windowed<K>, V> flushListener,
final boolean sendOldValues) {
this.context = context;
Expand Down
Expand Up @@ -26,11 +26,11 @@
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, ValueAndTimestamp<V>> {
private final InternalProcessorContext context;
private final ProcessorNode myNode;
private final InternalProcessorContext<Object, Object> context;
private final ProcessorNode<?, ?> myNode;

TimestampedCacheFlushListener(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
TimestampedCacheFlushListener(final ProcessorContext<Object, Object> context) {
this.context = (InternalProcessorContext<Object, Object>) context;
myNode = this.context.currentNode();
}

Expand All @@ -39,7 +39,7 @@ public void apply(final K key,
final ValueAndTimestamp<V> newValue,
final ValueAndTimestamp<V> oldValue,
final long timestamp) {
final ProcessorNode prev = context.currentNode();
final ProcessorNode<?, ?> prev = context.currentNode();
context.setCurrentNode(myNode);
try {
context.forward(
Expand Down
Expand Up @@ -30,13 +30,13 @@
* @param <V> the type of the value
*/
class TimestampedTupleForwarder<K, V> {
private final ProcessorContext context;
private final ProcessorContext<Object, Object> context;
private final boolean sendOldValues;
private final boolean cachingEnabled;

@SuppressWarnings("unchecked")
TimestampedTupleForwarder(final StateStore store,
final ProcessorContext context,
final ProcessorContext<Object, Object> context,
final TimestampedCacheFlushListener<K, V> flushListener,
final boolean sendOldValues) {
this.context = context;
Expand Down
Expand Up @@ -38,7 +38,7 @@ public Transformer<KIn, VIn, Iterable<KeyValue<KOut, VOut>>> get() {
private Transformer<KIn, VIn, KeyValue<KOut, VOut>> transformer = transformerSupplier.get();

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
transformer.init(context);
}

Expand Down
Expand Up @@ -61,9 +61,10 @@ private final class KTableKTableJoinProcessor extends AbstractProcessor<KO, Chan
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
final InternalProcessorContext<Object, Object> internalProcessorContext =
(InternalProcessorContext<Object, Object>) context;
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
Expand Down

0 comments on commit 29e08fd

Please sign in to comment.