From 4435db7b5344c5efd00fcd46aa94bc328a4b031c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 14 Sep 2015 14:17:40 -0700 Subject: [PATCH] Clone properties only for SQL for backward compatibility --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++++++-- core/src/test/scala/org/apache/spark/ThreadingSuite.scala | 1 + .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 6 ++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3748f3c526eca..8a12f7e38d2bb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -351,8 +351,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli protected[spark] val localProperties = new InheritableThreadLocal[Properties] { override protected def childValue(parent: Properties): Properties = { // Note: make a clone such that changes in the parent properties aren't reflected in - // the those of the children threads, which has confusing semantics (SPARK-10564). - SerializationUtils.clone(parent).asInstanceOf[Properties] + // the those of the children threads, which has confusing semantics (SPARK-10563). + if (conf.get("spark.localProperties.clone", "false").toBoolean) { + SerializationUtils.clone(parent).asInstanceOf[Properties] + } else { + new Properties(parent) + } } override protected def initialValue(): Properties = new Properties() } diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index 93cba322d8521..9dfadc35f1352 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -214,6 +214,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { test("mutation in parent local property does not affect child (SPARK-10563)") { sc = new SparkContext("local", "test") + sc.conf.set("spark.localProperties.clone", "true") val originalTestValue: String = "original-value" var threadTestValue: String = null sc.setLocalProperty("test", originalTestValue) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a1eea09e0477b..57a487a426b00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -78,6 +78,12 @@ class SQLContext(@transient val sparkContext: SparkContext) sparkContext.addSparkListener(listener) sparkContext.ui.foreach(new SQLTab(this, _)) + // Execution IDs go through SparkContext's local properties, which are not safe to use with + // fork join pools by default. In particular, even after a child thread is spawned, if the + // parent sets a property the value may be reflected in the child. This leads to undefined + // consequences such as SPARK-10548, so we should just clone the properties instead to be safe. + sparkContext.conf.set("spark.localProperties.clone", "true") + /** * Set Spark SQL configuration properties. *