-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6473: Add MockProcessorContext to public test-utils #4736
Conversation
TODO: I still need to write html docs for this, but I wanted to kick off the reviews of the implementation and test. |
\cc @bbejeck |
Ah, thanks for adding reviewers @mjsax . The tests failed check-style because I have single-line methods. I'll fix it tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of initial comments. Some can be applied multiple times, but I did not mark all places in the code to avoid review noise.
|
||
/** | ||
* This is a mock of {@link ProcessorContext} provided for authors of {@link Processor}, | ||
* {@link org.apache.kafka.streams.kstream.Transformer}, and {@link org.apache.kafka.streams.kstream.ValueTransformer}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: either import those cases or change to {@link org.apache.kafka.streams.kstream.Transformer Transformer}
to get rid of the long package name in the rendered JavaDocs.
Applies multiple times.
* {@link org.apache.kafka.streams.kstream.Transformer}, and {@link org.apache.kafka.streams.kstream.ValueTransformer}. | ||
* <p> | ||
* The tests for this class ({@link org.apache.kafka.streams.MockProcessorContextTest}) include several behavioral | ||
* tests that serve as example usage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would omit this (not the test, just the sentence) and put examples into the web docs.
* <p> | ||
* Note that this class does not take any automated actions (such as firing scheduled punctuators). | ||
* It simply captures any data it witnessess. | ||
* If you require more automated tests, we recommend wrapping your {@link Processor} in a dummy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dummy -> minimal source-processor-sink ?
* It simply captures any data it witnessess. | ||
* If you require more automated tests, we recommend wrapping your {@link Processor} in a dummy | ||
* {@link org.apache.kafka.streams.Topology} and using the {@link org.apache.kafka.streams.TopologyTestDriver}. | ||
* <p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no new paragraph required
@SuppressWarnings("JavadocReference") | ||
@InterfaceStability.Evolving | ||
public class MockProcessorContext implements ProcessorContext { | ||
// Immutable fields ================================================ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those are all final
-- comment can be omitted.
} | ||
|
||
@Override public <FK, FV> void forward(final FK key, final FV value, final int childIndex) { | ||
captured.add(new Capture(childIndex, castKV(key, value))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was deprecated recently. Should we throw similar to schedule()
?
|
||
@Override | ||
public <FK, FV> void forward(final FK key, final FV value, final String childName) { | ||
captured.add(new Capture(childName, castKV(key, value))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was deprecated recently. Should we throw similar to schedule()?
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private <FK, FV> KeyValue castKV(final FK key, final FV value) { return new KeyValue(key, value);} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this add much value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, haha. It did when I was actually casting ;) I'm removing it.
* {@link StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG} will set the {@link ProcessorContext#valueSerde()} value. | ||
* All properties will be available via {@link ProcessorContext#appConfigs()} and {@link ProcessorContext#appConfigsWithPrefix(String)}. | ||
*/ | ||
public MockProcessorContext(final Properties config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could add a constructor with zero parameters, too?
/** | ||
* Behavioral test demonstrating the use of the context for capturing forwarded values | ||
*/ | ||
@Test public void testForward() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename test -> shouldCaptureOutputRecords
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The jenkins are failing due to some findBugs warning. You can run ./gradlew findbugsMain findbugsTest -x test
to see the details.
import java.util.Properties; | ||
|
||
/** | ||
* This is a mock of {@link ProcessorContext} provided for authors of {@link Processor}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe for users to test their Processor, Transformer, and ValueTransformer implementations
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that better.
* Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}. | ||
* Most unit tests using this mock won't need to know the taskId, | ||
* and most unit tests should be able to get by with the | ||
* {@link InMemoryKeyValueStore}, so the stateDir won't matter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest when editing on https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html
state clearly how to switch from an in-memory store to a persistent store when move from unit test code to production code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. Maybe I'll include a link to that page in the javadoc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This link might break in the future. Do you really want to put it in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's nice to link to it from here. We can add the stablest link we're aware of (I'm not sure what /11/
is for). If it does break, it will at least clue the reader in that there is more documentation out there, and they can search until they find it.
I have had this exact thing happen to me before, and found useful information that way.
But I understand it's also setting us up to have a broken link in our docs (since there's no process in place to verify it resolves), which doesn't make us look great if/when it breaks.
I don't feel strongly either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the following problem: either we link to a stable version like 11 or the "latest" version. However, for the first case, we need to update the link each release (and I am pretty sure we will forget at some point and thus link to the wrong version). For the second case, the link "changes" dynamically, and will also link to the wrong version. Thus, I would prefer to not put links to the docs into the JavaDoc. It's a paint maintain them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I've removed it.
public List<KeyValue> forwarded(final String childName) { | ||
final LinkedList<KeyValue> result = new LinkedList<>(); | ||
for (final CapturedForward capture : capturedForwards) { | ||
if (capture.childName != null && capture.childName.equals(childName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. is this correct? If forward(kv)
is called without childName or childIndex, it means sending to all children. So should this be capture.childName == null || ...
? Ditto above in line 414.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, yeah, that makes more sense. I was thinking in terms of reflecting exactly what calls to forward(...)
there were, but it's probably more useful to mimic the regular forwarding logic.
@Override | ||
public void process(final String key, final Long value) { | ||
final To child = count % 2 == 0 ? To.child("george") : To.child("pete"); | ||
this.context().forward(key + value, key.length() + value, child); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add some data that are forward(key + value)
directly, and verify they are also returned below in forwarded
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you mean adding something like:
this.context().forward(key, value);
and verify it gets forwarded to all children?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup.
this.kv = kv; | ||
} | ||
|
||
private CapturedForward(final Integer childIndex, final KeyValue kv) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are already deprecating the API with childIndex, should we simply throw an unsupported exception in the deprecate function and remove this field here as well? WDOT?
final Iterator<KeyValue> forwarded = context.forwarded().iterator(); | ||
assertEquals(forwarded.next(), new KeyValue<>("foo5", 8L)); | ||
assertEquals(forwarded.next(), new KeyValue<>("barbaz50", 56L)); | ||
Assert.assertFalse(forwarded.hasNext()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this testing forwarding to child nodes by name and we have this test above, do we need to have it again here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't strictly need it, but these tests are partially intended to demonstrate example usage, so I wrote it in "scenario style".
assertEquals(forwarded.next(), new KeyValue<>("foo5", 8L)); | ||
assertEquals(forwarded.next(), new KeyValue<>("barbaz50", 56L)); | ||
Assert.assertFalse(forwarded.hasNext()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
Ok, I think I've addressed all the concerns so far. I've started on the documentation, but I'm calling it a night for now. |
LGTM! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch @vvcephei! LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits.
<div class="pagination"> | ||
<a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a> | ||
<a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a> | ||
</div> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice rework of this part. Where is the new section about the MockProcessorContext
? Did I miss it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I hadn't added it yet. It's pushed now. I decided to put it alongside the PAPI docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this. To use the MockProcessorContext
users need to add the dependency -- this is explained in the "Testing" section and we should not explain it twice. If MockProcesorContext
is in PAPI section, people might be confused as the class is not available to them by default. It would be useful to link to the "Testing" section from PAPI guide though.
Not sure what others think. \cc @bbejeck @guozhangwang @dguy @miguno
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, as discussed "above", there is a link, although I'd still appreciate others' thoughts about the structure of the docs anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the docs, I think we concentrate most of the contents in the documentation/streams/developer-guide/testing
and in all other sections like PAPI, we can just add a note and refer to this link when necessary
} | ||
} | ||
|
||
private final List<CapturedPunctuator> punctuators = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move to top of class ?
public static class CapturedForward { | ||
/*Nullable*/ private final String childName; | ||
private final long timestamp; | ||
private final KeyValue kv; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: kv
-> keyValue
(thought the whole class) -- IMHO, we should avoid abbreviations to improved code readability
} | ||
} | ||
|
||
private List<CapturedForward> capturedForwards = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: top of class
|
||
private List<CapturedForward> capturedForwards = new LinkedList<>(); | ||
|
||
private boolean committed = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: top of class
final Iterator<CapturedForward> forwarded = context.forwarded().iterator(); | ||
assertEquals(forwarded.next().kv(), new KeyValue<>("foo5", 8L)); | ||
assertEquals(forwarded.next().kv(), new KeyValue<>("barbaz50", 56L)); | ||
Assert.assertFalse(forwarded.hasNext()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use static imports to get rid of Assert.
final Iterator<CapturedForward> forwarded = context.forwarded().iterator(); | ||
assertEquals(forwarded.next().kv(), new KeyValue<>("foo5", 8L)); | ||
assertEquals(forwarded.next().kv(), new KeyValue<>("barbaz50", 56L)); | ||
Assert.assertFalse(forwarded.hasNext()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use static imports to get rid of Assert.
public void shouldCaptureApplicationAndRecordMetadata() { | ||
final Properties config = new Properties(); | ||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata"); | ||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove -- not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't; it's required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point out where it would fail? Unclear to me atm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I commented on the wrong usage. See my stacktraces comment on the MockProcessorContext constructor.
private final StreamsConfig config; | ||
private final File stateDir; // default: null | ||
|
||
// settable record metadata ================================================ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explain why Integer
, Long
is used instead of int
, long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so that they throw an exception when gotten before setting, which is always a test bug since the real context always sets them first. Otherwise, they'll return -1, leading to longer test debugging cycles.
|
||
assertEquals(context.applicationId(), "testFullConstructor"); | ||
assertEquals(context.taskId(), new TaskId(1, 1)); | ||
assertEquals(context.taskId(), new TaskId(1, 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opsh!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple more nits on the docs.
@@ -40,6 +40,7 @@ <h1>Developer Guide for Kafka Streams</h1> | |||
<div class="toctree-wrapper compound"> | |||
<ul> | |||
<li class="toctree-l1"><a class="reference internal" href="write-streams.html">Writing a Streams Application</a></li> | |||
<li class="toctree-l1"><a class="reference internal" href="testing.html">Testing a Streams Application</a></li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you add this here? It's already linked in L48
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you see this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! My bad. I overlooked it and thought it was missing. Sorry.
processorUnderTest.process("key", "value"); | ||
|
||
final Iterator<CapturedForward> forwarded = context.forwarded().iterator(); | ||
assertEquals(forwarded.next().kv(), new KeyValue<>(..., ...)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<>
-> lt;>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
huh. That was an oversight on my part, but it actually renders correctly, and IDEA didn't warn about it. Maybe since <>
isn't valid HTML, it just interprets it as text...
Anyway, I'm escaping it now.
<div class="pagination"> | ||
<a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a> | ||
<a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a> | ||
</div> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this. To use the MockProcessorContext
users need to add the dependency -- this is explained in the "Testing" section and we should not explain it twice. If MockProcesorContext
is in PAPI section, people might be confused as the class is not available to them by default. It would be useful to link to the "Testing" section from PAPI guide though.
Not sure what others think. \cc @bbejeck @guozhangwang @dguy @miguno
For this reason, we provide a <code>MockProcessorContext</code> in our <code>test-utils</code>module. | ||
</p> | ||
<p> | ||
Instructions on importing the <code>test-utils</code> module are <a href="testing.html">here</a>. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, you just mention it here. Might be fine. Still not sure if it might be better to put both into "Testing" section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's where I started to put it, but after some writing and thinking about it, I think it's better to organize it by theme, with liberal hyperlinks. By theme, I mean: "I am using the Streams DSL, and now I need to test it" or "I am using the PAPI, and now I need to test it", versus " I need to write some tests".
In other words, while the boundaries are obviously blurry, I think your PAPI user is a different persona from your DSL user. It's better to structure documentation around personas, to keep users on task while they're learning the system. I think the "write a streams app" page is long enough to justify splitting testing into a separate page, but it should probably link to it at the bottom, like "Next, you'll want to test your topology: ".
That said, I do think we are missing some links. I'd propose to add:
- The testing page should mention the MockProcessorContext and link here.
- The "write a streams app" page should probably link to the testing page.
One final justification: the thing that got me thinking about organizing the docs this way is that unit testing a Processor is literally not "testing a streams app", since a processor is not an app, so it felt weird to put it under that heading.
As with all things, I'm open to adopting your suggestion, but that was my rationale...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words, while the boundaries are obviously blurry, I think your PAPI user is a different persona from your DSL user.
Agreed.
Adding more cross links is always good, IMHO. Let's see what others think.
<p> | ||
To begin with, instantiate your processor and initialize it with the mock context: | ||
<pre> | ||
final Processor processorUnderTest = ...; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove final
? It's out internal code style but is not required here IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a question of style either way. I'd prefer to set a good example.
processorUnderTest.init(context); | ||
</pre> | ||
If you need to pass configuration to your processor or set the default serdes, you can create the mock with | ||
config: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wrap line ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't wrap in the rendered text, and this keeps the html lines shorter and therefore more readable for us.
// you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios. | ||
context.resetForwards(); | ||
|
||
assertEquals(context.forwarded().size(), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guess we can omit this. It's a test of resetForward
that people would not put into their code when testing their own processor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I was trying to plant the seed: "ah, I can reset, and then the list is empty, so I can probably process more stuff and verify more forwards...". That was the spirit of the behavioral tests. But if you think: 1) you'd rather not have behavioral tests, 2) people won't read the tests to learn how to use it, or 3) the behavioral test doesn't express what I was trying to express, I'm happy to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough.
// commit captures can also be reset. | ||
context.resetCommit(); | ||
|
||
assertFalse(context.committed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above; can be removed IMHO
Thanks for the thorough review, @mjsax ! I've addressed your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Some more follow up questions.
@@ -40,6 +40,7 @@ <h1>Developer Guide for Kafka Streams</h1> | |||
<div class="toctree-wrapper compound"> | |||
<ul> | |||
<li class="toctree-l1"><a class="reference internal" href="write-streams.html">Writing a Streams Application</a></li> | |||
<li class="toctree-l1"><a class="reference internal" href="testing.html">Testing a Streams Application</a></li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you see this comment?
processorUnderTest.process("key", "value"); | ||
|
||
final Iterator<CapturedForward> forwarded = context.forwarded().iterator(); | ||
assertEquals(forwarded.next().keyValue(), new KeyValue<>(..., ...)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<>
must be updated to <>
AFAIK.
private String topic; | ||
private Integer partition; | ||
private Long offset; | ||
private Long timestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand why. But other contributors might not (and I might forget why in the future and want to change it...). It's not obvious from the code and thus should be explained with a commend, IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, I could check for null in the getters and throw an exception explaining that the fields must be initialized before use. This would document the situation for us, as well as explain it more clearly for users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Would be helpful for users to understand. The exception message should explain what they need to do to avoid the exception.
new Properties() { | ||
{ | ||
put(StreamsConfig.APPLICATION_ID_CONFIG, ""); | ||
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does it fail if we don't specify them? In KS they are required, because we pass the configs into a StreamsConfig
-- but we don't use StreamsConfig
(if I did not miss this) thus I think we can simplify the code here and just pass in empty Properties
|
||
public class MockProcessorContextTest { | ||
/** | ||
* Behavioral test demonstrating the use of the context for capturing forwarded values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a personal preference. We can also leave it for this case -- in general we might not want to put JavaDocs into tests.
public void shouldCaptureApplicationAndRecordMetadata() { | ||
final Properties config = new Properties(); | ||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata"); | ||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point out where it would fail? Unclear to me atm.
Ok, @mjsax I've either responded to your latest comments or made changes. Please let me know what you think now. I think the question about the structure of the docs is particularly important. I explained my thoughts here: #4736 (comment) . I've taken the liberty of adding the links I proposed. If we don't like what I did, I can take them back out. As I've mentioned elsewhere, I think one of the bis strengths of this project is ease-of-use, especially for folks getting off the ground, so it's super important for the docs to cater to them. Not saying that the approach I've taken is better by this metric, just affirming that that's what I'm going for. I'm super open to more discussion about the best way to document this feature. |
Hmm... I think I'll also add a test to the WordCountProcessorDemo... |
WordCountProcessorDemo -> maybe different PR (it's easier to review short PRs and the other class is not related to this work) PR looks good overall. Let's figure out the doc question, and I'll do one more pass after we decided on this. |
Ah, I added the example before I saw your last comment @mjsax . I recognize it's easier to review smaller PRs. FWIW, writing that example led me down a different code path and exposed a shortcoming of my implementation. Namely, if folks register a state store with logging enabled, the context needs to supply a RecordCollector. I opted to supply a no-op collector. An alternative I considered: throw an exception explaining they should disable logging in state stores for testing. I think no-op will offer a better experience. |
retest this please |
Made another pass over the PR. As for the docs, I'd prefer we concentrate on the testing section and let other sections just refer to it.
That is a good question. And we have actually seen people encountering this in their tests. I think it is valuable to test that changelog records are successfully sent, and hence I'd suggest we add an internal mock record collector (we do not need to expose it in the public API), which will be returned in |
@guozhangwang Thanks for the review. I'm not opposed to adding it, but just to play devil's advocate for a sec... As written the MockProcessorContext is for testing Processors, but a mocked RecordCollector would be for testing the state store, no? I believe it's an important distinction, since as written the mock is not fully capable of managing all kinds of state stores, since it doesn't wire in a StateManager. We decided that processor implementations should be agnostic to the state store implementation, so it should be sufficient for testing to provide an in-memory store. If anything, I think this is an argument to throw WDYT? Have I missed the point? |
I see your point about the mock record collector now, good point. To rephrase it: We should claim to users that MockProcessorContext is only for testing your I think this is a better philosophy, and for that I think it's appropriate to throw UnsupportedException indicating that this function should never be called. |
@mjsax : I'm going to move the MPC docs into the testing document and link to it from the PAPI doc. I've just pushed a changeset containing the exception instead of a no-op recordCollector. |
Sorry for the late response, but I'm in favor of moving the MPC into testing docs and linking to it from PAPI docs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nits.
LGMT.
|
||
@Override | ||
public String topic() { | ||
if (topic == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Add { }
to block (we always use them). Same below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, auto-format must have stripped them at some point. I'll check my settings.
processor.process("barbaz", 50L); | ||
|
||
final Iterator<CapturedForward> forwarded = context.forwarded().iterator(); | ||
assertEquals(forwarded.next().keyValue(), new KeyValue<>("foo5", 8L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertX
has expected value as first parameter -- we should switch both to avoid confusing error messages.
Applied to whole class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gah! I will switch them all. :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry about that.
retest this please |
* add source dependency on test-utils * discovered a likely scenario in which test code would need MPC to implement RecordCollector.Supplier, so added that.
resolved those last issues and rebased. I'll make sure the tests pass and then ping for final reviews. |
Hey @guozhangwang & @dguy , I think we're ready to merge this. Do you all mind taking a look? |
Merged to |
We are adding a public testing utility to make it easier to unit test Processor implementations.
See KIP-267 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils).
The testing for this change is in this commit. There are behavioral and unit tests.
Committer Checklist (excluded from commit message)