Skip to content

Optimize benchmark q2 subquery filter #3789

@andygrove

Description

@andygrove

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

@Dandandan noticed a difference between Spark and DataFusion logical plan for q2:

Instead of moving partsupp.ps_supplycost = __sq_1.__value to a filter, Spark adds it to the inner join instead.

Seems in the spark query this one is included in Inner join, instead of a filter:

((knownfloatingpointnormalized(normalizenanandzero(ps_supplycost#35)) = knownfloatingpointnormalized(normalizenanandzero(min(ps_supplycost)#124)))

That will likely reduce the output size of the join.

DataFusion

Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST
  Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment
    Filter: partsupp.ps_supplycost = __sq_1.__value
      Inner Join: part.p_partkey = __sq_1.ps_partkey
        Inner Join: nation.n_regionkey = region.r_regionkey
          Inner Join: supplier.s_nationkey = nation.n_nationkey
            Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
              Inner Join: part.p_partkey = partsupp.ps_partkey
                Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8("%BRASS")
                  TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8("%BRASS")]
                TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
              TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
            TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
          Filter: region.r_name = Utf8("EUROPE")
            TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]
        Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value, alias=__sq_1
          Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
            Inner Join: nation.n_regionkey = region.r_regionkey
              Inner Join: supplier.s_nationkey = nation.n_nationkey
                Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
                  TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
                  TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
                TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
              Filter: region.r_name = Utf8("EUROPE")
                TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]

Spark

Sort [s_acctbal#23 DESC NULLS LAST, n_name#109 ASC NULLS FIRST, s_name#19 ASC NULLS FIRST, p_partkey#0L ASC NULLS FIRST], true
+- Project [s_acctbal#23, s_name#19, n_name#109, p_partkey#0L, p_mfgr#2, s_address#20, s_phone#22, s_comment#24]
   +- Join Inner, (n_regionkey#110L = r_regionkey#116L)
      :- Project [p_partkey#0L, p_mfgr#2, s_name#19, s_address#20, s_phone#22, s_acctbal#23, s_comment#24, n_name#109, n_regionkey#110L]
      :  +- Join Inner, (s_nationkey#21L = n_nationkey#108L)
      :     :- Project [p_partkey#0L, p_mfgr#2, s_name#19, s_address#20, s_nationkey#21L, s_phone#22, s_acctbal#23, s_comment#24]
      :     :  +- Join Inner, (s_suppkey#18L = ps_suppkey#33L)
      :     :     :- Project [p_partkey#0L, p_mfgr#2, ps_suppkey#33L]
      :     :     :  +- Join Inner, ((ps_supplycost#35 = min(ps_supplycost)#124) AND (p_partkey#0L = ps_partkey#125L))
      :     :     :     :- Project [p_partkey#0L, p_mfgr#2, ps_suppkey#33L, ps_supplycost#35]
      :     :     :     :  +- Join Inner, (p_partkey#0L = ps_partkey#32L)
      :     :     :     :     :- Project [p_partkey#0L, p_mfgr#2]
      :     :     :     :     :  +- Filter (((isnotnull(p_size#5) AND isnotnull(p_type#4)) AND ((p_size#5 = 15) AND EndsWith(p_type#4, BRASS))) AND isnotnull(p_partkey#0L))
      :     :     :     :     :     +- Relation [p_partkey#0L,p_name#1,p_mfgr#2,p_brand#3,p_type#4,p_size#5,p_container#6,p_retailprice#7,p_comment#8] parquet
      :     :     :     :     +- Project [ps_partkey#32L, ps_suppkey#33L, ps_supplycost#35]
      :     :     :     :        +- Filter (isnotnull(ps_partkey#32L) AND (isnotnull(ps_supplycost#35) AND isnotnull(ps_suppkey#33L)))
      :     :     :     :           +- Relation [ps_partkey#32L,ps_suppkey#33L,ps_availqty#34,ps_supplycost#35,ps_comment#36] parquet
      :     :     :     +- Filter isnotnull(min(ps_supplycost)#124)
      :     :     :        +- Aggregate [ps_partkey#125L], [min(ps_supplycost#128) AS min(ps_supplycost)#124, ps_partkey#125L]
      :     :     :           +- Project [ps_partkey#125L, ps_supplycost#128]
      :     :     :              +- Join Inner, (n_regionkey#139L = r_regionkey#141L)
      :     :     :                 :- Project [ps_partkey#125L, ps_supplycost#128, n_regionkey#139L]
      :     :     :                 :  +- Join Inner, (s_nationkey#133L = n_nationkey#137L)
      :     :     :                 :     :- Project [ps_partkey#125L, ps_supplycost#128, s_nationkey#133L]
      :     :     :                 :     :  +- Join Inner, (s_suppkey#130L = ps_suppkey#126L)
      :     :     :                 :     :     :- Project [ps_partkey#125L, ps_suppkey#126L, ps_supplycost#128]
      :     :     :                 :     :     :  +- Filter (isnotnull(ps_suppkey#126L) AND isnotnull(ps_partkey#125L))
      :     :     :                 :     :     :     +- Relation [ps_partkey#125L,ps_suppkey#126L,ps_availqty#127,ps_supplycost#128,ps_comment#129] parquet
      :     :     :                 :     :     +- Project [s_suppkey#130L, s_nationkey#133L]
      :     :     :                 :     :        +- Filter (isnotnull(s_suppkey#130L) AND isnotnull(s_nationkey#133L))
      :     :     :                 :     :           +- Relation [s_suppkey#130L,s_name#131,s_address#132,s_nationkey#133L,s_phone#134,s_acctbal#135,s_comment#136] parquet
      :     :     :                 :     +- Project [n_nationkey#137L, n_regionkey#139L]
      :     :     :                 :        +- Filter (isnotnull(n_nationkey#137L) AND isnotnull(n_regionkey#139L))
      :     :     :                 :           +- Relation [n_nationkey#137L,n_name#138,n_regionkey#139L,n_comment#140] parquet
      :     :     :                 +- Project [r_regionkey#141L]
      :     :     :                    +- Filter ((isnotnull(r_name#142) AND (r_name#142 = EUROPE)) AND isnotnull(r_regionkey#141L))
      :     :     :                       +- Relation [r_regionkey#141L,r_name#142,r_comment#143] parquet
      :     :     +- Filter (isnotnull(s_suppkey#18L) AND isnotnull(s_nationkey#21L))
      :     :        +- Relation [s_suppkey#18L,s_name#19,s_address#20,s_nationkey#21L,s_phone#22,s_acctbal#23,s_comment#24] parquet
      :     +- Project [n_nationkey#108L, n_name#109, n_regionkey#110L]
      :        +- Filter (isnotnull(n_nationkey#108L) AND isnotnull(n_regionkey#110L))
      :           +- Relation [n_nationkey#108L,n_name#109,n_regionkey#110L,n_comment#111] parquet
      +- Project [r_regionkey#116L]
         +- Filter ((isnotnull(r_name#117) AND (r_name#117 = EUROPE)) AND isnotnull(r_regionkey#116L))
            +- Relation [r_regionkey#116L,r_name#117,r_comment#118] parquet

Describe the solution you'd like
Hopefully we can refine our existing rule or add a new rule to optimize this.

Describe alternatives you've considered
None

Additional context
None

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestperformanceMake DataFusion faster

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions