Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30872][SQL] Constraints inferred from inferred attributes #27632

Closed
wants to merge 4 commits into from
Closed

[SPARK-30872][SQL] Constraints inferred from inferred attributes #27632

wants to merge 4 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Feb 19, 2020

What changes were proposed in this pull request?

This PR fix a special case about infer additional constraints. How to reproduce this issue:

scala> spark.range(20).selectExpr("id as a", "id as b", "id as c").write.saveAsTable("t1")

scala> spark.sql("select count(*) from t1 where a = b and b = c and (c = 3 or c = 13)").explain(false)
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition, true, [id=#76]
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(1) Project
         +- *(1) Filter (((((((isnotnull(c#36L) AND ((b#35L = 3) OR (b#35L = 13))) AND isnotnull(b#35L)) AND (a#34L = c#36L)) AND isnotnull(a#34L)) AND (a#34L = b#35L)) AND (b#35L = c#36L)) AND ((c#36L = 3) OR (c#36L = 13)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.t1[a#34L,b#35L,c#36L] Batched: true, DataFilters: [isnotnull(c#36L), ((b#35L = 3) OR (b#35L = 13)), isnotnull(b#35L), (a#34L = c#36L), isnotnull(a#..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/Downloads/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehous..., PartitionFilters: [], PushedFilters: [IsNotNull(c), Or(EqualTo(b,3),EqualTo(b,13)), IsNotNull(b), IsNotNull(a), Or(EqualTo(c,3),EqualT..., ReadSchema: struct<a:bigint,b:bigint,c:bigint>

We can infer more constraints: (a#34L = 3) OR (a#34L = 13).

Why are the changes needed?

Improve query performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and benchmark test.
Benchmark code and benchmark result:

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SaveMode.Overwrite

spark.conf.set("spark.sql.constraintPropagation.enabled", false)

val numRows = 1024 * 1024 * 15
val first = 10000
val mid = numRows / 2
val end = numRows - 10000
spark.range(numRows).selectExpr("id as a", "id as b", "id as c").write.saveAsTable("t1")

val title = "Constraints inferred from inferred constraints"
val benchmark = new Benchmark(title, numRows, minNumIters = 5)
benchmark.addCase("Infer to b") { _ =>
  spark.sql(s"select count(*) from t1 where a = b and b = c and a = c and (c = $first or c = $mid or c = $end) and (b = $first or b = $mid or b = $end)").write.format("noop").mode(Overwrite).save()
}

benchmark.addCase("Infer to b to a") { _ =>
  spark.sql(s"select count(*) from t1 where a = b and b = c and a = c and (c = $first or c = $mid or c = $end) and (b = $first or b = $mid or b = $end) and (a = $first or a = $mid or a = $end)").write.format("noop").mode(Overwrite).save()
}
benchmark.run()
Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Constraints inferred from inferred constraints:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Infer to b                                          463            547          65         34.0          29.4       1.0X
Infer to b to a                                     428            452          14         36.8          27.2       1.1X

@SparkQA
Copy link

SparkQA commented Feb 19, 2020

Test build #118663 has finished for PR 27632 at commit 2fe8253.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Feb 19, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Feb 19, 2020

Test build #118668 has started for PR 27632 at commit 2fe8253.

@SparkQA
Copy link

SparkQA commented Feb 20, 2020

Test build #118693 has finished for PR 27632 at commit 780fc35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -75,7 +85,7 @@ trait ConstraintHelper {
inferredConstraints ++= replaceConstraints(predicates - eq, l, r)
case _ => // No inference
}
inferredConstraints -- constraints
(inferredConstraints -- constraints).filterNot(i => constraints.exists(_.semanticEquals(i)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the constraint contains a = b. This change is to filter out b = a.

@wangyum wangyum changed the title [WIP][SPARK-30872][SQL] Constraints inferred from inferred attributes [SPARK-30872][SQL] Constraints inferred from inferred attributes Feb 20, 2020
@wangyum
Copy link
Member Author

wangyum commented Feb 20, 2020

cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Feb 24, 2020

Test build #118842 has finished for PR 27632 at commit 884ba16.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Feb 24, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Feb 24, 2020

Test build #118855 has finished for PR 27632 at commit 884ba16.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

# Conflicts:
#	sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3404,6 +3404,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
""".stripMargin)
checkAnswer(df, Row(Row(1, 2)) :: Nil)
}

test("SPARK-30872: Constraints inferred from inferred attributes") {
Copy link
Member Author

@wangyum wangyum Feb 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will throw TreeNodeException: Once strategy's idempotence is broken for batch Infer Filters before this PR:

[info] - SPARK-30872: Constraints inferred from inferred attributes *** FAILED *** (146 milliseconds)
[info]   org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Once strategy's idempotence is broken for batch Infer Filters
[info]  Aggregate [count(1) AS count(1)#19182L]                                                                                                                                                                                                                        Aggregate [count(1) AS count(1)#19182L]
[info]  +- Project                                                                                                                                                                                                                                                     +- Project
[info] !   +- Filter ((((((a#19179L = c#19181L) AND isnotnull(b#19180L)) AND isnotnull(c#19181L)) AND ((b#19180L = 3) OR (b#19180L = 13))) AND isnotnull(a#19179L)) AND (((a#19179L = b#19180L) AND (b#19180L = c#19181L)) AND ((c#19181L = 3) OR (c#19181L = 13))))      +- Filter (((a#19179L = 3) OR (a#19179L = 13)) AND ((((((a#19179L = c#19181L) AND isnotnull(b#19180L)) AND isnotnull(c#19181L)) AND ((b#19180L = 3) OR (b#19180L = 13))) AND isnotnull(a#19179L)) AND (((a#19179L = b#19180L) AND (b#19180L = c#19181L)) AND ((c#19181L = 3) OR (c#19181L = 13)))))
[info]        +- Relation[a#19179L,b#19180L,c#19181L] parquet                                                                                                                                                                                                                +- Relation[a#19179L,b#19180L,c#19181L] parquet
[info]           , tree:
[info] Aggregate [count(1) AS count(1)#19182L]
[info] +- Project
[info]    +- Filter (((a#19179L = 3) OR (a#19179L = 13)) AND ((((((a#19179L = c#19181L) AND isnotnull(b#19180L)) AND isnotnull(c#19181L)) AND ((b#19180L = 3) OR (b#19180L = 13))) AND isnotnull(a#19179L)) AND (((a#19179L = b#19180L) AND (b#19180L = c#19181L)) AND ((c#19181L = 3) OR (c#19181L = 13)))))
[info]       +- Relation[a#19179L,b#19180L,c#19181L] parquet
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.checkBatchIdempotence(RuleExecutor.scala:100)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:187)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:132)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:132)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:111)
[info]   at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:111)
[info]   at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82)
[info]   at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
[info]   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:119)
[info]   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:762)
[info]   at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:119)
[info]   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
[info]   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
[info]   at org.apache.spark.sql.QueryTest.assertEmptyMissingInput(QueryTest.scala:231)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:154)
[info]   at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$746(SQLQuerySuite.scala:3413)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

@SparkQA
Copy link

SparkQA commented Feb 26, 2020

Test build #118963 has finished for PR 27632 at commit bf7b8e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

github-actions bot commented Jun 8, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jun 8, 2020
@wangyum wangyum closed this Jun 8, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants