Skip to content

Commit

Permalink
Instantiate SparkListeners from classes listed in configurations.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jan 20, 2015
1 parent 9c0d8f1 commit b22b379
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 20 deletions.
35 changes: 35 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,41 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
executorAllocationManager.foreach(_.start())

// Use reflection to instantiate listeners specified via the `spark.extraListeners` configuration
// or the SPARK_EXTRA_LISTENERS environment variable
try {
val listenerClassNames: Seq[String] = {
val fromSparkConf = conf.get("spark.extraListeners", "").split(',')
val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',')
(fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "")
}
for (className <- listenerClassNames) {
val listenerClass = Class.forName(className).asInstanceOf[Class[_ <: SparkListener]]
val listener = try {
listenerClass.getConstructor(classOf[SparkConf]).newInstance(conf)
} catch {
case e: NoSuchMethodException =>
try {
listenerClass.newInstance()
} catch {
case e: NoSuchMethodException =>
throw new SparkException(
s"$listenerClass did not have a zero-argument constructor or a" +
" single-argument constructor that accepts SparkConf (is it a nested Scala class?)")
}
}
listenerBus.addListener(listener)
logInfo(s"Registered listener $listenerClass")
}
} catch {
case e: Exception =>
try {
stop()
} finally {
throw new SparkException(s"Exception when registering SparkListener", e)
}
}

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.util.Utils
*/
private[spark] trait SparkListenerBus extends Logging {

// SparkListeners attached to this event bus
protected val sparkListeners = new ArrayBuffer[SparkListener]
// SparkListeners attached to this event bus. Marked `protected[spark]` for access in tests.
protected[spark] val sparkListeners = new ArrayBuffer[SparkListener]
with mutable.SynchronizedBuffer[SparkListener]

def addListener(listener: SparkListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,21 @@ import java.util.concurrent.Semaphore

import scala.collection.mutable

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.Matchers

import org.apache.spark.{LocalSparkContext, SparkContext}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.ResetSystemProperties

class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers with BeforeAndAfter
with BeforeAndAfterAll with ResetSystemProperties {
import org.mockito.Mockito._
import org.scalatest.{FunSuite, Matchers}

class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
with ResetSystemProperties {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

val jobCompletionTime = 1421191296660L

before {
sc = new SparkContext("local", "SparkListenerSuite")
}

test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
Expand Down Expand Up @@ -127,6 +123,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("basic creation of StageInfo") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
Expand All @@ -148,6 +145,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("basic creation of StageInfo with shuffle") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
Expand Down Expand Up @@ -185,6 +183,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("StageInfo with fewer tasks than partitions") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
Expand All @@ -201,6 +200,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("local metrics") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
Expand Down Expand Up @@ -267,6 +267,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("onTaskGettingResult() called when result fetched remotely") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveTaskEvents
sc.addSparkListener(listener)

Expand All @@ -287,6 +288,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("onTaskGettingResult() not called when result sent directly") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveTaskEvents
sc.addSparkListener(listener)

Expand All @@ -302,6 +304,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("onTaskEnd() should be called for all started tasks, even after job has been killed") {
sc = new SparkContext("local", "SparkListenerSuite")
val WAIT_TIMEOUT_MILLIS = 10000
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
Expand Down Expand Up @@ -356,21 +359,37 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
assert(jobCounter2.count === 5)
}

test("registering listeners via spark.extraListeners") {
val conf = new SparkConf().setMaster("local").setAppName("test")
.set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," +
classOf[BasicJobCounter].getName)
sc = new SparkContext(conf)
sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1)
sc.listenerBus.sparkListeners.collect {
case x: ListenerThatAcceptsSparkConf => x
}.size should be (1)
}

test("registering listeners via SPARK_EXTRA_LISTENERS") {
val SPARK_EXTRA_LISTENERS = classOf[ListenerThatAcceptsSparkConf].getName + "," +
classOf[BasicJobCounter].getName
val conf = spy(new SparkConf().setMaster("local").setAppName("test"))
when(conf.getenv("SPARK_EXTRA_LISTENERS")).thenReturn(SPARK_EXTRA_LISTENERS)
when(conf.clone).thenReturn(conf) // so that our mock is still used
sc = new SparkContext(conf)
sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1)
sc.listenerBus.sparkListeners.collect {
case x: ListenerThatAcceptsSparkConf => x
}.size should be (1)
}

/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
private def checkNonZeroAvg(m: Traversable[Long], msg: String) {
assert(m.sum / m.size.toDouble > 0.0, msg)
}

/**
* A simple listener that counts the number of jobs observed.
*/
private class BasicJobCounter extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
}

/**
* A simple listener that saves all task infos and task metrics.
*/
Expand Down Expand Up @@ -423,3 +442,19 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

}

// These classes can't be declared inside of the SparkListenerSuite class because we don't want
// their constructors to contain references to SparkListenerSuite:

/**
* A simple listener that counts the number of jobs observed.
*/
private class BasicJobCounter extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
}

private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
}
11 changes: 11 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ of the most common options to set are:
Logs the effective SparkConf as INFO when a SparkContext is started.
</td>
</tr>
<tr>
<td><code>spark.extraListeners</code></td>
<td>(none)</td>
<td>
A comma-separated list of classes that implement <code>SparkListener</code>; when initializing
SparkContext, instances of these classes will be created and registered with Spark's listener
bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor
will be called; otherwise, a zero-argument constructor will be called. If no valid constructor
can be found, the SparkContext creation will fail with an exception.
</td>
</tr>
</table>

Apart from these, the following properties are also available, and may be useful in some situations:
Expand Down

0 comments on commit b22b379

Please sign in to comment.