-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[Hot Fix #469] Fix flaky test in SparkListenerSuite #516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. All automated tests passed. |
|
All automated tests passed. |
|
Thanks. I've merged this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realize that waitUntilEmpty is not enough. In waitUntilEmpty, it only checks eventQueue.isEmpty. But when eventQueue.isEmpty is true, there is still a chance that listeners do not finish their jobs or the memory is not synchronized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @andrewor14
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. A solution is to have eventQueue peek instead of take, such that we remove the event from the queue only after all listeners have finished processing it.
Though many other places have long been relying on waitUntilEmpty before this PR. It makes me wonder how likely this race condition actually causes a test failure... (nevertheless it's possible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, looks like LinkedBlockingQueue's take() is special in that it waits for the next item to be ready. It seems there isn't an equivalent for peek... we may have to synchronize some other way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since waitUntilEmpty is only for test, is it possible that moving such wait logic to the SparkListener instances used in tests? E.g.,
import java.util.concurrent.{CountDownLatch, TimeUnit}
class SaveStageAndTaskInfo extends SparkListener {
val stageInfos = mutable.Map[StageInfo, Seq[(TaskInfo, TaskMetrics)]]()
var taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
val latch = new CountDownLatch(1)
override def onTaskEnd(task: SparkListenerTaskEnd) {
val info = task.taskInfo
val metrics = task.taskMetrics
if (info != null && metrics != null) {
taskInfoMetrics += ((info, metrics))
}
}
override def onStageCompleted(stage: SparkListenerStageCompleted) {
stageInfos(stage.stageInfo) = taskInfoMetrics
taskInfoMetrics = mutable.Buffer.empty
latch.countDown()
}
def waitForCompleted(timeoutMillis: Long) {
latch.await(timeoutMillis, TimeUnit.MILLISECONDS)
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two modified tests may fail if the race condition does not bid in our favor... Author: Andrew Or <andrewor14@gmail.com> Closes #516 from andrewor14/stage-info-test-fix and squashes the following commits: b4b6100 [Andrew Or] Add/replace missing waitUntilEmpty() calls to listener bus (cherry picked from commit 4b2bab1) Signed-off-by: Reynold Xin <rxin@apache.org>
Original poster is @zsxwing, who reported this bug in #516. Much of SparkListenerSuite relies on LiveListenerBus's `waitUntilEmpty()` method. As the name suggests, this waits until the event queue is empty. However, the following race condition could happen: (1) We dequeue an event (2) The queue is empty, we return true (even though the event has not been processed) (3) The test asserts something assuming that all listeners have finished executing (and fails) (4) The listeners receive and process the event This PR makes (1) and (4) atomic by synchronizing around it. To do that, however, we must avoid using `eventQueue.take`, which is blocking and will cause a deadlock if we synchronize around it. As a workaround, we use the non-blocking `eventQueue.poll` + a semaphore to provide the same semantics. This has been a possible race condition for a long time, but for some reason we've never run into it. Author: Andrew Or <andrewor14@gmail.com> Closes #544 from andrewor14/stage-info-test-fix and squashes the following commits: 3cbe40c [Andrew Or] Merge github.com:apache/spark into stage-info-test-fix 56dbbcb [Andrew Or] Check if event is actually added before releasing semaphore eb486ae [Andrew Or] Synchronize accesses to the LiveListenerBus' event queue
Original poster is @zsxwing, who reported this bug in #516. Much of SparkListenerSuite relies on LiveListenerBus's `waitUntilEmpty()` method. As the name suggests, this waits until the event queue is empty. However, the following race condition could happen: (1) We dequeue an event (2) The queue is empty, we return true (even though the event has not been processed) (3) The test asserts something assuming that all listeners have finished executing (and fails) (4) The listeners receive and process the event This PR makes (1) and (4) atomic by synchronizing around it. To do that, however, we must avoid using `eventQueue.take`, which is blocking and will cause a deadlock if we synchronize around it. As a workaround, we use the non-blocking `eventQueue.poll` + a semaphore to provide the same semantics. This has been a possible race condition for a long time, but for some reason we've never run into it. Author: Andrew Or <andrewor14@gmail.com> Closes #544 from andrewor14/stage-info-test-fix and squashes the following commits: 3cbe40c [Andrew Or] Merge github.com:apache/spark into stage-info-test-fix 56dbbcb [Andrew Or] Check if event is actually added before releasing semaphore eb486ae [Andrew Or] Synchronize accesses to the LiveListenerBus' event queue (cherry picked from commit ee6f7e2) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
modified SparkPluginBuild.scala to use https protocol for accessing gith... We cannot build Spark behind a proxy although we execute sbt with -Dhttp(s).proxyHost -Dhttp(s).proxyPort -Dhttp(s).proxyUser -Dhttp(s).proxyPassword options. It's because of using git protocol to clone junit_xml_listener.git. I could build after modifying SparkPluginBuild.scala. I reported this issue to JIRA. https://spark-project.atlassian.net/browse/SPARK-1046
The two modified tests may fail if the race condition does not bid in our favor... Author: Andrew Or <andrewor14@gmail.com> Closes apache#516 from andrewor14/stage-info-test-fix and squashes the following commits: b4b6100 [Andrew Or] Add/replace missing waitUntilEmpty() calls to listener bus
Original poster is @zsxwing, who reported this bug in apache#516. Much of SparkListenerSuite relies on LiveListenerBus's `waitUntilEmpty()` method. As the name suggests, this waits until the event queue is empty. However, the following race condition could happen: (1) We dequeue an event (2) The queue is empty, we return true (even though the event has not been processed) (3) The test asserts something assuming that all listeners have finished executing (and fails) (4) The listeners receive and process the event This PR makes (1) and (4) atomic by synchronizing around it. To do that, however, we must avoid using `eventQueue.take`, which is blocking and will cause a deadlock if we synchronize around it. As a workaround, we use the non-blocking `eventQueue.poll` + a semaphore to provide the same semantics. This has been a possible race condition for a long time, but for some reason we've never run into it. Author: Andrew Or <andrewor14@gmail.com> Closes apache#544 from andrewor14/stage-info-test-fix and squashes the following commits: 3cbe40c [Andrew Or] Merge github.com:apache/spark into stage-info-test-fix 56dbbcb [Andrew Or] Check if event is actually added before releasing semaphore eb486ae [Andrew Or] Synchronize accesses to the LiveListenerBus' event queue
modified SparkPluginBuild.scala to use https protocol for accessing gith... We cannot build Spark behind a proxy although we execute sbt with -Dhttp(s).proxyHost -Dhttp(s).proxyPort -Dhttp(s).proxyUser -Dhttp(s).proxyPassword options. It's because of using git protocol to clone junit_xml_listener.git. I could build after modifying SparkPluginBuild.scala. I reported this issue to JIRA. https://spark-project.atlassian.net/browse/SPARK-1046 (cherry picked from commit 3d5c03e) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
This reverts commit acd5e29.
The two modified tests may fail if the race condition does not bid in our favor...