Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8410: Part 1: processor context bounds #8414

Merged
merged 3 commits into from Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -153,12 +153,12 @@ public Processor<String, Long> get() {
}

public static class CustomMaxAggregator implements Processor<String, Long> {
ProcessorContext context;
ProcessorContext<Object, Object> context;
private KeyValueStore<String, Long> store;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
this.context = context;
context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
Expand Down
Expand Up @@ -64,7 +64,7 @@ public interface Transformer<K, V, R> {
*
* @param context the context
*/
void init(final ProcessorContext context);
void init(final ProcessorContext<Object, Object> context);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the KIP, I proposed to leave the existing Processor and Transformer init methods with a raw ProcessorContext argument. This will give all users a rawtypes warning with no recourse. Instead, in this PR, I'm adding <Object, Object> bounds instead. This at least allows users to also add the generics if they desire, while not changing the practical bounds at all. If this proves acceptable, I'll update the KIP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand this. Can you elaborate?

Btw: to we think we will have any path forward to let user specify the actually return type (that is currently only R but does not specify the actually key or value type), such that we would pass in ProcessorContext<K, V>? Can't remember if we discussed this on the KIP (just a side question).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Yes, it's been forever, but we did (myself, I had to look back over the KIP to figure this out when I wrote the comment).

The idea is to discuss separately how to migrate to a version of Transformer that allows bounding the output types. The ticket for this is https://issues.apache.org/jira/browse/KAFKA-8396. This KIP was already sweeping enough in its design and implementation scope that we wanted to give Transformer its own discussion. Accordingly, this KIP only has to deal with the Processor API change and the consequences that are necessary/convenient to tackle at the same time.

Part of the reason to give Transformer its own discussion is that there are other opportunities to simplify transform() use cases (see the ticket for more info), so it really would be its own conversation.

What I was talking about above is that I proposed in this KIP only to leave Transformer completely alone, so this would still be a raw type: void init(final ProcessorContext context). But what I realized just now is that anyone who implements the interface will get a "raw types" warning from javac, and there is nothing they can do about it but ignore it. If they try to add the equivalent <Object, Object> or <?, ?> parameters, their implementation would clash with our interface.

OTOH, if we go ahead and add the parameters as I proposed above, then at least they can also add the <Object, Object> parameters to their code to resolve the warning. They can not bound the types tighter than that, and neither can we, until we pick up KAFKA-8396. So, it doesn't provide any additional type safety, just a way to resolve the warning without resolving to ignoring it. AFAICT, there is no way to compatibly migrate the interface to accommodate greater type safety. KAFKA-8396 will have to include deprecating something in any case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarification.


/**
* Transform the record with the given key and value.
Expand Down
Expand Up @@ -69,7 +69,7 @@ public interface ValueTransformer<V, VR> {
* @throws IllegalStateException If store gets registered after initialization is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
void init(final ProcessorContext context);
void init(final ProcessorContext<Void, Void> context);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As proposed, Void types indicate that you can't forward anything.


/**
* Transform the given value to a new value.
Expand Down
Expand Up @@ -72,7 +72,7 @@ public interface ValueTransformerWithKey<K, V, VR> {
* @throws IllegalStateException If store gets registered after initialization is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
void init(final ProcessorContext context);
void init(final ProcessorContext<Void, Void> context);

/**
* Transform the given [key and ]value to a new value.
Expand Down
Expand Up @@ -111,7 +111,7 @@ static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWi
final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new ValueTransformerWithKey<K, V, VR>() {
@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
valueTransformer.init(context);
}

Expand Down
Expand Up @@ -46,7 +46,7 @@ public KStreamFlatTransformProcessor(final Transformer<? super KIn, ? super VIn,
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
transformer.init(context);
}
Expand Down
Expand Up @@ -39,14 +39,14 @@ public Processor<KIn, VIn> get() {
public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {

private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;
private ProcessorContext context;
private ProcessorContext<Object, Object> context;

KStreamFlatTransformValuesProcessor(final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer) {
this.valueTransformer = valueTransformer;
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
this.context = context;
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,11 +50,11 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
valueGetter.init(new ForwardingDisabledProcessorContext(context));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even in this first PR, we expose some bugs. In this case, ValueGetters and ValueTransformers shouldn't be able to forward anything, but nothing previously prevented them from doing it. Now, the new bounds force us to wire in a context that has Void bounds.

}

@Override
Expand Down
Expand Up @@ -39,14 +39,14 @@ public Processor<K, V> get() {
public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {

private final ValueTransformerWithKey<K, V, R> valueTransformer;
private ProcessorContext context;
private ProcessorContext<Object, Object> context;

KStreamTransformValuesProcessor(final ValueTransformerWithKey<K, V, R> valueTransformer) {
this.valueTransformer = valueTransformer;
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
this.context = context;
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
Expand Down Expand Up @@ -75,11 +76,11 @@ private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>>
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
valueGetter.init(new ForwardingDisabledProcessorContext(context));
}

@Override
Expand Down Expand Up @@ -135,7 +136,7 @@ private class KTableKTableInnerJoinValueGetter implements KTableValueGetter<K, R
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
Expand Down Expand Up @@ -74,11 +75,11 @@ private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
valueGetter.init(new ForwardingDisabledProcessorContext(context));
}

@Override
Expand Down Expand Up @@ -141,7 +142,7 @@ private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R>
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
Expand Down Expand Up @@ -73,11 +74,11 @@ private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
valueGetter.init(new ForwardingDisabledProcessorContext(context));
}

@Override
Expand Down Expand Up @@ -136,7 +137,7 @@ private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, R
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
Expand Down Expand Up @@ -72,11 +73,11 @@ private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
valueGetter.init(new ForwardingDisabledProcessorContext(context));
}

@Override
Expand Down Expand Up @@ -132,7 +133,7 @@ private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, R
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
Expand Down
Expand Up @@ -135,7 +135,7 @@ private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> {
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
parentGetter.init(context);
}

Expand Down
Expand Up @@ -104,14 +104,14 @@ public void process(final K key, final Change<V> change) {
private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> {

private final KTableValueGetter<K, V> parentGetter;
private ProcessorContext context;
private ProcessorContext<Void, Void> context;

KTableMapValueGetter(final KTableValueGetter<K, V> parentGetter) {
this.parentGetter = parentGetter;
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
this.context = context;
parentGetter.init(context);
}
Expand Down
Expand Up @@ -134,7 +134,7 @@ private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {
}

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Void, Void> context) {
parentGetter.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
}
Expand Down
Expand Up @@ -21,7 +21,7 @@

public interface KTableValueGetter<K, V> {

void init(ProcessorContext context);
void init(ProcessorContext<Void, Void> context);

ValueAndTimestamp<V> get(K key);

Expand Down
Expand Up @@ -24,11 +24,11 @@
import org.apache.kafka.streams.state.internals.CacheFlushListener;

class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> {
private final InternalProcessorContext context;
private final ProcessorNode myNode;
private final InternalProcessorContext<Object, Object> context;
private final ProcessorNode<?, ?> myNode;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll notice that sometimes, I use wildcard (?) generic parameter bounds and other times, Object. The reason is a little subtle. It's because the InternalProcessorContext has a method that takes a ProcessorNode argument. We want to let (ourselves) pass in any ProcessorNode, not just an exactly <Object, Object> one. But for all the ProcessorContext references, we pass the context in, and we have to do it untyped anyway, so the type bound doesn't matter. It results in less mind-bending code for users to declare types using the Object,Object bounds, and it makes no difference to them, as far as the calls to forward, since the parameterized types are only upper bounds anyway.

Punchline, it doesn't really matter, but there are considerations that led me to use ? in some cases and Object in others.


SessionCacheFlushListener(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
SessionCacheFlushListener(final ProcessorContext<Object, Object> context) {
this.context = (InternalProcessorContext<Object, Object>) context;
myNode = this.context.currentNode();
}

Expand All @@ -37,7 +37,7 @@ public void apply(final Windowed<K> key,
final V newValue,
final V oldValue,
final long timestamp) {
final ProcessorNode prev = context.currentNode();
final ProcessorNode<?, ?> prev = context.currentNode();
context.setCurrentNode(myNode);
try {
context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end()));
Expand Down
Expand Up @@ -32,13 +32,13 @@
* @param <V>
*/
class SessionTupleForwarder<K, V> {
private final ProcessorContext context;
private final ProcessorContext<Object, Object> context;
private final boolean sendOldValues;
private final boolean cachingEnabled;

@SuppressWarnings("unchecked")
SessionTupleForwarder(final StateStore store,
final ProcessorContext context,
final ProcessorContext<Object, Object> context,
final CacheFlushListener<Windowed<K>, V> flushListener,
final boolean sendOldValues) {
this.context = context;
Expand Down
Expand Up @@ -26,11 +26,11 @@
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

class TimestampedCacheFlushListener<K, V> implements CacheFlushListener<K, ValueAndTimestamp<V>> {
private final InternalProcessorContext context;
private final ProcessorNode myNode;
private final InternalProcessorContext<Object, Object> context;
private final ProcessorNode<?, ?> myNode;

TimestampedCacheFlushListener(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
TimestampedCacheFlushListener(final ProcessorContext<Object, Object> context) {
this.context = (InternalProcessorContext<Object, Object>) context;
myNode = this.context.currentNode();
}

Expand All @@ -39,7 +39,7 @@ public void apply(final K key,
final ValueAndTimestamp<V> newValue,
final ValueAndTimestamp<V> oldValue,
final long timestamp) {
final ProcessorNode prev = context.currentNode();
final ProcessorNode<?, ?> prev = context.currentNode();
context.setCurrentNode(myNode);
try {
context.forward(
Expand Down
Expand Up @@ -30,13 +30,13 @@
* @param <V> the type of the value
*/
class TimestampedTupleForwarder<K, V> {
private final ProcessorContext context;
private final ProcessorContext<Object, Object> context;
private final boolean sendOldValues;
private final boolean cachingEnabled;

@SuppressWarnings("unchecked")
TimestampedTupleForwarder(final StateStore store,
final ProcessorContext context,
final ProcessorContext<Object, Object> context,
final TimestampedCacheFlushListener<K, V> flushListener,
final boolean sendOldValues) {
this.context = context;
Expand Down
Expand Up @@ -38,7 +38,7 @@ public Transformer<KIn, VIn, Iterable<KeyValue<KOut, VOut>>> get() {
private Transformer<KIn, VIn, KeyValue<KOut, VOut>> transformer = transformerSupplier.get();

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
transformer.init(context);
}

Expand Down
Expand Up @@ -61,9 +61,10 @@ private final class KTableKTableJoinProcessor extends AbstractProcessor<KO, Chan
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<Object, Object> context) {
super.init(context);
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
final InternalProcessorContext<Object, Object> internalProcessorContext =
(InternalProcessorContext<Object, Object>) context;
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
Expand Down