Skip to content

Commit

Permalink
[SPARK-34031][SQL] Union operator missing rowCount when CBO enabled
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This pr add row count to `Union` operator when CBO enabled.
```scala
spark.sql("CREATE TABLE t1 USING parquet AS SELECT id FROM RANGE(10)")
spark.sql("CREATE TABLE t2 USING parquet AS SELECT id FROM RANGE(10)")
spark.sql("ANALYZE TABLE t1 COMPUTE STATISTICS FOR ALL COLUMNS")
spark.sql("ANALYZE TABLE t2 COMPUTE STATISTICS FOR ALL COLUMNS")
spark.sql("set spark.sql.cbo.enabled=true")
spark.sql("SELECT * FROM t1 UNION ALL SELECT * FROM t2").explain("cost")
```

Before this pr:
```
== Optimized Logical Plan ==
Union false, false, Statistics(sizeInBytes=320.0 B)
:- Relation[id#5880L] parquet, Statistics(sizeInBytes=160.0 B, rowCount=10)
+- Relation[id#5881L] parquet, Statistics(sizeInBytes=160.0 B, rowCount=10)
```

After this pr:
```
== Optimized Logical Plan ==
Union false, false, Statistics(sizeInBytes=320.0 B, rowCount=20)
:- Relation[id#2138L] parquet, Statistics(sizeInBytes=160.0 B, rowCount=10)
+- Relation[id#2139L] parquet, Statistics(sizeInBytes=160.0 B, rowCount=10)
```

### Why are the changes needed?

