Skip to content

The canonicalization of Filter Expressions (Data and runtime) is broken in SparkRuntimeFilterableScan  #16563

@ahshahid

Description

@ahshahid

While doing integrated testing of iceberg with Spark, have found issue regarding the re-use of exchange operator in Spark.

Because the equality and hashCode in SparkBatchQueryScan is not imposing an order in generation of the filter expression strings, two plans though structurally similar, do not match.

The stack trace of the issue is pasted below:

The reuse of exchange is not happening because of the equality mismatch of SparkBatchQueryScan, as I highlighted :
The issue shows up in the table
BatchScan spark_catalog.default.item ,
as marked in bold in the actual plan.

I am attaching a patch for Spark 4.1 , which solves this issue. Same patch needs application in all relevant spark versions...

`
simplified plans did not match
actual simplified =

patch.txt

TakeOrderedAndProject [sum_sales,avg_monthly_sales,s_store_name,i_category,i_brand,s_company_name,d_year,d_moy,psum,nsum]
WholeStageCodegen (22)
Project [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,avg_monthly_sales,sum_sales,sum_sales,sum_sales]
BroadcastHashJoin [i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
Project [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn,sum_sales]
BroadcastHashJoin [i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
Project [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn]
Filter [avg_monthly_sales,sum_sales]
InputAdapter
Window [_w0,i_category,i_brand,s_store_name,s_company_name,d_year]
WholeStageCodegen (7)
Filter [d_year]
InputAdapter
Window [d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (6)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name] #1
WholeStageCodegen (5)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] [sum(UnscaledValue(ss_sales_price)),sum_sales,_w0,sum]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #2
WholeStageCodegen (4)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price] [sum,sum]
Project [i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
BroadcastHashJoin [ss_store_sk,s_store_sk]
Project [i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
Project [i_brand,i_category,ss_sold_date_sk,ss_store_sk,ss_sales_price]
BroadcastHashJoin [i_item_sk,ss_item_sk]
Project [i_item_sk,i_brand,i_category]
Filter [i_item_sk,i_brand,i_category]
InputAdapter
BatchScan spark_catalog.default.item [i_item_sk,i_brand,i_category]

InputAdapter
BroadcastExchange #3
WholeStageCodegen (1)
Project [ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
Filter [ss_item_sk,ss_store_sk]
InputAdapter
BatchScan spark_catalog.default.store_sales [ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
InputAdapter
BroadcastExchange #4
WholeStageCodegen (2)
Project [d_date_sk,d_year,d_moy]
Filter [d_year,d_moy,d_date_sk]
InputAdapter
BatchScan spark_catalog.default.date_dim [d_date_sk,d_year,d_moy]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (3)
Project [s_store_sk,s_store_name,s_company_name]
Filter [s_store_sk,s_company_name,s_store_name]
InputAdapter
BatchScan spark_catalog.default.store [s_store_sk,s_store_name,s_company_name]
InputAdapter
BroadcastExchange #6
WholeStageCodegen (14)
Project [i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
InputAdapter
Window [d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (13)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name] #7
WholeStageCodegen (12)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] [sum(UnscaledValue(ss_sales_price)),sum_sales,sum]
InputAdapter
ReusedExchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] #2
InputAdapter
BroadcastExchange #8
WholeStageCodegen (21)
Project [i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
InputAdapter
Window [d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (20)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name] #9
WholeStageCodegen (19)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] [sum(UnscaledValue(ss_sales_price)),sum_sales,sum]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #10
WholeStageCodegen (18)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price] [sum,sum]
Project [i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
BroadcastHashJoin [ss_store_sk,s_store_sk]
Project [i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
Project [i_brand,i_category,ss_sold_date_sk,ss_store_sk,ss_sales_price]
BroadcastHashJoin [i_item_sk,ss_item_sk]
Project [i_item_sk,i_brand,i_category]
Filter [i_item_sk,i_category,i_brand]
InputAdapter
BatchScan spark_catalog.default.item [i_item_sk,i_brand,i_category]

InputAdapter
ReusedExchange [ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price] #3
InputAdapter
ReusedExchange [d_date_sk,d_year,d_moy] #4
InputAdapter
BroadcastExchange #11
WholeStageCodegen (17)
Project [s_store_sk,s_store_name,s_company_name]
Filter [s_store_sk,s_store_name,s_company_name]
InputAdapter
BatchScan spark_catalog.default.store [s_store_sk,s_store_name,s_company_name]

approved simplified=
TakeOrderedAndProject [sum_sales,avg_monthly_sales,s_store_name,i_category,i_brand,s_company_name,d_year,d_moy,psum,nsum]
WholeStageCodegen (22)
Project [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,avg_monthly_sales,sum_sales,sum_sales,sum_sales]
BroadcastHashJoin [i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
Project [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn,sum_sales]
BroadcastHashJoin [i_category,i_brand,s_store_name,s_company_name,rn,i_category,i_brand,s_store_name,s_company_name,rn]
Project [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales,avg_monthly_sales,rn]
Filter [avg_monthly_sales,sum_sales]
InputAdapter
Window [_w0,i_category,i_brand,s_store_name,s_company_name,d_year]
WholeStageCodegen (7)
Filter [d_year]
InputAdapter
Window [d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (6)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name] #1
WholeStageCodegen (5)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] [sum(UnscaledValue(ss_sales_price)),sum_sales,_w0,sum]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy] #2
WholeStageCodegen (4)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,ss_sales_price] [sum,sum]
Project [i_brand,i_category,ss_sales_price,d_year,d_moy,s_store_name,s_company_name]
BroadcastHashJoin [ss_store_sk,s_store_sk]
Project [i_brand,i_category,ss_store_sk,ss_sales_price,d_year,d_moy]
BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
Project [i_brand,i_category,ss_sold_date_sk,ss_store_sk,ss_sales_price]
BroadcastHashJoin [i_item_sk,ss_item_sk]
Project [i_item_sk,i_brand,i_category]
Filter [i_item_sk,i_category,i_brand]
InputAdapter
BatchScan spark_catalog.default.item [i_item_sk,i_brand,i_category]
InputAdapter
BroadcastExchange #3
WholeStageCodegen (1)
Project [ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
Filter [ss_item_sk,ss_store_sk]
InputAdapter
BatchScan spark_catalog.default.store_sales [ss_sold_date_sk,ss_item_sk,ss_store_sk,ss_sales_price]
InputAdapter
BroadcastExchange #4
WholeStageCodegen (2)
Project [d_date_sk,d_year,d_moy]
Filter [d_year,d_moy,d_date_sk]
InputAdapter
BatchScan spark_catalog.default.date_dim [d_date_sk,d_year,d_moy]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (3)
Project [s_store_sk,s_store_name,s_company_name]
Filter [s_store_sk,s_store_name,s_company_name]
InputAdapter
BatchScan spark_catalog.default.store [s_store_sk,s_store_name,s_company_name]
InputAdapter
BroadcastExchange #6
WholeStageCodegen (14)
Project [i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
InputAdapter
Window [d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (13)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
Exchange [i_category,i_brand,s_store_name,s_company_name] #7
WholeStageCodegen (12)
HashAggregate [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] [sum(UnscaledValue(ss_sales_price)),sum_sales,sum]
InputAdapter
ReusedExchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum] #2
InputAdapter
BroadcastExchange #8
WholeStageCodegen (21)
Project [i_category,i_brand,s_store_name,s_company_name,sum_sales,rn]
InputAdapter
Window [d_year,d_moy,i_category,i_brand,s_store_name,s_company_name]
WholeStageCodegen (20)
Sort [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy]
InputAdapter
ReusedExchange [i_category,i_brand,s_store_name,s_company_name,d_year,d_moy,sum_sales] #7

`

Pls Note:
There is another issue too, which causes failure of re-use of exchange, but that is more involved, shows up when DPP of Spark and AQE both are enabled. The fix for that requires enahncement of SupportsRuntimeFiltering interface and hence spans both iceberg and Spark
( In case interested, pls take a look at spark jira: SPARK-45866
This ticket was opened nearly 2 years back, but no action has been taken...

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions