Skip to content

Commit

Permalink
[SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be…
Browse files Browse the repository at this point in the history
… folded by FoldablePropagation rule

## What changes were proposed in this pull request?
This PR fixes the code in Optimizer phase where the constant alias columns of a `INNER JOIN` query are folded in Rule `FoldablePropagation`.

For the following query():

```
val sqlA =
  """
    |create temporary view ta as
    |select a, 'a' as tag from t1 union all
    |select a, 'b' as tag from t2
  """.stripMargin

val sqlB =
  """
    |create temporary view tb as
    |select a, 'a' as tag from t3 union all
    |select a, 'b' as tag from t4
  """.stripMargin

val sql =
  """
    |select tb.* from ta inner join tb on
    |ta.a = tb.a and
    |ta.tag = tb.tag
  """.stripMargin
```

The tag column is an constant alias column, it's folded by `FoldablePropagation` like this:

```
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===
 Project [a#4, tag#14]                              Project [a#4, tag#14]
!+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, ((a#0 = a#4) && (a = a))
    :- Union                                           :- Union
    :  :- Project [a#0, a AS tag#8]                    :  :- Project [a#0, a AS tag#8]
    :  :  +- LocalRelation [a#0]                       :  :  +- LocalRelation [a#0]
    :  +- Project [a#2, b AS tag#9]                    :  +- Project [a#2, b AS tag#9]
    :     +- LocalRelation [a#2]                       :     +- LocalRelation [a#2]
    +- Union                                           +- Union
       :- Project [a#4, a AS tag#14]                      :- Project [a#4, a AS tag#14]
       :  +- LocalRelation [a#4]                          :  +- LocalRelation [a#4]
       +- Project [a#6, b AS tag#15]                      +- Project [a#6, b AS tag#15]
          +- LocalRelation [a#6]                             +- LocalRelation [a#6]
```

Finally the Result of Batch Operator Optimizations is:

```
Project [a#4, tag#14]                              Project [a#4, tag#14]
!+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, (a#0 = a#4)
!   :- SubqueryAlias ta, `ta`                          :- Union
!   :  +- Union                                        :  :- LocalRelation [a#0]
!   :     :- Project [a#0, a AS tag#8]                 :  +- LocalRelation [a#2]
!   :     :  +- SubqueryAlias t1, `t1`                 +- Union
!   :     :     +- Project [a#0]                          :- LocalRelation [a#4, tag#14]
!   :     :        +- SubqueryAlias grouping              +- LocalRelation [a#6, tag#15]
!   :     :           +- LocalRelation [a#0]
!   :     +- Project [a#2, b AS tag#9]
!   :        +- SubqueryAlias t2, `t2`
!   :           +- Project [a#2]
!   :              +- SubqueryAlias grouping
!   :                 +- LocalRelation [a#2]
!   +- SubqueryAlias tb, `tb`
!      +- Union
!         :- Project [a#4, a AS tag#14]
!         :  +- SubqueryAlias t3, `t3`
!         :     +- Project [a#4]
!         :        +- SubqueryAlias grouping
!         :           +- LocalRelation [a#4]
!         +- Project [a#6, b AS tag#15]
!            +- SubqueryAlias t4, `t4`
!               +- Project [a#6]
!                  +- SubqueryAlias grouping
!                     +- LocalRelation [a#6]
```

The condition `tag#8 = tag#14` of INNER JOIN has been removed. This leads to the data of inner join being wrong.

After fix:

```
=== Result of Batch LocalRelation ===
 GlobalLimit 21                                           GlobalLimit 21
 +- LocalLimit 21                                         +- LocalLimit 21
    +- Project [a#4, tag#11]                                 +- Project [a#4, tag#11]
       +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11))         +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11))
!         :- SubqueryAlias ta                                      :- Union
!         :  +- Union                                              :  :- LocalRelation [a#0, tag#8]
!         :     :- Project [a#0, a AS tag#8]                       :  +- LocalRelation [a#2, tag#9]
!         :     :  +- SubqueryAlias t1                             +- Union
!         :     :     +- Project [a#0]                                :- LocalRelation [a#4, tag#11]
!         :     :        +- SubqueryAlias grouping                    +- LocalRelation [a#6, tag#12]
!         :     :           +- LocalRelation [a#0]
!         :     +- Project [a#2, b AS tag#9]
!         :        +- SubqueryAlias t2
!         :           +- Project [a#2]
!         :              +- SubqueryAlias grouping
!         :                 +- LocalRelation [a#2]
!         +- SubqueryAlias tb
!            +- Union
!               :- Project [a#4, a AS tag#11]
!               :  +- SubqueryAlias t3
!               :     +- Project [a#4]
!               :        +- SubqueryAlias grouping
!               :           +- LocalRelation [a#4]
!               +- Project [a#6, b AS tag#12]
!                  +- SubqueryAlias t4
!                     +- Project [a#6]
!                        +- SubqueryAlias grouping
!                           +- LocalRelation [a#6]
```

## How was this patch tested?

add sql-tests/inputs/inner-join.sql
All tests passed.

Author: Stan Zhai <zhaishidan@haizhi.com>

Closes #17099 from stanzhai/fix-inner-join.
  • Loading branch information
stanzhai authored and gatorsmile committed Mar 1, 2017
1 parent 38e7835 commit 5502a9c
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
// join is not always picked from its children, but can also be null.
// TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes
// of outer join.
case j @ Join(_, _, Inner, _) =>
case j @ Join(_, _, Inner, _) if !stop =>
j.transformExpressions(replaceFoldable)

// We can fold the projections an expand holds. However expand changes the output columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ class FoldablePropagationSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("Propagate in inner join") {
val ta = testRelation.select('a, Literal(1).as('tag))
.union(testRelation.select('a, Literal(2).as('tag)))
.subquery('ta)
val tb = testRelation.select('a, Literal(1).as('tag))
.union(testRelation.select('a, Literal(2).as('tag)))
.subquery('tb)
val query = ta.join(tb, Inner,
Some("ta.a".attr === "tb.a".attr && "ta.tag".attr === "tb.tag".attr))
val optimized = Optimize.execute(query.analyze)
val correctAnswer = query.analyze
comparePlans(optimized, correctAnswer)
}

