Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4648: Improve test coverage StreamTask #2451

Closed
wants to merge 3 commits into from

Conversation

dguy
Copy link
Contributor

@dguy dguy commented Jan 26, 2017

Provide test coverage for exception paths in: schedule(), closeTopology(), and punctuate()

@dguy
Copy link
Contributor Author

dguy commented Jan 26, 2017

@dguy dguy changed the title KAFKA-4640: Improve test coverage StreamTask KAFKA-4648: Improve test coverage StreamTask Jan 26, 2017
@asfbot
Copy link

asfbot commented Jan 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1259/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1257/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1257/
Test PASSed (JDK 7 and Scala 2.10).

@enothereska
Copy link
Contributor

LGTM

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM

@Test(expected = IllegalStateException.class)
public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() throws Exception {
((ProcessorContextImpl) task.processorContext()).setCurrentNode(processor);
task.punctuate(processor, 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap with try-catch instead of expected annotation -- more than one line test.

@SuppressWarnings("unchecked")
@Test
public void shouldCloseAllProcessorNodesWhenExceptionsRaised() throws Exception {
final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer, intDeserializer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unify "create task" code with shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseTopology -- it's almost the same and both test cases can use the same topology structure.

@dguy
Copy link
Contributor Author

dguy commented Feb 7, 2017

resolved merge conflicts and addressed feedback. @guozhangwang can you please merge?

@asfbot
Copy link

asfbot commented Feb 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1534/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Feb 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1531/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Feb 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1531/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments, otherwise LGTM.

final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);

task = new StreamTask(taskId00, applicationId, partitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just initialize the task in setUp with the testCache so that we do not need to explicit re-initialize it here? If not we need to at least close the task before re-initialize it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs the topology that is constructed just above, so no.

@SuppressWarnings("unchecked")
@Test
public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseTopology() throws Exception {
task = createTaskThatThrowsExceptionOnClose();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close the task first before re-initialize it to another StreamTask? Ditto below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't really matter. It is not like the topology has created any state stores, but if it makes you feel better...

final List<ProcessorNode> processorNodes = Arrays.asList(processorNode, processor, source1, source2);
final Map<String, SourceNode> sourceNodes
= Collections.<String, SourceNode>singletonMap(topic1[0], processorNode);
final ProcessorTopology topology = new ProcessorTopology(processorNodes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reuse the class's own private topology here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - the topology is different.

@dguy
Copy link
Contributor Author

dguy commented Feb 8, 2017

updated

@asfbot
Copy link

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1554/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1551/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1551/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged to trunk.

@asfgit asfgit closed this in b621703 Feb 8, 2017
@dguy dguy deleted the kafka-4640 branch February 17, 2017 00:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants