Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41162][SQL] Fix anti- and semi-join for self-join with aggregations #39131

Closed

Conversation

EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Dec 19, 2022

What changes were proposed in this pull request?

Rule PushDownLeftSemiAntiJoin should not push an anti-join below an Aggregate when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while DeduplicateRelations cannot deduplicate those attributes (in this example due to the projection of value to id).

This behaviour already exists for Project and Union, but Aggregate lacks this safety guard.

Why are the changes needed?

Without this change, the optimizer creates an incorrect plan.

This example fails with distinct() (an aggregation), and succeeds without distinct(), but both queries are identical:

val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)

With distinct(), rule PushDownLeftSemiAntiJoin creates a join condition (value#907 + 1) = value#907, which can never be true. This effectively removes the anti-join.

Before this PR:
The anti-join is fully removed from the plan.

== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true

This is caused by PushDownLeftSemiAntiJoin adding join condition (value#907 + 1) = value#907, which is wrong as because id#910 in (id#910 + 1) AS id#912 exists in the right child of the join as well as in the left grandchild:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

After this PR:
Join condition (id#910 + 1) AS id#912 is understood to become ambiguous as both sides of the prospect join contain id#910. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:

== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true

Does this PR introduce any user-facing change?

It fixes correctness.

How was this patch tested?

Unit tests in DataFrameJoinSuite and LeftSemiAntiJoinPushDownSuite.

@github-actions github-actions bot added the SQL label Dec 19, 2022
@EnricoMi EnricoMi changed the title [SPARK-41162][SQL] Fix anti-join and semi-join for aggregations with self-join [SPARK-41162][SQL] Fix anti- and semi-join for self-join with aggregations Dec 19, 2022
@mridulm
Copy link
Contributor

mridulm commented Dec 20, 2022

+CC @shardulm94

@EnricoMi
Copy link
Contributor Author

@cloud-fan I think this is a better approach to fix correctness bug SPARK-41162 than #38676.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
Copy link
Contributor

@EnricoMi thanks for the fix! which spark version starts to have this bug?

@@ -46,7 +46,7 @@ class LeftSemiPushdownSuite extends PlanTest {
val testRelation1 = LocalRelation($"d".int)
val testRelation2 = LocalRelation($"e".int)

test("Project: LeftSemiAnti join pushdown") {
test("Project: LeftSemi join pushdown") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These change to test names are necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The term LeftSemiAnti is wrong and misleading for individual tests, correcting this while I am touching the file.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Jan 5, 2023

@EnricoMi thanks for the fix! which spark version starts to have this bug?

This was introduced in Spark 3.0.0.

@cloud-fan cloud-fan closed this in 737eecd Jan 5, 2023
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan
Copy link
Contributor

@EnricoMi can you help to create backport PRs for 3.3/3.2/3.1? It has conflicts. Thanks!

EnricoMi added a commit to G-Research/spark that referenced this pull request Jan 5, 2023
…tions

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

It fixes correctness.

Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes apache#39131 from EnricoMi/branch-antijoin-selfjoin-fix.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
EnricoMi added a commit to G-Research/spark that referenced this pull request Jan 5, 2023
…tions

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

It fixes correctness.

Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes apache#39131 from EnricoMi/branch-antijoin-selfjoin-fix.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
EnricoMi added a commit to G-Research/spark that referenced this pull request Jan 5, 2023
…tions

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

It fixes correctness.

Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes apache#39131 from EnricoMi/branch-antijoin-selfjoin-fix.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Jan 5, 2023

@EnricoMi can you help to create backport PRs for 3.3/3.2/3.1? It has conflicts. Thanks!

@cloud-fan Backport for 3.3 (and 3.2) in #39409.
Looks like 3.1 is not needed: #39411 (review).

@dongjoon-hyun
Copy link
Member

Yep. This is applicable for 3.3 and 3.2 in the community.

cloud-fan pushed a commit that referenced this pull request Jan 6, 2023
…gregations

### What changes were proposed in this pull request?
Backport #39131 to branch-3.3.

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

### Why are the changes needed?
Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

### Does this PR introduce _any_ user-facing change?
It fixes correctness.

### How was this patch tested?
Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes #39409 from EnricoMi/branch-antijoin-selfjoin-fix-3.3.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Jan 6, 2023
…gregations

### What changes were proposed in this pull request?
Backport #39131 to branch-3.3.

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

### Why are the changes needed?
Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

### Does this PR introduce _any_ user-facing change?
It fixes correctness.

### How was this patch tested?
Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes #39409 from EnricoMi/branch-antijoin-selfjoin-fix-3.3.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b97f79d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
if agg.aggregateExpressions.forall(_.deterministic) && agg.groupingExpressions.nonEmpty &&
!agg.aggregateExpressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) &&
canPushThroughCondition(agg.children, joinCond, rightOp) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rewrite the joinCondition assuming the join has already been pushed through Aggregate. That said, we need to do alias replacement for joinCondition first. cc @EnricoMi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. The canPushThroughCondition is called before the Join is being pushed through the Aggregate, it has been added to prevent this from happening in this situation. The other cases (e.g. Union) are calling into canPushThroughCondition equivalently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, canPushThroughCondition checks the right side references of the join condition, and check if the right side references have conflict expr ID with left side plan (below Project) output. It doesn't care about the left side references of the join condition.

sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…gregations

### What changes were proposed in this pull request?
Backport apache#39131 to branch-3.3.

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

### Why are the changes needed?
Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

### Does this PR introduce _any_ user-facing change?
It fixes correctness.

### How was this patch tested?
Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes apache#39409 from EnricoMi/branch-antijoin-selfjoin-fix-3.3.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b97f79d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
7 participants