Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
assert(counter.count === 0)

// Starting listener bus should flush all buffered events (asynchronously, hence the sleep)
// Starting listener bus should flush all buffered events
bus.start()
Thread.sleep(1000)
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(counter.count === 5)

// After listener bus has stopped, posting events should not increment counter
Expand Down Expand Up @@ -177,6 +177,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
listener.stageInfos.clear()

rdd3.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realize that waitUntilEmpty is not enough. In waitUntilEmpty, it only checks eventQueue.isEmpty. But when eventQueue.isEmpty is true, there is still a chance that listeners do not finish their jobs or the memory is not synchronized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. A solution is to have eventQueue peek instead of take, such that we remove the event from the queue only after all listeners have finished processing it.

Though many other places have long been relying on waitUntilEmpty before this PR. It makes me wonder how likely this race condition actually causes a test failure... (nevertheless it's possible)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, looks like LinkedBlockingQueue's take() is special in that it waits for the next item to be ready. It seems there isn't an equivalent for peek... we may have to synchronize some other way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since waitUntilEmpty is only for test, is it possible that moving such wait logic to the SparkListener instances used in tests? E.g.,

  import java.util.concurrent.{CountDownLatch, TimeUnit}

  class SaveStageAndTaskInfo extends SparkListener {
    val stageInfos = mutable.Map[StageInfo, Seq[(TaskInfo, TaskMetrics)]]()
    var taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
    val latch = new CountDownLatch(1)

    override def onTaskEnd(task: SparkListenerTaskEnd) {
      val info = task.taskInfo
      val metrics = task.taskMetrics
      if (info != null && metrics != null) {
        taskInfoMetrics += ((info, metrics))
      }
    }

    override def onStageCompleted(stage: SparkListenerStageCompleted) {
      stageInfos(stage.stageInfo) = taskInfoMetrics
      taskInfoMetrics = mutable.Buffer.empty
      latch.countDown()
    }

    def waitForCompleted(timeoutMillis: Long) {
      latch.await(timeoutMillis, TimeUnit.MILLISECONDS)
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing we have to keep waitUntilEmpty in LiveListenerBus.scala (unfortunately) because DAGSchedulerSuite also uses it. I started a more general solution at #544. Maybe we can move the discussion there.

listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get
stageInfo3.rddInfos.size should be {2} // ShuffledRDD, MapPartitionsRDD
Expand Down