Skip to content

Commit

Permalink
[SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In the PR, I propose to not perform recursive parallel listening of files in the `scanPartitions` method because it can cause a deadlock. Instead of that I propose to do `scanPartitions` in parallel for top level partitions only.

## How was this patch tested?

I extended an existing test to trigger the deadlock.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #22233 from MaxGekk/fix-recover-partitions.
  • Loading branch information
MaxGekk authored and gatorsmile committed Aug 28, 2018
1 parent 4e3f3ce commit aff8f15
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import java.util.Locale

import scala.collection.{GenMap, GenSeq}
import scala.concurrent.ExecutionContext
import scala.collection.parallel.ForkJoinTaskSupport
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
Expand All @@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand All @@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
import org.apache.spark.util.ThreadUtils.parmap

// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
Expand Down Expand Up @@ -622,9 +621,8 @@ case class AlterTableRecoverPartitionsCommand(
val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
try {
implicit val ec = ExecutionContext.fromExecutor(evalPool)
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
spark.sessionState.conf.resolver)
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
} finally {
evalPool.shutdown()
}
Expand Down Expand Up @@ -656,13 +654,23 @@ case class AlterTableRecoverPartitionsCommand(
spec: TablePartitionSpec,
partitionNames: Seq[String],
threshold: Int,
resolver: Resolver)(implicit ec: ExecutionContext): Seq[(TablePartitionSpec, Path)] = {
resolver: Resolver,
evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}

val statuses = fs.listStatus(path, filter).toSeq
def handleStatus(st: FileStatus): Seq[(TablePartitionSpec, Path)] = {
val statuses = fs.listStatus(path, filter)
val statusPar: GenSeq[FileStatus] =
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
// parallelize the list of partitions here, then we can have better parallelism later.
val parArray = statuses.par
parArray.tasksupport = evalTaskSupport
parArray
} else {
statuses
}
statusPar.flatMap { st =>
val name = st.getPath.getName
if (st.isDirectory && name.contains("=")) {
val ps = name.split("=", 2)
Expand All @@ -671,7 +679,7 @@ case class AlterTableRecoverPartitionsCommand(
val value = ExternalCatalogUtils.unescapePathName(ps(1))
if (resolver(columnName, partitionNames.head)) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
partitionNames.drop(1), threshold, resolver)
partitionNames.drop(1), threshold, resolver, evalTaskSupport)
} else {
logWarning(
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
Expand All @@ -682,14 +690,6 @@ case class AlterTableRecoverPartitionsCommand(
Seq.empty
}
}
val result = if (partitionNames.length > 1 &&
statuses.length > threshold || partitionNames.length > 2) {
parmap(statuses)(handleStatus _)
} else {
statuses.map(handleStatus)
}

result.flatten
}

private def gatherPartitionStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,24 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
protected override def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean = true): CatalogTable = {
isDataSource: Boolean = true,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
val storage =
CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name)))
val metadata = new MetadataBuilder()
.putString("key", "value")
.build()
val schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
schema = schema.copy(
fields = schema.fields ++ partitionCols.map(StructField(_, IntegerType))),
provider = Some("parquet"),
partitionColumnNames = Seq("a", "b"),
partitionColumnNames = partitionCols,
createTime = 0L,
createVersion = org.apache.spark.SPARK_VERSION,
tracksPartitionsInCatalog = true)
Expand Down Expand Up @@ -176,7 +177,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
protected def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean = true): CatalogTable
isDataSource: Boolean = true,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable

private val escapedIdentifier = "`(.+)`".r

Expand Down Expand Up @@ -228,8 +230,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
private def createTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean = true): Unit = {
catalog.createTable(generateTable(catalog, name, isDataSource), ignoreIfExists = false)
isDataSource: Boolean = true,
partitionCols: Seq[String] = Seq("a", "b")): Unit = {
catalog.createTable(
generateTable(catalog, name, isDataSource, partitionCols), ignoreIfExists = false)
}

private def createTablePartition(
Expand Down Expand Up @@ -1131,7 +1135,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}

test("alter table: recover partition (parallel)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "0") {
testRecoverPartitions()
}
}
Expand All @@ -1144,23 +1148,32 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}

val tableIdent = TableIdentifier("tab1")
createTable(catalog, tableIdent)
val part1 = Map("a" -> "1", "b" -> "5")
createTable(catalog, tableIdent, partitionCols = Seq("a", "b", "c"))
val part1 = Map("a" -> "1", "b" -> "5", "c" -> "19")
createTablePartition(catalog, part1, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))

val part2 = Map("a" -> "2", "b" -> "6")
val part2 = Map("a" -> "2", "b" -> "6", "c" -> "31")
val root = new Path(catalog.getTableMetadata(tableIdent).location)
val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
fs.mkdirs(new Path(new Path(new Path(root, "a=1"), "b=5"), "c=19"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "a.csv")) // file
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "_SUCCESS")) // file

fs.mkdirs(new Path(new Path(new Path(root, "A=2"), "B=6"), "C=31"))
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "b.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "c.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), ".hiddenFile")) // file
fs.mkdirs(new Path(new Path(root, "A=2/B=6/C=31"), "_temporary"))

val parts = (10 to 100).map { a =>
val part = Map("a" -> a.toString, "b" -> "5", "c" -> "42")
fs.mkdirs(new Path(new Path(new Path(root, s"a=$a"), "b=5"), "c=42"))
fs.createNewFile(new Path(new Path(root, s"a=$a/b=5/c=42"), "a.csv")) // file
createTablePartition(catalog, part, tableIdent)
part
}

// invalid
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
Expand All @@ -1174,7 +1187,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
try {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
Set(part1, part2) ++ parts)
if (!isUsingHiveMetastore) {
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
protected override def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean): CatalogTable = {
isDataSource: Boolean,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
val storage =
if (isDataSource) {
val serde = HiveSerDe.sourceToSerDe("parquet")
Expand All @@ -84,17 +85,17 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
val metadata = new MetadataBuilder()
.putString("key", "value")
.build()
val schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
schema = schema.copy(
fields = schema.fields ++ partitionCols.map(StructField(_, IntegerType))),
provider = if (isDataSource) Some("parquet") else Some("hive"),
partitionColumnNames = Seq("a", "b"),
partitionColumnNames = partitionCols,
createTime = 0L,
createVersion = org.apache.spark.SPARK_VERSION,
tracksPartitionsInCatalog = true)
Expand Down

0 comments on commit aff8f15

Please sign in to comment.