Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41162][SQL][3.3] Fix anti- and semi-join for self-join with aggregations #39409

Conversation

EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Jan 5, 2023

What changes were proposed in this pull request?

Backport #39131 to branch-3.3.

Rule PushDownLeftSemiAntiJoin should not push an anti-join below an Aggregate when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while DeduplicateRelations cannot deduplicate those attributes (in this example due to the projection of value to id).

This behaviour already exists for Project and Union, but Aggregate lacks this safety guard.

Why are the changes needed?

Without this change, the optimizer creates an incorrect plan.

This example fails with distinct() (an aggregation), and succeeds without distinct(), but both queries are identical:

val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)

With distinct(), rule PushDownLeftSemiAntiJoin creates a join condition (value#907 + 1) = value#907, which can never be true. This effectively removes the anti-join.

Before this PR:
The anti-join is fully removed from the plan.

== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true

This is caused by PushDownLeftSemiAntiJoin adding join condition (value#907 + 1) = value#907, which is wrong as because id#910 in (id#910 + 1) AS id#912 exists in the right child of the join as well as in the left grandchild:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

After this PR:
Join condition (id#910 + 1) AS id#912 is understood to become ambiguous as both sides of the prospect join contain id#910. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:

== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true

Does this PR introduce any user-facing change?

It fixes correctness.

How was this patch tested?

Unit tests in DataFrameJoinSuite and LeftSemiAntiJoinPushDownSuite.

@github-actions github-actions bot added the SQL label Jan 5, 2023
…tions

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

It fixes correctness.

Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes apache#39131 from EnricoMi/branch-antijoin-selfjoin-fix.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@EnricoMi EnricoMi force-pushed the branch-antijoin-selfjoin-fix-3.3 branch from c531325 to b8b22a7 Compare January 5, 2023 13:03
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Jan 5, 2023

@cloud-fan this backports #39131 to branch-3.3 (and branch-3.2).

@cloud-fan
Copy link
Contributor

all tests pass: https://github.com/G-Research/spark/runs/10459421774

merging to 3.3/3.2, thanks!

cloud-fan pushed a commit that referenced this pull request Jan 6, 2023
…gregations

### What changes were proposed in this pull request?
Backport #39131 to branch-3.3.

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

### Why are the changes needed?
Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

### Does this PR introduce _any_ user-facing change?
It fixes correctness.

### How was this patch tested?
Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes #39409 from EnricoMi/branch-antijoin-selfjoin-fix-3.3.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Jan 6, 2023
…gregations

### What changes were proposed in this pull request?
Backport #39131 to branch-3.3.

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

### Why are the changes needed?
Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

### Does this PR introduce _any_ user-facing change?
It fixes correctness.

### How was this patch tested?
Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes #39409 from EnricoMi/branch-antijoin-selfjoin-fix-3.3.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b97f79d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan cloud-fan closed this Jan 6, 2023
@EnricoMi EnricoMi deleted the branch-antijoin-selfjoin-fix-3.3 branch January 6, 2023 07:49
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…gregations

### What changes were proposed in this pull request?
Backport apache#39131 to branch-3.3.

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

### Why are the changes needed?
Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

### Does this PR introduce _any_ user-facing change?
It fixes correctness.

### How was this patch tested?
Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes apache#39409 from EnricoMi/branch-antijoin-selfjoin-fix-3.3.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b97f79d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants