-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29277][SQL] Add early DSv2 filter and projection pushdown #25955
Conversation
FYI, @cloud-fan & @brkyvz. |
This comment has been minimized.
This comment has been minimized.
e87fbc4
to
d0890ac
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
f01bd12
to
38b8655
Compare
This comment has been minimized.
This comment has been minimized.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Show resolved
Hide resolved
The test failures are expected and are caused by removing the |
96be597
to
a43f1b7
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Test build #111757 has finished for PR 25955 at commit
|
All tests are passing without calling the computeStats method before pushdown, so I'm confident that the early pushdown rule is in the right place. I'm adding back the computeStats method that will throw UnsupportedOperationException while testing to ensure that it is not incorrectly called in the future. It does not throw an exception when not testing so that user queries don't fail. |
@cloud-fan, this is ready for another review, assuming tests pass. Thanks! |
Test build #111793 has finished for PR 25955 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Show resolved
Hide resolved
...lyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
Outdated
Show resolved
Hide resolved
...lyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
Outdated
Show resolved
Hide resolved
...lyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
Show resolved
Hide resolved
598a6fd
to
9503757
Compare
@cloud-fan, I rebased and updated this if you want to have another look. I updated this as you suggested so that I also updated this to solve the problem where DDL commands would have other rules run on the relation, including early push-down. As we discussed, I removed the relation from I should point out that I didn't change If we want to avoid the relation underneath Last, I made a small change to |
.map(rel => desc.copy(table = rel)) | ||
.getOrElse(desc) | ||
|
||
case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested out a trait that worked for all of the plans that need to be resolved here, but the code was longer with the trait and implementations. If we need it later because we have more cases in this rule, it should be easy to add. I don't think we need it right now.
@@ -32,6 +32,16 @@ class ResolveCatalogs(val catalogManager: CatalogManager) | |||
import org.apache.spark.sql.connector.catalog.CatalogV2Util._ | |||
|
|||
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | |||
case unresolved @ UnresolvedRelation(nameParts) => | |||
nameParts match { | |||
case AsTemporaryViewIdentifier(i) if catalogManager.v1SessionCatalog.isTemporaryTable(i) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added to convert to v2 unresolved relation. Matching the view identifier ensures the same behavior as lookupV2RelationAndCatalog
.
@@ -64,7 +69,8 @@ class SparkOptimizer( | |||
override def nonExcludableRules: Seq[String] = super.nonExcludableRules :+ | |||
ExtractPythonUDFFromJoinCondition.ruleName :+ | |||
ExtractPythonUDFFromAggregate.ruleName :+ ExtractGroupingPythonUDFFromAggregate.ruleName :+ | |||
ExtractPythonUDFs.ruleName | |||
ExtractPythonUDFs.ruleName :+ | |||
V2ScanRelationPushDown.ruleName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only DataSourceV2ScanRelation
will be converted to a physical scan node, so the early push-down rule is now required.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
@cloud-fan, I've removed the refactor and tests are passing. Can you take another look at this? |
s"BUG: computeStats called before pushdown on DSv2 relation: $name") | ||
} else { | ||
// when not testing, return stats because bad stats are better than failing a query | ||
newScanBuilder() match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was inlined in a previous commit, why it's reverted?
@@ -3218,6 +3218,8 @@ class Dataset[T] private[sql]( | |||
fr.inputFiles | |||
case r: HiveTableRelation => | |||
r.tableMeta.storage.locationUri.map(_.toString).toArray | |||
case DataSourceV2ScanRelation(table: FileTable, _, _) => | |||
table.fileIndex.inputFiles | |||
case DataSourceV2Relation(table: FileTable, _, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a previous discussion, we decided to make V2ScanRelationPushDown
mandatory, so that DataSourceV2Relation
won't appear in the optimized plan. Why do we change mind?
I noticed that except for the table resolution refactor, we also revert the changes that make |
@cloud-fan, those changes were a mistake from rolling back. I've updated this with the changes. |
This comment has been minimized.
This comment has been minimized.
1886948
to
4220723
Compare
Test build #112935 has finished for PR 25955 at commit
|
@@ -17,7 +17,7 @@ | |||
package org.apache.spark.sql.execution.datasources.orc | |||
|
|||
import org.apache.spark.SparkConf | |||
import org.apache.spark.sql.DataFrame | |||
import org.apache.spark.sql.{DataFrame, Row} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change
LGTM if tests pass |
We need to bring back the changes in |
I added the cases back to CheckAnalysis. |
Test build #112955 has finished for PR 25955 at commit
|
@cloud-fan, tests are passing with the additional checks. |
Merging to master. Thanks for reviewing, @cloud-fan and @brkyvz! |
Hi, Guys. Sorry, but this breaks our
I checked that the failure happens consistently on
Without this patch, it's recovered. Although this is not a part of cc @jiangxb1987 since he is a release manager for 3.0.0-preview. |
@rdblue . Could you make this PR once more with |
All Jenkins jobs using Maven look okay (including JDK11 tests). |
### What changes were proposed in this pull request? This adds a new rule, `V2ScanRelationPushDown`, to push filters and projections in to a new `DataSourceV2ScanRelation` in the optimizer. That scan is then used when converting to a physical scan node. The new relation correctly reports stats based on the scan. To run scan pushdown before rules where stats are used, this adds a new optimizer override, `earlyScanPushDownRules` and a batch for early pushdown in the optimizer, before cost-based join reordering. The other early pushdown rule, `PruneFileSourcePartitions`, is moved into the early pushdown rule set. This also moves pushdown helper methods from `DataSourceV2Strategy` into a util class. ### Why are the changes needed? This is needed for DSv2 sources to supply stats for cost-based rules in the optimizer. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? This updates the implementation of stats from `DataSourceV2Relation` so tests will fail if stats are accessed before early pushdown for v2 relations. Closes apache#25955 from rdblue/move-v2-pushdown. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Ryan Blue <blue@apache.org>
…on pushdown Bring back #25955 ### What changes were proposed in this pull request? This adds a new rule, `V2ScanRelationPushDown`, to push filters and projections in to a new `DataSourceV2ScanRelation` in the optimizer. That scan is then used when converting to a physical scan node. The new relation correctly reports stats based on the scan. To run scan pushdown before rules where stats are used, this adds a new optimizer override, `earlyScanPushDownRules` and a batch for early pushdown in the optimizer, before cost-based join reordering. The other early pushdown rule, `PruneFileSourcePartitions`, is moved into the early pushdown rule set. This also moves pushdown helper methods from `DataSourceV2Strategy` into a util class. ### Why are the changes needed? This is needed for DSv2 sources to supply stats for cost-based rules in the optimizer. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? This updates the implementation of stats from `DataSourceV2Relation` so tests will fail if stats are accessed before early pushdown for v2 relations. Closes #26341 from cloud-fan/back. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This adds a new rule,
V2ScanRelationPushDown
, to push filters and projections in to a newDataSourceV2ScanRelation
in the optimizer. That scan is then used when converting to a physical scan node. The new relation correctly reports stats based on the scan.To run scan pushdown before rules where stats are used, this adds a new optimizer override,
earlyScanPushDownRules
and a batch for early pushdown in the optimizer, before cost-based join reordering. The other early pushdown rule,PruneFileSourcePartitions
, is moved into the early pushdown rule set.This also moves pushdown helper methods from
DataSourceV2Strategy
into a util class.Why are the changes needed?
This is needed for DSv2 sources to supply stats for cost-based rules in the optimizer.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
This updates the implementation of stats from
DataSourceV2Relation
so tests will fail if stats are accessed before early pushdown for v2 relations.