Skip to content

Commit

Permalink
[SPARK-10548] [SPARK-10563] [SQL] Fix concurrent SQL executions / bra…
Browse files Browse the repository at this point in the history
…nch-1.5

*Note: this is for branch-1.5 only*

This is the same as #8710 but affects only SQL. The more general fix for SPARK-10563 is considered  risky to backport into a maintenance release, so it is disabled by default and enabled only in SQL.

Author: Andrew Or <andrew@databricks.com>

Closes #8721 from andrewor14/concurrent-sql-executions-1.5 and squashes the following commits:

3b9b462 [Andrew Or] Merge branch 'branch-1.5' of github.com:apache/spark into concurrent-sql-executions-1.5
4435db7 [Andrew Or] Clone properties only for SQL for backward compatibility
0b7e5ab [Andrew Or] Clone parent local properties on inherit
  • Loading branch information
Andrew Or committed Sep 15, 2015
1 parent 7286c2b commit 997be78
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 43 deletions.
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
import scala.util.control.NonFatal

import org.apache.commons.lang.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
Expand Down Expand Up @@ -347,8 +348,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] var checkpointDir: Option[String] = None

// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
protected[spark] val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = {
// Note: make a clone such that changes in the parent properties aren't reflected in
// the those of the children threads, which has confusing semantics (SPARK-10563).
if (conf.get("spark.localProperties.clone", "false").toBoolean) {
SerializationUtils.clone(parent).asInstanceOf[Properties]
} else {
new Properties(parent)
}
}
override protected def initialValue(): Properties = new Properties()
}

Expand Down
66 changes: 25 additions & 41 deletions core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
}.start()
}
sem.acquire(2)
throwable.foreach { t => throw t }
throwable.foreach { t => throw improveStackTrace(t) }
if (ThreadingSuiteState.failed.get()) {
logError("Waited 1 second without seeing runningThreads = 4 (it was " +
ThreadingSuiteState.runningThreads.get() + "); failing test")
Expand Down Expand Up @@ -178,7 +178,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())

sem.acquire(5)
throwable.foreach { t => throw t }
throwable.foreach { t => throw improveStackTrace(t) }
assert(sc.getLocalProperty("test") === null)
}

Expand Down Expand Up @@ -207,58 +207,42 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())

sem.acquire(5)
throwable.foreach { t => throw t }
throwable.foreach { t => throw improveStackTrace(t) }
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
}

test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {
val jobStarted = new Semaphore(0)
val jobEnded = new Semaphore(0)
@volatile var jobResult: JobResult = null
var throwable: Option[Throwable] = None

test("mutation in parent local property does not affect child (SPARK-10563)") {
sc = new SparkContext("local", "test")
sc.setJobGroup("originalJobGroupId", "description")
sc.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobStarted.release()
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
jobResult = jobEnd.jobResult
jobEnded.release()
}
})

// Create a new thread which will inherit the current thread's properties
val thread = new Thread() {
sc.conf.set("spark.localProperties.clone", "true")
val originalTestValue: String = "original-value"
var threadTestValue: String = null
sc.setLocalProperty("test", originalTestValue)
var throwable: Option[Throwable] = None
val thread = new Thread {
override def run(): Unit = {
try {
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId")
// Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
try {
sc.parallelize(1 to 100).foreach { x =>
Thread.sleep(100)
}
} catch {
case s: SparkException => // ignored so that we don't print noise in test logs
}
threadTestValue = sc.getLocalProperty("test")
} catch {
case t: Throwable =>
throwable = Some(t)
}
}
}
sc.setLocalProperty("test", "this-should-not-be-inherited")
thread.start()
// Wait for the job to start, then mutate the original properties, which should have been
// inherited by the running job but hopefully defensively copied or snapshotted:
jobStarted.tryAcquire(10, TimeUnit.SECONDS)
sc.setJobGroup("modifiedJobGroupId", "description")
// Canceling the original job group should cancel the running job. In other words, the
// modification of the properties object should not affect the properties of running jobs
sc.cancelJobGroup("originalJobGroupId")
jobEnded.tryAcquire(10, TimeUnit.SECONDS)
throwable.foreach { t => throw t }
assert(jobResult.isInstanceOf[JobFailed])
thread.join()
throwable.foreach { t => throw improveStackTrace(t) }
assert(threadTestValue === originalTestValue)
}

/**
* Improve the stack trace of an error thrown from within a thread.
* Otherwise it's difficult to tell which line in the test the error came from.
*/
private def improveStackTrace(t: Throwable): Throwable = {
t.setStackTrace(t.getStackTrace ++ Thread.currentThread.getStackTrace)
t
}

}
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
sparkContext.addSparkListener(listener)
sparkContext.ui.foreach(new SQLTab(this, _))

// Execution IDs go through SparkContext's local properties, which are not safe to use with
// fork join pools by default. In particular, even after a child thread is spawned, if the
// parent sets a property the value may be reflected in the child. This leads to undefined
// consequences such as SPARK-10548, so we should just clone the properties instead to be safe.
sparkContext.conf.set("spark.localProperties.clone", "true")

/**
* Set Spark SQL configuration properties.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import java.util.Properties

import scala.collection.parallel.CompositeThrowable

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.SQLContext

class SQLExecutionSuite extends SparkFunSuite {

test("concurrent query execution (SPARK-10548)") {
// Try to reproduce the issue with the old SparkContext
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test")
val badSparkContext = new BadSparkContext(conf)
try {
testConcurrentQueryExecution(badSparkContext)
fail("unable to reproduce SPARK-10548")
} catch {
case e: IllegalArgumentException =>
assert(e.getMessage.contains(SQLExecution.EXECUTION_ID_KEY))
} finally {
badSparkContext.stop()
}

// Verify that the issue is fixed with the latest SparkContext
val goodSparkContext = new SparkContext(conf)
try {
testConcurrentQueryExecution(goodSparkContext)
} finally {
goodSparkContext.stop()
}
}

/**
* Trigger SPARK-10548 by mocking a parent and its child thread executing queries concurrently.
*/
private def testConcurrentQueryExecution(sc: SparkContext): Unit = {
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

// Initialize local properties. This is necessary for the test to pass.
sc.getLocalProperties

// Set up a thread that runs executes a simple SQL query.
// Before starting the thread, mutate the execution ID in the parent.
// The child thread should not see the effect of this change.
var throwable: Option[Throwable] = None
val child = new Thread {
override def run(): Unit = {
try {
sc.parallelize(1 to 100).map { i => (i, i) }.toDF("a", "b").collect()
} catch {
case t: Throwable =>
throwable = Some(t)
}

}
}
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, "anything")
child.start()
child.join()

// The throwable is thrown from the child thread so it doesn't have a helpful stack trace
throwable.foreach { t =>
t.setStackTrace(t.getStackTrace ++ Thread.currentThread.getStackTrace)
throw t
}
}

}

/**
* A bad [[SparkContext]] that does not clone the inheritable thread local properties
* when passing them to children threads.
*/
private class BadSparkContext(conf: SparkConf) extends SparkContext(conf) {
protected[spark] override val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
override protected def initialValue(): Properties = new Properties()
}
}

0 comments on commit 997be78

Please sign in to comment.