diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6a354ed4d1486..79310c28cdbd2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -379,6 +379,41 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } executorAllocationManager.foreach(_.start()) + // Use reflection to instantiate listeners specified via the `spark.extraListeners` configuration + // or the SPARK_EXTRA_LISTENERS environment variable + try { + val listenerClassNames: Seq[String] = { + val fromSparkConf = conf.get("spark.extraListeners", "").split(',') + val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',') + (fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "") + } + for (className <- listenerClassNames) { + val listenerClass = Class.forName(className).asInstanceOf[Class[_ <: SparkListener]] + val listener = try { + listenerClass.getConstructor(classOf[SparkConf]).newInstance(conf) + } catch { + case e: NoSuchMethodException => + try { + listenerClass.newInstance() + } catch { + case e: NoSuchMethodException => + throw new SparkException( + s"$listenerClass did not have a zero-argument constructor or a" + + " single-argument constructor that accepts SparkConf (is it a nested Scala class?)") + } + } + listenerBus.addListener(listener) + logInfo(s"Registered listener $listenerClass") + } + } catch { + case e: Exception => + try { + stop() + } finally { + throw new SparkException(s"Exception when registering SparkListener", e) + } + } + // At this point, all relevant SparkListeners have been registered, so begin releasing events listenerBus.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e700c6af542f4..57840a81ec342 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -28,8 +28,8 @@ import org.apache.spark.util.Utils */ private[spark] trait SparkListenerBus extends Logging { - // SparkListeners attached to this event bus - protected val sparkListeners = new ArrayBuffer[SparkListener] + // SparkListeners attached to this event bus. Marked `protected[spark]` for access in tests. + protected[spark] val sparkListeners = new ArrayBuffer[SparkListener] with mutable.SynchronizedBuffer[SparkListener] def addListener(listener: SparkListener) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 0fb1bdd30d975..ec9edbc3b5986 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -21,25 +21,21 @@ import java.util.concurrent.Semaphore import scala.collection.mutable -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} -import org.scalatest.Matchers - -import org.apache.spark.{LocalSparkContext, SparkContext} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext} import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.ResetSystemProperties -class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers with BeforeAndAfter - with BeforeAndAfterAll with ResetSystemProperties { +import org.mockito.Mockito._ +import org.scalatest.{FunSuite, Matchers} + +class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers + with ResetSystemProperties { /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 val jobCompletionTime = 1421191296660L - before { - sc = new SparkContext("local", "SparkListenerSuite") - } - test("basic creation and shutdown of LiveListenerBus") { val counter = new BasicJobCounter val bus = new LiveListenerBus @@ -127,6 +123,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("basic creation of StageInfo") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) @@ -148,6 +145,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("basic creation of StageInfo with shuffle") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) @@ -185,6 +183,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("StageInfo with fewer tasks than partitions") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) @@ -201,6 +200,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("local metrics") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) @@ -267,6 +267,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("onTaskGettingResult() called when result fetched remotely") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveTaskEvents sc.addSparkListener(listener) @@ -287,6 +288,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("onTaskGettingResult() not called when result sent directly") { + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveTaskEvents sc.addSparkListener(listener) @@ -302,6 +304,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } test("onTaskEnd() should be called for all started tasks, even after job has been killed") { + sc = new SparkContext("local", "SparkListenerSuite") val WAIT_TIMEOUT_MILLIS = 10000 val listener = new SaveTaskEvents sc.addSparkListener(listener) @@ -356,6 +359,30 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers assert(jobCounter2.count === 5) } + test("registering listeners via spark.extraListeners") { + val conf = new SparkConf().setMaster("local").setAppName("test") + .set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," + + classOf[BasicJobCounter].getName) + sc = new SparkContext(conf) + sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1) + sc.listenerBus.sparkListeners.collect { + case x: ListenerThatAcceptsSparkConf => x + }.size should be (1) + } + + test("registering listeners via SPARK_EXTRA_LISTENERS") { + val SPARK_EXTRA_LISTENERS = classOf[ListenerThatAcceptsSparkConf].getName + "," + + classOf[BasicJobCounter].getName + val conf = spy(new SparkConf().setMaster("local").setAppName("test")) + when(conf.getenv("SPARK_EXTRA_LISTENERS")).thenReturn(SPARK_EXTRA_LISTENERS) + when(conf.clone).thenReturn(conf) // so that our mock is still used + sc = new SparkContext(conf) + sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1) + sc.listenerBus.sparkListeners.collect { + case x: ListenerThatAcceptsSparkConf => x + }.size should be (1) + } + /** * Assert that the given list of numbers has an average that is greater than zero. */ @@ -363,14 +390,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers assert(m.sum / m.size.toDouble > 0.0, msg) } - /** - * A simple listener that counts the number of jobs observed. - */ - private class BasicJobCounter extends SparkListener { - var count = 0 - override def onJobEnd(job: SparkListenerJobEnd) = count += 1 - } - /** * A simple listener that saves all task infos and task metrics. */ @@ -423,3 +442,19 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers } } + +// These classes can't be declared inside of the SparkListenerSuite class because we don't want +// their constructors to contain references to SparkListenerSuite: + +/** + * A simple listener that counts the number of jobs observed. + */ +private class BasicJobCounter extends SparkListener { + var count = 0 + override def onJobEnd(job: SparkListenerJobEnd) = count += 1 +} + +private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener { + var count = 0 + override def onJobEnd(job: SparkListenerJobEnd) = count += 1 +} diff --git a/docs/configuration.md b/docs/configuration.md index efbab4085317a..4a4527d38d8ce 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -190,6 +190,17 @@ of the most common options to set are: Logs the effective SparkConf as INFO when a SparkContext is started. + + spark.extraListeners + (none) + + A comma-separated list of classes that implement SparkListener; when initializing + SparkContext, instances of these classes will be created and registered with Spark's listener + bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor + will be called; otherwise, a zero-argument constructor will be called. If no valid constructor + can be found, the SparkContext creation will fail with an exception. + + Apart from these, the following properties are also available, and may be useful in some situations: