Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29947][SQL] Improve ResolveRelations performance #26589

Closed
wants to merge 10 commits into from
Closed

[SPARK-29947][SQL] Improve ResolveRelations performance #26589

wants to merge 10 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Nov 19, 2019

What changes were proposed in this pull request?

It is very common for a SQL query to query a table more than once. For example:

== Physical Plan ==
*(12) HashAggregate(keys=[cmn_mtrc_summ_dt#21, rev_rollup#1279, CASE WHEN (rev_rollup#1319 = rev_rollup#1279) THEN 0 ELSE 1 END#1366, CASE WHEN cast(sap_category_id#24 as decimal(10,0)) IN (5,7,23,41) THEN 0 ELSE 1 END#1367], functions=[sum(coalesce(bid_count#34, 0)), sum(coalesce(ck_trans_count#35, 0)), sum(coalesce(ended_bid_count#36, 0)), sum(coalesce(ended_lstg_count#37, 0)), sum(coalesce(ended_success_lstg_count#38, 0)), sum(coalesce(item_sold_count#39, 0)), sum(coalesce(new_lstg_count#40, 0)), sum(coalesce(gmv_us_amt#41, 0.00)), sum(coalesce(gmv_slr_lc_amt#42, 0.00)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_insrtn_fee_us_amt#46, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_insrtn_crd_us_amt#50, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_fetr_fee_us_amt#54, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_fetr_crd_us_amt#58, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_fv_fee_us_amt#62, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_fv_crd_us_amt#67, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_othr_l_fee_us_amt#72, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_othr_l_crd_us_amt#76, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_othr_nl_fee_us_amt#80, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_othr_nl_crd_us_amt#84, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_slr_tools_fee_us_amt#88, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_slr_tools_crd_us_amt#92, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(coalesce(rvnu_unasgnd_us_amt#96, 0.000000)), sum((coalesce(rvnu_transaction_us_amt#112, 0.0) + coalesce(rvnu_transaction_crd_us_amt#115, 0.0))), sum((coalesce(rvnu_total_us_amt#118, 0.0) + coalesce(rvnu_total_crd_us_amt#121, 0.0)))])
+- Exchange hashpartitioning(cmn_mtrc_summ_dt#21, rev_rollup#1279, CASE WHEN (rev_rollup#1319 = rev_rollup#1279) THEN 0 ELSE 1 END#1366, CASE WHEN cast(sap_category_id#24 as decimal(10,0)) IN (5,7,23,41) THEN 0 ELSE 1 END#1367, 200), true, [id=#403]
   +- *(11) HashAggregate(keys=[cmn_mtrc_summ_dt#21, rev_rollup#1279, CASE WHEN (rev_rollup#1319 = rev_rollup#1279) THEN 0 ELSE 1 END AS CASE WHEN (rev_rollup#1319 = rev_rollup#1279) THEN 0 ELSE 1 END#1366, CASE WHEN cast(sap_category_id#24 as decimal(10,0)) IN (5,7,23,41) THEN 0 ELSE 1 END AS CASE WHEN cast(sap_category_id#24 as decimal(10,0)) IN (5,7,23,41) THEN 0 ELSE 1 END#1367], functions=[partial_sum(coalesce(bid_count#34, 0)), partial_sum(coalesce(ck_trans_count#35, 0)), partial_sum(coalesce(ended_bid_count#36, 0)), partial_sum(coalesce(ended_lstg_count#37, 0)), partial_sum(coalesce(ended_success_lstg_count#38, 0)), partial_sum(coalesce(item_sold_count#39, 0)), partial_sum(coalesce(new_lstg_count#40, 0)), partial_sum(coalesce(gmv_us_amt#41, 0.00)), partial_sum(coalesce(gmv_slr_lc_amt#42, 0.00)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_insrtn_fee_us_amt#46, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_insrtn_crd_us_amt#50, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_fetr_fee_us_amt#54, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_fetr_crd_us_amt#58, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_fv_fee_us_amt#62, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_fv_crd_us_amt#67, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_othr_l_fee_us_amt#72, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_othr_l_crd_us_amt#76, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_othr_nl_fee_us_amt#80, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_othr_nl_crd_us_amt#84, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_slr_tools_fee_us_amt#88, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_slr_tools_crd_us_amt#92, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(coalesce(rvnu_unasgnd_us_amt#96, 0.000000)), partial_sum((coalesce(rvnu_transaction_us_amt#112, 0.0) + coalesce(rvnu_transaction_crd_us_amt#115, 0.0))), partial_sum((coalesce(rvnu_total_us_amt#118, 0.0) + coalesce(rvnu_total_crd_us_amt#121, 0.0)))])
      +- *(11) Project [cmn_mtrc_summ_dt#21, sap_category_id#24, bid_count#34, ck_trans_count#35, ended_bid_count#36, ended_lstg_count#37, ended_success_lstg_count#38, item_sold_count#39, new_lstg_count#40, gmv_us_amt#41, gmv_slr_lc_amt#42, rvnu_insrtn_fee_us_amt#46, rvnu_insrtn_crd_us_amt#50, rvnu_fetr_fee_us_amt#54, rvnu_fetr_crd_us_amt#58, rvnu_fv_fee_us_amt#62, rvnu_fv_crd_us_amt#67, rvnu_othr_l_fee_us_amt#72, rvnu_othr_l_crd_us_amt#76, rvnu_othr_nl_fee_us_amt#80, rvnu_othr_nl_crd_us_amt#84, rvnu_slr_tools_fee_us_amt#88, rvnu_slr_tools_crd_us_amt#92, rvnu_unasgnd_us_amt#96, ... 6 more fields]
         +- *(11) BroadcastHashJoin [byr_cntry_id#23], [cntry_id#1309], LeftOuter, BuildRight
            :- *(11) Project [cmn_mtrc_summ_dt#21, byr_cntry_id#23, sap_category_id#24, bid_count#34, ck_trans_count#35, ended_bid_count#36, ended_lstg_count#37, ended_success_lstg_count#38, item_sold_count#39, new_lstg_count#40, gmv_us_amt#41, gmv_slr_lc_amt#42, rvnu_insrtn_fee_us_amt#46, rvnu_insrtn_crd_us_amt#50, rvnu_fetr_fee_us_amt#54, rvnu_fetr_crd_us_amt#58, rvnu_fv_fee_us_amt#62, rvnu_fv_crd_us_amt#67, rvnu_othr_l_fee_us_amt#72, rvnu_othr_l_crd_us_amt#76, rvnu_othr_nl_fee_us_amt#80, rvnu_othr_nl_crd_us_amt#84, rvnu_slr_tools_fee_us_amt#88, rvnu_slr_tools_crd_us_amt#92, ... 6 more fields]
            :  +- *(11) BroadcastHashJoin [slr_cntry_id#28], [cntry_id#1269], LeftOuter, BuildRight
            :     :- *(11) Project [gen_attr_1#360 AS cmn_mtrc_summ_dt#21, gen_attr_5#267 AS byr_cntry_id#23, gen_attr_7#268 AS sap_category_id#24, gen_attr_15#272 AS slr_cntry_id#28, gen_attr_27#278 AS bid_count#34, gen_attr_29#279 AS ck_trans_count#35, gen_attr_31#280 AS ended_bid_count#36, gen_attr_33#282 AS ended_lstg_count#37, gen_attr_35#283 AS ended_success_lstg_count#38, gen_attr_37#284 AS item_sold_count#39, gen_attr_39#281 AS new_lstg_count#40, gen_attr_41#285 AS gmv_us_amt#41, gen_attr_43#287 AS gmv_slr_lc_amt#42, gen_attr_51#290 AS rvnu_insrtn_fee_us_amt#46, gen_attr_59#294 AS rvnu_insrtn_crd_us_amt#50, gen_attr_67#298 AS rvnu_fetr_fee_us_amt#54, gen_attr_75#302 AS rvnu_fetr_crd_us_amt#58, gen_attr_83#306 AS rvnu_fv_fee_us_amt#62, gen_attr_93#311 AS rvnu_fv_crd_us_amt#67, gen_attr_103#316 AS rvnu_othr_l_fee_us_amt#72, gen_attr_111#320 AS rvnu_othr_l_crd_us_amt#76, gen_attr_119#324 AS rvnu_othr_nl_fee_us_amt#80, gen_attr_127#328 AS rvnu_othr_nl_crd_us_amt#84, gen_attr_135#332 AS rvnu_slr_tools_fee_us_amt#88, ... 6 more fields]
            :     :  +- *(11) BroadcastHashJoin [cast(gen_attr_308#777 as decimal(20,0))], [cast(gen_attr_309#803 as decimal(20,0))], LeftOuter, BuildRight
            :     :     :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 6 more fields]
            :     :     :  +- *(11) BroadcastHashJoin [cast(gen_attr_310#674 as int)], [cast(gen_attr_311#774 as int)], LeftOuter, BuildRight
            :     :     :     :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 6 more fields]
            :     :     :     :  +- *(11) BroadcastHashJoin [cast(gen_attr_5#267 as decimal(20,0))], [cast(gen_attr_312#665 as decimal(20,0))], LeftOuter, BuildRight
            :     :     :     :     :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 5 more fields]
            :     :     :     :     :  +- *(11) BroadcastHashJoin [cast(gen_attr_313#565 as decimal(20,0))], [cast(gen_attr_314#591 as decimal(20,0))], LeftOuter, BuildRight
            :     :     :     :     :     :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 6 more fields]
            :     :     :     :     :     :  +- *(11) BroadcastHashJoin [cast(gen_attr_315#462 as int)], [cast(gen_attr_316#562 as int)], LeftOuter, BuildRight
            :     :     :     :     :     :     :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 6 more fields]
            :     :     :     :     :     :     :  +- *(11) BroadcastHashJoin [cast(gen_attr_15#272 as decimal(20,0))], [cast(gen_attr_317#453 as decimal(20,0))], LeftOuter, BuildRight
            :     :     :     :     :     :     :     :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 5 more fields]
            :     :     :     :     :     :     :     :  +- *(11) BroadcastHashJoin [cast(gen_attr_25#277 as decimal(20,0))], [cast(gen_attr_318#379 as decimal(20,0))], LeftOuter, BuildRight
            :     :     :     :     :     :     :     :     :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_25#277, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, ... 6 more fields]
            :     :     :     :     :     :     :     :     :  +- *(11) BroadcastHashJoin [cast(gen_attr_23#276 as decimal(20,0))], [cast(gen_attr_319#367 as decimal(20,0))], LeftOuter, BuildRight
            :     :     :     :     :     :     :     :     :     :- *(11) Project [byr_cntry_id#1169 AS gen_attr_5#267, sap_category_id#1170 AS gen_attr_7#268, slr_cntry_id#1174 AS gen_attr_15#272, lstg_curncy_id#1178 AS gen_attr_23#276, blng_curncy_id#1179 AS gen_attr_25#277, bid_count#1180 AS gen_attr_27#278, ck_trans_count#1181 AS gen_attr_29#279, ended_bid_count#1182 AS gen_attr_31#280, new_lstg_count#1183 AS gen_attr_39#281, ended_lstg_count#1184 AS gen_attr_33#282, ended_success_lstg_count#1185 AS gen_attr_35#283, item_sold_count#1186 AS gen_attr_37#284, gmv_us_amt#1187 AS gen_attr_41#285, gmv_slr_lc_amt#1189 AS gen_attr_43#287, rvnu_insrtn_fee_us_amt#1192 AS gen_attr_51#290, rvnu_insrtn_crd_us_amt#1196 AS gen_attr_59#294, rvnu_fetr_fee_us_amt#1200 AS gen_attr_67#298, rvnu_fetr_crd_us_amt#1204 AS gen_attr_75#302, rvnu_fv_fee_us_amt#1208 AS gen_attr_83#306, rvnu_fv_crd_us_amt#1213 AS gen_attr_93#311, rvnu_othr_l_fee_us_amt#1218 AS gen_attr_103#316, rvnu_othr_l_crd_us_amt#1222 AS gen_attr_111#320, rvnu_othr_nl_fee_us_amt#1226 AS gen_attr_119#324, rvnu_othr_nl_crd_us_amt#1230 AS gen_attr_127#328, ... 7 more fields]
            :     :     :     :     :     :     :     :     :     :  +- *(11) ColumnarToRow
            :     :     :     :     :     :     :     :     :     :     +- FileScan parquet default.big_table1[byr_cntry_id#1169,sap_category_id#1170,slr_cntry_id#1174,lstg_curncy_id#1178,blng_curncy_id#1179,bid_count#1180,ck_trans_count#1181,ended_bid_count#1182,new_lstg_count#1183,ended_lstg_count#1184,ended_success_lstg_count#1185,item_sold_count#1186,gmv_us_amt#1187,gmv_slr_lc_amt#1189,rvnu_insrtn_fee_us_amt#1192,rvnu_insrtn_crd_us_amt#1196,rvnu_fetr_fee_us_amt#1200,rvnu_fetr_crd_us_amt#1204,rvnu_fv_fee_us_amt#1208,rvnu_fv_crd_us_amt#1213,rvnu_othr_l_fee_us_amt#1218,rvnu_othr_l_crd_us_amt#1222,rvnu_othr_nl_fee_us_amt#1226,rvnu_othr_nl_crd_us_amt#1230,... 7 more fields] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionFilters: [isnotnull(cmn_mtrc_summ_dt#1262), (cmn_mtrc_summ_dt#1262 >= 18078), (cmn_mtrc_summ_dt#1262 <= 18..., PushedFilters: [], ReadSchema: struct<byr_cntry_id:decimal(4,0),sap_category_id:decimal(9,0),slr_cntry_id:decimal(4,0),lstg_curn...
            :     :     :     :     :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(9,0), true] as decimal(20,0)))), [id=#288]
            :     :     :     :     :     :     :     :     :        +- *(1) Project [CURNCY_ID#1263 AS gen_attr_319#367]
            :     :     :     :     :     :     :     :     :           +- *(1) Filter isnotnull(CURNCY_ID#1263)
            :     :     :     :     :     :     :     :     :              +- *(1) ColumnarToRow
            :     :     :     :     :     :     :     :     :                 +- FileScan parquet default.small_table1[CURNCY_ID#1263] Batched: true, DataFilters: [isnotnull(CURNCY_ID#1263)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table1], PartitionFilters: [], PushedFilters: [IsNotNull(CURNCY_ID)], ReadSchema: struct<CURNCY_ID:decimal(9,0)>, SelectedBucketsCount: 1 out of 1
            :     :     :     :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(9,0), true] as decimal(20,0)))), [id=#297]
            :     :     :     :     :     :     :     :        +- *(2) Project [CURNCY_ID#1263 AS gen_attr_318#379]
            :     :     :     :     :     :     :     :           +- *(2) Filter isnotnull(CURNCY_ID#1263)
            :     :     :     :     :     :     :     :              +- *(2) ColumnarToRow
            :     :     :     :     :     :     :     :                 +- FileScan parquet default.small_table1[CURNCY_ID#1263] Batched: true, DataFilters: [isnotnull(CURNCY_ID#1263)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table1], PartitionFilters: [], PushedFilters: [IsNotNull(CURNCY_ID)], ReadSchema: struct<CURNCY_ID:decimal(9,0)>, SelectedBucketsCount: 1 out of 1
            :     :     :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(4,0), true] as decimal(20,0)))), [id=#306]
            :     :     :     :     :     :     :        +- *(3) Project [cntry_id#1269 AS gen_attr_317#453, rev_rollup_id#1278 AS gen_attr_315#462]
            :     :     :     :     :     :     :           +- *(3) Filter isnotnull(cntry_id#1269)
            :     :     :     :     :     :     :              +- *(3) ColumnarToRow
            :     :     :     :     :     :     :                 +- FileScan parquet default.small_table2[cntry_id#1269,rev_rollup_id#1278] Batched: true, DataFilters: [isnotnull(cntry_id#1269)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table2], PartitionFilters: [], PushedFilters: [IsNotNull(cntry_id)], ReadSchema: struct<cntry_id:decimal(4,0),rev_rollup_id:smallint>
            :     :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[0, smallint, true] as int) as bigint))), [id=#315]
            :     :     :     :     :     :        +- *(4) Project [rev_rollup_id#1286 AS gen_attr_316#562, curncy_id#1289 AS gen_attr_313#565]
            :     :     :     :     :     :           +- *(4) Filter isnotnull(rev_rollup_id#1286)
            :     :     :     :     :     :              +- *(4) ColumnarToRow
            :     :     :     :     :     :                 +- FileScan parquet default.small_table3[rev_rollup_id#1286,curncy_id#1289] Batched: true, DataFilters: [isnotnull(rev_rollup_id#1286)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table3], PartitionFilters: [], PushedFilters: [IsNotNull(rev_rollup_id)], ReadSchema: struct<rev_rollup_id:smallint,curncy_id:decimal(4,0)>
            :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(9,0), true] as decimal(20,0)))), [id=#324]
            :     :     :     :     :        +- *(5) Project [CURNCY_ID#1263 AS gen_attr_314#591]
            :     :     :     :     :           +- *(5) Filter isnotnull(CURNCY_ID#1263)
            :     :     :     :     :              +- *(5) ColumnarToRow
            :     :     :     :     :                 +- FileScan parquet default.small_table1[CURNCY_ID#1263] Batched: true, DataFilters: [isnotnull(CURNCY_ID#1263)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table1], PartitionFilters: [], PushedFilters: [IsNotNull(CURNCY_ID)], ReadSchema: struct<CURNCY_ID:decimal(9,0)>, SelectedBucketsCount: 1 out of 1
            :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(4,0), true] as decimal(20,0)))), [id=#333]
            :     :     :     :        +- *(6) Project [cntry_id#1269 AS gen_attr_312#665, rev_rollup_id#1278 AS gen_attr_310#674]
            :     :     :     :           +- *(6) Filter isnotnull(cntry_id#1269)
            :     :     :     :              +- *(6) ColumnarToRow
            :     :     :     :                 +- FileScan parquet default.small_table2[cntry_id#1269,rev_rollup_id#1278] Batched: true, DataFilters: [isnotnull(cntry_id#1269)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table2], PartitionFilters: [], PushedFilters: [IsNotNull(cntry_id)], ReadSchema: struct<cntry_id:decimal(4,0),rev_rollup_id:smallint>
            :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[0, smallint, true] as int) as bigint))), [id=#342]
            :     :     :        +- *(7) Project [rev_rollup_id#1286 AS gen_attr_311#774, curncy_id#1289 AS gen_attr_308#777]
            :     :     :           +- *(7) Filter isnotnull(rev_rollup_id#1286)
            :     :     :              +- *(7) ColumnarToRow
            :     :     :                 +- FileScan parquet default.small_table3[rev_rollup_id#1286,curncy_id#1289] Batched: true, DataFilters: [isnotnull(rev_rollup_id#1286)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table3], PartitionFilters: [], PushedFilters: [IsNotNull(rev_rollup_id)], ReadSchema: struct<rev_rollup_id:smallint,curncy_id:decimal(4,0)>
            :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(9,0), true] as decimal(20,0)))), [id=#351]
            :     :        +- *(8) Project [CURNCY_ID#1263 AS gen_attr_309#803]
            :     :           +- *(8) Filter isnotnull(CURNCY_ID#1263)
            :     :              +- *(8) ColumnarToRow
            :     :                 +- FileScan parquet default.small_table1[CURNCY_ID#1263] Batched: true, DataFilters: [isnotnull(CURNCY_ID#1263)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table1], PartitionFilters: [], PushedFilters: [IsNotNull(CURNCY_ID)], ReadSchema: struct<CURNCY_ID:decimal(9,0)>, SelectedBucketsCount: 1 out of 1
            :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(4,0), true])), [id=#360]
            :        +- *(9) Project [cntry_id#1269, rev_rollup#1279]
            :           +- *(9) Filter isnotnull(cntry_id#1269)
            :              +- *(9) ColumnarToRow
            :                 +- FileScan parquet default.small_table2[cntry_id#1269,rev_rollup#1279] Batched: true, DataFilters: [isnotnull(cntry_id#1269)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table2], PartitionFilters: [], PushedFilters: [IsNotNull(cntry_id)], ReadSchema: struct<cntry_id:decimal(4,0),rev_rollup:string>
            +- ReusedExchange [cntry_id#1309, rev_rollup#1319], BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(4,0), true])), [id=#360]

This PR try to improve ResolveTables and ResolveRelations performance by reducing the connection times to Hive Metastore Server in such case.

Why are the changes needed?

  1. Reduce the connection times to Hive Metastore Server.
  2. Improve ResolveTables and ResolveRelations performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

manual test.
After SPARK-29606 and before this PR:

=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 9323
Total time: 2.687441263 seconds

Rule                                                                                               Effective Time / Total Time                     Effective Runs / Total Runs

org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations                                   929173767 / 930133504                           2 / 18
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables                                      0 / 383363402                                   0 / 18
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin                                         0 / 99433540                                    0 / 4
org.apache.spark.sql.catalyst.analysis.DecimalPrecision                                            41809394 / 83727901                             2 / 18
org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions                               71372977 / 71372977                             1 / 1
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts                              0 / 59071933                                    0 / 18
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                                  37858325 / 58471776                             5 / 18
org.apache.spark.sql.catalyst.analysis.TypeCoercion$PromoteStrings                                 20889892 / 53229016                             1 / 18
org.apache.spark.sql.catalyst.analysis.TypeCoercion$FunctionArgumentConversion                     23428968 / 50890815                             1 / 18
org.apache.spark.sql.catalyst.analysis.TypeCoercion$InConversion                                   23230666 / 49182607                             1 / 18
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator                                   0 / 43638350                                    0 / 18
org.apache.spark.sql.catalyst.optimizer.ColumnPruning                                              17194844 / 42530885                             1 / 6

After SPARK-29606 and after this PR:

=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 9323
Total time: 2.163765869 seconds

Rule                                                                                               Effective Time / Total Time                     Effective Runs / Total Runs

org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations                                   658905353 / 659829383                           2 / 18
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables                                      0 / 220708715                                   0 / 18
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin                                         0 / 99606816                                    0 / 4
org.apache.spark.sql.catalyst.analysis.DecimalPrecision                                            39616060 / 78215752                             2 / 18
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                                  36706549 / 54917789                             5 / 18
org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions                               53561921 / 53561921                             1 / 1
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts                              0 / 52329678                                    0 / 18
org.apache.spark.sql.catalyst.analysis.TypeCoercion$PromoteStrings                                 20945755 / 49695998                             1 / 18
org.apache.spark.sql.catalyst.analysis.TypeCoercion$FunctionArgumentConversion                     20872241 / 46740145                             1 / 18
org.apache.spark.sql.catalyst.analysis.TypeCoercion$InConversion                                   19780298 / 44327227                             1 / 18
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator                                   0 / 42312023                                    0 / 18
org.apache.spark.sql.catalyst.optimizer.ColumnPruning                                              17197393 / 39501424                             1 / 6

@SparkQA
Copy link

SparkQA commented Nov 19, 2019

Test build #114049 has finished for PR 26589 at commit 3379cf3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Nov 19, 2019

cc @cloud-fan

@dongjoon-hyun
Copy link
Member

Could you resolve the conflicts, @wangyum ? So, the basic idea is caching the result with Map, isn't it?

@cloud-fan
Copy link
Contributor

should this be a per-query thing? e.g. for each query, we only need to resolve a table once.

@wangyum
Copy link
Member Author

wangyum commented Nov 25, 2019

for each query, we only need to resolve a table once in ResolveTables and resolve a table once in ResolveRelations.

@cloud-fan
Copy link
Contributor

A table may appear several times in the query, e.g. self-join. Shall we handle it as well using the cache?

# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@wangyum
Copy link
Member Author

wangyum commented Nov 25, 2019

Yes. We can handle self-join using the cache.

@SparkQA
Copy link

SparkQA commented Nov 25, 2019

Test build #114384 has finished for PR 26589 at commit a7ce2a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@wangyum wangyum changed the title [SPARK-29947][SQL] Improve ResolveTables and ResolveRelations performance [SPARK-29947][SQL] Improve ResolveRelations performance Dec 15, 2019
@SparkQA
Copy link

SparkQA commented Dec 15, 2019

Test build #115362 has finished for PR 26589 at commit 84967f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case other => other
}
def apply(plan: LogicalPlan): LogicalPlan = {
var logicalPlans = Map.empty[UnresolvedRelation, LogicalPlan]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use a mutable map while we're here? Also, can you add a short comment with the JIRA ID?

Copy link
Member Author

@wangyum wangyum Dec 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment to doc:

* @param relationToLogicalPlanMaps The UnresolvedRelation to LogicalPlan mapping, this can ensure
* that the table is resolved only once if a table is used
* multiple times in a query.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems okay

@cloud-fan
Copy link
Contributor

shall we put the map in AnalysisContext? Then we can make sure we only resolve the table once in a query, including tables referred by views.

@SparkQA
Copy link

SparkQA commented Dec 28, 2019

Test build #115884 has finished for PR 26589 at commit 07e6b7f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Dec 28, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Dec 28, 2019

Test build #115886 has finished for PR 26589 at commit 07e6b7f.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Dec 28, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Dec 28, 2019

Test build #115887 has finished for PR 26589 at commit 07e6b7f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Dec 28, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Dec 28, 2019

Test build #115893 has finished for PR 26589 at commit 07e6b7f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 29, 2019

Test build #115911 has finished for PR 26589 at commit 77b739a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
case class AnalysisContext(
defaultDatabase: Option[String] = None,
nestedViewDepth: Int = 0)
nestedViewDepth: Int = 0,
relationToLogicalPlanMaps: mutable.Map[UnresolvedRelation, LogicalPlan] = mutable.Map.empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about just relationCache?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and it's simpler to use Seq[String] as key.

# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@SparkQA
Copy link

SparkQA commented Jan 2, 2020

Test build #116051 has finished for PR 26589 at commit 2e1f87d.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 2, 2020

Test build #116053 has finished for PR 26589 at commit e88351a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -92,10 +92,13 @@ object FakeV2SessionCatalog extends TableCatalog {
* views.
* @param nestedViewDepth The nested depth in the view resolution, this enables us to limit the
* depth of nested views.
* @param relationCache The UnresolvedRelation to LogicalPlan mapping, this can ensure that the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A mapping from qualified table names to resolved relations.

@@ -858,7 +861,12 @@ class Analyzer(
}

case u: UnresolvedRelation =>
lookupRelation(u.multipartIdentifier).map(resolveViews).getOrElse(u)
val relationCache = AnalysisContext.get.relationCache
relationCache.getOrElse(u.tableName, {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we apply the cache in lookupRelation where we expand the name and get the qualified table name?

@@ -870,7 +874,8 @@ class Analyzer(
case SessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table =>
v1SessionCatalog.getRelation(v1Table.v1Table)
AnalysisContext.get.relationCache.getOrElseUpdate(
v1Table.v1Table.qualifiedName, v1SessionCatalog.getRelation(v1Table.v1Table))
Copy link
Contributor

@cloud-fan cloud-fan Jan 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the key should be a fully qualified name, including catalog name. We may have different tables in different catalogs with the same name.

how about

val key = catalog.name +: ident.namespace :+ ident.name
relationCache.getOrElseUpdate(key, ...)

@SparkQA
Copy link

SparkQA commented Jan 3, 2020

Test build #116069 has finished for PR 26589 at commit 99a8557.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 3, 2020

Test build #116073 has finished for PR 26589 at commit 8e7b666.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jan 3, 2020

Test build #116085 has finished for PR 26589 at commit 8e7b666.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan closed this in 568ad4e Jan 3, 2020
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@@ -870,7 +874,9 @@ class Analyzer(
case SessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table =>
v1SessionCatalog.getRelation(v1Table.v1Table)
val key = catalog.name +: ident.namespace :+ ident.name
AnalysisContext.get.relationCache.getOrElseUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work. The table is already loaded and it's too late to use the cache. I've sent #27341 to fix it.

dongjoon-hyun pushed a commit that referenced this pull request Jan 23, 2020
### What changes were proposed in this pull request?

Fix a bug in #26589 , to make this feature work.

### Why are the changes needed?

This feature doesn't work actually.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

new test

Closes #27341 from cloud-fan/cache.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
*/
case class AnalysisContext(
catalogAndNamespace: Seq[String] = Nil,
nestedViewDepth: Int = 0)
nestedViewDepth: Int = 0,
relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the anti-pattern from the style guide ... https://github.com/databricks/scala-style-guide#case-classes-and-immutability

cloud-fan added a commit that referenced this pull request Jun 3, 2020
… with fresh attribute IDs

### What changes were proposed in this pull request?

This is a followup of #26589, which caches the table relations to speed up the table lookup. However, it brings some side effects: the rule `ResolveRelations` may return exactly the same relations, while before it always returns relations with fresh attribute IDs.

This PR is to eliminate this side effect.

### Why are the changes needed?

There is no bug report yet, but this side effect may impact things like self-join. It's better to restore the 2.4 behavior and always return refresh relations.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #28717 from cloud-fan/fix.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit that referenced this pull request Jun 3, 2020
… with fresh attribute IDs

### What changes were proposed in this pull request?

This is a followup of #26589, which caches the table relations to speed up the table lookup. However, it brings some side effects: the rule `ResolveRelations` may return exactly the same relations, while before it always returns relations with fresh attribute IDs.

This PR is to eliminate this side effect.

### Why are the changes needed?

There is no bug report yet, but this side effect may impact things like self-join. It's better to restore the 2.4 behavior and always return refresh relations.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #28717 from cloud-fan/fix.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dc0709f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants