Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5411] Allow SparkListeners to be specified in SparkConf and loaded when creating SparkContext #4111

Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,41 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
executorAllocationManager.foreach(_.start())

// Use reflection to instantiate listeners specified via the `spark.extraListeners` configuration

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we group this logic with listenerBus.start() into a separate method called "setupListenerBus()" and call it in the constructor? The reason is this logic has to occur before the listenerBus.start() and having them grouped in a method can better ensure the order is preserved.

// or the SPARK_EXTRA_LISTENERS environment variable
try {
val listenerClassNames: Seq[String] = {
val fromSparkConf = conf.get("spark.extraListeners", "").split(',')
val fromEnvVar = Option(conf.getenv("SPARK_EXTRA_LISTENERS")).getOrElse("").split(',')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were moving away from adding new env-based config options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This functions slightly differently than the rest of our env-based configurations: the settings from this configuration are merged with the SparkConf configuration, rather than overriding/being-overriden by it. My motivation for this configuration was to provide a mechanism for the execution environment to inject custom listeners without having to worry about them being overriden by settings in the user's SparkConf (this is necessary because we don't have a mechanism for automatic merging of Spark configurations).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think at some point it would be nice to have a better long-term solution for this kind of scenario. I added SPARK_DIST_CLASSPATH in #2982 for similar reasons, but if we have something like "spark-dist.conf" in SPARK_CONF_DIR, we could at some point merge those values with the user's configuration and avoid adding more env variables like this.

Anyway, separate discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm - I'm also skeptical about having an env var here. This same reasoning could be used to argue having an env var for every conf value right (which we definitely don't want). @JoshRosen at least for the use case I had in mind for this, we could on our own preprocess the conf files to deal with merging (i.e. no need to put extra logic in Spark).

(fromSparkConf ++ fromEnvVar).map(_.trim).filter(_ != "")
}
for (className <- listenerClassNames) {
val listenerClass = Class.forName(className).asInstanceOf[Class[_ <: SparkListener]]
val listener = try {
listenerClass.getConstructor(classOf[SparkConf]).newInstance(conf)
} catch {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So using exceptions for basically branching is not really great. You can alternatively use:
http://docs.oracle.com/javase/7/docs/api/java/lang/Class.html#getDeclaredConstructors%28%29 to find the right constructor.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i meant getConstructors() (not getDeclaredConstructors)

case e: NoSuchMethodException =>
try {
listenerClass.newInstance()
} catch {
case e: NoSuchMethodException =>
throw new SparkException(
s"$listenerClass did not have a zero-argument constructor or a" +
" single-argument constructor that accepts SparkConf (is it a nested Scala class?)")
}
}
listenerBus.addListener(listener)
logInfo(s"Registered listener $listenerClass")
}
} catch {
case e: Exception =>
try {
stop()
} finally {
throw new SparkException(s"Exception when registering SparkListener", e)
}
}

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.util.Utils
*/
private[spark] trait SparkListenerBus extends Logging {

// SparkListeners attached to this event bus
protected val sparkListeners = new ArrayBuffer[SparkListener]
// SparkListeners attached to this event bus. Marked `protected[spark]` for access in tests.
protected[spark] val sparkListeners = new ArrayBuffer[SparkListener]
with mutable.SynchronizedBuffer[SparkListener]

def addListener(listener: SparkListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,21 @@ import java.util.concurrent.Semaphore

import scala.collection.mutable

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.Matchers

import org.apache.spark.{LocalSparkContext, SparkContext}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.ResetSystemProperties

class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers with BeforeAndAfter
with BeforeAndAfterAll with ResetSystemProperties {
import org.mockito.Mockito._
import org.scalatest.{FunSuite, Matchers}

class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
with ResetSystemProperties {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

val jobCompletionTime = 1421191296660L

before {
sc = new SparkContext("local", "SparkListenerSuite")
}

test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
Expand Down Expand Up @@ -127,6 +123,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("basic creation of StageInfo") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
Expand All @@ -148,6 +145,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("basic creation of StageInfo with shuffle") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
Expand Down Expand Up @@ -185,6 +183,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("StageInfo with fewer tasks than partitions") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
Expand All @@ -201,6 +200,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("local metrics") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
Expand Down Expand Up @@ -267,6 +267,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("onTaskGettingResult() called when result fetched remotely") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveTaskEvents
sc.addSparkListener(listener)

Expand All @@ -287,6 +288,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("onTaskGettingResult() not called when result sent directly") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveTaskEvents
sc.addSparkListener(listener)

Expand All @@ -302,6 +304,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

test("onTaskEnd() should be called for all started tasks, even after job has been killed") {
sc = new SparkContext("local", "SparkListenerSuite")
val WAIT_TIMEOUT_MILLIS = 10000
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
Expand Down Expand Up @@ -356,21 +359,37 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
assert(jobCounter2.count === 5)
}

test("registering listeners via spark.extraListeners") {
val conf = new SparkConf().setMaster("local").setAppName("test")
.set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," +
classOf[BasicJobCounter].getName)
sc = new SparkContext(conf)
sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1)
sc.listenerBus.sparkListeners.collect {
case x: ListenerThatAcceptsSparkConf => x
}.size should be (1)
}

test("registering listeners via SPARK_EXTRA_LISTENERS") {
val SPARK_EXTRA_LISTENERS = classOf[ListenerThatAcceptsSparkConf].getName + "," +
classOf[BasicJobCounter].getName
val conf = spy(new SparkConf().setMaster("local").setAppName("test"))
when(conf.getenv("SPARK_EXTRA_LISTENERS")).thenReturn(SPARK_EXTRA_LISTENERS)
when(conf.clone).thenReturn(conf) // so that our mock is still used
sc = new SparkContext(conf)
sc.listenerBus.sparkListeners.collect { case x: BasicJobCounter => x}.size should be (1)
sc.listenerBus.sparkListeners.collect {
case x: ListenerThatAcceptsSparkConf => x
}.size should be (1)
}

/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
private def checkNonZeroAvg(m: Traversable[Long], msg: String) {
assert(m.sum / m.size.toDouble > 0.0, msg)
}

/**
* A simple listener that counts the number of jobs observed.
*/
private class BasicJobCounter extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
}

/**
* A simple listener that saves all task infos and task metrics.
*/
Expand Down Expand Up @@ -423,3 +442,19 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
}

}

// These classes can't be declared inside of the SparkListenerSuite class because we don't want
// their constructors to contain references to SparkListenerSuite:

/**
* A simple listener that counts the number of jobs observed.
*/
private class BasicJobCounter extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
}

private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd) = count += 1
}
11 changes: 11 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ of the most common options to set are:
Logs the effective SparkConf as INFO when a SparkContext is started.
</td>
</tr>
<tr>
<td><code>spark.extraListeners</code></td>
<td>(none)</td>
<td>
A comma-separated list of classes that implement <code>SparkListener</code>; when initializing
SparkContext, instances of these classes will be created and registered with Spark's listener
bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor
will be called; otherwise, a zero-argument constructor will be called. If no valid constructor
can be found, the SparkContext creation will fail with an exception.
</td>
</tr>
</table>

Apart from these, the following properties are also available, and may be useful in some situations:
Expand Down