Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4180] [Core] Prevent creation of multiple active SparkContexts #3121

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 81 additions & 22 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation will eventually be removed; see SPARK-2243 for more details.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say "may eventually be removed".

*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
Expand Down Expand Up @@ -179,6 +182,30 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")

// This is placed after the configuration validation so that common configuration errors, like
// forgetting to pass a master url or app name, don't prevent subsequent SparkContexts from being
// constructed.
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
SparkContext.activeSparkContextCreationSite.foreach { creationSite =>
val errMsg = "Only one SparkContext may be active in this JVM (see SPARK-2243)."
val errDetails = if (SparkContext.activeSparkContextIsFullyConstructed) {
s"The currently active SparkContext was created at:\n${creationSite.longForm}"
} else {
s"Another SparkContext is either being constructed or threw an exception from its" +
" constructor; please restart your JVM in order to create a new SparkContext." +
s"The current SparkContext was created at:\n${creationSite.longForm}"
}
val exception = new SparkException(s"$errMsg $errDetails")
if (conf.getBoolean("spark.driver.disableMultipleSparkContextsErrorChecking", false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I prefer something more concise spark.driver.disallowMultipleContexts or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I picked a kinda verbose name because I wanted to reserve the better names in case we added a public configuration that does something similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll switch it to the nicer one, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think I'll call it allowMultipleContexts to avoid double-negation.

logWarning("Multiple SparkContext error detection is disabled!", exception)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More like "Multiple SparkContexts detected in the same JVM!". We're not trying to warn people that they disabled the config.

} else {
throw exception
}
}
SparkContext.activeSparkContextCreationSite = Some(Utils.getCallSite())
SparkContext.activeSparkContextIsFullyConstructed = false
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put this chunk of code in SparkContext.verifyUniqueConstruction or something?

val jars: Seq[String] =
conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

Expand Down Expand Up @@ -1071,27 +1098,31 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {

/** Shut down the SparkContext. */
def stop() {
postApplicationEnd()
ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
dagScheduler = null
if (dagSchedulerCopy != null) {
env.metricsSystem.report()
metadataCleaner.cancel()
env.actorSystem.stop(heartbeatReceiver)
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
SparkContext.activeSparkContextCreationSite = None
SparkContext.activeSparkContextIsFullyConstructed = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set these at the end of sc.stop()? What if dagScheduler.stop() throws an exception?

postApplicationEnd()
ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
dagScheduler = null
if (dagSchedulerCopy != null) {
env.metricsSystem.report()
metadataCleaner.cancel()
env.actorSystem.stop(heartbeatReceiver)
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
}
}
}

Expand Down Expand Up @@ -1157,7 +1188,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
if (dagScheduler == null) {
throw new SparkException("SparkContext has been shutdown")
}
val callSite = getCallSite
val callSite = Utils.getCallSite()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh, good catch! This was a mistake.

val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
Expand Down Expand Up @@ -1380,6 +1411,10 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
private[spark] def cleanup(cleanupTime: Long) {
persistentRdds.clearOldValues(cleanupTime)
}

SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
SparkContext.activeSparkContextIsFullyConstructed = true
}
}

/**
Expand All @@ -1388,6 +1423,30 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
*/
object SparkContext extends Logging {

/**
* Lock that prevents multiple threads from being in the SparkContext constructor at the same
* time.
*/
private[spark] val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()

/**
* Records the creation site of the last SparkContext to successfully enter the constructor.
* This may be an active SparkContext, or a SparkContext that is currently under construction.
*
* Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private[spark] var activeSparkContextCreationSite: Option[CallSite] = None

/**
* Tracks whether `activeSparkContextCreationSite` refers to a fully-constructed SparkContext
* or a partially-constructed one that is either still executing its constructor or threw
* an exception from its constructor. This is used to enable better error-reporting when
* SparkContext construction fails due to existing contexts.
*
* Access to this field should be guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private[spark] var activeSparkContextIsFullyConstructed: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personal preference, but the name is a little long. I'd prefer activeContextIsFullyConstructed, leaving out "Spark"


private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation will eventually be removed; see SPARK-2243 for more details.
*/
class JavaSparkContext(val sc: SparkContext)
extends JavaSparkContextVarargsWorkaround with Closeable {
Expand Down
48 changes: 46 additions & 2 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,53 @@ import org.scalatest.FunSuite

import org.apache.hadoop.io.BytesWritable

class SparkContextSuite extends FunSuite {
//Regression test for SPARK-3121
class SparkContextSuite extends FunSuite with LocalSparkContext {

test("Only one SparkContext may be active at a time") {
// Regression test for SPARK-4180
val conf = new SparkConf().setAppName("test").setMaster("local")
sc = new SparkContext(conf)
// A SparkContext is already running, so we shouldn't be able to create a second one
intercept[SparkException] { new SparkContext(conf) }
// After stopping the running context, we should be able to create a new one
resetSparkContext()
sc = new SparkContext(conf)
}

test("Can still construct a new SparkContext after failing due to missing app name or master") {
val missingMaster = new SparkConf()
val missingAppName = missingMaster.clone.setMaster("local")
val validConf = missingAppName.clone.setAppName("test")
// We shouldn't be able to construct SparkContexts because these are invalid configurations
intercept[SparkException] { new SparkContext(missingMaster) }
intercept[SparkException] { new SparkContext(missingAppName) }
// Even though those earlier calls failed, we should still be able to create a new SparkContext
sc = new SparkContext(validConf)
}

test("Check for multiple SparkContexts can be disabled via undocumented debug option") {
val propertyName = "spark.driver.disableMultipleSparkContextsErrorChecking"
val originalPropertyValue = System.getProperty(propertyName)
var secondSparkContext: SparkContext = null
try {
System.setProperty(propertyName, "true")
val conf = new SparkConf().setAppName("test").setMaster("local")
sc = new SparkContext(conf)
secondSparkContext = new SparkContext(conf)
} finally {
if (secondSparkContext != null) {
secondSparkContext.stop()
}
if (originalPropertyValue != null) {
System.setProperty(propertyName, originalPropertyValue)
} else {
System.clearProperty(propertyName)
}
}
}

test("BytesWritable implicit conversion is correct") {
// Regression test for SPARK-3121
val bytesWritable = new BytesWritable()
val inputArray = (1 to 10).map(_.toByte).toArray
bytesWritable.set(inputArray, 0, 10)
Expand Down
2 changes: 2 additions & 0 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ The first thing a Spark program must do is to create a [SparkContext](api/scala/
how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object
that contains information about your application.

Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before creating a new one.

{% highlight scala %}
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,20 +487,20 @@ class StreamingContext private[streaming] (
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
// Warn (but not fail) if context is stopped twice,
// or context is stopped before starting
if (state == Initialized) {
logWarning("StreamingContext has not been started yet")
return
}
if (state == Stopped) {
logWarning("StreamingContext has already been stopped")
return
} // no need to throw an exception as its okay to stop twice
scheduler.stop(stopGracefully)
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
if (stopSparkContext) sc.stop()
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like thtis change but I owuld like this as part of a separate JIRA + PR tied to streaming so that its easier to track. Could issue a new PR (with unit test), and I can merge that quickly and unblock this.

// Even if the streaming context has not been started, we still need to stop the SparkContext:
if (stopSparkContext) sc.stop()
if (state == Initialized) {
logWarning("StreamingContext has not been started yet")
} else {
scheduler.stop(stopGracefully)
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
}
}
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
state = Stopped
}
}
Expand Down
Loading