Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34331][SQL] Speed up DS v2 metadata col resolution #31440

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Feb 2, 2021

What changes were proposed in this pull request?

This is a follow-up of #28027

#28027 added a DS v2 API that allows data sources to produce metadata/hidden columns that can only be seen when it's explicitly selected. The way we integrate this API into Spark is:

  1. The v2 relation gets normal output and metadata output from the data source, and the metadata output is excluded from the plan output by default.
  2. column resolution can resolve UnresolvedAttribute with metadata columns, even if the child plan doesn't output metadata columns.
  3. An analyzer rule searches the query plan, trying to find a node that has missing inputs. If such node is found, transform the sub-plan of this node, and update the v2 relation to include the metadata output.

The analyzer rule in step 3 brings a perf regression, for queries that do not read v2 tables at all. This rule will calculate QueryPlan.inputSet (which builds an AttributeSet from outputs of all children) and QueryPlan.missingInput (which does a set exclusion and creates a new AttributeSet) for every plan node in the query plan. In our benchmark, the TPCDS query compilation time gets increased by more than 10%

This PR proposes a simple way to improve it: we add a special metadata entry to the metadata attribute, which allows us to quickly check if a plan needs to add metadata columns: we just check all the references of this plan, and see if the attribute contains the special metadata entry, instead of calculating QueryPlan.missingInput.

This PR also fixes one bug: we should not change the final output schema of the plan, if we only use metadata columns in operators like filter, sort, etc.

Why are the changes needed?

Fix perf regression in SQL query compilation, and fix a bug.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Run org.apache.spark.sql.TPCDSQuerySuite, before this PR, AddMetadataColumns is the top 4 rule ranked by running time

=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 407641
Total time: 47.257239779 seconds

Rule                                  Effective Time / Total Time                     Effective Runs / Total Runs

OptimizeSubqueries                      4157690003 / 8485444626                         49 / 2778
Analyzer$ResolveAggregateFunctions      1238968711 / 3369351761                         49 / 2141
ColumnPruning                           660038236 / 2924755292                          338 / 6391
Analyzer$AddMetadataColumns             0 / 2918352992                                  0 / 2151

after this PR:

Analyzer$AddMetadataColumns             0 / 122885629                                   0 / 2151

This rule is 20 times faster and is negligible to the total compilation time.

This PR also add new tests to verify the bug fix.

@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Feb 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39373/

@SparkQA
Copy link

SparkQA commented Feb 2, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39373/

Comment on lines 4040 to 4042
case a: AppendData => a.withNewTable(removeMetaCol(a.table))
case o: OverwriteByExpression => o.withNewTable(removeMetaCol(o.table))
case o: OverwritePartitionsDynamic => o.withNewTable(removeMetaCol(o.table))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these be replaced with case v: V2WriteCommand => v.withNewTable(removeMetaCol(v.table)), or do we need to match these specific types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.

* This rule removes metadata columns from `DataSourceV2Relation` under 2 cases:
* - A single v2 scan (can be produced by `spark.table`), which is similar to star expansion, and
* metadata columns should only be picked by explicit references.
* - V2 scans under writing commands, as we can't insert into metadata columns.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the table in InsertIntoStatement. How about the query? E.g. spark.table(...).write.insertInto(...). Do we need to remove metadata columns for the query here if it also is a v2 scan?

@SparkQA
Copy link

SparkQA commented Feb 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39384/

@github-actions github-actions bot added the SQL label Feb 2, 2021
@SparkQA
Copy link

SparkQA commented Feb 2, 2021

Test build #134786 has finished for PR 31440 at commit 461f42a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class MetadataColumnHelper(attr: Attribute)

@SparkQA
Copy link

SparkQA commented Feb 2, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39384/

@SparkQA
Copy link

SparkQA commented Feb 2, 2021

Test build #134796 has finished for PR 31440 at commit 51086e2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


