From 28b6ac28a9abc2d0a1b5df8d3e0d7ee911cf0108 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 26 Sep 2017 18:01:44 +0800 Subject: [PATCH] Resolve conflict with master, brings by SPARK-18838 --- .../spark/internal/config/package.scala | 10 +++ .../spark/scheduler/AsyncEventQueue.scala | 87 ++++++++++++------- .../spark/scheduler/SparkListenerSuite.scala | 42 +++++++++ 3 files changed, 110 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 44a2815b81a73..94d26dc01a3d8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -177,6 +177,16 @@ package object config { .intConf .createWithDefault(128) + private[spark] val LISTENER_BUS_EVENT_QUEUE_OPEN_HOLD = + ConfigBuilder("spark.scheduler.listenerbus.eventqueue.openHold") + .booleanConf + .createWithDefault(false) + + private[spark] val LISTENER_BUS_EVENT_QUEUE_IDLE_CAPACITY = + ConfigBuilder("spark.scheduler.listenerbus.eventqueue.idleCapacity") + .doubleConf + .createWithDefault(0.6) + // This property sets the root namespace for metrics reporting private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace") .stringConf diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 8605e1da161c7..3abaa25f86a63 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -20,6 +20,8 @@ package org.apache.spark.scheduler import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import scala.util.control.Breaks._ + import com.codahale.metrics.{Gauge, Timer} import org.apache.spark.{SparkConf, SparkContext} @@ -42,8 +44,13 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if // it's perpetually being added to more quickly than it's being drained. - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( - conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + private val EVENT_QUEUE_CAPACITY = conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) + + // control the strategy when event queue is full + private val openHoldMode = conf.get(LISTENER_BUS_EVENT_QUEUE_OPEN_HOLD) + private lazy val idleCapacity = conf.get(LISTENER_BUS_EVENT_QUEUE_IDLE_CAPACITY) + private val isFull = new AtomicBoolean(false) // Keep the event count separately, so that waitUntilEmpty() can be implemented properly; // this allows that method to return only when the events in the queue have been fully @@ -89,6 +96,13 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi super.postToAll(next) } finally { ctx.stop() + isFull.synchronized { + if (openHoldMode && isFull.get && + (eventQueue.remainingCapacity() / EVENT_QUEUE_CAPACITY).toLong >= idleCapacity) { + isFull.notifyAll() + isFull.compareAndSet(true, false) + } + } } eventCount.decrementAndGet() next = eventQueue.take() @@ -139,34 +153,49 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi } eventCount.incrementAndGet() - if (eventQueue.offer(event)) { - return - } - eventCount.decrementAndGet() - droppedEvents.inc() - droppedEventsCounter.incrementAndGet() - if (logDroppedEvent.compareAndSet(false, true)) { - // Only log the following message once to avoid duplicated annoying logs. - logError(s"Dropping event from queue $name. " + - "This likely means one of the listeners is too slow and cannot keep up with " + - "the rate at which tasks are being started by the scheduler.") - } - logTrace(s"Dropping event $event") - - val droppedCount = droppedEventsCounter.get - if (droppedCount > 0) { - // Don't log too frequently - if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { - // There may be multiple threads trying to decrease droppedEventsCounter. - // Use "compareAndSet" to make sure only one thread can win. - // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and - // then that thread will update it. - if (droppedEventsCounter.compareAndSet(droppedCount, 0)) { - val prevLastReportTimestamp = lastReportTimestamp - lastReportTimestamp = System.currentTimeMillis() - val previous = new java.util.Date(prevLastReportTimestamp) - logWarning(s"Dropped $droppedEvents events from $name since $previous.") + breakable { + while (true) { + isFull.synchronized { + if (eventQueue.offer(event)) { + return + } + + if (openHoldMode) { + // while hold mode opened, the post thead wait here until queue idle capacity reached + logWarning(s"Hold the event $event because no remaining room in event queue.") + isFull.compareAndSet(false, true) + isFull.wait() + } else { + eventCount.decrementAndGet() + droppedEvents.inc() + droppedEventsCounter.incrementAndGet() + if (logDroppedEvent.compareAndSet(false, true)) { + // Only log the following message once to avoid duplicated annoying logs. + logError(s"Dropping event from queue $name. " + + "This likely means one of the listeners is too slow and cannot keep up with " + + "the rate at which tasks are being started by the scheduler.") + } + logTrace(s"Dropping event $event") + + val droppedCount = droppedEventsCounter.get + if (droppedCount > 0) { + // Don't log too frequently + if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { + // There may be multiple threads trying to decrease droppedEventsCounter. + // Use "compareAndSet" to make sure only one thread can win. + // And if another thread is increasing droppedEventsCounter, "compareAndSet" will + // fail and then that thread will update it. + if (droppedEventsCounter.compareAndSet(droppedCount, 0)) { + val prevLastReportTimestamp = lastReportTimestamp + lastReportTimestamp = System.currentTimeMillis() + val previous = new java.util.Date(prevLastReportTimestamp) + logWarning(s"Dropped $droppedEvents events from $name since $previous.") + } + } + } + break + } } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index d061c7845f4a6..f2c6691333560 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.Matchers import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config.LISTENER_BUS_EVENT_QUEUE_CAPACITY +import org.apache.spark.internal.config.LISTENER_BUS_EVENT_QUEUE_OPEN_HOLD import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{ResetSystemProperties, RpcUtils} @@ -480,6 +481,47 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(bus.findListenersByClass[BasicJobCounter]().isEmpty) } + test("SPARK-21560: hold event while queue is full") { + val conf = new SparkConf() + // Set the queue capacity to 5, the number of events(50) is much larger than the queue size + conf.set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) + + def postEvent(bus: LiveListenerBus, counter: BasicJobCounter): Unit = { + bus.addToSharedQueue(counter) + bus.start(mockSparkContext, mockMetricsSystem) + + val thread = new Thread(bus.toString) { + override def run() { + (1 to 50).foreach { + _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + } + } + } + thread.start() + thread.join() + bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + } + + // Test for normal mode + val counter = new BasicJobCounter + val bus = new LiveListenerBus(conf) + postEvent(bus, counter) + assert(bus.metrics.numEventsPosted.getCount === 50) + // Some of events will be dropped when queue is full + assert(counter.count != 50) + bus.stop() + + // Test for hold mode + conf.set(LISTENER_BUS_EVENT_QUEUE_OPEN_HOLD, true) + val hold_counter = new BasicJobCounter + val hold_bus = new LiveListenerBus(conf) + postEvent(hold_bus, hold_counter) + assert(hold_bus.metrics.numEventsPosted.getCount === 50) + // No events will be dropped when queue is full + assert(hold_counter.count === 50) + hold_bus.stop() + } + /** * Assert that the given list of numbers has an average that is greater than zero. */