New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8410: Part 1: processor context bounds #8414
KAFKA-8410: Part 1: processor context bounds #8414
Conversation
@@ -64,7 +64,7 @@ | |||
* | |||
* @param context the context | |||
*/ | |||
void init(final ProcessorContext context); | |||
void init(final ProcessorContext<Object, Object> context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the KIP, I proposed to leave the existing Processor and Transformer init methods with a raw ProcessorContext argument. This will give all users a rawtypes warning with no recourse. Instead, in this PR, I'm adding <Object, Object>
bounds instead. This at least allows users to also add the generics if they desire, while not changing the practical bounds at all. If this proves acceptable, I'll update the KIP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand this. Can you elaborate?
Btw: to we think we will have any path forward to let user specify the actually return type (that is currently only R
but does not specify the actually key or value type), such that we would pass in ProcessorContext<K, V>
? Can't remember if we discussed this on the KIP (just a side question).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. Yes, it's been forever, but we did (myself, I had to look back over the KIP to figure this out when I wrote the comment).
The idea is to discuss separately how to migrate to a version of Transformer that allows bounding the output types. The ticket for this is https://issues.apache.org/jira/browse/KAFKA-8396. This KIP was already sweeping enough in its design and implementation scope that we wanted to give Transformer its own discussion. Accordingly, this KIP only has to deal with the Processor API change and the consequences that are necessary/convenient to tackle at the same time.
Part of the reason to give Transformer its own discussion is that there are other opportunities to simplify transform()
use cases (see the ticket for more info), so it really would be its own conversation.
What I was talking about above is that I proposed in this KIP only to leave Transformer completely alone, so this would still be a raw type: void init(final ProcessorContext context)
. But what I realized just now is that anyone who implements the interface will get a "raw types" warning from javac, and there is nothing they can do about it but ignore it. If they try to add the equivalent <Object, Object>
or <?, ?>
parameters, their implementation would clash with our interface.
OTOH, if we go ahead and add the parameters as I proposed above, then at least they can also add the <Object, Object>
parameters to their code to resolve the warning. They can not bound the types tighter than that, and neither can we, until we pick up KAFKA-8396. So, it doesn't provide any additional type safety, just a way to resolve the warning without resolving to ignoring it. AFAICT, there is no way to compatibly migrate the interface to accommodate greater type safety. KAFKA-8396 will have to include deprecating something in any case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarification.
@@ -69,7 +69,7 @@ | |||
* @throws IllegalStateException If store gets registered after initialization is already finished | |||
* @throws StreamsException if the store's change log does not contain the partition | |||
*/ | |||
void init(final ProcessorContext context); | |||
void init(final ProcessorContext<Void, Void> context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As proposed, Void types indicate that you can't forward anything.
super.init(context); | ||
metrics = (StreamsMetricsImpl) context.metrics(); | ||
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); | ||
valueGetter.init(context); | ||
valueGetter.init(new ForwardingDisabledProcessorContext(context)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even in this first PR, we expose some bugs. In this case, ValueGetters and ValueTransformers shouldn't be able to forward anything, but nothing previously prevented them from doing it. Now, the new bounds force us to wire in a context that has Void bounds.
private final InternalProcessorContext context; | ||
private final ProcessorNode myNode; | ||
private final InternalProcessorContext<Object, Object> context; | ||
private final ProcessorNode<?, ?> myNode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll notice that sometimes, I use wildcard (?
) generic parameter bounds and other times, Object
. The reason is a little subtle. It's because the InternalProcessorContext has a method that takes a ProcessorNode argument. We want to let (ourselves) pass in any ProcessorNode, not just an exactly <Object, Object>
one. But for all the ProcessorContext references, we pass the context in, and we have to do it untyped anyway, so the type bound doesn't matter. It results in less mind-bending code for users to declare types using the Object,Object bounds, and it makes no difference to them, as far as the calls to forward
, since the parameterized types are only upper bounds anyway.
Punchline, it doesn't really matter, but there are considerations that led me to use ?
in some cases and Object
in others.
* @return a handle allowing cancellation of the punctuation schedule established by this method | ||
*/ | ||
Cancellable schedule(final Duration interval, | ||
final PunctuationType type, | ||
final Punctuator callback) throws IllegalArgumentException; | ||
final Punctuator callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not part of the KIP, and no difference whatsoever to the public API, but (unlike the javadoc I added), declaring that a method throws an unchecked exception is a totally ineffective way to inform users that it might get thrown (since they'll only ever see the declaraction if they read the source of this file). At least, the javadoc will show up in their IDE.
import org.junit.runner.RunWith; | ||
|
||
@RunWith(EasyMockRunner.class) | ||
public class ForwardingDisabledProcessorContextTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test doesn't even compile now, because the compiler checks all these constraints for us. There's no way to express the things we wanted to forbid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet!
Hmm, that last commit illustrates a problem with this approach. Adding generics to a previously raw type is only a compiler warning for Java users, but it's a compiler error for Scala users, because Scala insists on having the generic arguments for any type that takes generic parameters. I think the way to fix this is to propose to add a new ProcessorContext as well as a new Processor interface. Or we can accept this level of source code incompatibility in the Scala API. For context, many changes we've made in minor versions break Scala source compatibility, since Scala generally makes it harder for libraries to implement non-breaking deprecation paths. |
public <K, V> void forward(final K key, | ||
final V value) { | ||
public <K1 extends K, V1 extends V> void forward(final K1 key, | ||
final V1 value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix indention (similar below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, forgot to reformat. Thanks for the catch.
Hey, thanks for the review @mjsax ! I resolved the formatting problem and answered your question. |
Java 11: `kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Feel free to merge after Jenkins passed.
Filed https://issues.apache.org/jira/browse/KAFKA-9831 for the unrelated failure. |
Thanks, @mjsax ! |
The AK PR apache/kafka#8414 introduced type parameters for the ProcessorContext. Since Scala requires generic arguments for any type that takes generic parameters, the Scala code in the examples that uses the ProcessorContext did not compile anymore.
This reverts commit 29e08fd. There turned out to be more than expected problems with adding the generic parameters. Reviewers: Matthias J. Sax <matthias@confluent.io>
Add type bounds to the ProcessorContext, which bounds the types that can be forwarded to child nodes.
Committer Checklist (excluded from commit message)