Skip to content

Commit

Permalink
[SPARK-32160][CORE][PYSPARK] Disallow to create SparkContext in execu…
Browse files Browse the repository at this point in the history
…tors

### What changes were proposed in this pull request?

This PR proposes to disallow to create `SparkContext` in executors, e.g., in UDFs.

### Why are the changes needed?

Currently executors can create SparkContext, but shouldn't be able to create it.

```scala
sc.range(0, 1).foreach { _ =>
  new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
}
```

### Does this PR introduce _any_ user-facing change?

Yes, users won't be able to create `SparkContext` in executors.

### How was this patch tested?

Addes tests.

Closes #28986 from ueshin/issues/SPARK-32160/disallow_spark_context_in_executors.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit cfecc20)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
ueshin authored and HyukjinKwon committed Jul 9, 2020
1 parent f9d53a6 commit 86efa45
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 20 deletions.
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()

// In order to prevent SparkContext from being created in executors.
SparkContext.assertOnDriver()

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction.
// NOTE: this must be placed at the beginning of the SparkContext constructor.
Expand Down Expand Up @@ -2539,6 +2542,19 @@ object SparkContext extends Logging {
}
}

/**
* Called to ensure that SparkContext is created or accessed only on the Driver.
*
* Throws an exception if a SparkContext is about to be created in executors.
*/
private def assertOnDriver(): Unit = {
if (TaskContext.get != null) {
// we're accessing it during task execution, fail.
throw new IllegalStateException(
"SparkContext should only be created and accessed on the driver.")
}
}

/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}
}

test("SPARK-32160: Disallow to create SparkContext in executors") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))

val error = intercept[SparkException] {
sc.range(0, 1).foreach { _ =>
new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
}
}.getMessage()

assert(error.contains("SparkContext should only be created and accessed on the driver."))
}
}

object SparkContextSuite {
Expand Down
14 changes: 14 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from pyspark.storagelevel import StorageLevel
from pyspark.resource import ResourceInformation
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.taskcontext import TaskContext
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
from pyspark.profiler import ProfilerCollector, BasicProfiler
Expand Down Expand Up @@ -118,6 +119,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
...
ValueError:...
"""
# In order to prevent SparkContext from being created in executors.
SparkContext._assert_on_driver()

self._callsite = first_spark_call() or CallSite(None, None, None)
if gateway is not None and gateway.gateway_parameters.auth_token is None:
raise ValueError(
Expand Down Expand Up @@ -1145,6 +1149,16 @@ def resources(self):
resources[name] = ResourceInformation(name, addrs)
return resources

@staticmethod
def _assert_on_driver():
"""
Called to ensure that SparkContext is created only on the Driver.
Throws an exception if a SparkContext is about to be created in executors.
"""
if TaskContext.get() is not None:
raise Exception("SparkContext should only be created and accessed on the driver.")


def _test():
import atexit
Expand Down
8 changes: 8 additions & 0 deletions python/pyspark/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,14 @@ def test_resources(self):
resources = sc.resources
self.assertEqual(len(resources), 0)

def test_disallow_to_create_spark_context_in_executors(self):
# SPARK-32160: SparkContext should not be created in executors.
with SparkContext("local-cluster[3, 1, 1024]") as sc:
with self.assertRaises(Exception) as context:
sc.range(2).foreach(lambda _: SparkContext())
self.assertIn("SparkContext should only be created and accessed on the driver.",
str(context.exception))


class ContextTestsWithResources(unittest.TestCase):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ object SparkSession extends Logging {
}

private def assertOnDriver(): Unit = {
if (Utils.isTesting && TaskContext.get != null) {
if (TaskContext.get != null) {
// we're accessing it during task execution, fail.
throw new IllegalStateException(
"SparkSession should only be created and accessed on the driver.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,29 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow

class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSparkContext {
private val random = new java.util.Random()
private var taskContext: TaskContext = _

override def afterAll(): Unit = try {
TaskContext.unset()
} finally {
super.afterAll()
}

private def withExternalArray(inMemoryThreshold: Int, spillThreshold: Int)
(f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = {
sc = new SparkContext("local", "test", new SparkConf(false))

taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
TaskContext.setTaskContext(taskContext)

val array = new ExternalAppendOnlyUnsafeRowArray(
taskContext.taskMemoryManager(),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
taskContext,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
inMemoryThreshold,
spillThreshold)
try f(array) finally {
array.clear()
try {
val array = new ExternalAppendOnlyUnsafeRowArray(
taskContext.taskMemoryManager(),
SparkEnv.get.blockManager,
SparkEnv.get.serializerManager,
taskContext,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
inMemoryThreshold,
spillThreshold)
try f(array) finally {
array.clear()
}
} finally {
TaskContext.unset()
}
}

Expand Down

0 comments on commit 86efa45

Please sign in to comment.