Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32859][SQL] Introduce physical rule to decide bucketing dynamically #29804

Closed
wants to merge 15 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Sep 18, 2020

What changes were proposed in this pull request?

This PR is to add support to decide bucketed table scan dynamically based on actual query plan. Currently bucketing is enabled by default (spark.sql.sources.bucketing.enabled=true), so for all bucketed tables in the query plan, we will use bucket table scan (all input files per the bucket will be read by same task). This has the drawback that if the bucket table scan is not benefitting at all (no join/groupby/etc in the query), we don't need to use bucket table scan as it would restrict the # of tasks to be # of buckets and might hurt parallelism.

The feature is to add a physical plan rule right after EnsureRequirements:

The rule goes through plan nodes. For all operators which has "interesting partition" (i.e., require ClusteredDistribution or HashClusteredDistribution), check if the sub-plan for operator has Exchange and bucketed table scan (and only allow certain operators in plan (i.e. Scan/Filter/Project/Sort/PartialAgg/etc.), see details in DisableUnnecessaryBucketedScan.disableBucketWithInterestingPartition). If yes, disable the bucketed table scan in the sub-plan. In addition, disabling bucketed table scan if there's operator with interesting partition along the sub-plan.

Why the algorithm works is that if there's a shuffle between the bucketed table scan and operator with interesting partition, then bucketed table scan partitioning will be destroyed by the shuffle operator in the middle, and we don't need bucketed table scan for sure.

The idea of "interesting partition" is inspired from "interesting order" in "Access Path Selection in a Relational Database Management System"(http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf), after discussion with @cloud-fan .

Why are the changes needed?

To avoid unnecessary bucketed scan in the query, and this is prerequisite for #29625 (decide bucketed sorted scan dynamically will be added later in that PR).

Does this PR introduce any user-facing change?

