Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void prepareEnvironment() throws InterruptedException {

@After
public void cleanup() throws InterruptedException {
CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
CLUSTER.deleteAllTopicsAndWait(120000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate why this fixed the test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100 percent sure what's the race condition here, and why it fixes the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be frank, I am not sure about the root-cause. Parameterized StreamStreamJoinIntegrationTest tests are passing for cacheEnabled=true and intermittently failing for parameter cacheEnabled=false.

I thought we may not be cleaning some intermediate topics/change-log/? topics between parameterized runs. So I tried deleting all topics including offsets topics. With this I am not able to reproduce the failures.

I may be totally wrong. I'll spend some more time to understand the tests and debug the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omkreddy Were you be able to reproduce the issue locally before this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am able to reproduce the failures on my machine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we are using the same application id twice in StreamStreamJoinIntegartionTest

STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");

This might be the root case -- deleting all topics would solve the issue, too, as it prevent to start with a corrupted state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seems that TableTableJoinIntegrationTest has a similar issue...

Copy link
Contributor Author

@omkreddy omkreddy Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.. Yes, I have observed failures with TableTableJoinIntegrationTest too. Shall I update the appId?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I can think of is that the changelog topic of the joining streams's materialized stores were not deleted, and hence upon starting on the next test case, the store would be pre-populated and hence cause join result to be wrong (note that the state dir is a @Rule which means it will get cleaned up on each test case, and in the next run it will not be the same directory path).

And the reason that it does not always fail is that we have to hit it that the two tests (with and without caching) consecutively.

So think with your fix in 2.0 and trunk, it is Okay to leave the appID as is for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just ran some more tests locally to confirm my suspicion.

}

private void checkResult(final String outputTopic, final List<String> expectedResult) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,24 @@ public void deleteTopicsAndWait(final long timeoutMs, final String... topics) th
}
}

/**
* Deletes all topics and blocks until all topics got deleted.
*
* @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
*/
public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException {
final Set<String> topics = new HashSet<>(JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
for (final String topic : topics) {
try {
brokers[0].deleteTopic(topic);
} catch (final UnknownTopicOrPartitionException e) { }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the exception be logged ?

}

if (timeoutMs > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should timeout be re-calculated considering the for loop above ?

TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
}
}

public void deleteAndRecreateTopics(final String... topics) throws InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
createTopics(topics);
Expand All @@ -295,6 +313,10 @@ private TopicsDeletedCondition(final String... topics) {
Collections.addAll(deletedTopics, topics);
}

public TopicsDeletedCondition(final Set<String> topics) {
deletedTopics.addAll(topics);
}

@Override
public boolean conditionMet() {
final Set<String> allTopics = new HashSet<>(
Expand Down