Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38977][SQL] Fix schema pruning with correlated subqueries #36303

Closed
wants to merge 2 commits into from

Conversation

aokolnychyi
Copy link
Contributor

What changes were proposed in this pull request?

This PR fixes schema pruning for queries with multiple correlated subqueries. Previously, Spark would throw an exception trying to determine root fields in SchemaPruning$identifyRootFields. That was happening because expressions in predicates that referenced attributes in subqueries were not ignored. That's why attributes from multiple subqueries could conflict with each other (e.g. incompatible types) even though they should be ignored.

For instance, the following query would throw a runtime exception.

SELECT name FROM contacts c
WHERE
 EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
 AND
 EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
[info]   org.apache.spark.SparkException: Failed to merge fields 'value' and 'value'. Failed to merge incompatible data types int and string
[info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.failedMergingFieldsError(QueryExecutionErrors.scala:936)

Why are the changes needed?

These changes are needed to avoid exceptions for some queries with multiple correlated subqueries.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

This PR comes with tests.

@@ -152,6 +152,10 @@ object SchemaPruning extends SQLConfHelper {
RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil
case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
expr.children.flatMap(getRootFields).map(_.copy(prunedIfAnyChildAccessed = true))
case s: SubqueryExpression =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I tried another approach. I was passing AttributeSet with table attributes and checking above if an attribute belongs to the table output. However, that required changing many places. This change is much smaller. Let me know if there are cases when this will not work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks reasonable to me. I am not aware of cases when this will not work. Let's wait for feedback from others.

@@ -935,4 +935,106 @@ abstract class SchemaPruningSuite
.count()
assert(count == 0)
}

testSchemaPruning("SPARK-38977: schema pruning with correlated EXISTS subquery") {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these queries would previously fail for V2 tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this bug only happens for v2 tables, not file source tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it will fail for both as the same method is used. Tests cover V1 and V2 so it should work for both now.

@github-actions github-actions bot added the SQL label Apr 21, 2022
@aokolnychyi
Copy link
Contributor Author

@huaxingao @sunchao @viirya @HyukjinKwon @cloud-fan @dongjoon-hyun @parthchandra, could you take a look whenever you have a minute?

@viirya
Copy link
Member

viirya commented Apr 21, 2022

This looks similar to #36216?

@aokolnychyi
Copy link
Contributor Author

@viirya, it looks similar but I am afraid #36216 does not address the problem that fails queries in this PR.

As far as I see, it updates ProjectionOverSchema that's being used after calling SchemaPruning$identifyRootFields. In my case, the failure happened while merging the schema in identifyRootFields. I am not sure whether my fix covers the other case, though.

@aokolnychyi
Copy link
Contributor Author

We may need both. Let me quickly check.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Apr 21, 2022

Alright, I think we will need both.

After this PR, the output returned by PushDownUtils$pruneColumns will only include columns from one relation. However, we still apply ProjectionOverSchema on filters with subqueries that may reference other relations too. That's why we need both changes.

@allisonwang-db @viirya, what do you think?

df2.createOrReplaceTempView("first_names")

val query = sql(
s"""SELECT name FROM contacts c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove s?


val query = sql(
s"""SELECT name FROM contacts c
|WHERE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This seems to be a 3-space indentation?

val query = sql(
s"""SELECT name FROM contacts c
|WHERE
| EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2-space after |?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, I agree that we need both PRs. Thanks @aokolnychyi

@viirya
Copy link
Member

viirya commented Apr 22, 2022

I'm going to merge this once CI passes.

@aokolnychyi
Copy link
Contributor Author

I think I addressed the indentation comments in all tests. @huaxingao, could you double check, please?

@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @huaxingao @viirya @cloud-fan!

Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix!

@viirya
Copy link
Member

viirya commented Apr 22, 2022

Thanks @aokolnychyi and all. Merging to master/3.3/3.2.

@viirya viirya closed this in 0c9947d Apr 22, 2022
viirya pushed a commit that referenced this pull request Apr 22, 2022
### What changes were proposed in this pull request?

This PR fixes schema pruning for queries with multiple correlated subqueries. Previously, Spark would throw an exception trying to determine root fields in `SchemaPruning$identifyRootFields`. That was happening because expressions in predicates that referenced attributes in subqueries were not ignored. That's why attributes from multiple subqueries could conflict with each other (e.g. incompatible types) even though they should be ignored.

For instance, the following query would throw a runtime exception.

```
SELECT name FROM contacts c
WHERE
 EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
 AND
 EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
```
```
[info]   org.apache.spark.SparkException: Failed to merge fields 'value' and 'value'. Failed to merge incompatible data types int and string
[info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.failedMergingFieldsError(QueryExecutionErrors.scala:936)
```

### Why are the changes needed?

These changes are needed to avoid exceptions for some queries with multiple correlated subqueries.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR comes with tests.

Closes #36303 from aokolnychyi/spark-38977.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 0c9947d)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
viirya pushed a commit that referenced this pull request Apr 22, 2022
### What changes were proposed in this pull request?

This PR fixes schema pruning for queries with multiple correlated subqueries. Previously, Spark would throw an exception trying to determine root fields in `SchemaPruning$identifyRootFields`. That was happening because expressions in predicates that referenced attributes in subqueries were not ignored. That's why attributes from multiple subqueries could conflict with each other (e.g. incompatible types) even though they should be ignored.

For instance, the following query would throw a runtime exception.

```
SELECT name FROM contacts c
WHERE
 EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
 AND
 EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
```
```
[info]   org.apache.spark.SparkException: Failed to merge fields 'value' and 'value'. Failed to merge incompatible data types int and string
[info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.failedMergingFieldsError(QueryExecutionErrors.scala:936)
```

### Why are the changes needed?

These changes are needed to avoid exceptions for some queries with multiple correlated subqueries.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR comes with tests.

Closes #36303 from aokolnychyi/spark-38977.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 0c9947d)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
### What changes were proposed in this pull request?

This PR fixes schema pruning for queries with multiple correlated subqueries. Previously, Spark would throw an exception trying to determine root fields in `SchemaPruning$identifyRootFields`. That was happening because expressions in predicates that referenced attributes in subqueries were not ignored. That's why attributes from multiple subqueries could conflict with each other (e.g. incompatible types) even though they should be ignored.

For instance, the following query would throw a runtime exception.

```
SELECT name FROM contacts c
WHERE
 EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
 AND
 EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
```
```
[info]   org.apache.spark.SparkException: Failed to merge fields 'value' and 'value'. Failed to merge incompatible data types int and string
[info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.failedMergingFieldsError(QueryExecutionErrors.scala:936)
```

### Why are the changes needed?

These changes are needed to avoid exceptions for some queries with multiple correlated subqueries.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR comes with tests.

Closes apache#36303 from aokolnychyi/spark-38977.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 0c9947d)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants