Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29231][SQL] Constraints should be inferred from cast equality constraint #27252

Closed
wants to merge 10 commits into from
Closed

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Jan 17, 2020

What changes were proposed in this pull request?

This PR add support infer constraints from cast equality constraint. For example:

scala> spark.sql("create table spark_29231_1(c1 bigint, c2 bigint)")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql("create table spark_29231_2(c1 int, c2 bigint)")
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("select t1.* from spark_29231_1 t1 join spark_29231_2 t2 on (t1.c1 = t2.c1 and t1.c1 = 1)").explain
== Physical Plan ==
*(2) Project [c1#5L, c2#6L]
+- *(2) BroadcastHashJoin [c1#5L], [cast(c1#7 as bigint)], Inner, BuildRight
   :- *(2) Project [c1#5L, c2#6L]
   :  +- *(2) Filter (isnotnull(c1#5L) AND (c1#5L = 1))
   :     +- *(2) ColumnarToRow
   :        +- FileScan parquet default.spark_29231_1[c1#5L,c2#6L] Batched: true, DataFilters: [isnotnull(c1#5L), (c1#5L = 1)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehouse/spark_29231_1], PartitionFilters: [], PushedFilters: [IsNotNull(c1), EqualTo(c1,1)], ReadSchema: struct<c1:bigint,c2:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#209]
      +- *(1) Project [c1#7]
         +- *(1) Filter isnotnull(c1#7)
            +- *(1) ColumnarToRow
               +- FileScan parquet default.spark_29231_2[c1#7] Batched: true, DataFilters: [isnotnull(c1#7)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehouse/spark_29231_2], PartitionFilters: [], PushedFilters: [IsNotNull(c1)], ReadSchema: struct<c1:int>

After this PR:

scala> spark.sql("select t1.* from spark_29231_1 t1 join spark_29231_2 t2 on (t1.c1 = t2.c1 and t1.c1 = 1)").explain
== Physical Plan ==
*(2) Project [c1#0L, c2#1L]
+- *(2) BroadcastHashJoin [c1#0L], [cast(c1#2 as bigint)], Inner, BuildRight
   :- *(2) Project [c1#0L, c2#1L]
   :  +- *(2) Filter (isnotnull(c1#0L) AND (c1#0L = 1))
   :     +- *(2) ColumnarToRow
   :        +- FileScan parquet default.spark_29231_1[c1#0L,c2#1L] Batched: true, DataFilters: [isnotnull(c1#0L), (c1#0L = 1)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/spark/spark-warehouse/spark_29231_1], PartitionFilters: [], PushedFilters: [IsNotNull(c1), EqualTo(c1,1)], ReadSchema: struct<c1:bigint,c2:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#99]
      +- *(1) Project [c1#2]
         +- *(1) Filter ((cast(c1#2 as bigint) = 1) AND isnotnull(c1#2))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.spark_29231_2[c1#2] Batched: true, DataFilters: [(cast(c1#2 as bigint) = 1), isnotnull(c1#2)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/spark/spark-warehouse/spark_29231_2], PartitionFilters: [], PushedFilters: [IsNotNull(c1)], ReadSchema: struct<c1:int>

Why are the changes needed?

Improve query performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and benchmark test.
Benchmark code and benchmark result:

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SaveMode.Overwrite

val numRows = 1024 * 1024 * 15
spark.range(numRows).selectExpr("cast(id as bigint) as c1", "cast(id as string) as c2").write.saveAsTable("t1")
spark.range(numRows).selectExpr("cast(id as int) as c1", "cast(id as string) as c2").write.saveAsTable("t2")

val title = "Constraints inferred from cast equality constraint"
val benchmark = new Benchmark(title, numRows, minNumIters = 5)
benchmark.addCase(s"t1.c1 > ${numRows - 10000}") { _ =>
  spark.sql(s"select count(*) from t1 join t2 on (t1.c1 = t2.c1 and t1.c1 > ${numRows - 10000})").write.format("noop").mode(Overwrite).save()
}

benchmark.addCase("t1.c1 = 100000") { _ =>
  spark.sql("select count(*) from t1 join t2 on (t1.c1 = t2.c1 and t1.c1 = 100000)").write.format("noop").mode(Overwrite).save()
}
benchmark.run()

Before this PR:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Constraints inferred from cast equality constraint:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
t1.c1 > 15718640                                   3718           3851         164          4.2         236.4       1.0X
t1.c1 = 100000                                     2133           2185          40          7.4         135.6       1.7X

After this PR:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Constraints inferred from cast equality constraint:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
t1.c1 > 15718640                                    348            435          67         45.2          22.1       1.0X
t1.c1 = 100000                                      264            311          35         59.5          16.8       1.3X

@wangyum
Copy link
Member Author

wangyum commented Jan 17, 2020

PostgreSQL and Hive support this feature:

postgres=# EXPLAIN  select t1.* from spark_29231_1 t1 join spark_29231_2 t2 on (t1.c1 = t2.c1 and t1.c1 = 1);
                                  QUERY PLAN
------------------------------------------------------------------------------
 Nested Loop  (cost=0.00..69.77 rows=90 width=16)
   ->  Seq Scan on spark_29231_2 t2  (cost=0.00..35.50 rows=10 width=4)
         Filter: (c1 = 1)
   ->  Materialize  (cost=0.00..33.17 rows=9 width=16)
         ->  Seq Scan on spark_29231_1 t1  (cost=0.00..33.12 rows=9 width=16)
               Filter: (c1 = 1)
(6 rows)
hive> explain select t1.* from spark_29231_1 t1 join spark_29231_2 t2 on (t1.c1 = t2.c1 and t1.c1 = 1);
Warning: Map Join MAPJOIN[11][bigTable=?] in task 'Stage-3:MAPRED' is a cross product
OK
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_0:t1
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_0:t1
          TableScan
            alias: t1
            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
            Filter Operator
              predicate: (c1 = 1L) (type: boolean)
              Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
              Select Operator
                expressions: c2 (type: bigint)
                outputColumnNames: _col1
                Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0
                    1

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: t2
            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
            Filter Operator
              predicate: (UDFToLong(c1) = 1) (type: boolean)
              Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
              Select Operator
                Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0
                    1
                  outputColumnNames: _col1
                  Statistics: Num rows: 1 Data size: 1 Basic stats: PARTIAL Column stats: NONE
                  Select Operator
                    expressions: 1L (type: bigint), _col1 (type: bigint)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 1 Data size: 1 Basic stats: PARTIAL Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 1 Data size: 1 Basic stats: PARTIAL Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Execution mode: vectorized
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

Time taken: 0.2 seconds, Fetched: 69 row(s)

@SparkQA
Copy link

SparkQA commented Jan 17, 2020

Test build #116909 has finished for PR 27252 at commit 048a0ec.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jan 17, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 17, 2020

Test build #116923 has finished for PR 27252 at commit 048a0ec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 19, 2020

Test build #116993 has finished for PR 27252 at commit 34c7001.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jan 19, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 19, 2020

Test build #116998 has finished for PR 27252 at commit 34c7001.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jan 20, 2020

Could you check if this fix can affect optimization time? IIRC we hit the time-consuming issue before in constraint propagation, e.g., e011004

@wangyum
Copy link
Member Author

wangyum commented Jan 20, 2020

@maropu Before this PR:

00:04:20.465 WARN org.apache.spark.sql.SQLQueryTestSuite:
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 5326085
Total time: 144.747179483 seconds

Rule                                                                                               Effective Time / Total Time                     Effective Runs / Total Runs

org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries                               16779452101 / 20173139174                       748 / 26140
org.apache.spark.sql.catalyst.optimizer.ColumnPruning                                              1425337133 / 5635754805                         5138 / 60806
org.apache.spark.sql.execution.datasources.FindDataSourceTable                                     4948196097 / 5101730455                         2384 / 49272
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery                                    3341395204 / 3939149301                         830 / 49313
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                                  1705032936 / 2959703766                         9241 / 49384
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions                          830752175 / 2570283319                          275 / 49294
org.apache.spark.sql.catalyst.optimizer.PushDownPredicates                                         455566317 / 2312688244                          2302 / 47783
org.apache.spark.sql.catalyst.optimizer.CollapseProject                                            989929053 / 2118855460                          5895 / 47762
org.apache.spark.sql.catalyst.optimizer.PruneFilters                                               14076793 / 1946034212                           168 / 47710
org.apache.spark.sql.catalyst.optimizer.ConstantFolding                                            653505895 / 1904392560                          3565 / 34666
org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators                                        176374899 / 1874860360                          2339 / 60754
org.apache.spark.sql.catalyst.optimizer.RemoveDispensableExpressions                               738001 / 1856087864                             6 / 34666
org.apache.spark.sql.catalyst.optimizer.NullPropagation                                            45193963 / 1759483772                           359 / 34718
org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions                          0 / 1743186421                                  0 / 34666
org.apache.spark.sql.catalyst.optimizer.BooleanSimplification                                      8531965 / 1689250544                            54 / 34666
org.apache.spark.sql.catalyst.optimizer.SimplifyCasts                                              45594645 / 1557656900                           287 / 34666
org.apache.spark.sql.catalyst.optimizer.LikeSimplification                                         0 / 1536745808                                  0 / 34666
org.apache.spark.sql.catalyst.optimizer.ReorderAssociativeOperator                                 0 / 1480669059                                  0 / 34666
org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals                                       9875655 / 1456484466                            178 / 34666
org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison                                   2727915 / 1453064933                            20 / 34666
org.apache.spark.sql.catalyst.optimizer.EliminateMapObjects                                        0 / 1424448232                                  0 / 13044
org.apache.spark.sql.catalyst.optimizer.CombineUnions                                              6802865 / 1423200337                            46 / 47814
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts                              337664736 / 1421112672                          1245 / 49272
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions                                   788751339 / 1396854044                          5789 / 49313
org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps                                    258804 / 1373494442                             3 / 34666
org.apache.spark.sql.catalyst.optimizer.ReplaceNullWithFalseInPredicate                            267041 / 1369285011                             3 / 34666
org.apache.spark.sql.catalyst.optimizer.OptimizeIn                                                 563702 / 1363849256                             4 / 34718
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints                                556287088 / 1322621698                          1349 / 13044

After this PR:

23:23:13.592 WARN org.apache.spark.sql.SQLQueryTestSuite:
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 5323370
Total time: 148.750443534 seconds

Rule                                                                                               Effective Time / Total Time                     Effective Runs / Total Runs

org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries                               18733079713 / 22418423535                       748 / 26140
org.apache.spark.sql.catalyst.optimizer.ColumnPruning                                              1457156155 / 6046856332                         5137 / 60797
org.apache.spark.sql.execution.datasources.FindDataSourceTable                                     4908256520 / 5087039927                         2384 / 49272
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery                                    3410581306 / 4001191597                         830 / 49313
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                                  1731479493 / 3001691281                         9241 / 49384
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions                          832539494 / 2565833320                          275 / 49294
org.apache.spark.sql.catalyst.optimizer.PruneFilters                                               15101110 / 2212401759                           168 / 47701
org.apache.spark.sql.catalyst.optimizer.PushDownPredicates                                         461342022 / 2065929738                          2284 / 47765
org.apache.spark.sql.catalyst.optimizer.SimplifyCasts                                              44285018 / 1920855654                           287 / 34657
org.apache.spark.sql.catalyst.optimizer.ConstantFolding                                            681578304 / 1905774291                          3565 / 34657
org.apache.spark.sql.catalyst.optimizer.NullPropagation                                            44365153 / 1899594511                           359 / 34709
org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators                                        175903898 / 1869259827                          2337 / 60745
org.apache.spark.sql.catalyst.optimizer.CollapseProject                                            640683938 / 1787207725                          5877 / 47753
org.apache.spark.sql.catalyst.optimizer.BooleanSimplification                                      8551126 / 1690452897                            54 / 34657
org.apache.spark.sql.catalyst.optimizer.ReplaceNullWithFalseInPredicate                            247737 / 1667509303                             3 / 34657
org.apache.spark.sql.catalyst.optimizer.ConstantPropagation                                        1252963 / 1640089155                            6 / 34709
org.apache.spark.sql.catalyst.optimizer.LikeSimplification                                         0 / 1519821245                                  0 / 34657
org.apache.spark.sql.catalyst.optimizer.RemoveDispensableExpressions                               633418 / 1510769956                             6 / 34657
org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals                                       11138539 / 1476803222                           178 / 34657
org.apache.spark.sql.catalyst.optimizer.ReorderAssociativeOperator                                 0 / 1461559025                                  0 / 34657
org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison                                   2479408 / 1439506402                            20 / 34657
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions                                   796497708 / 1425108121                          5789 / 49313
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts                              338657646 / 1416857363                          1245 / 49272
org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions                          0 / 1380618071                                  0 / 34657
org.apache.spark.sql.catalyst.optimizer.OptimizeIn                                                 615040 / 1351329799                             4 / 34709
org.apache.spark.sql.catalyst.optimizer.CombineUnions                                              6596639 / 1351015377                            46 / 47805
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints                                533512099 / 1340374908                          1246 / 13044

@maropu
Copy link
Member

maropu commented Jan 20, 2020

Thanks, @wangyum ! cc: @viirya

@SparkQA
Copy link

SparkQA commented Jan 20, 2020

Test build #117104 has finished for PR 27252 at commit 7dcfe91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jan 21, 2020

cc @cloud-fan

val candidateConstraints = binaryComparisons - eq
val bridge = Cast(r, lc.dataType, tz)
inferredConstraints ++= replaceConstraints(candidateConstraints, r, l)
inferredConstraints ++= replaceConstraints(candidateConstraints, lc, bridge)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this safe? cast is not reversible. cast(int_col as long) = long_col can have different result than int_col = cast(long_col as int).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2060190: Makes it only support filter at higher data type.

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117624 has finished for PR 27252 at commit 2060190.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Jan 31, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 31, 2020

Test build #117644 has finished for PR 27252 at commit 2060190.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -62,11 +62,16 @@ trait ConstraintHelper {
*/
def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = {
var inferredConstraints = Set.empty[Expression]
val binaryComparisons = constraints.filter(_.isInstanceOf[BinaryComparison])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know what constraints are not BinaryComparison? I think it's possible, but I can't find some examples.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. Only BinaryComparison is incorrect. for example: int_column = long_column where long_column in (1L, 2L).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue fix by 420058b.

@SparkQA
Copy link

SparkQA commented Feb 4, 2020

Test build #117800 has started for PR 27252 at commit 420058b.

@wangyum
Copy link
Member Author

wangyum commented Feb 4, 2020

retest this please

2 similar comments
@wangyum
Copy link
Member Author

wangyum commented Feb 4, 2020

retest this please

@wangyum
Copy link
Member Author

wangyum commented Feb 4, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Feb 4, 2020

Test build #117816 has finished for PR 27252 at commit 420058b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Feb 4, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Feb 4, 2020

Test build #117821 has finished for PR 27252 at commit 420058b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -62,11 +62,17 @@ trait ConstraintHelper {
*/
def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = {
var inferredConstraints = Set.empty[Expression]
// IsNotNull should be constructed by `constructIsNotNullConstraints`.
val predicates = constraints.filterNot(_.isInstanceOf[IsNotNull])
constraints.foreach {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot do it like predicates.foreach { here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can do it.


Seq(Some("left.a".attr.cast(LongType) === "right.b".attr),
Some("right.b".attr === "left.a".attr.cast(LongType))).foreach { condition =>
testConstraintsAfterJoin(originalLeft, originalRight, left, right, Inner, condition)
Copy link
Member

@maropu maropu Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of test coverage, its better to test both cases (left/right-side casts)? I have the same comment in the test below.

@SparkQA
Copy link

SparkQA commented Feb 12, 2020

Test build #118270 has finished for PR 27252 at commit 47eadf4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2020

Test build #118273 has finished for PR 27252 at commit d283028.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2020

Test build #118276 has finished for PR 27252 at commit d055eba.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Feb 12, 2020

retest this please

1 similar comment
@cloud-fan
Copy link
Contributor

retest this please

val left = testRelation1.where(IsNotNull('a) && 'a === 1).subquery('left)
val right = testRelation2.where(IsNotNull('b)).subquery('right)

Seq(Some("left.a".attr.cast(LongType) === "right.b".attr),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong, but I find these test cases a bit confusing because left and right have equality filters. Eg. here 'a === 1 in left so actually it would be correct to infer 1.cast(LongType) === 'b for right. This PR doesn't do that obviously (#27518 will address that) but probably using inequalities ('a < 1) would be easier to follow.

inferredConstraints ++= replaceConstraints(candidateConstraints, l, r)
inferredConstraints ++= replaceConstraints(candidateConstraints, r, l)
case eq @ EqualTo(l @ Cast(_: Attribute, _, _), r: Attribute) =>
inferredConstraints ++= replaceConstraints(predicates - eq, r, l)
Copy link
Contributor

@cloud-fan cloud-fan Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to https://github.com/apache/spark/pull/27252/files#r378111623

If we have cast(a, dt) = b and b = 1, we can definitely infer cast(a, dt) = 1.
If we have cast(a, dt) = b and a = 1, seems we can also infer cast(1, dt) = b.

But I'm a bit unsure about how to do it. We may need a variant of replaceConstraints, which holds an expression builder "e => cast(e, dt) = b". It looks for attribute a, and replace it with cast(1, dt) = b

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to touch replaceConstraints at all. Please check #27518 that will do the trick because a = 1 will be "substituted" into cast(a, dt) = b as a new cast(1, dt) = b constraint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both this PR and #27518 are beneficial. But I would use val originalLeft = testRelation1.where('a < 1).subquery('left) in test cases of this one instead of val originalLeft = testRelation1.where('a === 1).subquery('left) to avoid confusion.

Copy link
Member Author

@wangyum wangyum Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan This PR support cast(1, dt) = b before:
https://github.com/apache/spark/compare/048a0ecc65763c6feaa939938e2dec6f4040d939..7dcfe915087dbe274b470928600197745a645f5e

I removed it because:

  1. It may be broken the plan. This is how I handled it before.
  2. For cast(a, dt) = b, we support inferring many predicates, for example: a > 1, a < 1, a in (2, 3). I'm not sure if it's safe.

How about only supporting cast(a, dt) = 1 now?

@peter-toth I'd like to support these cases in #27518:

a < b && b < c infer a < c
a < b && b <= c infer a < c
a < b && b = c infer a < c
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum I see, but I think currently you are doing something very different in #27518 see details here: #27518 (comment)

I would suggest keeping your #27518 in its current form (but amending its title) and open a new one to address inequalities.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about only supporting cast(a, dt) = 1 now?

+1 for supporting the limited case only in this pr. Since this part of optimization can affect many queries, I think we need exhaustive discussions and tests for supporting wider cases.

@SparkQA
Copy link

SparkQA commented Feb 12, 2020

Test build #118283 has finished for PR 27252 at commit d055eba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Seq(Some("left.a".attr.cast(LongType) === "right.b".attr),
Some("right.b".attr === "left.a".attr.cast(LongType))).foreach { condition =>
testConstraintsAfterJoin(originalLeft, originalRight, left, right, Inner, condition)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also test cast(int) here. The key is: we should test both left side cast and right side cast, as @maropu said.

"left.a".attr.cast(LongType) === "right.b".attr and "right.b".attr === "left.a".attr.cast(LongType) only test left side cast (join left side, not EqualTo left side).

@SparkQA
Copy link

SparkQA commented Feb 12, 2020

Test build #118311 has finished for PR 27252 at commit 4b14b3f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Feb 12, 2020

retest this please

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me.

@SparkQA
Copy link

SparkQA commented Feb 13, 2020

Test build #118323 has finished for PR 27252 at commit 4b14b3f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Feb 13, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Feb 13, 2020

Test build #118329 has finished for PR 27252 at commit 4b14b3f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan closed this in fb0e07b Feb 13, 2020
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@wangyum wangyum deleted the SPARK-29231 branch February 13, 2020 14:58
@gatorsmile
Copy link
Member

@wangyum Could you update the PR description with the perf measurement?

@wangyum
Copy link
Member Author

wangyum commented Feb 19, 2020

@gatorsmile Done

sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…constraint

### What changes were proposed in this pull request?

This PR add support infer constraints from cast equality constraint. For example:
```scala
scala> spark.sql("create table spark_29231_1(c1 bigint, c2 bigint)")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql("create table spark_29231_2(c1 int, c2 bigint)")
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("select t1.* from spark_29231_1 t1 join spark_29231_2 t2 on (t1.c1 = t2.c1 and t1.c1 = 1)").explain
== Physical Plan ==
*(2) Project [c1#5L, c2#6L]
+- *(2) BroadcastHashJoin [c1#5L], [cast(c1#7 as bigint)], Inner, BuildRight
   :- *(2) Project [c1#5L, c2#6L]
   :  +- *(2) Filter (isnotnull(c1#5L) AND (c1#5L = 1))
   :     +- *(2) ColumnarToRow
   :        +- FileScan parquet default.spark_29231_1[c1#5L,c2#6L] Batched: true, DataFilters: [isnotnull(c1#5L), (c1#5L = 1)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehouse/spark_29231_1], PartitionFilters: [], PushedFilters: [IsNotNull(c1), EqualTo(c1,1)], ReadSchema: struct<c1:bigint,c2:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=apache#209]
      +- *(1) Project [c1#7]
         +- *(1) Filter isnotnull(c1#7)
            +- *(1) ColumnarToRow
               +- FileScan parquet default.spark_29231_2[c1#7] Batched: true, DataFilters: [isnotnull(c1#7)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-preview2-bin-hadoop2.7/spark-warehouse/spark_29231_2], PartitionFilters: [], PushedFilters: [IsNotNull(c1)], ReadSchema: struct<c1:int>
```

After this PR:
```scala
scala> spark.sql("select t1.* from spark_29231_1 t1 join spark_29231_2 t2 on (t1.c1 = t2.c1 and t1.c1 = 1)").explain
== Physical Plan ==
*(2) Project [c1#0L, c2#1L]
+- *(2) BroadcastHashJoin [c1#0L], [cast(c1#2 as bigint)], Inner, BuildRight
   :- *(2) Project [c1#0L, c2#1L]
   :  +- *(2) Filter (isnotnull(c1#0L) AND (c1#0L = 1))
   :     +- *(2) ColumnarToRow
   :        +- FileScan parquet default.spark_29231_1[c1#0L,c2#1L] Batched: true, DataFilters: [isnotnull(c1#0L), (c1#0L = 1)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/spark/spark-warehouse/spark_29231_1], PartitionFilters: [], PushedFilters: [IsNotNull(c1), EqualTo(c1,1)], ReadSchema: struct<c1:bigint,c2:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=apache#99]
      +- *(1) Project [c1#2]
         +- *(1) Filter ((cast(c1#2 as bigint) = 1) AND isnotnull(c1#2))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.spark_29231_2[c1#2] Batched: true, DataFilters: [(cast(c1#2 as bigint) = 1), isnotnull(c1#2)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/spark/spark-warehouse/spark_29231_2], PartitionFilters: [], PushedFilters: [IsNotNull(c1)], ReadSchema: struct<c1:int>
```

### Why are the changes needed?

Improve query performance.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Unit test.

Closes apache#27252 from wangyum/SPARK-29231.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
7 participants