Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36114][SQL] Support subqueries with correlated non-equality predicates #38135

Conversation

allisonwang-db
Copy link
Contributor

What changes were proposed in this pull request?

This PR supports correlated non-equality predicates in subqueries. It leverages the DecorrelateInnerQuery framework to decorrelate subqueries with non-equality predicates. DecorrelateInnerQuery inserts domain joins in the query plan and the rule RewriteCorrelatedScalarSubquery rewrites the domain joins into actual joins with the outer query.

Note, correlated non-equality predicates can lead to query plans with non-equality join conditions, which may be planned as a broadcast NL join or cartesian product.

Why are the changes needed?

To improve subquery support in Spark.

Does this PR introduce any user-facing change?

Yes. Before this PR, Spark does not allow correlated non-equality predicates in subqueries.
For example:

SELECT (SELECT min(c2) FROM t2 WHERE t1.c1 > t2.c1) FROM t1

This will throw an exception: Correlated column is not allowed in a non-equality predicate

After this PR, this query can run successfully.

How was this patch tested?

Unit tests and SQL query tests.

@github-actions github-actions bot added the SQL label Oct 7, 2022
@allisonwang-db
Copy link
Contributor Author

cc @cloud-fan @dtenedor

// and lateral subqueries.
val allowNonEqualityPredicates =
SQLConf.get.decorrelateInnerQueryEnabled && (isScalar || isLateral)
if (!allowNonEqualityPredicates && predicates.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I have been missing context:

After the non-equality predicates are supported, what are the left gap? I assuming all the predicates are supported now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh you have an example below which makes sense:

-- Correlated equality predicates that are not supported after SPARK-35080
SELECT c, (
    SELECT count(*)
    FROM (VALUES ('ab'), ('abc'), ('bc')) t2(c)
    WHERE t1.c = substring(t2.c, 1, 1)
) FROM (VALUES ('a'), ('b')) t1(c);

@@ -881,11 +881,10 @@ class AnalysisErrorSuite extends AnalysisTest {
($"a" + $"c" === $"b", "(a#x + outer(c#x)) = b#x"),
(And($"a" === $"c", Cast($"d", IntegerType) === $"c"), "CAST(d#x AS INT) = outer(c#x)"))
conditions.foreach { case (cond, msg) =>
val plan = Project(
ScalarSubquery(
val plan = Filter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: looks like this line of Project -> Filter change is not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Project can host IN/EXISTS now.

|FROM (SELECT CAST(c1 AS SHORT) a FROM t1)
|""".stripMargin)
checkAnswer(df, Row(5) :: Row(null) :: Nil)
checkNumJoins(df.queryExecution.optimizedPlan, 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am missing context here: why need to check NumJoins only for this case? I did a code search and seems like other test cases in this suite do not care NumJoins.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am verifying the optimized plan should have 1 left outer join and 1 domain (inner) join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check the number of joins for the safe cast case as well?

@cloud-fan
Copy link
Contributor

is this only for scalar subquery?

@allisonwang-db
Copy link
Contributor Author

is this only for scalar subquery?

Scalar and lateral subqueries. IN and EXISTS subqueries are not supported because they are not using the new decorrelation framework.

Copy link
Contributor

@amaliujia amaliujia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 4d33ee0 Oct 24, 2022
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…edicates

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
  7. If you want to add a new configuration, please read the guideline first for naming configurations in
     'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
  8. If you want to add or modify an error type or message, please read the guideline first in
     'core/src/main/resources/error/README.md'.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->
This PR supports correlated non-equality predicates in subqueries. It leverages the DecorrelateInnerQuery framework to decorrelate subqueries with non-equality predicates. DecorrelateInnerQuery inserts domain joins in the query plan and the rule RewriteCorrelatedScalarSubquery rewrites the domain joins into actual joins with the outer query.

Note, correlated non-equality predicates can lead to query plans with non-equality join conditions, which may be planned as a broadcast NL join or cartesian product.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->
To improve subquery support in Spark.

### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such as the documentation fix.
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
If no, write 'No'.
-->
Yes. Before this PR, Spark does not allow correlated non-equality predicates in subqueries.
For example:
```sql
SELECT (SELECT min(c2) FROM t2 WHERE t1.c1 > t2.c1) FROM t1
```
This will throw an exception: `Correlated column is not allowed in a non-equality predicate`

After this PR, this query can run successfully.

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
-->
Unit tests and SQL query tests.

Closes apache#38135 from allisonwang-db/spark-36114-non-equality-pred.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants