From 8ceae42fbb5c08a1e5d801b1bc4beceab9f03142 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 10 Sep 2015 16:37:55 -0700 Subject: [PATCH 1/6] Exclude certain local properties from being inherited such as, cough cough, the SQL execution ID. This was a problem because scala's parallel collections spawns threads as children of the existing threads, causing the execution ID to be inherited when it shouldn't be. --- .../main/scala/org/apache/spark/SparkContext.scala | 13 +++++++++++-- .../scala/org/apache/spark/sql/SQLContext.scala | 5 +++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cbfe8bf31c3d6..b059d8fff8a44 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -29,7 +29,7 @@ import java.util.UUID.randomUUID import scala.collection.JavaConverters._ import scala.collection.{Map, Set} import scala.collection.generic.Growable -import scala.collection.mutable.HashMap +import scala.collection.mutable.{HashMap, HashSet} import scala.reflect.{ClassTag, classTag} import scala.util.control.NonFatal @@ -348,10 +348,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Thread Local variable that can be used by users to pass information down the stack private val localProperties = new InheritableThreadLocal[Properties] { - override protected def childValue(parent: Properties): Properties = new Properties(parent) + override protected def childValue(parent: Properties): Properties = { + val p = new Properties(parent) + nonInheritedLocalProperties.foreach(p.remove) + p + } override protected def initialValue(): Properties = new Properties() } + /** + * Keys of local properties that should not be inherited by children threads. + */ + private[spark] val nonInheritedLocalProperties: HashSet[String] = new HashSet[String] + /* ------------------------------------------------------------------------------------- * | Initialization. This code initializes the context in a manner that is exception-safe. | | All internal fields holding state are initialized here, and any error prompts the | diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 4e8414af50b44..f13f3c88ac6b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -78,6 +78,11 @@ class SQLContext(@transient val sparkContext: SparkContext) sparkContext.addSparkListener(listener) sparkContext.ui.foreach(new SQLTab(this, _)) + // Ensure query execution IDs are not inherited across the thread hierarchy, which is + // the default behavior for SparkContext local properties. Otherwise, we may confuse + // the listener as to which query is being executed. (SPARK-10548) + sparkContext.nonInheritedLocalProperties.add(SQLExecution.EXECUTION_ID_KEY) + /** * Set Spark SQL configuration properties. * From 3ec715c4e5af5fa8d5e58c7aa93d01cd09970ae7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 10 Sep 2015 17:45:30 -0700 Subject: [PATCH 2/6] Fix remove from Properties + add tests Because java.util.Properties' remove method takes in an Any instead of a String, there were some issues with matching the key's hashCode, so removing was not successful in unit tests. Instead, this commit fixes it by manually filtering out the keys and adding them to the child thread's properties. --- .../scala/org/apache/spark/SparkContext.scala | 15 ++++- .../org/apache/spark/ThreadingSuite.scala | 27 +++++++++ .../sql/execution/SQLExecutionSuite.scala | 55 +++++++++++++++++++ 3 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b059d8fff8a44..bc9f7b8f09c91 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicIntege import java.util.UUID.randomUUID import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ import scala.collection.{Map, Set} import scala.collection.generic.Growable import scala.collection.mutable.{HashMap, HashSet} @@ -349,9 +350,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Thread Local variable that can be used by users to pass information down the stack private val localProperties = new InheritableThreadLocal[Properties] { override protected def childValue(parent: Properties): Properties = { - val p = new Properties(parent) - nonInheritedLocalProperties.foreach(p.remove) - p + if (nonInheritedLocalProperties.nonEmpty) { + // If there are properties that should not be inherited, filter them out + val p = new Properties + val filtered = parent.filter { case (k, _) => + !nonInheritedLocalProperties.contains(k) + } + p.putAll(filtered) + p + } else { + new Properties(parent) + } } override protected def initialValue(): Properties = new Properties() } diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index 48509f0759a3b..e004be213c190 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -154,6 +154,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { val threads = (1 to 5).map { i => new Thread() { override def run() { + // TODO: these assertion failures don't actually fail the test... sc.setLocalProperty("test", i.toString) assert(sc.getLocalProperty("test") === i.toString) sem.release() @@ -175,6 +176,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { val threads = (1 to 5).map { i => new Thread() { override def run() { + // TODO: these assertion failures don't actually fail the test... assert(sc.getLocalProperty("test") === "parent") sc.setLocalProperty("test", i.toString) assert(sc.getLocalProperty("test") === i.toString) @@ -190,6 +192,30 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { assert(sc.getLocalProperty("Foo") === null) } + test("inheritance exclusions (SPARK-10548)") { + sc = new SparkContext("local", "test") + sc.nonInheritedLocalProperties.add("do-not-inherit-me") + sc.setLocalProperty("do-inherit-me", "parent") + sc.setLocalProperty("do-not-inherit-me", "parent") + var throwable: Option[Throwable] = None + val threads = (1 to 5).map { i => + new Thread() { + override def run() { + // only the ones we intend to inherit will be passed to the children + try { + assert(sc.getLocalProperty("do-inherit-me") === "parent") + assert(sc.getLocalProperty("do-not-inherit-me") === null) + } catch { + case t: Throwable => throwable = Some(t) + } + } + } + } + threads.foreach(_.start()) + threads.foreach(_.join()) + throwable.foreach { t => throw t } + } + test("mutations to local properties should not affect submitted jobs (SPARK-6629)") { val jobStarted = new Semaphore(0) val jobEnded = new Semaphore(0) @@ -210,6 +236,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { // Create a new thread which will inherit the current thread's properties val thread = new Thread() { override def run(): Unit = { + // TODO: these assertion failures don't actually fail the test... assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId") // Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala new file mode 100644 index 0000000000000..8df1e22ead3f9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class SQLExecutionSuite extends SharedSQLContext { + import testImplicits._ + + test("query execution IDs are not inherited across threads") { + sparkContext.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, "123") + sparkContext.setLocalProperty("do-inherit-me", "some-value") + var throwable: Option[Throwable] = None + val thread = new Thread { + override def run(): Unit = { + try { + assert(sparkContext.getLocalProperty("do-inherit-me") === "some-value") + assert(sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) === null) + } catch { + case t: Throwable => + throwable = Some(t) + } + } + } + thread.start() + thread.join() + throwable.foreach { t => throw t } + } + + // This is the end-to-end version of the previous test. + test("parallel query execution (SPARK-10548)") { + (1 to 5).foreach { i => + // Scala's parallel collections spawns new threads as children of the existing threads. + // We need to run this multiple times to ensure new threads are spawned. Without the fix + // for SPARK-10548, this usually fails on the second try. + val df = sparkContext.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b") + (1 to 10).par.foreach { _ => df.count() } + } + } +} From d48c11400b11e58d6ee3a4f5d9c330dd1736f25c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 10 Sep 2015 18:19:28 -0700 Subject: [PATCH 3/6] Fix style --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bc9f7b8f09c91..58bb97a391293 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicIntege import java.util.UUID.randomUUID import scala.collection.JavaConverters._ -import scala.collection.JavaConversions._ import scala.collection.{Map, Set} import scala.collection.generic.Growable import scala.collection.mutable.{HashMap, HashSet} @@ -353,10 +352,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli if (nonInheritedLocalProperties.nonEmpty) { // If there are properties that should not be inherited, filter them out val p = new Properties - val filtered = parent.filter { case (k, _) => + val filtered = parent.asScala.filter { case (k, _) => !nonInheritedLocalProperties.contains(k) } - p.putAll(filtered) + p.putAll(filtered.asJava) p } else { new Properties(parent) From 5297f795e4e0ad5cf3bb6ff520312c2b82684f86 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 11 Sep 2015 10:55:09 -0700 Subject: [PATCH 4/6] Always clone parent properties ... to make the behavior more consistent in SQL vs non-SQL cases. --- .../scala/org/apache/spark/SparkContext.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 58bb97a391293..9533df189f8f8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -349,17 +349,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Thread Local variable that can be used by users to pass information down the stack private val localProperties = new InheritableThreadLocal[Properties] { override protected def childValue(parent: Properties): Properties = { - if (nonInheritedLocalProperties.nonEmpty) { - // If there are properties that should not be inherited, filter them out - val p = new Properties - val filtered = parent.asScala.filter { case (k, _) => - !nonInheritedLocalProperties.contains(k) - } - p.putAll(filtered.asJava) - p - } else { - new Properties(parent) + // Note: make a clone such that changes in the parent properties aren't reflected in + // the those of the children threads, which has confusing semantics (SPARK-10564). + val p = new Properties + val filtered = parent.asScala.filter { case (k, _) => + !nonInheritedLocalProperties.contains(k) } + p.putAll(filtered.asJava) + p } override protected def initialValue(): Properties = new Properties() } From 35bb6f08befdab26edf5535f27f0b6a405f142f2 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 13 Sep 2015 00:01:12 -0700 Subject: [PATCH 5/6] Limit scope of mutable set --- core/src/main/scala/org/apache/spark/SparkContext.scala | 9 +++++++-- .../src/test/scala/org/apache/spark/ThreadingSuite.scala | 2 +- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0b155a77d8d42..f4973d5f1bc09 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -361,10 +361,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli override protected def initialValue(): Properties = new Properties() } + // Keys of local properties that should not be inherited by children threads + private val nonInheritedLocalProperties: HashSet[String] = new HashSet[String] + /** - * Keys of local properties that should not be inherited by children threads. + * Mark a local property such that its values are never inherited across the thread hierarchy. */ - private[spark] val nonInheritedLocalProperties: HashSet[String] = new HashSet[String] + private[spark] def markLocalPropertyNonInherited(key: String): Unit = { + nonInheritedLocalProperties += key + } /* ------------------------------------------------------------------------------------- * | Initialization. This code initializes the context in a manner that is exception-safe. | diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index 17b1e30dd8212..5c1ca00e7cf3e 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -214,7 +214,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { test("inheritance exclusions (SPARK-10548)") { sc = new SparkContext("local", "test") - sc.nonInheritedLocalProperties.add("do-not-inherit-me") + sc.markLocalPropertyNonInherited("do-not-inherit-me") sc.setLocalProperty("do-inherit-me", "parent") sc.setLocalProperty("do-not-inherit-me", "parent") var throwable: Option[Throwable] = None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f13f3c88ac6b4..5d0236653040e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -81,7 +81,7 @@ class SQLContext(@transient val sparkContext: SparkContext) // Ensure query execution IDs are not inherited across the thread hierarchy, which is // the default behavior for SparkContext local properties. Otherwise, we may confuse // the listener as to which query is being executed. (SPARK-10548) - sparkContext.nonInheritedLocalProperties.add(SQLExecution.EXECUTION_ID_KEY) + sparkContext.markLocalPropertyNonInherited(SQLExecution.EXECUTION_ID_KEY) /** * Set Spark SQL configuration properties. From 984a92f84237e95aa96680773c771841cdfd16cf Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 14 Sep 2015 13:49:38 -0700 Subject: [PATCH 6/6] Simplify fix by cloning properties on inherit The fix for SPARK-10548 can be simplified by just cloning the parent properties on inherit rather than excluding specific properties from ever being inherited. This is safe because the child thread must be created BEFORE the parent thread runs a query. --- .../scala/org/apache/spark/SparkContext.scala | 22 +---- .../org/apache/spark/ThreadingSuite.scala | 84 ++++++------------ .../org/apache/spark/sql/SQLContext.scala | 5 -- .../sql/execution/SQLExecutionSuite.scala | 88 ++++++++++++++----- 4 files changed, 96 insertions(+), 103 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f4973d5f1bc09..ecb9b85175ae4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -29,10 +29,11 @@ import java.util.UUID.randomUUID import scala.collection.JavaConverters._ import scala.collection.{Map, Set} import scala.collection.generic.Growable -import scala.collection.mutable.{HashMap, HashSet} +import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} import scala.util.control.NonFatal +import org.apache.commons.lang.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, @@ -347,30 +348,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] var checkpointDir: Option[String] = None // Thread Local variable that can be used by users to pass information down the stack - private val localProperties = new InheritableThreadLocal[Properties] { + protected[spark] val localProperties = new InheritableThreadLocal[Properties] { override protected def childValue(parent: Properties): Properties = { // Note: make a clone such that changes in the parent properties aren't reflected in // the those of the children threads, which has confusing semantics (SPARK-10564). - val p = new Properties - val filtered = parent.asScala.filter { case (k, _) => - !nonInheritedLocalProperties.contains(k) - } - p.putAll(filtered.asJava) - p + SerializationUtils.clone(parent).asInstanceOf[Properties] } override protected def initialValue(): Properties = new Properties() } - // Keys of local properties that should not be inherited by children threads - private val nonInheritedLocalProperties: HashSet[String] = new HashSet[String] - - /** - * Mark a local property such that its values are never inherited across the thread hierarchy. - */ - private[spark] def markLocalPropertyNonInherited(key: String): Unit = { - nonInheritedLocalProperties += key - } - /* ------------------------------------------------------------------------------------- * | Initialization. This code initializes the context in a manner that is exception-safe. | | All internal fields holding state are initialized here, and any error prompts the | diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index 5c1ca00e7cf3e..93cba322d8521 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -152,7 +152,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { ThreadingSuiteState.runningThreads.get() + "); failing test") fail("One or more threads didn't see runningThreads = 4") } - throwable.foreach { t => throw t } + throwable.foreach { t => throw improveStackTrace(t) } } test("set local properties in different thread") { @@ -179,7 +179,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { sem.acquire(5) assert(sc.getLocalProperty("test") === null) - throwable.foreach { t => throw t } + throwable.foreach { t => throw improveStackTrace(t) } } test("set and get local properties in parent-children thread") { @@ -209,73 +209,39 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { sem.acquire(5) assert(sc.getLocalProperty("test") === "parent") assert(sc.getLocalProperty("Foo") === null) - throwable.foreach { t => throw t } + throwable.foreach { t => throw improveStackTrace(t) } } - test("inheritance exclusions (SPARK-10548)") { + test("mutation in parent local property does not affect child (SPARK-10563)") { sc = new SparkContext("local", "test") - sc.markLocalPropertyNonInherited("do-not-inherit-me") - sc.setLocalProperty("do-inherit-me", "parent") - sc.setLocalProperty("do-not-inherit-me", "parent") + val originalTestValue: String = "original-value" + var threadTestValue: String = null + sc.setLocalProperty("test", originalTestValue) var throwable: Option[Throwable] = None - val threads = (1 to 5).map { i => - new Thread() { - override def run() { - // only the ones we intend to inherit will be passed to the children - try { - assert(sc.getLocalProperty("do-inherit-me") === "parent") - assert(sc.getLocalProperty("do-not-inherit-me") === null) - } catch { - case t: Throwable => throwable = Some(t) - } - } - } - } - threads.foreach(_.start()) - threads.foreach(_.join()) - throwable.foreach { t => throw t } - } - - test("mutations to local properties should not affect submitted jobs (SPARK-6629)") { - val jobStarted = new Semaphore(0) - val jobEnded = new Semaphore(0) - @volatile var jobResult: JobResult = null - - sc = new SparkContext("local", "test") - sc.setJobGroup("originalJobGroupId", "description") - sc.addSparkListener(new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - jobStarted.release() - } - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { - jobResult = jobEnd.jobResult - jobEnded.release() - } - }) - - // Create a new thread which will inherit the current thread's properties - val thread = new Thread() { + val thread = new Thread { override def run(): Unit = { - assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId") - // Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task try { - sc.parallelize(1 to 100).foreach { x => - Thread.sleep(100) - } + threadTestValue = sc.getLocalProperty("test") } catch { - case s: SparkException => // ignored so that we don't print noise in test logs + case t: Throwable => + throwable = Some(t) } } } + sc.setLocalProperty("test", "this-should-not-be-inherited") thread.start() - // Wait for the job to start, then mutate the original properties, which should have been - // inherited by the running job but hopefully defensively copied or snapshotted: - jobStarted.tryAcquire(10, TimeUnit.SECONDS) - sc.setJobGroup("modifiedJobGroupId", "description") - // Canceling the original job group should cancel the running job. In other words, the - // modification of the properties object should not affect the properties of running jobs - sc.cancelJobGroup("originalJobGroupId") - jobEnded.tryAcquire(10, TimeUnit.SECONDS) - assert(jobResult.isInstanceOf[JobFailed]) + thread.join() + throwable.foreach { t => throw improveStackTrace(t) } + assert(threadTestValue === originalTestValue) } + + /** + * Improve the stack trace of an error thrown from within a thread. + * Otherwise it's difficult to tell which line in the test the error came from. + */ + private def improveStackTrace(t: Throwable): Throwable = { + t.setStackTrace(t.getStackTrace ++ Thread.currentThread.getStackTrace) + t + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5d0236653040e..4e8414af50b44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -78,11 +78,6 @@ class SQLContext(@transient val sparkContext: SparkContext) sparkContext.addSparkListener(listener) sparkContext.ui.foreach(new SQLTab(this, _)) - // Ensure query execution IDs are not inherited across the thread hierarchy, which is - // the default behavior for SparkContext local properties. Otherwise, we may confuse - // the listener as to which query is being executed. (SPARK-10548) - sparkContext.markLocalPropertyNonInherited(SQLExecution.EXECUTION_ID_KEY) - /** * Set Spark SQL configuration properties. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala index 8df1e22ead3f9..63639681ef80a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala @@ -17,39 +17,85 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.test.SharedSQLContext +import java.util.Properties -class SQLExecutionSuite extends SharedSQLContext { - import testImplicits._ +import scala.collection.parallel.CompositeThrowable - test("query execution IDs are not inherited across threads") { - sparkContext.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, "123") - sparkContext.setLocalProperty("do-inherit-me", "some-value") +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.sql.SQLContext + +class SQLExecutionSuite extends SparkFunSuite { + + test("concurrent query execution (SPARK-10548)") { + // Try to reproduce the issue with the old SparkContext + val conf = new SparkConf() + .setMaster("local[*]") + .setAppName("test") + val badSparkContext = new BadSparkContext(conf) + try { + testConcurrentQueryExecution(badSparkContext) + fail("unable to reproduce SPARK-10548") + } catch { + case e: IllegalArgumentException => + assert(e.getMessage.contains(SQLExecution.EXECUTION_ID_KEY)) + } finally { + badSparkContext.stop() + } + + // Verify that the issue is fixed with the latest SparkContext + val goodSparkContext = new SparkContext(conf) + try { + testConcurrentQueryExecution(goodSparkContext) + } finally { + goodSparkContext.stop() + } + } + + /** + * Trigger SPARK-10548 by mocking a parent and its child thread executing queries concurrently. + */ + private def testConcurrentQueryExecution(sc: SparkContext): Unit = { + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + // Initialize local properties. This is necessary for the test to pass. + sc.getLocalProperties + + // Set up a thread that runs executes a simple SQL query. + // Before starting the thread, mutate the execution ID in the parent. + // The child thread should not see the effect of this change. var throwable: Option[Throwable] = None - val thread = new Thread { + val child = new Thread { override def run(): Unit = { try { - assert(sparkContext.getLocalProperty("do-inherit-me") === "some-value") - assert(sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) === null) + sc.parallelize(1 to 100).map { i => (i, i) }.toDF("a", "b").collect() } catch { case t: Throwable => throwable = Some(t) } + } } - thread.start() - thread.join() - throwable.foreach { t => throw t } - } + sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, "anything") + child.start() + child.join() - // This is the end-to-end version of the previous test. - test("parallel query execution (SPARK-10548)") { - (1 to 5).foreach { i => - // Scala's parallel collections spawns new threads as children of the existing threads. - // We need to run this multiple times to ensure new threads are spawned. Without the fix - // for SPARK-10548, this usually fails on the second try. - val df = sparkContext.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b") - (1 to 10).par.foreach { _ => df.count() } + // The throwable is thrown from the child thread so it doesn't have a helpful stack trace + throwable.foreach { t => + t.setStackTrace(t.getStackTrace ++ Thread.currentThread.getStackTrace) + throw t } } + +} + +/** + * A bad [[SparkContext]] that does not clone the inheritable thread local properties + * when passing them to children threads. + */ +private class BadSparkContext(conf: SparkConf) extends SparkContext(conf) { + protected[spark] override val localProperties = new InheritableThreadLocal[Properties] { + override protected def childValue(parent: Properties): Properties = new Properties(parent) + override protected def initialValue(): Properties = new Properties() + } }