Skip to content
Permalink
Browse files

[SPARK-26601][SQL] Make broadcast-exchange thread pool configurable

## What changes were proposed in this pull request?

Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s.

```
object BroadcastExchangeExec {
  private[execution] val executionContext = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
}

 /**
   * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
   * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
   */
  def newDaemonCachedThreadPool(
      prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
    val threadFactory = namedThreadFactory(prefix)
    val threadPool = new ThreadPoolExecutor(
      maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
      maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
      keepAliveSeconds,
      TimeUnit.SECONDS,
      new LinkedBlockingQueue[Runnable],
      threadFactory)
    threadPool.allowCoreThreadTimeOut(true)
    threadPool
  }
```

But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. In such case,we need to make this thread pool configurable.
A case has described in https://issues.apache.org/jira/browse/SPARK-26601

## How was this patch tested?
UT

Closes #23670 from caneGuy/zhoukang/make-broadcat-config.

Authored-by: zhoukang <zhoukang199191@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information...
caneGuy authored and HyukjinKwon committed May 13, 2019
1 parent 2bc42ad commit 126310ca68f2f248ea8b312c4637eccaba2fdc2b
@@ -132,6 +132,19 @@ object StaticSQLConf {
.intConf
.createWithDefault(1000)

val BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD =
buildStaticConf("spark.sql.broadcastExchange.maxThreadThreshold")
.internal()
.doc("The maximum degree of parallelism to fetch and broadcast the table. " +
"If we encounter memory issue like frequently full GC or OOM when broadcast table " +
"we can decrease this number in order to reduce memory usage. " +
"Notice the number should be carefully chosen since decreasing parallelism might " +
"cause longer waiting for other broadcasting. Also, increasing parallelism may " +
"cause memory problem.")
.intConf
.checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in [0,128].")
.createWithDefault(128)

val SQL_EVENT_TRUNCATE_LENGTH = buildStaticConf("spark.sql.event.truncate.length")
.doc("Threshold of SQL length beyond which it will be truncated before adding to " +
"event. Defaults to no truncation. If set to 0, callsite will be logged instead.")
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPar
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.HashedRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.util.{SparkFatalException, ThreadUtils}

/**
@@ -157,5 +157,6 @@ case class BroadcastExchangeExec(

object BroadcastExchangeExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange",
SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD)))
}

0 comments on commit 126310c

Please sign in to comment.
You can’t perform that action at this time.