-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4789: Added support to ProcessorTopologyTestDriver to forward timestamps to internal topics #2590
KAFKA-4789: Added support to ProcessorTopologyTestDriver to forward timestamps to internal topics #2590
Conversation
…xtracted timestamps to internal topics
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Left a few comments, though
driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER); | ||
driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER); | ||
driver.process(INPUT_TOPIC_1, "key3", "value3@3000", STRING_SERIALIZER, STRING_SERIALIZER); | ||
assertNextOutputRecordTimestamp(OUTPUT_TOPIC_1, "key1", "value1", 1000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like these could be replaced with:
assertThat(driver.readOutput(...), equalTo(new ProducerRecord<>(topic, null, timestamp, key, value)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes the test more readable. Also, there are already similar methods with different signatures: assertNextOutputRecord
. I can replace it if you think that will make the code shorter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, i understand there are other assertNextOutputRecord
methods. I'd probably replace them too (not in this PR!) . I feel using assertThat
is better in this case as we are just comparing the ProducerRecord
. I don't see the need to have a method with multiple assertions when it can all be done in a single assertion.
@@ -298,6 +309,15 @@ private void assertNextOutputRecord(String topic, String key, String value) { | |||
assertNull(record.partition()); | |||
} | |||
|
|||
private void assertNextOutputRecordTimestamp(String topic, String key, String value, Long timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed if replaced with assertThat
as mentioned above
* A processor that forwards messages with modified values (without custom timestamp information) to each child, if | ||
* the value is in ".*@[0-9]+" format. | ||
*/ | ||
protected static class ValueTimestampProcessor extends AbstractProcessor<String, String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't clear to me why we need this class. We can test the timestamp extraction without it, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to test that the extracted timestamp is forwarded with the record produced. We need to remove the custom timestamp information from the value, otherwise, it will be extracted from the value again.
…ng extracted timestamps to internal topics
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
…ng extracted timestamps to internal topics
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
…ng extracted timestamps to internal topics
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hrafzali. LGTM
@guozhangwang can you please merge this? |
|
||
@Override | ||
public void punctuate(long streamTime) { | ||
context().forward(Long.toString(streamTime), "punctuate"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this punctuate function necessary? Since we do not schedule
in the init functions I think this punctuation will never be triggered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I removed the punctuate function.
… forwarding extracted timestamps to internal topics
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged to trunk. Thanks @hrafzali !
This resolves the issue in the ProcessorTopologyTestDriver that the extracted timestamp is not forwarded with the produced record to the internal topics.
JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-4789
The contribution is my original work and I license the work to the project under the project's open source license.
@guozhangwang @dguy