Skip to content

MINOR: Fix ConsumerBounceTest to wait for consumer close#3334

Closed
rajinisivaram wants to merge 1 commit intoapache:trunkfrom
rajinisivaram:MINOR-more-test-cleanup
Closed

MINOR: Fix ConsumerBounceTest to wait for consumer close#3334
rajinisivaram wants to merge 1 commit intoapache:trunkfrom
rajinisivaram:MINOR-more-test-cleanup

Conversation

@rajinisivaram
Copy link
Copy Markdown
Contributor

No description provided.

@rajinisivaram
Copy link
Copy Markdown
Contributor Author

@ijuma Can you take a look (and merge if it is ok), please? Thank you!

This is a fix for the failure in https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5268/

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1638)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1536)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1514)
	at kafka.api.IntegrationTestHarness.$anonfun$tearDown$2(IntegrationTestHarness.scala:99)
	at kafka.api.IntegrationTestHarness.$anonfun$tearDown$2$adapted(IntegrationTestHarness.scala:99)

@asfbot
Copy link
Copy Markdown

asfbot commented Jun 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5273/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jun 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5289/
Test PASSed (JDK 7 and Scala 2.11).

Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I think the actual issue here is that we don't wait for the executor's tasks to terminate in the tearDown before we try to close the consumers. If we do that, then we should not have the issue reported here.

One of the get calls you added still seems worthwhile. The other one, I am less sure about.

// consumer1 should leave group and close immediately even though rebalance is in progress
submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
future1.get
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? It seems like waitForRebalance would do the job, no?

// consumer2 should close immediately without LeaveGroup request since there are no brokers available
submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(0))
val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(0))
future2.get
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need a variable and we can just call get on the result?

consumers.foreach(_.close())
} finally {
super.tearDown()
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit tricky. Maybe we should just leave as it was. The issue is that close throwing an exception is a bug and if we do the try/finally, we may obscure the original exception if super.tearDown itself throws an exception.

@rajinisivaram
Copy link
Copy Markdown
Contributor Author

Closing this PR since the test was updated in commit 031da88.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants