Skip to content

Commit

Permalink
[SPARK-20920][SQL] ForkJoinPool pools are leaked when writing hive ta…
Browse files Browse the repository at this point in the history
…bles with many partitions

## What changes were proposed in this pull request?

Don't leave thread pool running from AlterTableRecoverPartitionsCommand DDL command

## How was this patch tested?

Existing tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #18216 from srowen/SPARK-20920.

(cherry picked from commit 7b7c85e)
Signed-off-by: Sean Owen <sowen@cloudera.com>
  • Loading branch information
srowen committed Jun 13, 2017
1 parent 03cc18b commit 58a8a37
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command

import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
Expand All @@ -34,7 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}

// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
Expand Down Expand Up @@ -508,8 +507,15 @@ case class AlterTableRecoverPartitionsCommand(
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val hadoopConf = spark.sparkContext.hadoopConfiguration
val pathFilter = getPathFilter(hadoopConf)
val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)

val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
try {
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
} finally {
evalPool.shutdown()
}
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")

Expand All @@ -530,8 +536,6 @@ case class AlterTableRecoverPartitionsCommand(
Seq.empty[Row]
}

@transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))

private def scanPartitions(
spark: SparkSession,
fs: FileSystem,
Expand All @@ -540,7 +544,8 @@ case class AlterTableRecoverPartitionsCommand(
spec: TablePartitionSpec,
partitionNames: Seq[String],
threshold: Int,
resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
resolver: Resolver,
evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}
Expand All @@ -564,7 +569,7 @@ case class AlterTableRecoverPartitionsCommand(
val value = ExternalCatalogUtils.unescapePathName(ps(1))
if (resolver(columnName, partitionNames.head)) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
partitionNames.drop(1), threshold, resolver)
partitionNames.drop(1), threshold, resolver, evalTaskSupport)
} else {
logWarning(
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
Expand Down

0 comments on commit 58a8a37

Please sign in to comment.