-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] #5433
KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] #5433
Conversation
Call for review @vvcephei |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Some minor comments. Overall LGTM.
assertOutputKeyValue(driver, 0, "XX0+YY0"); | ||
assertOutputKeyValue(driver, 1, "XX1+YY1"); | ||
assertOutputKeyValue(driver, 2, "XX2+YY2"); | ||
assertOutputKeyValue(driver, 3, "XX3+YY3"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add assertNull(driver.readOutput(output));
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good idea. Done
assertOutputKeyValue(driver, 0, "X0+YY0"); | ||
assertOutputKeyValue(driver, 1, "X1+YY1"); | ||
assertOutputKeyValue(driver, 2, "X2+YY2"); | ||
assertOutputKeyValue(driver, 3, "X3+YY3"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add assertNull(driver.readOutput(output));
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Done
driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null)); | ||
} | ||
assertOutputKeyValue(driver, 0, null); | ||
assertOutputKeyValue(driver, 1, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
assertEquals(kv.value, value); | ||
} | ||
} | ||
private void assertOutputKeyValue(TopologyTestDriver driver, Integer expectedKey, String expectedValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
to parameters / reformat one parameter per line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I forgot final. Thanks
assertOutputKeyValue(driver, 0, "X0+Y0"); | ||
assertOutputKeyValue(driver, 1, "X1+Y1"); | ||
assertOutputKeyValue(driver, 2, "X2+null"); | ||
assertOutputKeyValue(driver, 3, "X3+null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add assertNull(driver.readOutput(output));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
assertOutputKeyValue(driver, 0, "X0+YY0"); | ||
assertOutputKeyValue(driver, 1, "X1+YY1"); | ||
assertOutputKeyValue(driver, 2, "X2+YY2"); | ||
assertOutputKeyValue(driver, 3, "X3+YY3"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add assertNull(driver.readOutput(output));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null)); | ||
} | ||
assertOutputKeyValue(driver, 0, "X0+null"); | ||
assertOutputKeyValue(driver, 1, "X1+null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add assertNull(driver.readOutput(output));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
assertOutputKeyValue(driver, 0, "XX0+null"); | ||
assertOutputKeyValue(driver, 1, "XX1+null"); | ||
assertOutputKeyValue(driver, 2, "XX2+YY2"); | ||
assertOutputKeyValue(driver, 3, "XX3+YY3"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add assertNull(driver.readOutput(output));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], null)); | ||
} | ||
assertOutputKeyValue(driver, 1, null); | ||
assertOutputKeyValue(driver, 2, "null+YY2"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add assertNull(driver.readOutput(output));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -376,19 +326,8 @@ public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() { | |||
assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]")); | |||
} | |||
|
|||
private KeyValue<Integer, String> kv(final Integer key, final String value) { | |||
return new KeyValue<>(key, value); | |||
private void assertOutputKeyValue(TopologyTestDriver driver, Integer expectedKey, String expectedValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Refactor: * KTableKTableOuterJoinTest * KTableKTableLeftJoinTest * KTableKTableOuterJoinTest
ead1c7a
to
b43c51f
Compare
retest this please |
There are a couple of failed tests with JDK11 and Scala 2.12, namely:
but they seem unrelated to the changes in this PR. |
Hey @h314to , I'll try to take a look tomorrow. Most likely, the failures are unrelated. Thank you for taking note of which ones failed. You can re-run them by saying: Retest this, please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
Thanks, @h314to !
Retest this please. |
1 similar comment
Retest this please. |
Thanks for the PR @h314to! Merged to |
Great! Thanks. I'm working on a final PR to close this issue, but I'm running into some trouble. If I don't figure it out soon, and you're ok with that, I'll push it marked as work in progress to ask for some help. |
Sure. Sounds good! |
@h314to We got a bug report today: https://issues.apache.org/jira/browse/KAFKA-7933 -- I am wondering if this could be related to this PR? Any thoughts on this? |
* ak/trunk: (45 commits) KAFKA-7487: DumpLogSegments misreports offset mismatches (apache#5756) MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (apache#6269) KAFKA-7935: UNSUPPORTED_COMPRESSION_TYPE if ReplicaManager.getLogConfig returns None (apache#6274) KAFKA-7895: Fix stream-time reckoning for suppress (apache#6278) KAFKA-6569: Move OffsetIndex/TimeIndex logger to companion object (apache#4586) MINOR: add log indicating the suppression time (apache#6260) MINOR: Make info logs for KafkaConsumer a bit more verbose (apache#6279) KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics (apache#6265) KAFKA-7884; Docs for message.format.version should display valid values (apache#6209) MINOR: Save failed test output to build output directory MINOR: add test for StreamsSmokeTestDriver (apache#6231) MINOR: Fix bugs identified by compiler warnings (apache#6258) KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] (apache#5433) MINOR: fix bypasses in ChangeLogging stores (apache#6266) MINOR: Make MockClient#poll() more thread-safe (apache#5942) MINOR: drop dbAccessor reference on close (apache#6254) KAFKA-7811: Avoid unnecessary lock acquire when KafkaConsumer commits offsets (apache#6119) KAFKA-7916: Unify store wrapping code for clarity (apache#6255) MINOR: Add missing Alter Operation to Topic supported operations list in AclCommand KAFKA-7921: log at error level for missing source topic (apache#6262) ...
…4] (apache#5433) Reviewer: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR continues the work towards the removal of KStreamTest driver. As suggested in a previous PR,
OutputVerifier
is used instead ofMockProcessor
.Refactor: