-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5362: Add Streams EOS system test with repartitioning topic #3310
KAFKA-5362: Add Streams EOS system test with repartitioning topic #3310
Conversation
Call for review @bbejeck @enothereska @dguy @guozhangwang |
Triggered branch builder at https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/320/ |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Branch builder failed before starting the test... retriggered at https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/324/ |
71afa21
to
435d821
Compare
Failing system test was due to broker bug that got fixed. Rebased and rerunning system test now: https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/326/ |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
System test passed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/328/ |
if (withRepartitioning) { | ||
final KStream<String, Integer> repartitionedData = data.through("repartition"); | ||
|
||
repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While checking the SmokeTestUtil
code: some of the static variables are not used any more and hence can be removed.
|
||
final KGroupedStream<String, Integer> groupedData2 = repartitionedData.groupByKey(); | ||
// max | ||
groupedData2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: groupedData2
-> groupedDataAfterRepartition
private static boolean receivedAllRecords(final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> receivedRecords, | ||
final Map<TopicPartition, Long> committedOffsets, | ||
final boolean withRepartitioning) { | ||
if (receivedRecords == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would never happen now since the passed in recordPerTopicPerPartition
is always initialized.
// repartition -> max, cnt | ||
|
||
final Map<String, List<String>> inputToOutputTopics = new HashMap<>(); | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bracket seems not necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used it to scope the initialization for inputToOutputTopics
-- this make the code more readable IMHO as you can easily see where the initialization code "ends".
|
||
final TopicPartition outputTp = new TopicPartition(outputTopic, partition); | ||
final List<ConsumerRecord<byte[], byte[]>> records = received.get(outputTp); | ||
if (records == null || records.size() < offset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check records.size() < offset
seems a bit sketchy to me. Basically we are assuming that the input topic data
's starting offset is always 0, and there is no "holes" in the topic partitions so that the offset
indicates the number of records we can read from the input topic.
Maybe a more robust way to do that would be
- read the input topics
data
and optionallyrepartition
based onwithRepartitioning
, stop when the current record's offset is equal to or larger than the committed offset, and remember the number of records; - read the output topics (again optionally
repartition
) from the beginning to the end (useseekTo
), and check that the number of records are the same as the number of records read from the input.
Then we do not need to truncate, and also in verification we do not need to check list size again since they are already checked here.
435d821
to
5866817
Compare
Updated. Retriggered branch builder: https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/329/ |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Branch builder failed with
Note the New run at: https://jenkins.confluent.io/job/system-test-kafka-branch-builder-2/330/ |
Retest this please. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + value + "> but was <" + receivedKey + "," + receivedValue + ">"); | ||
} | ||
} | ||
} catch (final NullPointerException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check that inputRecords.hasNext
is false after the for loop, or any manners to make sure that pairing lists have the same size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto elsewhere.
@@ -383,7 +403,84 @@ private static void verifySum(final Map<TopicPartition, List<ConsumerRecord<byte | |||
sum += value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if (sum = value) else (sum += value)
Integer max = currentMinPerKey.get(key); | ||
if (max == null) { | ||
max = value; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if (max = value) else (max = Math.max)
.
} | ||
} | ||
} catch (final NullPointerException e) { | ||
System.err.println(inputPerTopicPerPartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we print both input and output partition lists, and also print as Error found: input is ... output is ...
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking about this twice, a NPE should not happen here any more as we reworked the code. IMHO, we can just remove the whole try-catch-block. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
What is the status of this? |
I'm putting this on hold until #3375 is merged, since we also saw a couple local test failures with it and suspected it is due to the same root cause. |
0c5d9db
to
7ee5752
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Updated. Branch builder passed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/937/ |
LGTM! Merged to trunk and 0.11.0. |
Author: Matthias J. Sax <matthias@confluent.io> Reviewers: Guozhang Wang <wangguoz@gmail.com> Closes #3310 from mjsax/kafka-5362-add-eos-system-tests-for-streams-api (cherry picked from commit 2265834) Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
No description provided.