Skip to content

Filters aren't passed down to table scans in a union #557

@nevi-me

Description

@nevi-me

Describe the bug

If I run a query that includes a union all between two or more different sources, it seems that the filters don't get passed to scans that can take filters.

I noticed this while running a query that pulls data from CSV files and a MongoDB database.

To Reproduce

I haven't tried this with Parquet + CSV, but I presume it's reproducible. To reproduce:

  • Create two datasets with the same schema, but different datafusion supported formats. One of the formats should support filter pushdown.
  • Run a query that has a union all and a filter outside of the union.

An example query with the New York City Yellow Cabs dataset:

select 
    count(*) as total_records,
    count(distinct payment_type) as total_payment_types,
    sum(cast(trip_distance as float)) as total_distance
from (select * from mongo_nyc union all select * from csv_nyc)
where 
    passenger_count > 3 and 
    total_amount < 20.0

Produces the following output:

Projection: #COUNT(UInt8(1)) AS total_records, #COUNT(DISTINCT payment_type) AS total_payment_types, #SUM(CAST(trip_distance AS Float64)) AS total_distance
  Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), COUNT(DISTINCT #payment_type), SUM(CAST(#trip_distance AS Float64))]]
    Filter: #passenger_count Gt Int64(3) And #total_amount Lt Float64(20)
      Union
        Projection: #passenger_count, #trip_distance, #payment_type, #total_amount
          TableScan: mongo_nyc projection=Some([3, 4, 9, 16])
        Projection: #passenger_count, #trip_distance, #payment_type, #total_amount
          TableScan: csv_nyc projection=Some([3, 4, 9, 16])

Expected behavior

I expected the filters on passenger_count and total_amount to be passed to the mongo_nyc table scan. Note that mongo_nyc above supports exact filter pushdown, so perhaps the csv_nyc should get rewritten to:

  Projection: #COUNT(UInt8(1)) AS total_records, #COUNT(DISTINCT payment_type) AS total_payment_types, 
#SUM(CAST(trip_distance AS Float64)) AS total_distance
    Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), COUNT(DISTINCT #payment_type), SUM(CAST(#trip_distance AS Float64))]]
-     Filter: #passenger_count Gt Int64(3) And #total_amount Lt Float64(20)
        Union
          Projection: #passenger_count, #trip_distance, #payment_type, #total_amount
-           TableScan: mongo_nyc projection=Some([3, 4, 9, 16])
+          TableScan: mongo_nyc projection=Some([3, 4, 9, 16]), filters=[#passenger_count Gt Int64(3), #total_amount Lt Float64(20)]
          Projection: #passenger_count, #trip_distance, #payment_type, #total_amount
+           Filter: #passenger_count Gt Int64(3) And #total_amount Lt Float64(20)
            TableScan: csv_nyc projection=Some([3, 4, 9, 16])

Additional context

I tested this on master as at 13 June 2021

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingenhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions