Skip to content

Commit

Permalink
Revert "[SPARK-5190] Allow SparkListeners to be registered before Spa…
Browse files Browse the repository at this point in the history
…rkContext starts."

This reverts commit 163ba19.
  • Loading branch information
JoshRosen committed Jan 20, 2015
1 parent 217ecc0 commit 9c0d8f1
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 79 deletions.
51 changes: 8 additions & 43 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,8 @@ import org.apache.spark.util._
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
* @param sparkListeners an optional list of [[SparkListener]]s to register.
*/
class SparkContext(
config: SparkConf,
sparkListeners: Seq[SparkListener] = Nil
) extends Logging with ExecutorAllocationClient {
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {

// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
Expand All @@ -93,15 +89,7 @@ class SparkContext(
* Create a SparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
*/
def this() = this(new SparkConf(), Nil)

/**
* Alternative constructor for binary compatibility.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
def this(config: SparkConf) = this(config, Nil)
def this() = this(new SparkConf())

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -136,40 +124,19 @@ class SparkContext(
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes.
* @param sparkListeners an optional list of [[SparkListener]]s to register.
*/
def this(
master: String,
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map(),
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map(),
sparkListeners: Seq[SparkListener] = Nil) = {
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
sparkListeners)
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
{
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
this.preferredNodeLocationData = preferredNodeLocationData
}

/**
* Alternative constructor for binary compatibility.
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes.
*/
def this(
master: String,
appName: String,
sparkHome: String,
jars: Seq[String],
environment: Map[String, String],
preferredNodeLocationData: Map[String, Set[SplitInfo]]) =
this(master, appName, sparkHome, jars, environment, preferredNodeLocationData, Nil)

// NOTE: The below constructors could be consolidated using default arguments. Due to
// Scala bug SI-8479, however, this causes the compile step to fail when generating docs.
// Until we have a good workaround for that bug the constructors remain broken out.
Expand All @@ -181,7 +148,7 @@ class SparkContext(
* @param appName A name for your application, to display on the cluster web UI.
*/
private[spark] def this(master: String, appName: String) =
this(master, appName, null, Nil, Map(), Map(), Nil)
this(master, appName, null, Nil, Map(), Map())

/**
* Alternative constructor that allows setting common Spark properties directly
Expand All @@ -191,7 +158,7 @@ class SparkContext(
* @param sparkHome Location where Spark is installed on cluster nodes.
*/
private[spark] def this(master: String, appName: String, sparkHome: String) =
this(master, appName, sparkHome, Nil, Map(), Map(), Nil)
this(master, appName, sparkHome, Nil, Map(), Map())

/**
* Alternative constructor that allows setting common Spark properties directly
Expand All @@ -203,7 +170,7 @@ class SparkContext(
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
this(master, appName, sparkHome, jars, Map(), Map(), Nil)
this(master, appName, sparkHome, jars, Map(), Map())

// log out Spark Version in Spark driver log
logInfo(s"Running Spark version $SPARK_VERSION")
Expand Down Expand Up @@ -412,8 +379,6 @@ class SparkContext(
}
executorAllocationManager.foreach(_.start())

sparkListeners.foreach(listenerBus.addListener)

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
import org.apache.spark.scheduler.SparkListener

/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
Expand Down Expand Up @@ -105,21 +104,7 @@ class JavaSparkContext(val sc: SparkContext)
*/
def this(master: String, appName: String, sparkHome: String, jars: Array[String],
environment: JMap[String, String]) =
this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(), Nil))

/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes
* @param sparkListeners an optional list of [[SparkListener]]s to register.
*/
def this(master: String, appName: String, sparkHome: String, jars: Array[String],
environment: JMap[String, String], sparkListeners: Array[SparkListener]) =
this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map(),
sparkListeners))
this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map()))

private[spark] val env = sc.env

Expand Down
21 changes: 1 addition & 20 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@

package org.apache.spark

import org.apache.hadoop.io.BytesWritable
import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._

import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}

import org.apache.spark.scheduler.{SparkListener, SparkListenerEnvironmentUpdate}
import org.apache.hadoop.io.BytesWritable

class SparkContextSuite extends FunSuite with LocalSparkContext {

Expand Down Expand Up @@ -77,18 +72,4 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
val byteArray2 = converter.convert(bytesWritable)
assert(byteArray2.length === 0)
}

test("SparkListeners can be registered via the SparkContext constructor (SPARK-5190)") {
@volatile var gotEnvironmentUpdate: Boolean = false
val listener = new SparkListener {
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = {
gotEnvironmentUpdate = true
}
}
val conf = new SparkConf().setAppName("test").setMaster("local")
sc = new SparkContext(conf, Seq(listener))
eventually(timeout(10 seconds)) {
assert(gotEnvironmentUpdate === true)
}
}
}

0 comments on commit 9c0d8f1

Please sign in to comment.