-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5233 KIP-138: Change punctuate semantics #3055
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
if (processorContext.currentNode() != null) { | ||
throw new IllegalStateException(String.format("%s Current node is not null", logPrefix)); | ||
} | ||
|
||
updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node); | ||
|
||
log.trace("{} Punctuating processor {} with timestamp {}", logPrefix, node.name(), timestamp); | ||
log.trace("{} Punctuating processor {} with timestamp {} {}", logPrefix, node.name(), timestamp, type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: "with timestamp {} and type {}"
Did a first pass over interfaces and depreciated functions, looks good. Haven't covered tests yet. FYI. @mihbor any chance you could rebase to latest trunk? Thanks. |
@mihbor a key thing that seems to be missing seems to be that if no events arrive, the system timer will never trigger. So we have again the issue that in absence of events, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @enothereska about missing improvements on when to punctuate.
@@ -82,6 +83,7 @@ | |||
R transform(final K key, final V value); | |||
|
|||
/** | |||
* <b>Deprecated as of 0.11.0.0</b> - <i>Please use {@link Punctuator)} functional interface instead.</i> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add @Deprecated
Java annotation, and also use JavaDoc tag @Deprecated
@@ -44,11 +44,13 @@ | |||
void process(K key, V value); | |||
|
|||
/** | |||
* <b>Deprecated as of 0.11.0.0</b> - <i>Please use {@link Punctuator)} functional interface instead.</i> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use JavaDoc tag @Deprecated
instead
* schedule a periodic call called a punctuation to {@link Punctuator#punctuate(long)}. | ||
* | ||
* @param interval the time interval between punctuations | ||
* @param type one of:<ul>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: markup, two >>
I would also prefer to have this explanation in the body and not in the parameter list.
* in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use. | ||
* <b>NOTE:</b> Only advanced if messages arrive</li> | ||
* <li>{@link PunctuationType#SYSTEM_TIME} - uses the system time (aka wall-clock time), | ||
* which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
advanced at the polling interval
: not sure if this is true.
@Override | ||
public <K, V> void forward(K key, V value, int childIndex) { | ||
throw new UnsupportedOperationException(); | ||
throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the cleanup! Appreciate it!
task.schedule(interval); | ||
schedule(interval, PunctuationType.STREAM_TIME, new Punctuator() { | ||
@Override | ||
public void punctuate(long timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
@@ -122,8 +125,19 @@ public void commit() { | |||
} | |||
|
|||
@Override | |||
public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
to all parameters
@@ -422,12 +446,14 @@ int numBuffered() { | |||
boolean maybePunctuate() { | |||
final long timestamp = partitionGroup.timestamp(); | |||
|
|||
boolean punctuated = systemTimePunctuationQueue.mayPunctuate(System.currentTimeMillis(), PunctuationType.SYSTEM_TIME, this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not call System.currentTimeMillis()
directly, but use the tasks Time
object -- otherwise, we cannot test anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With regard to high level comment, we should on do this call here anyway.
Oh, I see your point about when to punctuate, thanks. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Not sure about metrics. Should there be one punctuate time sensor or one for stream time and a separate one for system time punctuations? |
My naive implementation of cancellable was quite badly broken as it wasn't pointing at the latest PunctuationSchedule object, but always only the first one. About to commit something more involved but functional. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nits. Overall LGTM.
@@ -28,8 +30,8 @@ | |||
* This is a stateful record-by-record operation, i.e, {@link #transform(Object, Object)} is invoked individually for | |||
* each record of a stream and can access and modify a state that is available beyond a single call of | |||
* {@link #transform(Object, Object)} (cf. {@link KeyValueMapper} for stateless record transformation). | |||
* Additionally, the interface can be called in regular intervals based on the processing progress | |||
* (cf. {@link #punctuate(long)}. | |||
* Additionally, this Transformer can {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: {@code Transformer}
@@ -28,8 +30,8 @@ | |||
* This is a stateful record-by-record operation, i.e, {@link #transform(Object)} is invoked individually for each | |||
* record of a stream and can access and modify a state that is available beyond a single call of | |||
* {@link #transform(Object)} (cf. {@link ValueMapper} for stateless value transformation). | |||
* Additionally, the interface can be called in regular intervals based on the processing progress | |||
* (cf. {@link #punctuate(long)}. | |||
* Additionally, this ValueTransformer can {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: {@code ValueTransformer}
@@ -82,6 +84,7 @@ | |||
R transform(final K key, final V value); | |||
|
|||
/** | |||
* @deprecated As of 0.11.1.0 please use {@link Punctuator} functional interface instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove As of 0.11.1.0
-- we also don't know yet what the next version number will be. If it's not 0.11.1.0 it would be wrong and I am pretty sure we will miss to update it.
@@ -82,6 +84,7 @@ | |||
VR transform(final V value); | |||
|
|||
/** | |||
* @deprecated As of 0.11.1.0 please use {@link Punctuator} functional interface instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
@@ -44,11 +45,13 @@ | |||
void process(K key, V value); | |||
|
|||
/** | |||
* @deprecated As of 0.11.1.0 please use {@link Punctuator} functional interface instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
Cancellable schedule(long interval, PunctuationType type, Punctuator callback); | ||
|
||
/** | ||
* @deprecated As of 0.11.1.0 please use {@link #schedule(long, PunctuationType, Punctuator)} instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
@@ -88,11 +88,35 @@ | |||
|
|||
/** | |||
* Schedules a periodic operation for processors. A processor may call this method during | |||
* {@link Processor#init(ProcessorContext) initialization} or processing | |||
* {@link Processor#process(Object, Object)} to | |||
* schedule a periodic call called a punctuation to {@link Punctuator#punctuate(long)}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a periodic call—called a punctuation—to
package org.apache.kafka.streams.processor; | ||
|
||
/** | ||
* A functional interface used as an argument to {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super-nit: add .
at the end
Sure, I'll change it. I'm also not a fan of premature optimisation and I think the impact would be negligible, but I didn't want to depart from the current practice for handling delegates in that class. |
rebased |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks @mihbor for your efforts on this KIP! Merged to trunk |
Thanks for this work, @mihbor I added trunk dep to my code base, I found the interface change self-describing, even near the end of a 10 hour day coding. My code was: and became: It was with great relief I saw all of our tests magically flip from failure to pass. I requested a hotfix to KIP-138 for 0.11.0.0. How's that for validation? |
Thanks for your kind comment :-) |
Cherry-picker: Dave Thomas <david.thomas@ticketmaster.com> Add revisionId to gradle-props. KAFKA-5233; KIP-138: Change punctuate semantics Implementation for KIP-138: Change punctuate semantics Author: Michal Borowiecki <michal.borowiecki@openbet.com> Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bbejeck@gmail.com>, Eno Thereska <eno.thereska@gmail.com>, Damian Guy <damian.guy@gmail.com> Closes apache#3055 from mihbor/KIP-138
Implementation for KIP-138: Change punctuate semantics