Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ package object config {
.intConf
.createWithDefault(128)

private[spark] val LISTENER_BUS_EVENT_QUEUE_OPEN_HOLD =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.openHold")
.booleanConf
.createWithDefault(false)

private[spark] val LISTENER_BUS_EVENT_QUEUE_IDLE_CAPACITY =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.idleCapacity")
.doubleConf
.createWithDefault(0.6)

// This property sets the root namespace for metrics reporting
private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.scheduler
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.control.Breaks._

import com.codahale.metrics.{Gauge, Timer}

import org.apache.spark.{SparkConf, SparkContext}
Expand All @@ -42,8 +44,13 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi

// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
private val EVENT_QUEUE_CAPACITY = conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

// control the strategy when event queue is full
private val openHoldMode = conf.get(LISTENER_BUS_EVENT_QUEUE_OPEN_HOLD)
private lazy val idleCapacity = conf.get(LISTENER_BUS_EVENT_QUEUE_IDLE_CAPACITY)
private val isFull = new AtomicBoolean(false)

// Keep the event count separately, so that waitUntilEmpty() can be implemented properly;
// this allows that method to return only when the events in the queue have been fully
Expand Down Expand Up @@ -89,6 +96,13 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
super.postToAll(next)
} finally {
ctx.stop()
isFull.synchronized {
if (openHoldMode && isFull.get &&
(eventQueue.remainingCapacity() / EVENT_QUEUE_CAPACITY).toLong >= idleCapacity) {
isFull.notifyAll()
isFull.compareAndSet(true, false)
}
}
}
eventCount.decrementAndGet()
next = eventQueue.take()
Expand Down Expand Up @@ -139,34 +153,49 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
}

eventCount.incrementAndGet()
if (eventQueue.offer(event)) {
return
}

eventCount.decrementAndGet()
droppedEvents.inc()
droppedEventsCounter.incrementAndGet()
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError(s"Dropping event from queue $name. " +
"This likely means one of the listeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
logTrace(s"Dropping event $event")

val droppedCount = droppedEventsCounter.get
if (droppedCount > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped $droppedEvents events from $name since $previous.")
breakable {
while (true) {
isFull.synchronized {
if (eventQueue.offer(event)) {
return
}

if (openHoldMode) {
// while hold mode opened, the post thead wait here until queue idle capacity reached
logWarning(s"Hold the event $event because no remaining room in event queue.")
isFull.compareAndSet(false, true)
isFull.wait()
} else {
eventCount.decrementAndGet()
droppedEvents.inc()
droppedEventsCounter.incrementAndGet()
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError(s"Dropping event from queue $name. " +
"This likely means one of the listeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
logTrace(s"Dropping event $event")

val droppedCount = droppedEventsCounter.get
if (droppedCount > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will
// fail and then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped $droppedEvents events from $name since $previous.")
}
}
}
break
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.Matchers
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config.LISTENER_BUS_EVENT_QUEUE_CAPACITY
import org.apache.spark.internal.config.LISTENER_BUS_EVENT_QUEUE_OPEN_HOLD
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}

Expand Down Expand Up @@ -480,6 +481,47 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(bus.findListenersByClass[BasicJobCounter]().isEmpty)
}

test("SPARK-21560: hold event while queue is full") {
val conf = new SparkConf()
// Set the queue capacity to 5, the number of events(50) is much larger than the queue size
conf.set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5)

def postEvent(bus: LiveListenerBus, counter: BasicJobCounter): Unit = {
bus.addToSharedQueue(counter)
bus.start(mockSparkContext, mockMetricsSystem)

val thread = new Thread(bus.toString) {
override def run() {
(1 to 50).foreach {
_ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
}
}
}
thread.start()
thread.join()
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
}

// Test for normal mode
val counter = new BasicJobCounter
val bus = new LiveListenerBus(conf)
postEvent(bus, counter)
assert(bus.metrics.numEventsPosted.getCount === 50)
// Some of events will be dropped when queue is full
assert(counter.count != 50)
bus.stop()

// Test for hold mode
conf.set(LISTENER_BUS_EVENT_QUEUE_OPEN_HOLD, true)
val hold_counter = new BasicJobCounter
val hold_bus = new LiveListenerBus(conf)
postEvent(hold_bus, hold_counter)
assert(hold_bus.metrics.numEventsPosted.getCount === 50)
// No events will be dropped when queue is full
assert(hold_counter.count === 50)
hold_bus.stop()
}

/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
Expand Down