Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43095][SQL] Avoid Once strategy's idempotence is broken for batch: Infer Filters #40742

Closed
wants to merge 5 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Apr 11, 2023

What changes were proposed in this pull request?

This PR makes it also remove EqualNullSafe when removing EqualTo if their children are same when constructing candidate constraints in ConstraintHelper.inferAdditionalConstraints.
For example: l = r and l <=> r. Before this PR, l = r and l <=> r can infer l <=> l and r <=> r which is useless. After This PR, it can't infer anything.

Why are the changes needed?

Avoid Once strategy's idempotence is broken for batch: Infer Filters:

export SPARK_TESTING=1

CREATE TABLE t1 (i INT, j INT, k STRING) USING parquet;
CREATE TABLE t2 (i INT, j INT, k STRING) USING parquet;
CREATE TABLE t3 (i INT, j INT, k STRING) USING parquet;

SELECT *
FROM   (SELECT t1.i, t1.i as t1i
        FROM t1 JOIN t3 ON t1.i = t3.i) t
       JOIN t2 ON t.i = t2.i;

Before this PR:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 Join Inner, (i#72 = i#78)                                               Join Inner, (i#72 = i#78)
!:- Project [i#72, i#72 AS t1i#71]                                       :- Filter ((i#72 <=> i#72) AND (t1i#71 <=> t1i#71))
!:  +- Join Inner, (i#72 = i#75)                                         :  +- Project [i#72, i#72 AS t1i#71]
!:     :- Project [i#72]                                                 :     +- Join Inner, (i#72 = i#75)
!:     :  +- Relation spark_catalog.default.t1[i#72,j#73,k#74] parquet   :        :- Filter isnotnull(i#72)
!:     +- Project [i#75]                                                 :        :  +- Project [i#72]
!:        +- Relation spark_catalog.default.t3[i#75,j#76,k#77] parquet   :        :     +- Relation spark_catalog.default.t1[i#72,j#73,k#74] parquet
!+- Relation spark_catalog.default.t2[i#78,j#79,k#80] parquet            :        +- Filter isnotnull(i#75)
!                                                                        :           +- Project [i#75]
!                                                                        :              +- Relation spark_catalog.default.t3[i#75,j#76,k#77] parquet
!                                                                        +- Filter isnotnull(i#78)
!                                                                           +- Relation spark_catalog.default.t2[i#78,j#79,k#80] parquet


org.apache.spark.SparkRuntimeException: Once strategy's idempotence is broken for batch Infer Filters
 Join Inner, (i#72 = i#78)                                                     Join Inner, (i#72 = i#78)
 :- Filter ((i#72 <=> i#72) AND (t1i#71 <=> t1i#71))                           :- Filter ((i#72 <=> i#72) AND (t1i#71 <=> t1i#71))
 :  +- Project [i#72, i#72 AS t1i#71]                                          :  +- Project [i#72, i#72 AS t1i#71]
 :     +- Join Inner, (i#72 = i#75)                                            :     +- Join Inner, (i#72 = i#75)
 :        :- Filter isnotnull(i#72)                                            :        :- Filter isnotnull(i#72)
 :        :  +- Project [i#72]                                                 :        :  +- Project [i#72]
 :        :     +- Relation spark_catalog.default.t1[i#72,j#73,k#74] parquet   :        :     +- Relation spark_catalog.default.t1[i#72,j#73,k#74] parquet
 :        +- Filter isnotnull(i#75)                                            :        +- Filter isnotnull(i#75)
 :           +- Project [i#75]                                                 :           +- Project [i#75]
 :              +- Relation spark_catalog.default.t3[i#75,j#76,k#77] parquet   :              +- Relation spark_catalog.default.t3[i#75,j#76,k#77] parquet
!+- Filter isnotnull(i#78)                                                     +- Filter (i#78 <=> i#78)
!   +- Relation spark_catalog.default.t2[i#78,j#79,k#80] parquet                  +- Filter isnotnull(i#78)
!                                                                                    +- Relation spark_catalog.default.t2[i#78,j#79,k#80] parquet.

After this PR:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
 Join Inner, (i#72 = i#78)                                               Join Inner, (i#72 = i#78)
 :- Project [i#72, i#72 AS t1i#71]                                       :- Project [i#72, i#72 AS t1i#71]
 :  +- Join Inner, (i#72 = i#75)                                         :  +- Join Inner, (i#72 = i#75)
!:     :- Project [i#72]                                                 :     :- Filter isnotnull(i#72)
!:     :  +- Relation spark_catalog.default.t1[i#72,j#73,k#74] parquet   :     :  +- Project [i#72]
!:     +- Project [i#75]                                                 :     :     +- Relation spark_catalog.default.t1[i#72,j#73,k#74] parquet
!:        +- Relation spark_catalog.default.t3[i#75,j#76,k#77] parquet   :     +- Filter isnotnull(i#75)
!+- Relation spark_catalog.default.t2[i#78,j#79,k#80] parquet            :        +- Project [i#75]
!                                                                        :           +- Relation spark_catalog.default.t3[i#75,j#76,k#77] parquet
!                                                                        +- Filter isnotnull(i#78)
!                                                                           +- Relation spark_catalog.default.t2[i#78,j#79,k#80] parquet
 

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@github-actions github-actions bot added the SQL label Apr 11, 2023
@wangyum
Copy link
Member Author

wangyum commented Apr 11, 2023

cc @cloud-fan

@@ -1430,6 +1430,10 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan]
.union(constructIsNotNullConstraints(constraints, plan.output))
.filter { c =>
c.references.nonEmpty && c.references.subsetOf(plan.outputSet) && c.deterministic
}.filterNot {
// Avoid once strategy idempotence is broken.
case a EqualNullSafe b => a.semanticEquals(b)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we fix the place that generates this useless predicate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Moved it to ConstraintHelper.inferAdditionalConstraints.

@@ -75,7 +75,10 @@ trait ConstraintHelper {
inferredConstraints ++= replaceConstraints(predicates - eq, l, r)
case _ => // No inference
}
inferredConstraints -- constraints
(inferredConstraints -- constraints).filterNot {
case a EqualNullSafe b => a.semanticEquals(b)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR. the generated EqualNullSafe will be optimized by SimplifyBinaryComparison.

@@ -75,7 +75,10 @@ trait ConstraintHelper {
inferredConstraints ++= replaceConstraints(predicates - eq, l, r)
case _ => // No inference
}
inferredConstraints -- constraints
(inferredConstraints -- constraints).filterNot {
Copy link
Contributor

@cloud-fan cloud-fan Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we update the pattern match above to add if !l.semanticEquals(r)?

@@ -66,13 +66,13 @@ trait ConstraintHelper {
val predicates = constraints.filterNot(_.isInstanceOf[IsNotNull])
predicates.foreach {
case eq @ EqualTo(l: Attribute, r: Attribute) =>
val candidateConstraints = predicates - eq
val candidateConstraints = predicates - eq - EqualNullSafe(l, r)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if l.semanticEquals(r), we don't need to infer predicates at all, right?

Copy link
Member Author

@wangyum wangyum Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But the current situation is like this: l <=> l and r <=> r is inferred from l === r and l <=> r. At this time, l.semanticEquals(r) is false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah got it. Let's add a code comment to explain it with this example.

Copy link
Contributor

@cloud-fan cloud-fan Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain it a bit more about why it makes the optimizer not idempotent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example:

Join Inner, (a#0 = a#7)
:- Project [a#0, a#0 AS xa#3]
:  +- Join Inner, (a#0 = a#4)
:     :- LocalRelation <empty>, [a#0, b#1, c#2]
:     +- LocalRelation <empty>, [a#4, b#5, c#6]
+- LocalRelation <empty>, [a#7, b#8, c#9]

After InferFiltersFromConstraints:

Join Inner, (a#0 = a#7)
:- Filter ((a#0 <=> a#0) AND (xa#3 <=> xa#3))
:  +- Project [a#0, a#0 AS xa#3]
:     +- Join Inner, (a#0 = a#4)
:        :- Filter isnotnull(a#0)
:        :  +- LocalRelation <empty>, [a#0, b#1, c#2]
:        +- Filter isnotnull(a#4)
:           +- LocalRelation <empty>, [a#4, b#5, c#6]
+- Filter isnotnull(a#7)
   +- LocalRelation <empty>, [a#7, b#8, c#9]

And run InferFiltersFromConstraints again. The left side constraints:

(a#0 <=> xa#3)
(xa#3 = a#0)
(a#0 <=> a#0)
(xa#3 <=> xa#3)

The right side constraints:

(isnotnull(a#7))

Join condition is:

(a#0 = a#7)

Based on these constraints, the inferred result is:

(a#7 <=> xa#3)
(xa#3 = a#7)
(a#7 <=> a#7)

(a#7 <=> a#7) is a valid constraints for right side. The result:

Join Inner, (a#0 = a#7)
:- Filter ((a#0 <=> a#0) AND (xa#3 <=> xa#3))
:  +- Project [a#0, a#0 AS xa#3]
:     +- Join Inner, (a#0 = a#4)
:        :- Filter isnotnull(a#0)
:        :  +- LocalRelation <empty>, [a#0, b#1, c#2]
:        +- Filter isnotnull(a#4)
:           +- LocalRelation <empty>, [a#4, b#5, c#6]
+- Filter (a#7 <=> a#7)
   +- Filter isnotnull(a#7)
      +- LocalRelation <empty>, [a#7, b#8, c#9]

@@ -66,13 +66,13 @@ trait ConstraintHelper {
val predicates = constraints.filterNot(_.isInstanceOf[IsNotNull])
predicates.foreach {
case eq @ EqualTo(l: Attribute, r: Attribute) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but shall we match Equality here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It seems ok.

@@ -66,13 +66,15 @@ trait ConstraintHelper {
val predicates = constraints.filterNot(_.isInstanceOf[IsNotNull])
predicates.foreach {
case eq @ EqualTo(l: Attribute, r: Attribute) =>
val candidateConstraints = predicates - eq
// Also remove EqualNullSafe with the same l and r to avoid Once strategy's idempotence
// is broken.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's mention l === r and l <=> r can infer l <=> l and r <=> r which is useless.

@wangyum wangyum closed this in 7796028 Apr 15, 2023
@wangyum
Copy link
Member Author

wangyum commented Apr 15, 2023

Merged to master.

@wangyum wangyum deleted the SPARK-43095 branch April 15, 2023 01:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants