Skip to content

Commit

Permalink
Remove 'spark.driver.allowMultipleContexts' to discourage multiple cr…
Browse files Browse the repository at this point in the history
…eation of SparkContexts
  • Loading branch information
HyukjinKwon committed Dec 14, 2018
1 parent c107b4e commit ecc4144
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 68 deletions.
66 changes: 22 additions & 44 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -64,8 +64,8 @@ import org.apache.spark.util.logging.DriverLogger
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
* Only one SparkContext should be active per JVM. You must `stop()` the active SparkContext before
* creating a new one.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
Expand All @@ -75,18 +75,10 @@ class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()

// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
private val allowMultipleContexts: Boolean =
config.getBoolean("spark.driver.allowMultipleContexts", false)
if (allowMultipleContexts) {
logWarning("'spark.driver.allowMultipleContexts' is deprecated as of Spark 3.0.0, and " +
"creation of multiple SparkContexts will be disallowed afterward.")
}

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction.
// NOTE: this must be placed at the beginning of the SparkContext constructor.
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
SparkContext.markPartiallyConstructed(this)

val startTime = System.currentTimeMillis()

Expand Down Expand Up @@ -2396,7 +2388,7 @@ class SparkContext(config: SparkConf) extends Logging {
// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
SparkContext.setActiveContext(this, allowMultipleContexts)
SparkContext.setActiveContext(this)
}

/**
Expand All @@ -2413,43 +2405,35 @@ object SparkContext extends Logging {
private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()

/**
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`.
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`.
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
* Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`.
*/
private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)

/**
* Points to a partially-constructed SparkContext if some thread is in the SparkContext
* Points to a partially-constructed SparkContext if another thread is in the SparkContext
* constructor, or `None` if no SparkContext is being constructed.
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
* Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`.
*/
private var contextBeingConstructed: Option[SparkContext] = None

/**
* Called to ensure that no other SparkContext is running in this JVM.
*
* Throws an exception if a running context is detected and logs a warning if another thread is
* constructing a SparkContext. This warning is necessary because the current locking scheme
* constructing a SparkContext. This warning is necessary because the current locking scheme
* prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private def assertNoOtherContextIsRunning(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
private def assertNoOtherContextIsRunning(sc: SparkContext): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
Option(activeContext.get()).filter(_ ne sc).foreach { ctx =>
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
" To ignore this error, set spark.driver.allowMultipleContexts = true. " +
val errMsg = "Only one SparkContext should be running in this JVM (see SPARK-2243)." +
s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
val exception = new SparkException(errMsg)
if (allowMultipleContexts) {
logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
} else {
throw exception
}
throw new SparkException(errMsg)
}

contextBeingConstructed.filter(_ ne sc).foreach { otherContext =>
Expand All @@ -2458,7 +2442,7 @@ object SparkContext extends Logging {
val otherContextCreationSite =
Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location")
val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" +
" constructor). This may indicate an error, since only one SparkContext may be" +
" constructor). This may indicate an error, since only one SparkContext should be" +
" running in this JVM (see SPARK-2243)." +
s" The other SparkContext was created at:\n$otherContextCreationSite"
logWarning(warnMsg)
Expand All @@ -2471,8 +2455,6 @@ object SparkContext extends Logging {
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
* @note This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
* @param config `SparkConfig` that will be used for initialisation of the `SparkContext`
* @return current `SparkContext` (or a new one if it wasn't created before the function call)
*/
Expand All @@ -2481,7 +2463,7 @@ object SparkContext extends Logging {
// from assertNoOtherContextIsRunning within setActiveContext
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
setActiveContext(new SparkContext(config))
} else {
if (config.getAll.nonEmpty) {
logWarning("Using an existing SparkContext; some configuration may not take effect.")
Expand All @@ -2505,7 +2487,7 @@ object SparkContext extends Logging {
def getOrCreate(): SparkContext = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(), allowMultipleContexts = false)
setActiveContext(new SparkContext())
}
activeContext.get()
}
Expand All @@ -2520,16 +2502,14 @@ object SparkContext extends Logging {

/**
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
* running. Throws an exception if a running context is detected and logs a warning if another
* thread is constructing a SparkContext. This warning is necessary because the current locking
* running. Throws an exception if a running context is detected and logs a warning if another
* thread is constructing a SparkContext. This warning is necessary because the current locking
* scheme prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private[spark] def markPartiallyConstructed(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
private[spark] def markPartiallyConstructed(sc: SparkContext): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
assertNoOtherContextIsRunning(sc)
contextBeingConstructed = Some(sc)
}
}
Expand All @@ -2538,18 +2518,16 @@ object SparkContext extends Logging {
* Called at the end of the SparkContext constructor to ensure that no other SparkContext has
* raced with this constructor and started.
*/
private[spark] def setActiveContext(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
private[spark] def setActiveContext(sc: SparkContext): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
assertNoOtherContextIsRunning(sc)
contextBeingConstructed = None
activeContext.set(sc)
}
}

