diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6a354ed4d1486..95f702796d530 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -63,8 +63,12 @@ import org.apache.spark.util._ * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. + * @param sparkListeners an optional list of [[SparkListener]]s to register. */ -class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { +class SparkContext( + config: SparkConf, + sparkListeners: Seq[SparkListener] = Nil + ) extends Logging with ExecutorAllocationClient { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() @@ -89,7 +93,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Create a SparkContext that loads settings from system properties (for instance, when * launching with ./bin/spark-submit). */ - def this() = this(new SparkConf()) + def this() = this(new SparkConf(), Nil) + + /** + * Alternative constructor for binary compatibility. + * + * @param config a Spark Config object describing the application configuration. Any settings in + * this config overrides the default configs as well as system properties. + */ + def this(config: SparkConf) = this(config, Nil) /** * :: DeveloperApi :: @@ -124,6 +136,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @param jars Collection of JARs to send to the cluster. These can be paths on the local file * system or HDFS, HTTP, HTTPS, or FTP URLs. * @param environment Environment variables to set on worker nodes. + * @param sparkListeners an optional list of [[SparkListener]]s to register. */ def this( master: String, @@ -131,12 +144,32 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map(), - preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = - { - this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) + preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map(), + sparkListeners: Seq[SparkListener] = Nil) = { + this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment), + sparkListeners) this.preferredNodeLocationData = preferredNodeLocationData } + /** + * Alternative constructor for binary compatibility. + * + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param appName A name for your application, to display on the cluster web UI. + * @param sparkHome Location where Spark is installed on cluster nodes. + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes. + */ + def this( + master: String, + appName: String, + sparkHome: String, + jars: Seq[String], + environment: Map[String, String], + preferredNodeLocationData: Map[String, Set[SplitInfo]]) = + this(master, appName, sparkHome, jars, environment, preferredNodeLocationData, Nil) + // NOTE: The below constructors could be consolidated using default arguments. Due to // Scala bug SI-8479, however, this causes the compile step to fail when generating docs. // Until we have a good workaround for that bug the constructors remain broken out. @@ -148,7 +181,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @param appName A name for your application, to display on the cluster web UI. */ private[spark] def this(master: String, appName: String) = - this(master, appName, null, Nil, Map(), Map()) + this(master, appName, null, Nil, Map(), Map(), Nil) /** * Alternative constructor that allows setting common Spark properties directly @@ -158,7 +191,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @param sparkHome Location where Spark is installed on cluster nodes. */ private[spark] def this(master: String, appName: String, sparkHome: String) = - this(master, appName, sparkHome, Nil, Map(), Map()) + this(master, appName, sparkHome, Nil, Map(), Map(), Nil) /** * Alternative constructor that allows setting common Spark properties directly @@ -170,7 +203,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * system or HDFS, HTTP, HTTPS, or FTP URLs. */ private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) = - this(master, appName, sparkHome, jars, Map(), Map()) + this(master, appName, sparkHome, jars, Map(), Map(), Nil) // log out Spark Version in Spark driver log logInfo(s"Running Spark version $SPARK_VERSION") @@ -379,6 +412,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } executorAllocationManager.foreach(_.start()) + sparkListeners.foreach(listenerBus.addListener) + // At this point, all relevant SparkListeners have been registered, so begin releasing events listenerBus.start() diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 97f5c9f257e09..4cbc624ad9cc0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -38,6 +38,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} +import org.apache.spark.scheduler.SparkListener /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns @@ -104,7 +105,21 @@ class JavaSparkContext(val sc: SparkContext) */ def this(master: String, appName: String, sparkHome: String, jars: Array[String], environment: JMap[String, String]) = - this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map())) + this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(), Nil)) + + /** + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param appName A name for your application, to display on the cluster web UI + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes + * @param sparkListeners an optional list of [[SparkListener]]s to register. + */ + def this(master: String, appName: String, sparkHome: String, jars: Array[String], + environment: JMap[String, String], sparkListeners: Array[SparkListener]) = + this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(), + sparkListeners)) private[spark] val env = sc.env diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 8b3c6871a7b39..e41610aaf5121 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -17,9 +17,14 @@ package org.apache.spark +import org.apache.hadoop.io.BytesWritable import org.scalatest.FunSuite +import org.scalatest.concurrent.Eventually._ -import org.apache.hadoop.io.BytesWritable +import scala.concurrent.duration._ +import scala.language.{implicitConversions, postfixOps} + +import org.apache.spark.scheduler.{SparkListener, SparkListenerEnvironmentUpdate} class SparkContextSuite extends FunSuite with LocalSparkContext { @@ -72,4 +77,18 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { val byteArray2 = converter.convert(bytesWritable) assert(byteArray2.length === 0) } + + test("SparkListeners can be registered via the SparkContext constructor (SPARK-5190)") { + @volatile var gotEnvironmentUpdate: Boolean = false + val listener = new SparkListener { + override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { + gotEnvironmentUpdate = true + } + } + val conf = new SparkConf().setAppName("test").setMaster("local") + sc = new SparkContext(conf, Seq(listener)) + eventually(timeout(10 seconds)) { + assert(gotEnvironmentUpdate === true) + } + } }