Skip to content

Commit

Permalink
[SPARK-32160][CORE][PYSPARK] Add a config to switch allow/disallow to…
Browse files Browse the repository at this point in the history
… create SparkContext in executors

### What changes were proposed in this pull request?

This is a follow-up of #28986.
This PR adds a config to switch allow/disallow to create `SparkContext` in executors.

- `spark.driver.allowSparkContextInExecutors`

### Why are the changes needed?

Some users or libraries actually create `SparkContext` in executors.
We shouldn't break their workloads.

### Does this PR introduce _any_ user-facing change?

Yes, users will be able to create `SparkContext` in executors with the config enabled.

### How was this patch tested?

More tests are added.

Closes #29278 from ueshin/issues/SPARK-32160/add_configs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
ueshin authored and HyukjinKwon committed Jul 31, 2020
1 parent 813532d commit 8014b0b
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 9 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()

// In order to prevent SparkContext from being created in executors.
SparkContext.assertOnDriver()
if (!config.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) {
// In order to prevent SparkContext from being created in executors.
SparkContext.assertOnDriver()
}

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1908,4 +1908,11 @@ package object config {
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val ALLOW_SPARK_CONTEXT_IN_EXECUTORS =
ConfigBuilder("spark.driver.allowSparkContextInExecutors")
.doc("If set to true, SparkContext can be created in executors.")
.version("3.0.1")
.booleanConf
.createWithDefault(false)
}
9 changes: 9 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu

assert(error.contains("SparkContext should only be created and accessed on the driver."))
}

test("SPARK-32160: Allow to create SparkContext in executors if the config is set") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))

sc.range(0, 1).foreach { _ =>
new SparkContext(new SparkConf().setAppName("test").setMaster("local")
.set(ALLOW_SPARK_CONTEXT_IN_EXECUTORS, true)).stop()
}
}
}

object SparkContextSuite {
Expand Down
4 changes: 4 additions & 0 deletions docs/core-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ license: |
* Table of contents
{:toc}

## Upgrading from Core 3.0 to 3.1

- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. You can allow it by setting the configuration `spark.driver.allowSparkContextInExecutors` when creating `SparkContext` in executors.

## Upgrading from Core 2.4 to 3.0

- The `org.apache.spark.ExecutorPlugin` interface and related configuration has been replaced with
Expand Down
6 changes: 4 additions & 2 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
...
ValueError:...
"""
# In order to prevent SparkContext from being created in executors.
SparkContext._assert_on_driver()
if (conf is None or
conf.get("spark.driver.allowSparkContextInExecutors", "false").lower() != "true"):
# In order to prevent SparkContext from being created in executors.
SparkContext._assert_on_driver()

self._callsite = first_spark_call() or CallSite(None, None, None)
if gateway is not None and gateway.gateway_parameters.auth_token is None:
Expand Down
11 changes: 11 additions & 0 deletions python/pyspark/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,17 @@ def test_disallow_to_create_spark_context_in_executors(self):
self.assertIn("SparkContext should only be created and accessed on the driver.",
str(context.exception))

def test_allow_to_create_spark_context_in_executors(self):
# SPARK-32160: SparkContext can be created in executors if the config is set.

def create_spark_context():
conf = SparkConf().set("spark.driver.allowSparkContextInExecutors", "true")
with SparkContext(conf=conf):
pass

with SparkContext("local-cluster[3, 1, 1024]") as sc:
sc.range(2).foreach(lambda _: create_spark_context())


class ContextTestsWithResources(unittest.TestCase):

Expand Down
12 changes: 8 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalog.Catalog
Expand Down Expand Up @@ -900,7 +901,13 @@ object SparkSession extends Logging {
* @since 2.0.0
*/
def getOrCreate(): SparkSession = synchronized {
assertOnDriver()
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }

if (!sparkConf.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) {
assertOnDriver()
}

// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
Expand All @@ -919,9 +926,6 @@ object SparkSession extends Logging {

// No active nor global default session. Create a new one.
val sparkContext = userSuppliedContext.getOrElse {
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }

// set a random app name if not given.
if (!sparkConf.contains("spark.app.name")) {
sparkConf.setAppName(java.util.UUID.randomUUID().toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf._
Expand Down Expand Up @@ -257,4 +258,27 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
context.stop()
}
}

test("SPARK-32160: Disallow to create SparkSession in executors") {
val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate()

val error = intercept[SparkException] {
session.range(1).foreach { v =>
SparkSession.builder.master("local").getOrCreate()
()
}
}.getMessage()

assert(error.contains("SparkSession should only be created and accessed on the driver."))
}

test("SPARK-32160: Allow to create SparkSession in executors if the config is set") {
val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate()

session.range(1).foreach { v =>
SparkSession.builder.master("local")
.config(ALLOW_SPARK_CONTEXT_IN_EXECUTORS.key, true).getOrCreate().stop()
()
}
}
}

0 comments on commit 8014b0b

Please sign in to comment.