Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32160][CORE][PYSPARK] Disallow to create SparkContext in executors. #28986

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()

// In order to prevent SparkContext from being created in executors.
SparkContext.assertOnDriver()

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction.
// NOTE: this must be placed at the beginning of the SparkContext constructor.
Expand Down Expand Up @@ -2554,6 +2557,19 @@ object SparkContext extends Logging {
}
}

/**
* Called to ensure that SparkContext is created or accessed only on the Driver.
*
* Throws an exception if a SparkContext is about to be created in executors.
*/
private def assertOnDriver(): Unit = {
if (TaskContext.get != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this under local mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under local mode:

scala> sc.range(0, 1).foreach { _ => new SparkContext(new SparkConf().setAppName("test").setMaster("local")) }
java.lang.IllegalStateException: SparkContext should only be created and accessed on the driver.
...

before this patch:

scala> sc.range(0, 1).foreach { _ => new SparkContext(new SparkConf().setAppName("test").setMaster("local")) }
org.apache.spark.SparkException: Only one SparkContext should be running in this JVM (see SPARK-2243).The currently running SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
...

Although the exception is different, it fails anyway.

I think the new error message is more reasonable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin could you submit a follow-up PR to add a conf? In Spark 3.0, turn it off by default; in Spark 3.1 turn it on by default? Also add it to the migration guide?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually affect any legitimate use case that would otherwise work? this should be more of a fail-fast for things that will already fail

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a user error. However, the users may use a Spark library in executors but the library calls SparkContext.getOrCreate. We should still let their workloads work if it worked in the previous releases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but would that ever succeed, using the SparkContext that was created? I'm trying to work out what library would do this and not fail. Is it, perhaps, some boilerplate initialization code that gets executed on driver and executor, and it makes a SparkContext that is never actually used on the executor, but now it fails fast?

I get it, if that's the use case. A release note and/or hidden config to disable it might be an OK workaround.

Alternatively if arguing that this is not-uncommon, maybe we just don't do this at all, and revert it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, had been discussing this offline with @ueshin and @HyukjinKwon - one use case that previously (and admittedly unfortunately) relied on the ability to create SparkContexts in the executors is MLflow's mlflow.pyfunc.spark_udf API (see docs & usage example), which provides a unified interface for scoring models trained in arbitrary ML frameworks (tensorflow, scikit-learn, pyspark.ml) on Spark DataFrames as a pandas UDF. Most ML frameworks (e.g. sklearn) are single-node frameworks that can operate on pandas dataframes, so applying them via pandas UDF works well. For a pyspark.ml model to be applied via pandas UDF, we need to convert the input pandas series -> spark dataframe on the executors, which requires a SparkContext on the executors.

I'll dig to see if there's a way to keep the MLflow use case working with this change (e.g. pyspark.ml models may ultimately perform inference via spark UDF, so maybe we can somehow extract the underlying UDF from the model and return that from mlflow.pyfunc.spark_udf?), but otherwise agree that we may want to revert this change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I submitted a PR to add configs #29278.
Could you take a look at it when you are available?

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit odds to have two separate configuration for Scala and Python. I don't think we will happen to allow this in PySpark specifically in the future even if it worked. I think we will discourage this behaviour anyway whether it is in Scala or Python, and deprecate/remove both configurations together eventually. In which case would we disable it in Scala side but enable it in Python side?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case would we disable it in Scala side but enable it in Python side?

Like how MLflow uses it today. It's pretty hard to make it work correctly in Scala side. But it's working in Python today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see but I guess we should discourage/deprecate/remove the behaviour in the end - am I correct? It's a bit weird to have two configuration to control the same behaviour (to end users). And I think it's a bit unlikely when it should be disabled in Scala but enabled in Python specifically. We could just enable it in both sides.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit to use a single config.

// we're accessing it during task execution, fail.
throw new IllegalStateException(
"SparkContext should only be created and accessed on the driver.")
}
}

/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}
}

test("SPARK-32160: Disallow to create SparkContext in executors") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))

val error = intercept[SparkException] {
sc.range(0, 1).foreach { _ =>
new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
}
}.getMessage()

assert(error.contains("SparkContext should only be created and accessed on the driver."))
}
}

object SparkContextSuite {
Expand Down
14 changes: 14 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from pyspark.storagelevel import StorageLevel
from pyspark.resource.information import ResourceInformation
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.taskcontext import TaskContext
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
from pyspark.profiler import ProfilerCollector, BasicProfiler
Expand Down Expand Up @@ -118,6 +119,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
...
ValueError:...
"""
# In order to prevent SparkContext from being created in executors.
SparkContext._assert_on_driver()

self._callsite = first_spark_call() or CallSite(None, None, None)
if gateway is not None and gateway.gateway_parameters.auth_token is None:
raise ValueError(
Expand Down Expand Up @@ -1145,6 +1149,16 @@ def resources(self):
resources[name] = ResourceInformation(name, addrs)
return resources

@staticmethod
def _assert_on_driver():
"""
Called to ensure that SparkContext is created only on the Driver.

Throws an exception if a SparkContext is about to be created in executors.
"""
if TaskContext.get() is not None:
raise Exception("SparkContext should only be created and accessed on the driver.")


def _test():
import atexit
Expand Down
8 changes: 8 additions & 0 deletions python/pyspark/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,14 @@ def test_resources(self):
resources = sc.resources
self.assertEqual(len(resources), 0)

def test_disallow_to_create_spark_context_in_executors(self):
# SPARK-32160: SparkContext should not be created in executors.
with SparkContext("local-cluster[3, 1, 1024]") as sc:
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
with self.assertRaises(Exception) as context:
sc.range(2).foreach(lambda _: SparkContext())
self.assertIn("SparkContext should only be created and accessed on the driver.",
str(context.exception))


class ContextTestsWithResources(unittest.TestCase):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ object SparkSession extends Logging {
}

private def assertOnDriver(): Unit = {
if (Utils.isTesting && TaskContext.get != null) {
if (TaskContext.get != null) {
// we're accessing it during task execution, fail.
throw new IllegalStateException(
"SparkSession should only be created and accessed on the driver.")
Expand Down