New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32708] Query optimization fails to reuse exchange with DataSourceV2 #29564
Conversation
ok to test |
2.4? |
@@ -54,7 +55,7 @@ case class DataSourceV2Relation( | |||
tableIdent.map(_.unquotedString).getOrElse(s"${source.name}:unknown") | |||
} | |||
|
|||
override def pushedFilters: Seq[Expression] = Seq.empty | |||
override def pushedFilters: Seq[Filter] = Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to change Expression to Filter, which is a public interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More explanation. Why changing from Expression to org.apache.spark.sql.sources.Filter
DataSourceV2ScanExec.pushedFilters are defined as array of Expressions whose equal function has expression_id in scope. So for example, Expression isnotnull(d_day_name#22364) is not considered equal to isnotnull(d_day_name#22420). Therefore, the right thing is to define and compare pushedFilter as Filter class.
At both Spark 3.0 and affected Spark 2.4's tests suite, Filter is the class being used. And the above 4 places seem to be the only places that miss to have pushedFilter as class Filter.
(Because pushedFilters are defined the right way in the above test suite, Spark 32708 was not caught by my tests previously added for SPARK-32609, another exchange reuse bug.
Usage of Expression was introduced by PR [SPARK-23203][SQL] DataSourceV2: Use immutable logical plan. From the PR's description and original intention, I don't see a necessary reason to maintain Expression.
If the issue does not happen in branch-3.0+, I think we might need to check which commit's resolved it from the commit history first. If it found, we might be able to just cherry-pick it. |
Test build #127968 has finished for PR 29564 at commit
|
In Branch 3.0, there is a mixed-in trait However, if we are going to cherry-pick the PRs mentioned above, then there will be reasons to cherry-pick the other data source V2 related PRs into 2.4 as well. |
If exchange reuse is broken, it means plan equality is broken somewhere. I think Can you look into it more and have a more surgical fix? |
… DataSourceV2" This reverts commit dd0fb24.
Updated with a more surgical fix. Please review. |
My org still relies heavily on 2.4 |
@mingjialiu the new fix looks more reasonable. Could you add test case for the changes? |
options, | ||
QueryPlan.normalizePredicates( | ||
pushedFilters, | ||
AttributeSeq(pushedFilters.flatMap(_.references).distinct)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use output
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Output here doesn't contain all predicate columns due to implementation at : https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala#L108
The fix LGTM, can you add a test? |
Test build #128481 has finished for PR 29564 at commit
|
Regarding test coverage, it's a bit tricky to repro in a unit test. Can I get some pointers on populating different expression ids for the same column? Or test suggestions? The key to repro is to have the same column assigned different expression Ids. |
Test added. Please review. |
Please ignore this message. I figured out that column's expression id is consistent within the same df. |
Test build #128541 has finished for PR 29564 at commit
|
Test build #128543 has finished for PR 29564 at commit
|
@@ -393,6 +393,29 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { | |||
} | |||
} | |||
} | |||
|
|||
test("SPARK-32708: same columns with different ExprIds should be equal after canonicalization ") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't have an end-to-end test, how about a low-level UT? Create two DataSourceV2ScanExec
instances and check scan1.sameResult(scan2)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I think this test case creates two DataSourceV2ScanExec
and do the check. It looks ok to me.
test pyspark.mllib.tests.StreamingLogisticRegressionWithSGDTests.test_training_and_prediction is failing. @gatorsmile @cloud-fan @gengliangwang do you think the failure is related to this change? If yes, any suggestions on how to fix it? |
I think we can rely on the jenkins test result as well |
Test build #128574 has finished for PR 29564 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@cloud-fan @gatorsmile Hi, it looks to me that 1 approval is not enough for merging. Can you please approve this PR if everything looks good to you? |
Thanks, merging to branch 2.4 |
…rceV2 ### What changes were proposed in this pull request? Override doCanonicalize function of class DataSourceV2ScanExec ### Why are the changes needed? Query plan of DataSourceV2 fails to reuse any exchange. This change can make DataSourceV2's plan more optimized and reuse exchange as DataSourceV1 and parquet do. Direct reason: equals function of DataSourceV2ScanExec returns 'false' as comparing the same V2 scans(same table, outputs and pushedfilters) Actual cause : With query plan's default doCanonicalize function, pushedfilters of DataSourceV2ScanExec are not canonicalized correctly. Essentially expressionId of predicate columns are not normalized. [Spark 32708](https://issues.apache.org/jira/browse/SPARK-32708#) was not caught by my [tests](https://github.com/apache/spark/blob/5b1b9b39eb612cbf9ec67efd4e364adafcff66c4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala#L392) previously added for [SPARK-32609] because the above issue happens only when the same filtered column are of different expression id (eg : join table t1 with t1) ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? unit test added Closes #29564 from mingjialiu/branch-2.4. Authored-by: mingjial <mingjial@google.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
Thank you, @mingjialiu and all! |
@gengliangwang . Could you update |
@dongjoon-hyun sure. |
@gengliangwang . The |
@dongjoon-hyun Sorry, it's updated now. |
Thank you, @gengliangwang ! |
…rceV2 ### What changes were proposed in this pull request? Override doCanonicalize function of class DataSourceV2ScanExec ### Why are the changes needed? Query plan of DataSourceV2 fails to reuse any exchange. This change can make DataSourceV2's plan more optimized and reuse exchange as DataSourceV1 and parquet do. Direct reason: equals function of DataSourceV2ScanExec returns 'false' as comparing the same V2 scans(same table, outputs and pushedfilters) Actual cause : With query plan's default doCanonicalize function, pushedfilters of DataSourceV2ScanExec are not canonicalized correctly. Essentially expressionId of predicate columns are not normalized. [Spark 32708](https://issues.apache.org/jira/browse/SPARK-32708#) was not caught by my [tests](https://github.com/apache/spark/blob/5b1b9b39eb612cbf9ec67efd4e364adafcff66c4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala#L392) previously added for [SPARK-32609] because the above issue happens only when the same filtered column are of different expression id (eg : join table t1 with t1) ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? unit test added Closes apache#29564 from mingjialiu/branch-2.4. Authored-by: mingjial <mingjial@google.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com> RB=2951163 BUG=APA-51006,LIHADOOP-62503 G=spark-reviewers R=yezhou,ekrogen,smahadik A=smahadik
What changes were proposed in this pull request?
Override doCanonicalize function of class DataSourceV2ScanExec
Why are the changes needed?
Query plan of DataSourceV2 fails to reuse any exchange. This change can make DataSourceV2's plan more optimized and reuse exchange as DataSourceV1 and parquet do.
Direct reason: equals function of DataSourceV2ScanExec returns 'false' as comparing the same V2 scans(same table, outputs and pushedfilters)
Actual cause : With query plan's default doCanonicalize function, pushedfilters of DataSourceV2ScanExec are not canonicalized correctly. Essentially expressionId of predicate columns are not normalized.
Spark 32708 was not caught by my tests previously added for [SPARK-32609] because the above issue happens only when the same filtered column are of different expression id (eg : join table t1 with t1)
Does this PR introduce any user-facing change?
no
How was this patch tested?
unit test added