A new config spark.sql.sources.bucketing.autoBucketedScan.enabled is introduced which set to false by default (the rule is disabled by default as it can regress cached bucketed table query, see discussion in #29804 (comment)). User can opt-in/opt-out by enabling/disabling the config, as we found in prod, some users rely on assumption of # of tasks == # of buckets when reading bucket table to precisely control # of tasks. This is a bad assumption but it does happen on our side, so leave a config here to allow them opt-out for the feature.

How was this patch tested?

Added unit tests in DisableUnnecessaryBucketedScanSuite.scala

@c21
Copy link
Contributor Author

c21 commented Sep 18, 2020

cc @cloud-fan and @sameeragarwal if you guys have time to take a look, thanks.

@SparkQA
Copy link

SparkQA commented Sep 18, 2020

Test build #128873 has finished for PR 29804 at commit a7193cf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class PlanBucketing(conf: SQLConf) extends Rule[SparkPlan]

@maropu
Copy link
Member

maropu commented Sep 19, 2020

Looks an interesting idea, @c21 ! I'll check this later.

@@ -165,6 +166,7 @@ case class FileSourceScanExec(
partitionFilters: Seq[Expression],
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
optionalDynamicDecideBucketing: Option[Boolean],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you add this param in metadata for better explain output?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need to be Option[Boolean]? I'd expect a simple disableBucketScan: Boolean = false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated. Added a new field DisableBucketedScan for explain. This will be printed out when bucketed scan is disable. In addition, changed to not print out the other field SelectedBucketsCount if this is not bucketed scan.

@cloud-fan - sure, updated. Was just trying to keep consistent with other bucketing parameters here, but an Option is not necessary.

@@ -2764,6 +2764,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val DYNAMIC_DECIDE_BUCKETING_ENABLED =
buildConf("spark.sql.sources.dynamic.decide.bucketing.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding this param under spark.sql.sources.bucketing.xxx? spark.sql.sources.bucketing.enableAutoBucketScan or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu , @cloud-fan - sure, updated. Change the config name to be spark.sql.sources.bucketing.autoBucketedScan.enabled as most of our configs are with .enabled at the end.

@SparkQA
Copy link

SparkQA commented Sep 19, 2020

Test build #128883 has finished for PR 29804 at commit 7ffcdbc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

import org.apache.spark.sql.internal.SQLConf

/**
* Plans bucketing dynamically based on actual physical query plan.
Copy link
Contributor

@cloud-fan cloud-fan Sep 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bucketing -> bucket scan, or bucketed table scan, just use a consistent terminology.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated.

* the paper "Access Path Selection in a Relational Database Management System"
* (http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf).
*/
case class PlanBucketing(conf: SQLConf) extends Rule[SparkPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: DisableUnnecessaryBucketScan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated.


val newPlan = plan.transformUp {
case p if hasInterestingPartition(p) =>
hasPlanWithInterestingPartition = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks tricky and fragile. We shouldn't call transformUp and update a global state. Can we write a recursive method to do bottom-up tree traverse manually? using TreeNode.mapChildren.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sure, I feel the same way and sorry for the laziness. I updated to use TreeNode.mapChildren with only one round of pre-order traversal of plan (disableBucketWithInterestingPartition). Wondering does it look better? Thanks.

* scan if needed.
*
* For all operators which [[hasInterestingPartition]] (i.e., require [[ClusteredDistribution]]
* or [[HashClusteredDistribution]]), check if the sub-plan for operator has [[Exchange]] and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if the sub-plan for operator has [[Exchange]]

Can we make it more fine-grained? It's possible that the exchange and the bucket scan are not in the same lineage:

         node with interesting partition
                         |
                     binary node
                    /           \
              Exchange      Bucket Scan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above quey is not valid, I just put it here as an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sorry if I misunderstand anything, I have check to only allow certain UnaryExecNode operators in sub-plan, so the single lineage property is guaranteed. I updated the comment to call it out explicitly.

Copy link
Contributor Author

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cloud-fan for review. Addressed all comments.

@@ -165,6 +166,7 @@ case class FileSourceScanExec(
partitionFilters: Seq[Expression],
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
optionalDynamicDecideBucketing: Option[Boolean],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated. Added a new field DisableBucketedScan for explain. This will be printed out when bucketed scan is disable. In addition, changed to not print out the other field SelectedBucketsCount if this is not bucketed scan.

@cloud-fan - sure, updated. Was just trying to keep consistent with other bucketing parameters here, but an Option is not necessary.

@@ -2764,6 +2764,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val DYNAMIC_DECIDE_BUCKETING_ENABLED =
buildConf("spark.sql.sources.dynamic.decide.bucketing.enabled")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu , @cloud-fan - sure, updated. Change the config name to be spark.sql.sources.bucketing.autoBucketedScan.enabled as most of our configs are with .enabled at the end.

import org.apache.spark.sql.internal.SQLConf

/**
* Plans bucketing dynamically based on actual physical query plan.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated.

* the paper "Access Path Selection in a Relational Database Management System"
* (http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf).
*/
case class PlanBucketing(conf: SQLConf) extends Rule[SparkPlan] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated.

* scan if needed.
*
* For all operators which [[hasInterestingPartition]] (i.e., require [[ClusteredDistribution]]
* or [[HashClusteredDistribution]]), check if the sub-plan for operator has [[Exchange]] and
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sorry if I misunderstand anything, I have check to only allow certain UnaryExecNode operators in sub-plan, so the single lineage property is guaranteed. I updated the comment to call it out explicitly.


val newPlan = plan.transformUp {
case p if hasInterestingPartition(p) =>
hasPlanWithInterestingPartition = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sure, I feel the same way and sorry for the laziness. I updated to use TreeNode.mapChildren with only one round of pre-order traversal of plan (disableBucketWithInterestingPartition). Wondering does it look better? Thanks.

@@ -339,7 +343,7 @@ case class FileSourceScanExec(
location.getClass.getSimpleName +
Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength)
val metadata =
Map(
HashMap(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: HashMap -> mutable.HashMap for readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - updated. Was just following other code in the same file.

} getOrElse {
spec.numBuckets
if (bucketedScan) {
relation.bucketSpec.map { spec =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map -> foreach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - just for my own education, why does it matter? Updated anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I remember the previous discussion: https://issues.apache.org/jira/browse/SPARK-16694

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, please only use map when you care about the return value. foreach is better if you just want to do some calculation if Option is Some

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , @maropu - I changed the code during iterations. The current change is just adding a if (bucketedScan) { ... } else { ... } on top of original code, where we still need to use map as it's returning value.

*
* The idea of [[hasInterestingPartition]] is inspired from "interesting order" in
* the paper "Access Path Selection in a Relational Database Management System"
* (http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we use a link to the ACM page instead? https://dl.acm.org/doi/10.1145/582095.582099

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated. Was just following the reference in CBO join reorder. Updated the link there as well.

// Do not disable bucketed table scan if it has filter pruning,
// because bucketed table scan is still useful here to save CPU/IO cost with
// only reading selected bucket files.
scan.bucketedScan && scan.optionalBucketSet.isEmpty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if a scan operator reads most buckets? e.g., 999 of 1000 buckets. We select bucket scans even in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - this is a good question, and I think it is kind of out of scope for this PR and needs more thoughts later. We don't have a cost model to decide whether to do (bucketed filter + bucketed scan) vs (normal filter + non-bucketed scan). It can depend on number of buckets, size of filtered buckets, CPU cost for filter, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with it for now. Technically I think filter by bucket ID and bucketed scan don't need to be coupled. We can always filter files by bucket id, and then do bucketed scan or not according to this rule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, okay. Could you file jira later, @c21 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

def apply(plan: SparkPlan): SparkPlan = {
if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a plan doesn't have any bucket spec, we do nothing in this rule? Could we filter out the case at the beginning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated.

private def isAllowedUnaryExecNode(plan: SparkPlan): Boolean = {
plan match {
case _: SortExec | _: Exchange | _: ProjectExec | _: FilterExec |
_: FileSourceScanExec => true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_: FileSourceScanExec

Is the case only allowed if FileSourceScanExec has a relation with bucket specs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - FileSourceScanExec should be pattern matched earlier in disableBucketWithInterestingPartition before isAllowedUnaryExecNode check. But thanks for catch, we don't need to add Exchange and FileSourceScanExec here anyway, removed both.

case p if hasInterestingPartition(p) =>
// Operators with interesting partition, propagates `withInterestingPartition` as true
// to its children.
p.mapChildren(disableBucketWithInterestingPartition(_, true, false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question; I read the PR description and I thought first that this rule is to find exchange plan nodes (inserted by EnsureRequirements) having bucket scans and then it disables the scans if the exchanges make bucket read meaningless. What I imagined is like this;

  private def disableBucketWithInterestingPartition(...): SparkPlan = {
    plan match {
      case exchange: Exchange if isAllowedUnaryExecNode(exchange.child) =>
        val newPlan = (disable bucket scan if the scan is meaningless)
        exchange.withNewChildren(newPlan:: Nil)

      case o =>
        o.mapChildren(disableBucketWithInterestingPartition(...))
    }
  }

But, the current code looks more general. Any reason to propagate required distributions in a top-down manner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - this is good question. I think currently only matching Exchange will also do the same job. Either way is fine with me. InterestingPartition and later on InterestingOrder (for bucketed sorted scan) looks like more general and we can extend them later.

One interesting extension I can think of - if bucketed scan parallelism is too low (to few # of buckets), we may decide to not do a bucketed scan for join to trade-off for query run-time vs extra shuffle cost (in this case, there's no Exchange before join).

@SparkQA
Copy link

SparkQA commented Sep 23, 2020

Test build #128998 has finished for PR 29804 at commit 6a8c7a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2020

Test build #128999 has finished for PR 29804 at commit a15f864.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor Author

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @maropu for review. Addressed comments.

@@ -339,7 +343,7 @@ case class FileSourceScanExec(
location.getClass.getSimpleName +
Utils.buildLocationMetadata(location.rootPaths, maxMetadataValueLength)
val metadata =
Map(
HashMap(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - updated. Was just following other code in the same file.

} getOrElse {
spec.numBuckets
if (bucketedScan) {
relation.bucketSpec.map { spec =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - just for my own education, why does it matter? Updated anyway.

*
* The idea of [[hasInterestingPartition]] is inspired from "interesting order" in
* the paper "Access Path Selection in a Relational Database Management System"
* (http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated. Was just following the reference in CBO join reorder. Updated the link there as well.

}

def apply(plan: SparkPlan): SparkPlan = {
if (!conf.bucketingEnabled || !conf.autoBucketedScanEnabled) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated.

// Do not disable bucketed table scan if it has filter pruning,
// because bucketed table scan is still useful here to save CPU/IO cost with
// only reading selected bucket files.
scan.bucketedScan && scan.optionalBucketSet.isEmpty
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - this is a good question, and I think it is kind of out of scope for this PR and needs more thoughts later. We don't have a cost model to decide whether to do (bucketed filter + bucketed scan) vs (normal filter + non-bucketed scan). It can depend on number of buckets, size of filtered buckets, CPU cost for filter, etc.

private def isAllowedUnaryExecNode(plan: SparkPlan): Boolean = {
plan match {
case _: SortExec | _: Exchange | _: ProjectExec | _: FilterExec |
_: FileSourceScanExec => true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - FileSourceScanExec should be pattern matched earlier in disableBucketWithInterestingPartition before isAllowedUnaryExecNode check. But thanks for catch, we don't need to add Exchange and FileSourceScanExec here anyway, removed both.

case p if hasInterestingPartition(p) =>
// Operators with interesting partition, propagates `withInterestingPartition` as true
// to its children.
p.mapChildren(disableBucketWithInterestingPartition(_, true, false))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - this is good question. I think currently only matching Exchange will also do the same job. Either way is fine with me. InterestingPartition and later on InterestingOrder (for bucketed sorted scan) looks like more general and we can extend them later.

One interesting extension I can think of - if bucketed scan parallelism is too low (to few # of buckets), we may decide to not do a bucketed scan for join to trade-off for query run-time vs extra shuffle cost (in this case, there's no Exchange before join).

} getOrElse {
metadata
} else if (disableBucketedScan) {
metadata += ("DisableBucketedScan" -> "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need it? It's kind of the reason why there is no bucket scan in this node. The reason can be: 1. the table is not bucketed. 2. the bucket column is not read. 3. the planner decides to disable it as it has no benefits.

If we do need the reason, we should make it completed. Let's not just put the disableBucketedScan flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least we don't need to do it in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - how about I update the explain with a BucketedScan: [NON_BUCKETED, BUCKETED_COLUMNS_NOT_READ, DISABLED] in a followup PR? cc @maropu .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of the reason why there is no bucket scan in this node. The reason can be: 1. the table is not bucketed. 2. the bucket column is not read. 3. the planner decides to disable it as it has no benefits.

The intention in my comment meant that users need to be able to see why bucket scans are disabled as Wenchen pointed it out above. Anyway, the followup looks okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO comment for followup jira - https://issues.apache.org/jira/browse/SPARK-32986 .

case _: SortExec | _: ProjectExec | _: FilterExec => true
case partialAgg: BaseAggregateExec =>
val modes = partialAgg.aggregateExpressions.map(_.mode)
modes.nonEmpty && modes.forall(mode => mode == Partial || mode == PartialMerge)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of checking the mode, shall we just check requiredChildDistributionExpressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sure, updated. Also thanks for catch, I think requiredChildDistributionExpressions covers all cases where we can have partial aggregate without any aggregate expression (e.g. SELECT i from t GROUP BY i).

@SparkQA
Copy link

SparkQA commented Sep 23, 2020

Test build #129008 has finished for PR 29804 at commit b45ee8b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

// and [[Exchange]] in the plan.
scan.copy(disableBucketedScan = true)
case o =>
if (isAllowedUnaryExecNode(o)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make the rule clear. I think there are 2 cases when doing traverse here:

  1. When we are looking for node with interesting partition, we don't need a whitelist, we can just go through any node. If we hit a file scan, disable its bucketed scan.
  2. When we hit a node with interesting partition, and walk through its sub-plan, we need a whitelist and stop earlier if we hit a node outside of the whitelist.

I'd expect to see code like

if (withInterestingPartition) {
  if (isAllowedUnaryExecNode(o)) {
    ...
  } else {
    // stop traversing down here.
    o
  }
} else {
   o.mapChildren(disableBucketWithInterestingPartition(_, false, false))
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - as discussed offline, we still need to traverse sub-plan for node with interesting partition, even though this node cannot disable bucketed scan (e.g. multiple join cases).

Updated the code to disable bucketed table scan if:

  1. The sub-plan from root to bucketed table scan, only contains node with interesting partition, exchange, and allowed whitelisted single-child nodes.
  2. The sub-plan from root to bucketed table scan, does not contain node with interesting partition.

@SparkQA
Copy link

SparkQA commented Sep 23, 2020

Test build #129021 has finished for PR 29804 at commit 9aa0266.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2020

Test build #129044 has started for PR 29804 at commit 26c4338.

("SELECT SUM(i) FROM t1 GROUP BY i", 1, 1),
("SELECT SUM(i) FROM t1 GROUP BY j", 0, 1),
("SELECT * FROM t1 WHERE i = 1", 1, 1),
("SELECT * FROM t1 WHERE j = 1", 0, 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left two comments about the test;

  • Could you add more test cases, e.g., multiple join cases, multiple bucket column cases, ...?
  • Could you split this single test unit into multiple ones having meaningful test titles?, e.g., test("SPARK-32859: disable unnecessary bucketed table scan based on query plan - multiple join test")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure. Sorry about missing the comment somehow, not shown up in github page. Added and split tests into 4 (basic, multiple join, multiple bucketed columns, other operators/cases). Thanks.

@maropu
Copy link
Member

maropu commented Sep 30, 2020

I am fine with disabling the feature by default, and let user opt-in case by case. As normally for SQL users (not dataframe), the cache query should not be very popular. WDYT? @maropu . thanks.

Ah, I see and I missed the case. The @viirya suggestion makes sense, so disabling it by default looks okay to me. Could you drop .internal accordingly if users need to care about the added option?

@c21
Copy link
Contributor Author

c21 commented Sep 30, 2020

Could you drop .internal accordingly if users need to care about the added option?

@maropu - sure, removed internal().

@SparkQA
Copy link

SparkQA commented Sep 30, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33884/

@SparkQA
Copy link

SparkQA commented Sep 30, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33884/

@SparkQA
Copy link

SparkQA commented Sep 30, 2020

Test build #129267 has finished for PR 29804 at commit f2ceacd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Sep 30, 2020

@viirya - wondering any other comment?

("SELECT i FROM t1", 0, 1),
("SELECT j FROM t1", 0, 0),
// Filter on bucketed column
("SELECT * FROM t1 WHERE i = 1", 1, 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this case 1 "no [[hasInterestingPartition]] operator"? Why there is one bucket scan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's filter on bucketed column, then we can use bucket filter push down to only read targeted bucket files, so we decide not to disable bucketed scan. See DisableUnnecessaryBucketedScan.isBucketedScanWithoutFilter(), I already added comment into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just fyi relevant discussion before - #29804 (comment) .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see.

}

test("SPARK-32859: disable unnecessary bucketed table scan - basic test") {
withTable("t1", "t2", "t3", "t4") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no table "t4"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - yes, this is typo, will remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

@SparkQA
Copy link

SparkQA commented Oct 1, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33911/

@SparkQA
Copy link

SparkQA commented Oct 1, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33911/

@c21
Copy link
Contributor Author

c21 commented Oct 1, 2020

Addressed all comments and updated PR description to reflect latest status of thing. cc @maropu thanks.

@SparkQA
Copy link

SparkQA commented Oct 1, 2020

Test build #129295 has finished for PR 29804 at commit b29f688.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Oct 1, 2020

okay, it looks okay as a first step for this feature, I think. Thanks for the valuable work, @c21! I'll merge this. If more comments are given by @cloud-fan, please address them in followup.

@maropu maropu closed this in d6f3138 Oct 2, 2020
@maropu
Copy link
Member

maropu commented Oct 2, 2020

Thanks, @c21 @viirya ! Merged to master.

@c21
Copy link
Contributor Author

c21 commented Oct 2, 2020

Thank you @maropu , @viirya and @cloud-fan for all discussion and help!

@c21 c21 deleted the bucket-rule branch October 2, 2020 15:38
"false, this configuration does not take any effect.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - thanks for pointing it out. Created https://issues.apache.org/jira/browse/SPARK-33075 for followup, cc @viirya in case there's any other regression for enabling auto bucketed scan, except cached query.

cloud-fan pushed a commit that referenced this pull request Nov 2, 2020
### What changes were proposed in this pull request?

As a followup comment from #29804 (comment) , here we add add the physical plan rule DisableUnnecessaryBucketedScan into AQE AdaptiveSparkPlanExec.queryStagePreparationRules, to make auto bucketed scan work with AQE.

The change is mostly in:
* `AdaptiveSparkPlanExec.scala`: add physical plan rule `DisableUnnecessaryBucketedScan`
* `DisableUnnecessaryBucketedScan.scala`: propagate logical plan link for the file source scan exec operator, otherwise we lose the logical plan link information when AQE is enabled, and will get exception [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L176). (for example, for query `SELECT * FROM bucketed_table` with AQE is enabled)
* `DisableUnnecessaryBucketedScanSuite.scala`: add new test suite for AQE enabled - `DisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE`, and changed some of tests to use `AdaptiveSparkPlanHelper.find/collect`, to make the plan verification work when AQE enabled.

### Why are the changes needed?

It's reasonable to add the support to allow disabling unnecessary bucketed scan with AQE is enabled, this helps optimize the query when AQE is enabled.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `DisableUnnecessaryBucketedScanSuite`.

Closes #30200 from c21/auto-bucket-aqe.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Feb 5, 2021
… data source v1

### What changes were proposed in this pull request?

As a followup from discussion in #29804 (comment) . Currently in data source v1 file scan `FileSourceScanExec`, [bucket filter pruning will only take effect with bucket table scan](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L542 ). However this is unnecessary, as bucket filter pruning can also happen if we disable bucketed table scan. Read files with bucket hash partitioning, and bucket filter pruning are two orthogonal features, and do not need to couple together.

### Why are the changes needed?

This help query leverage the benefit from bucket filter pruning to save CPU/IO to not read unnecessary bucket files, and do not bound by bucket table scan when the parallelism of tasks is a concern.

In addition, this also resolves the issue to reduce number of tasks launched for simple query with bucket column filter - SPARK-33207, because with bucket scan, we launch # of tasks to equal to # of buckets, and this is unnecessary.

### Does this PR introduce _any_ user-facing change?

Users will notice query to start pruning irrelevant files for reading bucketed table, when disabling bucketing. If the input data does not follow spark data source bucketing convention, by default exception will be thrown and query will be failed. The exception can be bypassed with setting config `spark.sql.files.ignoreCorruptFiles` to true.

### How was this patch tested?

Added unit test in `BucketedReadSuite.scala` to make all existing unit tests for bucket filter work with this PR.

Closes #31413 from c21/bucket-pruning.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit that referenced this pull request Aug 12, 2021
…e v1

### What changes were proposed in this pull request?

As a followup from discussion in #29804 (comment) , currently the query plan for data source v1 scan operator - `FileSourceScanExec` has no information to indicate whether the table is read as bucketed table or not. And if table not read as bucketed table, what's the reason behind it. Add this info into `FileSourceScanExec` physical query plan output, can help users and developers understand query plan more easily without spending a lot of time debugging why table is not read as bucketed table.

### Why are the changes needed?

Help users and developers debug query plan for bucketed table.

### Does this PR introduce _any_ user-facing change?

The added `Bucketed` information in physical query plan when reading bucketed table.
Note for reading non-bucketed table, the query plan stays same and nothing is changed.

Example:

```
Seq((1, 2), (2, 3)).toDF("i", "j").write.bucketBy(8, "i").saveAsTable("t1")
Seq(2, 3).toDF("i").write.bucketBy(8, "i").saveAsTable("t2")
val df1 = spark.table("t1")
val df2 = spark.table("t2")
df1.join(df2, df1("i") === df2("i"))
```

```
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [i#20], [i#24], Inner
   :- Sort [i#20 ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(i#20)
   :     +- FileScan parquet default.t1[i#20,j#21] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#20)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8
   +- Sort [i#24 ASC NULLS FIRST], false, 0
      +- Filter isnotnull(i#24)
         +- FileScan parquet default.t2[i#24] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#24)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, SelectedBucketsCount: 8 out of 8
```

### How was this patch tested?

Added unit test in `ExplainSuite.scala`.

Closes #33698 from c21/scan-v1.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants