Skip to content

Commit

Permalink
[SPARK-30556][SQL] Copy sparkContext.localproperties to child thread …
Browse files Browse the repository at this point in the history
…inSubqueryExec.executionContext

### What changes were proposed in this pull request?
In `org.apache.spark.sql.execution.SubqueryExec#relationFuture` make a copy of `org.apache.spark.SparkContext#localProperties` and pass it to the sub-execution thread in `org.apache.spark.sql.execution.SubqueryExec#executionContext`

### Why are the changes needed?
Local properties set via sparkContext are not available as TaskContext properties when executing  jobs and threadpools have idle threads which are reused

Explanation:
When `SubqueryExec`, the relationFuture is evaluated via a separate thread. The threads inherit the `localProperties` from `sparkContext` as they are the child threads.
These threads are created in the `executionContext` (thread pools). Each Thread pool has a default keepAliveSeconds of 60 seconds for idle threads.
Scenarios where the thread pool has threads which are idle and reused for a subsequent new query, the thread local properties will not be inherited from spark context (thread properties are inherited only on thread creation) hence end up having old or no properties set. This will cause taskset properties to be missing when properties are transferred by child thread via `sparkContext.runJob/submitJob`

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Added UT

Closes #27267 from ajithme/subquerylocalprop.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
ajithme authored and dongjoon-hyun committed Jan 23, 2020
1 parent eccae13 commit bbab2bb
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,17 @@ object StaticSQLConf {
"cause longer waiting for other broadcasting. Also, increasing parallelism may " +
"cause memory problem.")
.intConf
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in [0,128].")
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].")
.createWithDefault(128)

val SUBQUERY_MAX_THREAD_THRESHOLD =
buildStaticConf("spark.sql.subquery.maxThreadThreshold")
.internal()
.doc("The maximum degree of parallelism to execute the subquery.")
.intConf
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].")
.createWithDefault(16)

val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length")
.doc("Threshold of SQL length beyond which it will be truncated before adding to " +
"event. Defaults to no truncation. If set to 0, callsite will be logged instead.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.SparkContext
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -164,4 +166,20 @@ object SQLExecution {
}
}
}

/**
* Wrap passed function to ensure necessary thread-local variables like
* SparkContext local properties are forwarded to execution thread
*/
def withThreadLocalCaptured[T](
sparkSession: SparkSession, exec: ExecutionContext)(body: => T): Future[T] = {
val activeSession = sparkSession
val sc = sparkSession.sparkContext
val localProps = Utils.cloneProperties(sc.getLocalProperties)
Future {
SparkSession.setActiveSession(activeSession)
sc.setLocalProperties(localProps)
body
}(exec)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}

/** Physical plan for Project. */
Expand Down Expand Up @@ -749,7 +750,9 @@ case class SubqueryExec(name: String, child: SparkPlan)
private lazy val relationFuture: Future[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
SQLExecution.withThreadLocalCaptured[Array[InternalRow]](
sqlContext.sparkSession,
SubqueryExec.executionContext) {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
Expand All @@ -764,7 +767,7 @@ case class SubqueryExec(name: String, child: SparkPlan)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
rows
}
}(SubqueryExec.executionContext)
}
}

protected override def doCanonicalize(): SparkPlan = {
Expand All @@ -788,7 +791,8 @@ case class SubqueryExec(name: String, child: SparkPlan)

object SubqueryExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
ThreadUtils.newDaemonCachedThreadPool("subquery",
SQLConf.get.getConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.sql.internal

import org.scalatest.Assertions._

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.{SparkException, SparkFunSuite, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
Expand Down Expand Up @@ -125,6 +125,38 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect())
assert(e.getCause.isInstanceOf[NoSuchElementException])
}

test("SPARK-30556 propagate local properties to subquery execution thread") {
withSQLConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD.key -> "1") {
withTempView("l", "m", "n") {
Seq(true).toDF().createOrReplaceTempView("l")
val confKey = "spark.sql.y"

def createDataframe(confKey: String, confValue: String): Dataset[Boolean] = {
Seq(true)
.toDF()
.mapPartitions { _ =>
TaskContext.get.getLocalProperty(confKey) == confValue match {
case true => Iterator(true)
case false => Iterator.empty
}
}
}

// set local configuration and assert
val confValue1 = "e"
createDataframe(confKey, confValue1).createOrReplaceTempView("m")
spark.sparkContext.setLocalProperty(confKey, confValue1)
assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM m)").collect.size == 1)

// change the conf value and assert again
val confValue2 = "f"
createDataframe(confKey, confValue2).createOrReplaceTempView("n")
spark.sparkContext.setLocalProperty(confKey, confValue2)
assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM n)").collect().size == 1)
}
}
}
}

case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode {
Expand Down

0 comments on commit bbab2bb

Please sign in to comment.