-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18588][Tests]Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite #16109
Conversation
@@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource( | |||
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt | |||
|
|||
private val offsetFetchAttemptIntervalMs = | |||
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong | |||
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "100").toLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increase this value because if not sleeping enough time, it's easy to fail with the same error when deleting a topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 100ms enough or should this be increase to 1 second?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increased the default value to 1 second.
private def verifyTopicDeletion( | ||
zkUtils: ZkUtils, | ||
topic: String, | ||
numPartitions: Int, | ||
servers: Seq[KafkaServer]) { | ||
import ZkUtils._ | ||
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) | ||
def isDeleted(): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed this method to assertTopicDeleted
and moved out of this method. I also rewrote it to make it tell us which line fails.
@@ -331,7 +351,7 @@ class KafkaTestUtils extends Logging { | |||
case _ => | |||
false | |||
} | |||
eventually(timeout(10.seconds)) { | |||
eventually(timeout(60.seconds)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increase this timeout as it seems 10 seconds
is not enough sometimes.
Test build #3451 has started for PR 16109 at commit |
Test build #3452 has started for PR 16109 at commit |
Test build #3453 has finished for PR 16109 at commit
|
Test build #3450 has finished for PR 16109 at commit
|
Test build #69508 has finished for PR 16109 at commit
|
Test build #3459 has finished for PR 16109 at commit
|
Test build #3457 has finished for PR 16109 at commit
|
Test build #3456 has finished for PR 16109 at commit
|
Test build #3458 has finished for PR 16109 at commit
|
Test build #3460 has finished for PR 16109 at commit
|
Not related to this PR.
Not related to this PR.
Looks like the retry interval is no enough. Found the following logs:
I will increase the interval. |
Test build #3461 has finished for PR 16109 at commit
|
Test build #69598 has finished for PR 16109 at commit
|
Test build #3463 has finished for PR 16109 at commit
|
Test build #3464 has finished for PR 16109 at commit
|
Test build #3462 has finished for PR 16109 at commit
|
The NPE without stack trace is probably because http://jawspeak.com/2010/05/26/hotspot-caused-exceptions-to-lose-their-stack-traces-in-production-and-the-fix/ |
} | ||
} | ||
} else if (record.offset < offset) { | ||
// This should not happen. If it does happen, then we probably misunderstand Kafka internal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert(record.offset == offset, | ||
s"The fetched data has a different offset: expected $offset but was ${record.offset}") | ||
record | ||
// In general, Kafka uses the specified offset as the start point, and tries to fetch the next |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test build #69746 has finished for PR 16109 at commit
|
retest this please |
!zkUtils.getAllTopics().contains(topic), | ||
s"topic $topic still exists on zookeeper") | ||
} | ||
|
||
private def verifyTopicDeletion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its hard to differentiate the semantics of verifyTopicDeletion
and assertTopicDeleted
. How about renamed them to verifyTopicDeletionWithRetries
and verifyTopicDeletion
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
override def createSparkSession(): TestSparkSession = { | ||
// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic | ||
new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", | ||
sparkConf.set("spark.sql.testkey", "true"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this key for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only used on SQLConfSuite. Removed it from this test.
Test build #69752 has finished for PR 16109 at commit
|
overall LGTM, just a few nits that would be good to fix. |
Test build #69784 has finished for PR 16109 at commit
|
Test build #69786 has finished for PR 16109 at commit
|
LGTM. Merging this to master and 2.1 |
…taLossSuite ## What changes were proposed in this pull request? Fixed the following failures: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.0000790851666665 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout. ``` ``` sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146) Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null at java.util.ArrayList.addAll(ArrayList.java:577) at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257) at org.apache.kafka.clients.Metadata.update(Metadata.java:177) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at ... ``` ## How was this patch tested? Tested in #16048 by running many times. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16109 from zsxwing/fix-kafka-flaky-test. (cherry picked from commit edc87e1) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
…taLossSuite ## What changes were proposed in this pull request? Fixed the following failures: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.0000790851666665 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout. ``` ``` sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146) Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null at java.util.ArrayList.addAll(ArrayList.java:577) at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257) at org.apache.kafka.clients.Metadata.update(Metadata.java:177) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at ... ``` ## How was this patch tested? Tested in apache#16048 by running many times. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#16109 from zsxwing/fix-kafka-flaky-test.
…taLossSuite ## What changes were proposed in this pull request? Fixed the following failures: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.0000790851666665 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout. ``` ``` sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146) Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null at java.util.ArrayList.addAll(ArrayList.java:577) at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257) at org.apache.kafka.clients.Metadata.update(Metadata.java:177) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at ... ``` ## How was this patch tested? Tested in apache#16048 by running many times. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#16109 from zsxwing/fix-kafka-flaky-test.
What changes were proposed in this pull request?
Fixed the following failures:
How was this patch tested?
Tested in #16048 by running many times.