-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8410: Migrating stateful operators to new Processor API #10507
Conversation
a2c2e64
to
71fa2fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry it took so long for me to pick this up. Just a quick first batch of review comments.
* calling {@code forward()} (and some other methods) would result in a runtime exception. | ||
* | ||
* @param context processor context | ||
* @param record record that failed deserialization | ||
* @param exception the actual exception | ||
*/ | ||
DeserializationHandlerResponse handle(final ProcessorContext context, | ||
DeserializationHandlerResponse handle(final ProcessorContext<?, ?> context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, man. I overlooked this in the KIP, and we can't just change this in-place, as it will break any subclasses.
What we need to do is deprecate this method and introduce a new one with a default implementation that calls back here. We can update the KIP with this change, since it's a simple oversight and follows established patterns for migrating interfaces.
import org.apache.kafka.streams.processor.ProcessorContext; | ||
import org.apache.kafka.streams.processor.ProcessorSupplier; | ||
import org.apache.kafka.streams.processor.StreamPartitioner; | ||
import org.apache.kafka.streams.processor.TopicNameExtractor; | ||
import org.apache.kafka.streams.processor.api.ProcessorContext; | ||
import org.apache.kafka.streams.processor.api.ProcessorSupplier; | ||
import org.apache.kafka.streams.state.KeyValueStore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jeqo , based on my understanding of the KIP, nothing in this interface should have changed. Was this changeset intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skimmed over this interface and changes are in line breaks of comments and renaming of type parameters. In the interest of good reviews, I would not do those changes in this PR but rather open a separate PR for this interface. However, I might have missed an important part. @jeqo Could you clarify?
Regarding the comments, we usually add a break after each sentence.
@@ -16,6 +16,7 @@ | |||
*/ | |||
package org.apache.kafka.streams.kstream; | |||
|
|||
import java.util.function.Function; | |||
import org.apache.kafka.common.serialization.Serde; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the same question here: do we need any changes to this interface?
* @param <V1> first value type | ||
* @param <V2> second value type | ||
* @param <VR> joined value type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here: it doesn't seem strictly necessary to rename the generic parameters as part of this PR.
Specifically, funny story: these params used to be called V
and V1
, and we renamed them to V1
and V2
because we thought it made more sense :)
* Lay the groundwork for migrating KTable Processors to the new PAPI. * Migrate the KTableFilter processor to prove that the groundwork works. This is an effort to help break up #10507 into multiple PRs. Reviewers: Boyang Chen <boyang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeqo I started to review the PR but haven't finished yet.
Could you please rebase the PR because it has some conflicts?
I think you should undo the changes to KStream
. AFAIS they are not required for this PR and pollute the PR a lot.
Please look carefully at my comments in CogroupedStreamAggregateBuilder
. I am not sure if I missing something there or if there is a bug.
@@ -32,7 +32,7 @@ | |||
private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class); | |||
|
|||
@Override | |||
public DeserializationHandlerResponse handle(final ProcessorContext context, | |||
public DeserializationHandlerResponse handle(final ProcessorContext<?, ?> context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to deprecate also this method and add a new one? Technically, it is a class of the public API that can be extended.
@@ -32,7 +32,7 @@ | |||
private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class); | |||
|
|||
@Override | |||
public DeserializationHandlerResponse handle(final ProcessorContext context, | |||
public DeserializationHandlerResponse handle(final ProcessorContext<?, ?> context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to deprecate also this method and add a new one? Technically, it is a class of the public API that can be extended.
import org.apache.kafka.streams.processor.ProcessorContext; | ||
import org.apache.kafka.streams.processor.ProcessorSupplier; | ||
import org.apache.kafka.streams.processor.StreamPartitioner; | ||
import org.apache.kafka.streams.processor.TopicNameExtractor; | ||
import org.apache.kafka.streams.processor.api.ProcessorContext; | ||
import org.apache.kafka.streams.processor.api.ProcessorSupplier; | ||
import org.apache.kafka.streams.state.KeyValueStore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skimmed over this interface and changes are in line breaks of comments and renaming of type parameters. In the interest of good reviews, I would not do those changes in this PR but rather open a separate PR for this interface. However, I might have missed an important part. @jeqo Could you clarify?
Regarding the comments, we usually add a break after each sentence.
import java.util.Collection; | ||
import java.util.HashSet; | ||
import java.util.Objects; | ||
import java.util.Set; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In KAFKA-10787 we agreed on an import order kafka
, org.apache.kafka
, com
, net
, org
, java
, javax
and static imports. Additionally, there should be a empty line between import blocks.
Note, PR #10428 introduces check and a formatter for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cadonna The sooner you merge the PR, I can start to apply the formatter to the streams module sooner. 😃
boolean stateCreated = false; | ||
int counter = 0; | ||
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { | ||
final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor = | ||
final KStreamAggregateProcessorSupplier<K, K, ?, ?> parentProcessor = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be KStreamAggregateProcessorSupplier<K, ?, K, ?>
? The positions of the parameters KOut
and VIn
on KStreamAggregateProcessorSupplier
changed with respect to KStreamAggProcessorSupplier
.
boolean stateCreated = false; | ||
int counter = 0; | ||
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { | ||
final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor = | ||
(KStreamAggProcessorSupplier<K, K, ?, ?>) new KStreamWindowAggregate<K, K, VOut, W>( | ||
final KStreamWindowAggregate<K, K, VOut, W> parentProcessor = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be KStreamWindowAggregate<K, VOut, K, W>
? Here I am not sure if I am missing something since the type parameter positions did not change. Why is the type parameter for V
in KStreamWindowAggregate
K
and not ?
?
import java.util.Objects; | ||
import java.util.Set; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment above about import order.
new KStreamAggregate<>(materializedInternal.storeName(), | ||
aggregateBuilder.countInitializer, | ||
aggregateBuilder.countAggregator), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new KStreamAggregate<>(materializedInternal.storeName(), | |
aggregateBuilder.countInitializer, | |
aggregateBuilder.countAggregator), | |
new KStreamAggregate<>( | |
materializedInternal.storeName(), | |
aggregateBuilder.countInitializer, | |
aggregateBuilder.countAggregator | |
), |
or
new KStreamAggregate<>(materializedInternal.storeName(), | |
aggregateBuilder.countInitializer, | |
aggregateBuilder.countAggregator), | |
new KStreamAggregate<>( | |
materializedInternal.storeName(), | |
aggregateBuilder.countInitializer, | |
aggregateBuilder.countAggregator), |
Continuation of #10381. Migration of Kafka Streams stateful operators (KTable, KStream aggregations, joins).
Committer Checklist (excluded from commit message)