/**
* Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's
* Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's
* also called in unit tests to prevent a flood of warnings from test suites that don't / can't
* properly clean up their SparkContexts.
*/
Expand Down
Expand Up @@ -40,8 +40,8 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD}
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
* Only one SparkContext should be active per JVM. You must `stop()` the active SparkContext before
* creating a new one.
*/
class JavaSparkContext(val sc: SparkContext) extends Closeable {

Expand Down
19 changes: 1 addition & 18 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Expand Up @@ -44,7 +44,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("Only one SparkContext may be active at a time") {
// Regression test for SPARK-4180
val conf = new SparkConf().setAppName("test").setMaster("local")
.set("spark.driver.allowMultipleContexts", "false")
sc = new SparkContext(conf)
val envBefore = SparkEnv.get
// A SparkContext is already running, so we shouldn't be able to create a second one
Expand All @@ -58,7 +57,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}

test("Can still construct a new SparkContext after failing to construct a previous one") {
val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "false")
val conf = new SparkConf()
// This is an invalid configuration (no app name or master URL)
intercept[SparkException] {
new SparkContext(conf)
Expand All @@ -67,18 +66,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
sc = new SparkContext(conf.setMaster("local").setAppName("test"))
}

test("Check for multiple SparkContexts can be disabled via undocumented debug option") {
var secondSparkContext: SparkContext = null
try {
val conf = new SparkConf().setAppName("test").setMaster("local")
.set("spark.driver.allowMultipleContexts", "true")
sc = new SparkContext(conf)
secondSparkContext = new SparkContext(conf)
} finally {
Option(secondSparkContext).foreach(_.stop())
}
}

test("Test getOrCreate") {
var sc2: SparkContext = null
SparkContext.clearActiveContext()
Expand All @@ -92,10 +79,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
assert(sc === sc2)
assert(sc eq sc2)

// Try creating second context to confirm that it's still possible, if desired
sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local")
.set("spark.driver.allowMultipleContexts", "true"))

sc2.stop()
}

Expand Down
Expand Up @@ -25,8 +25,7 @@ import org.apache.spark.util.AccumulatorV2

class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
test("launch of backend and scheduler") {
val conf = new SparkConf().setMaster("myclusterManager").
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
val conf = new SparkConf().setMaster("myclusterManager").setAppName("testcm")
sc = new SparkContext(conf)
// check if the scheduler components are created and initialized
sc.schedulerBackend match {
Expand Down
2 changes: 1 addition & 1 deletion docs/rdd-programming-guide.md
Expand Up @@ -138,7 +138,7 @@ The first thing a Spark program must do is to create a [SparkContext](api/scala/
how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object
that contains information about your application.

Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before creating a new one.
Only one SparkContext should be active per JVM. You must `stop()` the active SparkContext before creating a new one.

{% highlight scala %}
val conf = new SparkConf().setAppName(appName).setMaster(master)
Expand Down
Expand Up @@ -263,7 +263,6 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false")
.set("spark.driver.allowMultipleContexts", "true")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
Expand Down

0 comments on commit ecc4144

Please sign in to comment.