Skip to content

Commit

Permalink
[SPARK-5190] Allow SparkListeners to be registered before SparkContex…
Browse files Browse the repository at this point in the history
…t starts.
  • Loading branch information
JoshRosen committed Jan 20, 2015
1 parent 74de94e commit 163ba19
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 10 deletions.
51 changes: 43 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ import org.apache.spark.util._
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
* @param sparkListeners an optional list of [[SparkListener]]s to register.
*/
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
class SparkContext(
config: SparkConf,
sparkListeners: Seq[SparkListener] = Nil
) extends Logging with ExecutorAllocationClient {

// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
Expand All @@ -89,7 +93,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Create a SparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
*/
def this() = this(new SparkConf())
def this() = this(new SparkConf(), Nil)

/**
* Alternative constructor for binary compatibility.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
def this(config: SparkConf) = this(config, Nil)

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -124,19 +136,40 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes.
* @param sparkListeners an optional list of [[SparkListener]]s to register.
*/
def this(
master: String,
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map(),
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
{
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map(),
sparkListeners: Seq[SparkListener] = Nil) = {
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
sparkListeners)
this.preferredNodeLocationData = preferredNodeLocationData
}

/**
* Alternative constructor for binary compatibility.
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes.
*/
def this(
master: String,
appName: String,
sparkHome: String,
jars: Seq[String],
environment: Map[String, String],
preferredNodeLocationData: Map[String, Set[SplitInfo]]) =
this(master, appName, sparkHome, jars, environment, preferredNodeLocationData, Nil)

// NOTE: The below constructors could be consolidated using default arguments. Due to
// Scala bug SI-8479, however, this causes the compile step to fail when generating docs.
// Until we have a good workaround for that bug the constructors remain broken out.
Expand All @@ -148,7 +181,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param appName A name for your application, to display on the cluster web UI.
*/
private[spark] def this(master: String, appName: String) =
this(master, appName, null, Nil, Map(), Map())
this(master, appName, null, Nil, Map(), Map(), Nil)

/**
* Alternative constructor that allows setting common Spark properties directly
Expand All @@ -158,7 +191,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param sparkHome Location where Spark is installed on cluster nodes.
*/
private[spark] def this(master: String, appName: String, sparkHome: String) =
this(master, appName, sparkHome, Nil, Map(), Map())
this(master, appName, sparkHome, Nil, Map(), Map(), Nil)

/**
* Alternative constructor that allows setting common Spark properties directly
Expand All @@ -170,7 +203,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
this(master, appName, sparkHome, jars, Map(), Map())
this(master, appName, sparkHome, jars, Map(), Map(), Nil)

// log out Spark Version in Spark driver log
logInfo(s"Running Spark version $SPARK_VERSION")
Expand Down Expand Up @@ -379,6 +412,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
executorAllocationManager.foreach(_.start())

sparkListeners.foreach(listenerBus.addListener)

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
import org.apache.spark.scheduler.SparkListener

/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
Expand Down Expand Up @@ -104,7 +105,21 @@ class JavaSparkContext(val sc: SparkContext)
*/
def this(master: String, appName: String, sparkHome: String, jars: Array[String],
environment: JMap[String, String]) =
this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map()))
this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(), Nil))

/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes
* @param sparkListeners an optional list of [[SparkListener]]s to register.
*/
def this(master: String, appName: String, sparkHome: String, jars: Array[String],
environment: JMap[String, String], sparkListeners: Array[SparkListener]) =
this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(),
sparkListeners))

private[spark] val env = sc.env

Expand Down
21 changes: 20 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.spark

import org.apache.hadoop.io.BytesWritable
import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._

import org.apache.hadoop.io.BytesWritable
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}

import org.apache.spark.scheduler.{SparkListener, SparkListenerEnvironmentUpdate}

class SparkContextSuite extends FunSuite with LocalSparkContext {

Expand Down Expand Up @@ -72,4 +77,18 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
val byteArray2 = converter.convert(bytesWritable)
assert(byteArray2.length === 0)
}

test("SparkListeners can be registered via the SparkContext constructor (SPARK-5190)") {
@volatile var gotEnvironmentUpdate: Boolean = false
val listener = new SparkListener {
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = {
gotEnvironmentUpdate = true
}
}
val conf = new SparkConf().setAppName("test").setMaster("local")
sc = new SparkContext(conf, Seq(listener))
eventually(timeout(10 seconds)) {
assert(gotEnvironmentUpdate === true)
}
}
}

0 comments on commit 163ba19

Please sign in to comment.