private def removeMetaCol(tbl: NamedRelation): NamedRelation = tbl match {
case r: DataSourceV2Relation =>
if (r.output.exists(_.isMetadataCol)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to guard r.copy(output = r.output.filterNot(_.isMetadataCol))? Why not to do it always?

@@ -61,7 +61,7 @@ class InMemoryTable(

private object IndexColumn extends MetadataColumn {
override def name: String = "index"
override def dataType: DataType = StringType
override def dataType: DataType = IntegerType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual data is int.

@cloud-fan cloud-fan force-pushed the metadata-col branch 2 times, most recently from 849862b to bd7b479 Compare February 3, 2021 15:52
@cloud-fan
Copy link
Contributor Author

My new approach doesn't work for dataframe queries, so I went back to the original approach, with some improvement to fix the perf regression. The patch is much smaller now, please take another look, thanks!

@SparkQA
Copy link

SparkQA commented Feb 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39428/

@SparkQA
Copy link

SparkQA commented Feb 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39428/

@SparkQA
Copy link

SparkQA commented Feb 3, 2021

Test build #134842 has finished for PR 31440 at commit be4aefe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class MetadataColumnHelper(attr: Attribute)

@cloud-fan
Copy link
Contributor Author

Tests pass and it's ready for review. @rdblue @brkyvz @viirya

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes sense to me.

if (metaCols.isEmpty) {
node
} else {
val newNode = addMetadataCol(node)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No matter how many meta cols we actually refer, we always add all meta cols, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it's good enough because:

  1. We guarantee that the outer plan will project out extra columns, so the final output schema won't change
  2. Column pruning will work and eventually the data source doesn't need to produce un-referenced columns.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach looks reasonable. Just one question.

@cloud-fan
Copy link
Contributor Author

thanks for the review, merging to master/3.1!

@cloud-fan cloud-fan closed this in 989eb68 Feb 5, 2021
cloud-fan added a commit that referenced this pull request Feb 5, 2021
### What changes were proposed in this pull request?

This is a follow-up of #28027

#28027 added a DS v2 API that allows data sources to produce metadata/hidden columns that can only be seen when it's explicitly selected. The way we integrate this API into Spark is:
1. The v2 relation gets normal output and metadata output from the data source, and the metadata output is excluded from the plan output by default.
2. column resolution can resolve `UnresolvedAttribute` with metadata columns, even if the child plan doesn't output metadata columns.
3. An analyzer rule searches the query plan, trying to find a node that has missing inputs. If such node is found, transform the sub-plan of this node, and update the v2 relation to include the metadata output.

The analyzer rule in step 3 brings a perf regression, for queries that do not read v2 tables at all. This rule will calculate `QueryPlan.inputSet` (which builds an `AttributeSet` from outputs of all children) and `QueryPlan.missingInput` (which does a set exclusion and creates a new `AttributeSet`) for every plan node in the query plan. In our benchmark, the TPCDS query compilation time gets increased by more than 10%

This PR proposes a simple way to improve it: we add a special metadata entry to the metadata attribute, which allows us to quickly check if a plan needs to add metadata columns: we just check all the references of this plan, and see if the attribute contains the special metadata entry, instead of calculating `QueryPlan.missingInput`.

This PR also fixes one bug: we should not change the final output schema of the plan, if we only use metadata columns in operators like filter, sort, etc.

### Why are the changes needed?

Fix perf regression in SQL query compilation, and fix a bug.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Run `org.apache.spark.sql.TPCDSQuerySuite`, before this PR, `AddMetadataColumns` is the top 4 rule ranked by running time
```
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 407641
Total time: 47.257239779 seconds

Rule                                  Effective Time / Total Time                     Effective Runs / Total Runs

OptimizeSubqueries                      4157690003 / 8485444626                         49 / 2778
Analyzer$ResolveAggregateFunctions      1238968711 / 3369351761                         49 / 2141
ColumnPruning                           660038236 / 2924755292                          338 / 6391
Analyzer$AddMetadataColumns             0 / 2918352992                                  0 / 2151
```
after this PR:
```
Analyzer$AddMetadataColumns             0 / 122885629                                   0 / 2151
```
This rule is 20 times faster and is negligible to the total compilation time.

This PR also add new tests to verify the bug fix.

Closes #31440 from cloud-fan/metadata-col.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 989eb68)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Late +1

node
} else {
val newNode = addMetadataCol(node)
// We should not change the output schema of the plan. We should project away the extr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extr -> extra

@@ -83,7 +85,8 @@ object DataSourceV2Implicits {
implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn]) {
def asStruct: StructType = {
val fields = metadata.map { metaCol =>
val field = StructField(metaCol.name, metaCol.dataType, metaCol.isNullable)
val fieldMeta = new MetadataBuilder().putBoolean(METADATA_COL_ATTR_KEY, true).build()
Copy link
Contributor

@imback82 imback82 Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be created outside the loop (or even at object level - new metadata related object to go with METADATA_COL_ATTR_KEY).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants