Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2] #4986

Merged
merged 3 commits into from
Jun 13, 2018

Conversation

h314to
Copy link
Contributor

@h314to h314to commented May 9, 2018

This PR is a further step towards the complete replacement of KStreamTestDriver with TopologyTestDriver.

  • Add task, processorTopology, and globalTopology access to TopologyTestDriverAccessor
  • Add condition to prevent NPE in ProcessorContextImpl
  • Refactor:
    • KTableFilterTest
    • KTableSourceTest
    • KTableMapValuesTest
    • KTableImplTest.

edit: To simplify the review process some straightforward changes were moved to another PR.

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.assertEquals;

public class KTableAggregateTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of tests in this class fail with TopologyTestDriver. My guess is that this is due to some problem with caching. Using TopologyTestDriver, regardless of the value I set for StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, I get the same results out of MockProcessor.processed as if the cache was 0. In fact, setting the KStreamTestDriver cache to zero makes it yield the exact same results as TopologyTestDriver. I'll keep looking into it. If you have any tip about what might be going wrong please let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure, but seems to be a bug in TopologyTestDriver related to ThreadCache -- KStreamTestDriver creates a cache and uses it as member variable and passes it into store.init(...) -- the new TopologyTestDriver also creates a cache and passes it into GlobalProcessorContextImpl and the created StreamTask -- maybe, internally the cache is not forwarded correctly such that it is not used when stores are initialized.

I would need to do a deeper analysis why the cache is not forwarded to the stores correctly. Hope the pointes help. If not sufficient, please let us know and we can dig deeper into it from our side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tips! I'll look into it a bit longer, to see if I can find out what's wrong. If I hit a dead end I'll let you know.

@h314to
Copy link
Contributor Author

h314to commented May 10, 2018

Rebased on current trunk to fix merge conficts.

* @param builder builder for the topology to be tested
* @param config the configuration for the topology
*/
TopologyTestDriver(final InternalTopologyBuilder builder,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this constructor after rebasing, since recent commits made it no longer necessary.

@vvcephei
Copy link
Contributor

Hey @h314to , Thanks for this latest PR! I'll take a look as soon as I have a chance.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! Couple of comments.

One more thing: parts of the changes seem to be straight forward -- can we extract them into a separate PR (again ;)) to make reviewing easier and reduce turn around time? The extracted PR could get merged quite quickly I believe.

Thanks a lot!

super(topology, config, initialWallClockTimeMs);
}

/** Get the processor context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move to next line ("weird" formatting)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is wierd. Changed.

return context;
}

/** Identify the source node for a given topic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @return the processor context
*/
public ProcessorContext getProcessorContext(final String topicName) {
ProcessorContext context = task.context();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

return names;
}

/** Get a processor by name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.assertEquals;

public class KTableAggregateTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure, but seems to be a bug in TopologyTestDriver related to ThreadCache -- KStreamTestDriver creates a cache and uses it as member variable and passes it into store.init(...) -- the new TopologyTestDriver also creates a cache and passes it into GlobalProcessorContextImpl and the created StreamTask -- maybe, internally the cache is not forwarded correctly such that it is not used when stores are initialized.

I would need to do a deeper analysis why the cache is not forwarded to the stores correctly. Hope the pointes help. If not sufficient, please let us know and we can dig deeper into it from our side.


// two state store should be created
assertEquals(2, driver.allStateStores().size());
// two state store should be created
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix typo store[s] ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) fixed.

assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
// contains the corresponding repartition source / sink nodes
assertTrue(driver.getAllProcessorNames().contains("KSTREAM-SINK-0000000003"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should rewrite this using Topology#describe() instead of hooking into the test driver?

Actually same thought about driver.getAllStateStores() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. allProcessorNames is only used in this test, and its functionally is easily replicated by using Topology#describe().

In the case of allStateStores, it provides a bit more functionality than allProcessorNames since it returns a Map<String, StateStore>. It is also part of the public API. Getting all processor names could be done using TopologyWrapper to access the InternalTopologyBuilder, so it could in principle be removed. However, it could be useful for end users' testing, which do not have access to TopologyWrapper, so it might be nice to keep it around.


processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering, if we should get rid of MockProcessor and update the test setup to pipe the result into a topic instead. Than we would use OutputVerifier instead of dealing with all those concatenated Strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds way cleaner. If it's ok I would like to implement that in a subsequent PR, to avoid cramming even more changes into this one.

Copy link
Contributor Author

@h314to h314to left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! I've made the changes you recommended. I'm also splitting the straightforward changes (those which for which all the tests in a class do not require using TopologyTestDriverWrapper) into a new PR.

super(topology, config, initialWallClockTimeMs);
}

/** Get the processor context
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is wierd. Changed.

return context;
}

/** Identify the source node for a given topic
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return names;
}

/** Get a processor by name
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.assertEquals;

public class KTableAggregateTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tips! I'll look into it a bit longer, to see if I can find out what's wrong. If I hit a dead end I'll let you know.


// two state store should be created
assertEquals(2, driver.allStateStores().size());
// two state store should be created
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) fixed.

assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
// contains the corresponding repartition source / sink nodes
assertTrue(driver.getAllProcessorNames().contains("KSTREAM-SINK-0000000003"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. allProcessorNames is only used in this test, and its functionally is easily replicated by using Topology#describe().

In the case of allStateStores, it provides a bit more functionality than allProcessorNames since it returns a Map<String, StateStore>. It is also part of the public API. Getting all processor names could be done using TopologyWrapper to access the InternalTopologyBuilder, so it could in principle be removed. However, it could be useful for end users' testing, which do not have access to TopologyWrapper, so it might be nice to keep it around.


processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds way cleaner. If it's ok I would like to implement that in a subsequent PR, to avoid cramming even more changes into this one.

@mjsax
Copy link
Member

mjsax commented May 21, 2018

Meta comment: as it is easier to review smaller PRs, it might be worth the exclude some classes from this PR and tackle them individually via multiple PRs:

  • KTableAggregateTest to figure out the caching issue
  • don't introduce allProcessorNames() but use Topology#describe() -- exclude all classes that use allProcessorNames() and don't introduce allProcessorNames() in the first place
  • MockProcessor and OutputVerifier (this might be a follow up OR as the classes seem to overlap with second bullet point).

Whatever works best :) Let us know.

@h314to
Copy link
Contributor Author

h314to commented May 22, 2018

Yes, smaller PR are simpler to review. In order to reduce the size of this one I reverted the changes to the following classes, which will be tackled in subsequent PRs:

  • Reverted changes to KTableAggregateTest.
  • allProcessorNames is gone with my previous commit and there is no further need for it
  • Reverted changes to KTableKTableInnerJoinTest, KTableKTableOuterJoinTest, and KTableKTableLeftJoinTest. These are the ones MockProcessor is used more extensively, and thus, the ones which would benefit the most from being rewritten with OutputVerifier.

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes look right to me. Just one question...

@@ -71,7 +71,7 @@ public StateStore getStateStore(final String name) {
return global;
}

if (!currentNode().stateStores.contains(name)) {
if (currentNode.stateStores != null && !currentNode().stateStores.contains(name)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this case a little confusing. currentNode().stateStores == null would seem to imply also that we don't have access to the store, or any store for that matter. Why do we allow this case to pass though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a bit messy. I'm going to change the way we get the processor context, and then we won't need this. Since there are already a few conflicts I'm also going to squash and rebase on current trunk.

@h314to
Copy link
Contributor Author

h314to commented May 23, 2018

I rebased on trunk and squashed. New changes are in a separate commit to ease review. I simplified the way the current node is set in getProcessorContext which allowed me to tidy up TopologyTestDriverWrapper a bit.

I also needed to fix a few calls to ConsumerRecordFactory#create because the recently added record header support (KAFKA-6850) made calls with null value ambiguous.

* Refactor:
  -KTableFilterTest.java
  -KTableImplTest.java
  -KTableMapValuesTest.java
  -KTableSourceTest.java

* Add access to task, processorTopology, and globalTopology in TopologyTestDriver via TopologyTestDriverWrapper
* Remove unnecessary constructor in TopologyTestDriver
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Couple of minor comments.

/**
* Get the processor context
*
* @param processorName used to search for a processor connected to this StateStore, which is set as current node
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't understand the comment. What StateStore does it refer to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is because this function is only for init the getters, which requires the state stores is connected, hence accessible in init to the node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's it. Thanks for clearing it up @guozhangwang . I tried to make it clearer while addressing your other comments.


// two state store should be created
assertEquals(2, driver.allStateStores().size());
// two state stores should be created
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove comment -- clear from the code itself (my believe is, if a test needs comments you need to rewrite the test :) )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done

// two state stores should be created
assertEquals(2, driver.allStateStores().size());
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
// two state stores should be created
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: as above; remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

driver.setUp(builder, stateDir, null, null);
driver.setTime(0L);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
// two state stores should be created
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once more

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}

private TopologyDescription.Node getProcessor(final Topology topology, final String processorName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used once -- would suggest to inline into assertTopologyContainsProcessor.

Additionally, the check for assertNotNull is not self-expressive. As the return type of assertTopologyContainsProcessor is void I would suggest to just call return (instead of return node;) if the node was found and replace return null;/assertNotNull with throw new AssertionError(...)

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes it much clearer. I changed it.


// three state store should be created, one for source, one for aggregate and one for reduce
assertEquals(3, driver.allStateStores().size());
// three state stores should be created, one for source, one for aggregate and one for reduce
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
// contains the corresponding repartition source / sink nodes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove and rename test to shouldCreateSourceAndSinkNodesForRepartitioningTopic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @h314to I have two meta comments regarding the newly added TopologyTestDriverWrapper:

The getProcessorContext function (I think its name should be setCurrentNodeForProcessorContext to be more accurate) is only used for initializing the value getters in our unit tests. I think we should not follow this pattern in unit test, i.e. enforcing ourselves to remember the processor name that creates the state store and set the processor node when calling init. Instead, in our internal code we call internalTopologyBuilder.connectProcessorAndStateStores to make sure the referenced node can also access to that store in its value getter. We could follow this pattern instead. I left a detailed comment as an example in KTableFilterTest.

Please let us know once you addressed @mjsax and my comments so that we can try complete removing the KStreamBuilder before 2.0

}
}

private TopologyDescription.Node getProcessor(final Topology topology, final String processorName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from me.

KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();

driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), props)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should connect the state stores with this request processor instead of enforcing us to remember which processor to set while initializing. E.g. here we can do (note I removed the procNames):

private void doTestValueGetter(final StreamsBuilder builder,
                                   final KTableImpl<String, Integer, Integer> table2,
                                   final KTableImpl<String, Integer, Integer> table3,
                                   final String topic1) {

        final Topology topology = builder.build();

        KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
        KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
        InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
        topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());

        try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) {

            KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
            KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();

            getter2.init(driver.getProcessorContext(table2.name));
            getter3.init(driver.getProcessorContext(table3.name));

            // same below

Ditto elsewhere for initializing the getters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way is so much better. I wasn't happy with my implementation, but couldn't find a cleaner way to do it. Thank you!

* @param processorName used to search for a processor connected to this StateStore, which is set as current node
* @return the processor context
*/
public ProcessorContext getProcessorContext(final String processorName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better name as "setCurrentNodeInProcessorContext"? And then in java docs mention that it returns the processor context with current node set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Changed the name and cleaned up the docs a bit.

/**
* Get the processor context
*
* @param processorName used to search for a processor connected to this StateStore, which is set as current node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is because this function is only for init the getters, which requires the state stores is connected, hence accessible in init to the node.

* @param name the name to search for
* @return the processor matching the search name
*/
public ProcessorNode getProcessor(final String name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check globalTopology as well before give up and throw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should. Added the global topology check.

 * Inline getProcessor in assertTopologyContainsProcessor in KTableImplTest
 * Rename KTableImplTest#testRepartition to shouldCreateSourceAndSinkNodesForRepartitioningTopic
 * Rename TopologyTestDriverWrapper#getProcessorContext to setCurrentNodeForProcessorContext
 * Search for processor in global topology before throwing
 * Change getters init in KTableFilterTest, KTableImplTest, KTableMapValuesTest, KTableSourceTest
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Thanks for the PR @h314to!

@guozhangwang guozhangwang merged commit de4f4f5 into apache:trunk Jun 13, 2018
guozhangwang pushed a commit that referenced this pull request Jun 13, 2018
…2] (#4986)

* KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2]

* Refactor:
  -KTableFilterTest.java
  -KTableImplTest.java
  -KTableMapValuesTest.java
  -KTableSourceTest.java

* Add access to task, processorTopology, and globalTopology in TopologyTestDriver via TopologyTestDriverWrapper
* Remove unnecessary constructor in TopologyTestDriver

* Change how TopologyTestDriverWrapper#getProcessorContext sets the current node

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Contributor

Merged to trunk, and cherry-picked to 2.0.

@guozhangwang
Copy link
Contributor

We still have a bunch test classes that uses KStreamTestDriver. Since the 2.0 code freeze is today I think the final PR for completely getting rid of them will only go into trunk then.

@h314to Please ping us once you have the final PR to remove KStreamTestDriver ready.

@h314to h314to deleted the fix/KAFKA-6474-part2 branch June 15, 2018 13:41
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
…2] (apache#4986)

* KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2]

* Refactor:
  -KTableFilterTest.java
  -KTableImplTest.java
  -KTableMapValuesTest.java
  -KTableSourceTest.java

* Add access to task, processorTopology, and globalTopology in TopologyTestDriver via TopologyTestDriverWrapper
* Remove unnecessary constructor in TopologyTestDriver

* Change how TopologyTestDriverWrapper#getProcessorContext sets the current node

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants