Skip to content

Commit

Permalink
[SPARK-32985][SQL] Decouple bucket scan and bucket filter pruning for…
Browse files Browse the repository at this point in the history
… data source v1

### What changes were proposed in this pull request?

As a followup from discussion in #29804 (comment) . Currently in data source v1 file scan `FileSourceScanExec`, [bucket filter pruning will only take effect with bucket table scan](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542 ). However this is unnecessary, as bucket filter pruning can also happen if we disable bucketed table scan. Read files with bucket hash partitioning, and bucket filter pruning are two orthogonal features, and do not need to couple together.

### Why are the changes needed?

This help query leverage the benefit from bucket filter pruning to save CPU/IO to not read unnecessary bucket files, and do not bound by bucket table scan when the parallelism of tasks is a concern.

In addition, this also resolves the issue to reduce number of tasks launched for simple query with bucket column filter - SPARK-33207, because with bucket scan, we launch # of tasks to equal to # of buckets, and this is unnecessary.

### Does this PR introduce _any_ user-facing change?

Users will notice query to start pruning irrelevant files for reading bucketed table, when disabling bucketing. If the input data does not follow spark data source bucketing convention, by default exception will be thrown and query will be failed. The exception can be bypassed with setting config `spark.sql.files.ignoreCorruptFiles` to true.

### How was this patch tested?

Added unit test in `BucketedReadSuite.scala` to make all existing unit tests for bucket filter work with this PR.

Closes #31413 from c21/bucket-pruning.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
c21 authored and cloud-fan committed Feb 5, 2021
1 parent 989eb68 commit 76baaf7
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,10 +542,9 @@ case class FileSourceScanExec(
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
.getOrElse(throw new IllegalStateException(s"Invalid bucket file ${f.filePath}"))
}

// TODO(SPARK-32985): Decouple bucket filter pruning and bucketed table scan
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
filesGroupedToBuckets.filter {
Expand Down Expand Up @@ -591,20 +590,41 @@ case class FileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")

// Filter files with bucket pruning if possible
val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled
val shouldProcess: Path => Boolean = optionalBucketSet match {
case Some(bucketSet) if bucketingEnabled =>
filePath => {
BucketingUtils.getBucketId(filePath.getName) match {
case Some(id) => bucketSet.get(id)
case None =>
// Do not prune the file if bucket file name is invalid
true
}
}
case _ =>
_ => true
}

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
)

if (shouldProcess(filePath)) {
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
)
} else {
Seq.empty
}
}
}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
exchange.mapChildren(disableBucketWithInterestingPartition(
_, withInterestingPartition, true, withAllowedNode))
case scan: FileSourceScanExec =>
if (isBucketedScanWithoutFilter(scan)) {
if (scan.bucketedScan) {
if (!withInterestingPartition || (withExchange && withAllowedNode)) {
val nonBucketedScan = scan.copy(disableBucketedScan = true)
scan.logicalLink.foreach(nonBucketedScan.setLogicalLink)
Expand Down Expand Up @@ -140,20 +140,13 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
}
}

private def isBucketedScanWithoutFilter(scan: FileSourceScanExec): Boolean = {
// Do not disable bucketed table scan if it has filter pruning,
// because bucketed table scan is still useful here to save CPU/IO cost with
// only reading selected bucket files.
scan.bucketedScan && scan.optionalBucketSet.isEmpty
}

def apply(plan: SparkPlan): SparkPlan = {
lazy val hasBucketedScanWithoutFilter = plan.find {
case scan: FileSourceScanExec => isBucketedScanWithoutFilter(scan)
lazy val hasBucketedScan = plan.find {
case scan: FileSourceScanExec => scan.bucketedScan
case _ => false
}.isDefined

if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScanWithoutFilter) {
if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled || !hasBucketedScan) {
plan
} else {
disableBucketWithInterestingPartition(plan, false, false, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
bucketValues: Seq[Any],
filterCondition: Column,
originalDataFrame: DataFrame): Unit = {
// This test verifies parts of the plan. Disable whole stage codegen.
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k")
// This test verifies parts of the plan. Disable whole stage codegen,
// automatically bucketed scan, and filter push down for json data source.
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "false",
SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "false") {
val bucketedDataFrame = spark.table("bucketed_table")
val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
// Limit: bucket pruning only works when the bucket column has one and only one column
assert(bucketColumnNames.length == 1)
Expand Down Expand Up @@ -148,19 +151,48 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
if (invalidBuckets.nonEmpty) {
fail(s"Buckets ${invalidBuckets.mkString(",")} should have been pruned from:\n$plan")
}

withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
// Bucket pruning should still work without bucketed scan
val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition)
.queryExecution.executedPlan
val fileScan = getFileScan(planWithoutBucketedScan)
assert(!fileScan.bucketedScan, s"except no bucketed scan but found\n$fileScan")

val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
// Return rows should have been pruned
val bucketColumnValue = row.get(bucketColumnIndex, bucketColumnType)
val bucketId = BucketingUtils.getBucketIdFromValue(
bucketColumn, numBuckets, bucketColumnValue)
!matchedBuckets.get(bucketId)
}).collect()

if (rowsWithInvalidBuckets.nonEmpty) {
fail(s"Rows ${rowsWithInvalidBuckets.mkString(",")} should have been pruned from:\n" +
s"$planWithoutBucketedScan")
}
}
}

val expectedDataFrame = originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")
.select("i", "j", "k")
checkAnswer(
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k").select("i", "j", "k"),
expectedDataFrame)

withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
checkAnswer(
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k").select("i", "j", "k"),
expectedDataFrame)
}
}
}

test("read partitioning bucketed tables with bucket pruning filters") {
withTable("bucketed_table") {
val numBuckets = NumBucketsForPruningDF
val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil)
// json does not support predicate push-down, and thus json is used here
df.write
.format("json")
.partitionBy("i")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ abstract class DisableUnnecessaryBucketedScanSuite
("SELECT i FROM t1", 0, 1),
("SELECT j FROM t1", 0, 0),
// Filter on bucketed column
("SELECT * FROM t1 WHERE i = 1", 1, 1),
("SELECT * FROM t1 WHERE i = 1", 0, 1),
// Filter on non-bucketed column
("SELECT * FROM t1 WHERE j = 1", 0, 1),
// Join with same buckets
Expand Down

0 comments on commit 76baaf7

Please sign in to comment.