test("Propagate in expand") {
val c1 = Literal(1).as('a)
val c2 = Literal(2).as('b)
Expand Down
17 changes: 17 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/inner-join.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a);
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a);
CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a);
CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a);

CREATE TEMPORARY VIEW ta AS
SELECT a, 'a' AS tag FROM t1
UNION ALL
SELECT a, 'b' AS tag FROM t2;

CREATE TEMPORARY VIEW tb AS
SELECT a, 'a' AS tag FROM t3
UNION ALL
SELECT a, 'b' AS tag FROM t4;

-- SPARK-19766 Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule
SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag;
68 changes: 68 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/inner-join.sql.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 13


-- !query 0
CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a)
-- !query 0 schema
struct<>
-- !query 0 output



-- !query 1
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a)
-- !query 1 schema
struct<>
-- !query 1 output



-- !query 2
CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a)
-- !query 2 schema
struct<>
-- !query 2 output



-- !query 3
CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a)
-- !query 3 schema
struct<>
-- !query 3 output



-- !query 4
CREATE TEMPORARY VIEW ta AS
SELECT a, 'a' AS tag FROM t1
UNION ALL
SELECT a, 'b' AS tag FROM t2
-- !query 4 schema
struct<>
-- !query 4 output



-- !query 5
CREATE TEMPORARY VIEW tb AS
SELECT a, 'a' AS tag FROM t3
UNION ALL
SELECT a, 'b' AS tag FROM t4
-- !query 5 schema
struct<>
-- !query 5 output



-- !query 6
SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag
-- !query 6 schema
struct<a:int,tag:string>
-- !query 6 output
1 a
1 a
1 b
1 b

0 comments on commit 5502a9c

Please sign in to comment.