Improve query performance,  [`JoinEstimation.estimateInnerOuterJoin`](https://github.com/apache/spark/blob/d6a68e0b67ff7de58073c176dd097070e88ac831/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala#L55-L156) need the row count.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #31068 from wangyum/SPARK-34031.

Lead-authored-by: Yuming Wang <yumwang@ebay.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
wangyum and HyukjinKwon committed Jan 7, 2021
1 parent 3aa4e11 commit aa509c1
Show file tree
Hide file tree
Showing 11 changed files with 874 additions and 903 deletions.
Expand Up @@ -217,7 +217,7 @@ object Union {
}

/**
* Logical plan for unioning two plans, without a distinct. This is UNION ALL in SQL.
* Logical plan for unioning multiple plans, without a distinct. This is UNION ALL in SQL.
*
* @param byName Whether resolves columns in the children by column names.
* @param allowMissingCol Allows missing columns in children query plans. If it is true,
Expand Down
Expand Up @@ -79,7 +79,15 @@ object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {

override def visitScriptTransform(p: ScriptTransformation): Statistics = default(p)

override def visitUnion(p: Union): Statistics = fallback(p)
override def visitUnion(p: Union): Statistics = {
val stats = p.children.map(_.stats)
val rowCount = if (stats.exists(_.rowCount.isEmpty)) {
None
} else {
Some(stats.map(_.rowCount.get).sum)
}
Statistics(sizeInBytes = stats.map(_.sizeInBytes).sum, rowCount = rowCount)
}

override def visitWindow(p: Window): Statistics = fallback(p)

Expand Down
Expand Up @@ -141,6 +141,17 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
expectedStatsCboOff = Statistics(sizeInBytes = 120))
}

test("SPARK-34031: Union operator missing rowCount when enable CBO") {
val union = Union(plan :: plan :: plan :: Nil)
val childrenSize = union.children.size
val sizeInBytes = plan.size.get * childrenSize
val rowCount = Some(plan.rowCount * childrenSize)
checkStats(
union,
expectedStatsCboOn = Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount),
expectedStatsCboOff = Statistics(sizeInBytes = sizeInBytes))
}

/** Check estimated stats when cbo is turned on/off. */
private def checkStats(
plan: LogicalPlan,
Expand Down
@@ -1,45 +1,43 @@
== Physical Plan ==
* Sort (41)
+- Exchange (40)
+- * Project (39)
+- * SortMergeJoin Inner (38)
:- * Sort (26)
: +- * Project (25)
: +- * BroadcastHashJoin Inner BuildRight (24)
: :- * HashAggregate (18)
: : +- Exchange (17)
: : +- * HashAggregate (16)
: : +- * Project (15)
: : +- * BroadcastHashJoin Inner BuildRight (14)
: : :- Union (9)
: : : :- * Project (4)
: : : : +- * Filter (3)
: : : : +- * ColumnarToRow (2)
: : : : +- Scan parquet default.web_sales (1)
: : : +- * Project (8)
: : : +- * Filter (7)
: : : +- * ColumnarToRow (6)
: : : +- Scan parquet default.catalog_sales (5)
: : +- BroadcastExchange (13)
: : +- * Filter (12)
: : +- * ColumnarToRow (11)
: : +- Scan parquet default.date_dim (10)
: +- BroadcastExchange (23)
: +- * Project (22)
: +- * Filter (21)
: +- * ColumnarToRow (20)
: +- Scan parquet default.date_dim (19)
+- * Sort (37)
+- Exchange (36)
+- * Project (35)
+- * BroadcastHashJoin Inner BuildRight (34)
:- * HashAggregate (28)
: +- ReusedExchange (27)
+- BroadcastExchange (33)
+- * Project (32)
+- * Filter (31)
+- * ColumnarToRow (30)
+- Scan parquet default.date_dim (29)
* Sort (39)
+- Exchange (38)
+- * Project (37)
+- * BroadcastHashJoin Inner BuildRight (36)
:- * Project (25)
: +- * BroadcastHashJoin Inner BuildRight (24)
: :- * HashAggregate (18)
: : +- Exchange (17)
: : +- * HashAggregate (16)
: : +- * Project (15)
: : +- * BroadcastHashJoin Inner BuildRight (14)
: : :- Union (9)
: : : :- * Project (4)
: : : : +- * Filter (3)
: : : : +- * ColumnarToRow (2)
: : : : +- Scan parquet default.web_sales (1)
: : : +- * Project (8)
: : : +- * Filter (7)
: : : +- * ColumnarToRow (6)
: : : +- Scan parquet default.catalog_sales (5)
: : +- BroadcastExchange (13)
: : +- * Filter (12)
: : +- * ColumnarToRow (11)
: : +- Scan parquet default.date_dim (10)
: +- BroadcastExchange (23)
: +- * Project (22)
: +- * Filter (21)
: +- * ColumnarToRow (20)
: +- Scan parquet default.date_dim (19)
+- BroadcastExchange (35)
+- * Project (34)
+- * BroadcastHashJoin Inner BuildRight (33)
:- * HashAggregate (27)
: +- ReusedExchange (26)
+- BroadcastExchange (32)
+- * Project (31)
+- * Filter (30)
+- * ColumnarToRow (29)
+- Scan parquet default.date_dim (28)


(1) Scan parquet default.web_sales
Expand Down Expand Up @@ -116,9 +114,9 @@ Results [8]: [d_week_seq#10, sum#20, sum#21, sum#22, sum#23, sum#24, sum#25, sum

(17) Exchange
Input [8]: [d_week_seq#10, sum#20, sum#21, sum#22, sum#23, sum#24, sum#25, sum#26]
Arguments: hashpartitioning(d_week_seq#10, 5), true, [id=#27]
Arguments: hashpartitioning(d_week_seq#10, 5), ENSURE_REQUIREMENTS, [id=#27]

(18) HashAggregate [codegen id : 6]
(18) HashAggregate [codegen id : 12]
Input [8]: [d_week_seq#10, sum#20, sum#21, sum#22, sum#23, sum#24, sum#25, sum#26]
Keys [1]: [d_week_seq#10]
Functions [7]: [sum(UnscaledValue(CASE WHEN (d_day_name#11 = Sunday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Monday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Tuesday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Wednesday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Thursday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Friday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Saturday) THEN sales_price#4 ELSE null END))]
Expand Down Expand Up @@ -147,82 +145,74 @@ Input [2]: [d_week_seq#42, d_year#43]
Input [1]: [d_week_seq#42]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#44]

(24) BroadcastHashJoin [codegen id : 6]
(24) BroadcastHashJoin [codegen id : 12]
Left keys [1]: [d_week_seq#10]
Right keys [1]: [d_week_seq#42]
Join condition: None

(25) Project [codegen id : 6]
(25) Project [codegen id : 12]
Output [8]: [d_week_seq#10 AS d_week_seq1#45, sun_sales#35 AS sun_sales1#46, mon_sales#36 AS mon_sales1#47, tue_sales#37 AS tue_sales1#48, wed_sales#38 AS wed_sales1#49, thu_sales#39 AS thu_sales1#50, fri_sales#40 AS fri_sales1#51, sat_sales#41 AS sat_sales1#52]
Input [9]: [d_week_seq#10, sun_sales#35, mon_sales#36, tue_sales#37, wed_sales#38, thu_sales#39, fri_sales#40, sat_sales#41, d_week_seq#42]

(26) Sort [codegen id : 6]
Input [8]: [d_week_seq1#45, sun_sales1#46, mon_sales1#47, tue_sales1#48, wed_sales1#49, thu_sales1#50, fri_sales1#51, sat_sales1#52]
Arguments: [d_week_seq1#45 ASC NULLS FIRST], false, 0

(27) ReusedExchange [Reuses operator id: 17]
(26) ReusedExchange [Reuses operator id: 17]
Output [8]: [d_week_seq#10, sum#53, sum#54, sum#55, sum#56, sum#57, sum#58, sum#59]

(28) HashAggregate [codegen id : 12]
(27) HashAggregate [codegen id : 11]
Input [8]: [d_week_seq#10, sum#53, sum#54, sum#55, sum#56, sum#57, sum#58, sum#59]
Keys [1]: [d_week_seq#10]
Functions [7]: [sum(UnscaledValue(CASE WHEN (d_day_name#11 = Sunday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Monday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Tuesday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Wednesday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Thursday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Friday) THEN sales_price#4 ELSE null END)), sum(UnscaledValue(CASE WHEN (d_day_name#11 = Saturday) THEN sales_price#4 ELSE null END))]
Aggregate Attributes [7]: [sum(UnscaledValue(CASE WHEN (d_day_name#11 = Sunday) THEN sales_price#4 ELSE null END))#60, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Monday) THEN sales_price#4 ELSE null END))#61, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Tuesday) THEN sales_price#4 ELSE null END))#62, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Wednesday) THEN sales_price#4 ELSE null END))#63, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Thursday) THEN sales_price#4 ELSE null END))#64, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Friday) THEN sales_price#4 ELSE null END))#65, sum(UnscaledValue(CASE WHEN (d_day_name#11 = Saturday) THEN sales_price#4 ELSE null END))#66]
Results [8]: [d_week_seq#10, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Sunday) THEN sales_price#4 ELSE null END))#60,17,2) AS sun_sales#35, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Monday) THEN sales_price#4 ELSE null END))#61,17,2) AS mon_sales#36, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Tuesday) THEN sales_price#4 ELSE null END))#62,17,2) AS tue_sales#37, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Wednesday) THEN sales_price#4 ELSE null END))#63,17,2) AS wed_sales#38, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Thursday) THEN sales_price#4 ELSE null END))#64,17,2) AS thu_sales#39, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Friday) THEN sales_price#4 ELSE null END))#65,17,2) AS fri_sales#40, MakeDecimal(sum(UnscaledValue(CASE WHEN (d_day_name#11 = Saturday) THEN sales_price#4 ELSE null END))#66,17,2) AS sat_sales#41]

(29) Scan parquet default.date_dim
(28) Scan parquet default.date_dim
Output [2]: [d_week_seq#67, d_year#68]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2002), IsNotNull(d_week_seq)]
ReadSchema: struct<d_week_seq:int,d_year:int>

(30) ColumnarToRow [codegen id : 11]
(29) ColumnarToRow [codegen id : 10]
Input [2]: [d_week_seq#67, d_year#68]

(31) Filter [codegen id : 11]
(30) Filter [codegen id : 10]
Input [2]: [d_week_seq#67, d_year#68]
Condition : ((isnotnull(d_year#68) AND (d_year#68 = 2002)) AND isnotnull(d_week_seq#67))

(32) Project [codegen id : 11]
(31) Project [codegen id : 10]
Output [1]: [d_week_seq#67]
Input [2]: [d_week_seq#67, d_year#68]

(33) BroadcastExchange
(32) BroadcastExchange
Input [1]: [d_week_seq#67]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#69]

(34) BroadcastHashJoin [codegen id : 12]
(33) BroadcastHashJoin [codegen id : 11]
Left keys [1]: [d_week_seq#10]
Right keys [1]: [d_week_seq#67]
Join condition: None

(35) Project [codegen id : 12]
(34) Project [codegen id : 11]
Output [8]: [d_week_seq#10 AS d_week_seq2#70, sun_sales#35 AS sun_sales2#71, mon_sales#36 AS mon_sales2#72, tue_sales#37 AS tue_sales2#73, wed_sales#38 AS wed_sales2#74, thu_sales#39 AS thu_sales2#75, fri_sales#40 AS fri_sales2#76, sat_sales#41 AS sat_sales2#77]
Input [9]: [d_week_seq#10, sun_sales#35, mon_sales#36, tue_sales#37, wed_sales#38, thu_sales#39, fri_sales#40, sat_sales#41, d_week_seq#67]

(36) Exchange
Input [8]: [d_week_seq2#70, sun_sales2#71, mon_sales2#72, tue_sales2#73, wed_sales2#74, thu_sales2#75, fri_sales2#76, sat_sales2#77]
Arguments: hashpartitioning((d_week_seq2#70 - 53), 5), true, [id=#78]

(37) Sort [codegen id : 13]
(35) BroadcastExchange
Input [8]: [d_week_seq2#70, sun_sales2#71, mon_sales2#72, tue_sales2#73, wed_sales2#74, thu_sales2#75, fri_sales2#76, sat_sales2#77]
Arguments: [(d_week_seq2#70 - 53) ASC NULLS FIRST], false, 0
Arguments: HashedRelationBroadcastMode(List(cast((input[0, int, true] - 53) as bigint)),false), [id=#78]

(38) SortMergeJoin [codegen id : 14]
(36) BroadcastHashJoin [codegen id : 12]
Left keys [1]: [d_week_seq1#45]
Right keys [1]: [(d_week_seq2#70 - 53)]
Join condition: None

(39) Project [codegen id : 14]
(37) Project [codegen id : 12]
Output [8]: [d_week_seq1#45, round(CheckOverflow((promote_precision(sun_sales1#46) / promote_precision(sun_sales2#71)), DecimalType(37,20), true), 2) AS round((sun_sales1 / sun_sales2), 2)#79, round(CheckOverflow((promote_precision(mon_sales1#47) / promote_precision(mon_sales2#72)), DecimalType(37,20), true), 2) AS round((mon_sales1 / mon_sales2), 2)#80, round(CheckOverflow((promote_precision(tue_sales1#48) / promote_precision(tue_sales2#73)), DecimalType(37,20), true), 2) AS round((tue_sales1 / tue_sales2), 2)#81, round(CheckOverflow((promote_precision(wed_sales1#49) / promote_precision(wed_sales2#74)), DecimalType(37,20), true), 2) AS round((wed_sales1 / wed_sales2), 2)#82, round(CheckOverflow((promote_precision(thu_sales1#50) / promote_precision(thu_sales2#75)), DecimalType(37,20), true), 2) AS round((thu_sales1 / thu_sales2), 2)#83, round(CheckOverflow((promote_precision(fri_sales1#51) / promote_precision(fri_sales2#76)), DecimalType(37,20), true), 2) AS round((fri_sales1 / fri_sales2), 2)#84, round(CheckOverflow((promote_precision(sat_sales1#52) / promote_precision(sat_sales2#77)), DecimalType(37,20), true), 2) AS round((sat_sales1 / sat_sales2), 2)#85]
Input [16]: [d_week_seq1#45, sun_sales1#46, mon_sales1#47, tue_sales1#48, wed_sales1#49, thu_sales1#50, fri_sales1#51, sat_sales1#52, d_week_seq2#70, sun_sales2#71, mon_sales2#72, tue_sales2#73, wed_sales2#74, thu_sales2#75, fri_sales2#76, sat_sales2#77]

(40) Exchange
(38) Exchange
Input [8]: [d_week_seq1#45, round((sun_sales1 / sun_sales2), 2)#79, round((mon_sales1 / mon_sales2), 2)#80, round((tue_sales1 / tue_sales2), 2)#81, round((wed_sales1 / wed_sales2), 2)#82, round((thu_sales1 / thu_sales2), 2)#83, round((fri_sales1 / fri_sales2), 2)#84, round((sat_sales1 / sat_sales2), 2)#85]
Arguments: rangepartitioning(d_week_seq1#45 ASC NULLS FIRST, 5), true, [id=#86]
Arguments: rangepartitioning(d_week_seq1#45 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [id=#86]

(41) Sort [codegen id : 15]
(39) Sort [codegen id : 13]
Input [8]: [d_week_seq1#45, round((sun_sales1 / sun_sales2), 2)#79, round((mon_sales1 / mon_sales2), 2)#80, round((tue_sales1 / tue_sales2), 2)#81, round((wed_sales1 / wed_sales2), 2)#82, round((thu_sales1 / thu_sales2), 2)#83, round((fri_sales1 / fri_sales2), 2)#84, round((sat_sales1 / sat_sales2), 2)#85]
Arguments: [d_week_seq1#45 ASC NULLS FIRST], true, 0

0 comments on commit aa509c1

Please sign in to comment.