-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6474: remove KStreamTestDriver #6732
KAFKA-6474: remove KStreamTestDriver #6732
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @ableegoldman @abbccdda @bbejeck , do you mind taking a look at this?
Someone reported that KIP-258 broke their tests, but it turned out they were using this deprecated, internal test utility. Rather than "fix" it, we can just go ahead and finally remove it.
PunctuationType.WALL_CLOCK_TIME, | ||
timestamp -> context.forward(-1, (int) timestamp) | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-introducing test logic that had been previously removed (see the lines below that I uncommented).
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
Outdated
Show resolved
Hide resolved
"200:1110 (ts: 0)", | ||
"2000:11110 (ts: 0)", | ||
"-1:2 (ts: 2)", | ||
"-1:3 (ts: 3)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two extra results are from the punctuation. You can see that they were previously expected in the commented-out expectation on old line 89.
"B:0+2-2+4-4 (ts: 0)", | ||
"B:0+2-2+4-4+7 (ts: 0)", | ||
"C:0+5-5 (ts: 0)", | ||
"C:0+5-5+8 (ts: 0)"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that there are some extra intermediate states here, because TTD doesn't do caching (or rather, flushes after each record)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I can follow. The old code calls driver.flushState();
after each driver.process(...)
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered this above. Even though we flush between input records, the internal streams code still forwards the retraction first, and then the addition. Those would previously get cached together, but TTD passes them though individually.
"A:0+1-1 (ts: 0)", | ||
"A:0+1-1+3 (ts: 0)", | ||
"A:0+1-1+3-3 (ts: 0)", | ||
"A:0+1-1+3-3+4 (ts: 0)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This difference is a mystery to me. We previously expected A:0+4 (ts: 0)
. In other words, the processor never even saw the intermediate records. I can't figure out the mechanism for this, so there's a risk we're not testing the right thing here anymore. Help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too (at all) familiar with either driver, but isn't the "mechanism" just that before we were only flushing after processing all three records, whereas now we're flushing on every record (and thus see these intermediates as you point out above)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but what seems strange to me is that the processor should still have seen all the events, and then caching should have absorbed some of the intermediate ones, but looking at the old output, it seems that the processor never saw those intermediate events, which indicates that there was some caching upstream of the processor. There's only a source KTable, but it's not materialized. I was thinking that it should not actually be stored in that case, but now that I'm reflecting on it again, I think that's only if you have optimizations enabled. So, this would explain it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vvcephei I agree with you plus there's a call driver.flushState();
(line 142 of the removed code) so indeed I suspect that it was caching that was filtering the intermediate records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The source KTable is materialized for this case. That's why groupBy().aggregate()
is only executed after the flush on the last value per key.
However, with TopologyTestDriver
we cannot mimic this behavior any longer. Hence, it would make sense to delete this test, as it's the same as testAggBasic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roger, wilco.
assertEquals( | ||
asList( | ||
"green:1 (ts: 0)", | ||
"green:2 (ts: 0)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a couple of extra intermediate states here.
"1:1 (ts: 0)", | ||
"1:12 (ts: 0)", | ||
"1:2 (ts: 0)", | ||
"1: (ts: 0)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one extra intermediate state here.
// that in turn will cause an eviction on reducer-topic. It will flush | ||
// key 2 as it is the only dirty entry in the cache | ||
driver.process("tableOne", "1", "5"); | ||
assertEquals(Long.valueOf(4L), reduceResults.get("2")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic isn't possible to test with TTD, so it would have to become an full integration test. But, it seems like it's testing the logic of caching and repartitioning, not the logic of the KTableAggregate, so I'm proposing to remove the test.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@mjsax maybe you can have a look at this, if you have a chance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One high level comment: do you think it's worth to add non-zero timestamp records into this test? Will that help use verify the order?
|
||
final String[] expected = {"2:10 (ts: 0)", "20:110 (ts: 0)", "200:1110 (ts: 0)", "2000:11110 (ts: 0)"}; | ||
final String[] expected = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we init expected
first and use its size for assertion on L100?
@vvcephei Thanks for the PR. This should be covered as https://issues.apache.org/jira/browse/KAFKA-6474 -- not as minor PR. Please also comment on the ticket -- don't think we need to reassign it, as @h314to did the lion's share of the ticket. Just to make sure we are all on the same page. |
Hey @abbccdda , thanks for the review! We certainly could add non-zero timestamps to the test, but I don't think it would help us verify the order, since TTD always just processes one record at a time, and we're already asserting a specific order in the expectation array. WDYT? |
Thanks for the reminder @mjsax ! I've changed the title of the PR. |
Java 8 failed, Java 11 passed, but test results already cleaned up retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @vvcephei LGTM.
// that in turn will cause an eviction on reducer-topic. It will flush | ||
// key 2 as it is the only dirty entry in the cache | ||
driver.process("tableOne", "1", "5"); | ||
assertEquals(Long.valueOf(4L), reduceResults.get("2")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
"A:0+1-1 (ts: 0)", | ||
"A:0+1-1+3 (ts: 0)", | ||
"A:0+1-1+3-3 (ts: 0)", | ||
"A:0+1-1+3-3+4 (ts: 0)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vvcephei I agree with you plus there's a call driver.flushState();
(line 142 of the removed code) so indeed I suspect that it was caching that was filtering the intermediate records.
asList( | ||
"A:0+1 (ts: 0)", | ||
"B:0+2 (ts: 0)", | ||
"A:0+1-1 (ts: 0)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems different to the old expected result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I figured it's because TTD forwards every record through one at a time. Even though we flush after each change, the old code would still "coalesce" the retraction and update. Does this seem wrong to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. Should be fine. Still not sure if I understand why...
Hm, I figured it's because TTD forwards every record through one at a time.
Yes. But a single input record result in an add plus remove, so I am wondering why those are not "coalesce" in TTD, too? The caching in the stores should behave the same...
Does this seem wrong to you?
Not necessarily. Just try to understand why TTD behaves differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that caching is disabled, but I am not sure why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vvcephei Still would like to understand this. Why does caching not take effect any longer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax @vvcephei I ran the new commit locally and I think I get the difference here:
In the ToplogyTestDriver#pipeInput:
// Process the record ...
task.process();
task.maybePunctuateStreamTime();
task.commit();
captureOutputRecords();
I.e. each record would cause a commit immediately, and in this case, when processing the two records from the repartition topics, each of them will trigger the pipeInput
once and hence commit once, i.e. the processing of the original one pipeInput
would cause two pipeInput
from the repartition topic, and hence commit twice, and flush twice.
While in the old KStreamTestDriver
, we do not commit from the repartition-topic piped record, hence only result in one flush.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that generally speaking the commit-on-every-pipeInput
of TopologyTestDriver is debatable, especially since we call pipeInput
recursively from repartition topics, which means each of the new / old records via the repartition topic would be triggering once. Will merge this PR still as-is and we can discuss if we want to change this behavior later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. That makes sense. Thanks!
Not sure if we want/need to change the behavior. Also, it would require a KIP imho, because people may have tests in place testing for the current behavior... Not sure if it's worth it.
"B:0+2 (ts: 0)", | ||
"A:0+1-1 (ts: 0)", | ||
"A:0+1-1+3 (ts: 0)", | ||
"B:0+2-2 (ts: 0)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
Why do we get more intermediate result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same explanation, I think.
@@ -125,23 +118,43 @@ public void testAggCoalesced() { | |||
final KTable<String, String> table2 = table1 | |||
.groupBy( | |||
MockMapper.noOpKeyValueMapper(), | |||
stringSerialzied) | |||
stringSerialized) | |||
.aggregate(MockInitializer.STRING_INIT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move this to next line
|
||
driver.pipeInput(recordFactory.create(topic1, "NULL", "5")); | ||
|
||
driver.pipeInput(recordFactory.create(topic1, "B", "7")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why do many blank lines??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, it was s/driver.flushState();//g
:) I'll remove the blank lines.
|
||
driver.pipeInput(recordFactory.create(input, "C", "yellow")); | ||
|
||
driver.pipeInput(recordFactory.create(input, "D", "green")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why do many blank lines?
…ls/KStreamTransformTest.java
…ls/KStreamTransformTest.java
…ls/KStreamTransformTest.java Co-Authored-By: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
Hey @bbejeck or @guozhangwang , I've rebased and taken @mjsax 's feedback. I think the only outstanding comment is #6732 (comment), but it seems out of scope for this PR (it's about TTD itself, not this change). I'm thinking we can just merge this and follow up on that question next week. WDYT? |
cc @RichardYuSTUG since he's also interested in this ticket. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment, otherwise LGTM.
@@ -275,53 +226,17 @@ public void testCountWithInternalStore() { | |||
final StreamsBuilder builder = new StreamsBuilder(); | |||
final String input = "count-test-input"; | |||
|
|||
final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we cannot continue to reuse a single MockProcessorSupplier
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I misread it. I thought they were all reusing the processor, somehow, which I thought was risky. Since it's just the supplier, I'll move it back.
// that in turn will cause an eviction on reducer-topic. It will flush | ||
// key 2 as it is the only dirty entry in the cache | ||
driver.process("tableOne", "1", "5"); | ||
assertEquals(Long.valueOf(4L), reduceResults.get("2")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@guozhangwang , Thanks for the review. I moved the supplier back to a field. |
The implementation of KIP-258 broke the state store methods in KStreamTestDriver. These methods were unused in this project, so the breakage was not detected. Since this is an internal testing utility, and it was deprecated and partially removed in favor of TopologyTestDriver, I opted to just complete the removal of the class. Reviewers: A. Sophie Blee-Goldman <ableegoldman@gmail.com>, Boyang Chen <boyang@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
The implementation of KIP-258 broke the state store methods in KStreamTestDriver.
These methods were unused in this project, so the breakage was not detected.
Since this is an internal testing utility, and it was deprecated and partially removed in
favor of TopologyTestDriver, I opted to just complete the removal of the class.
Committer Checklist (excluded from commit message)