Skip to content

TableProvider implementation for DataFrame does not support filter pushdown #3681

@Gentoli

Description

@Gentoli

Describe the bug
Filter is not pushdown if a dataframe is registered as a table.

To Reproduce
Not isolated to join, using it here to illustrate the issue.

Steps to reproduce the behaviour:

let ctx = SessionContext::new();

ctx.register_csv("test", "./test.csv", CsvReadOptions::new()).await.unwrap();
let right = ctx.read_csv("./test2.csv", CsvReadOptions::new()).await.unwrap();

let df = ctx.table("test").unwrap()
    .join(right, JoinType::Inner, &["name_1"], &["name_2"], None)
    .unwrap();

println!("works");
df.filter(col("name_1").eq(lit("andrew"))).unwrap()
    .explain(false, false).unwrap().show().await.unwrap();

ctx.register_table("table_alias", df.clone()).unwrap();

println!("don't work");
ctx.table("table_alias").unwrap()
    .filter(col("name_1").eq(lit("andrew"))).unwrap()
    .explain(false, false).unwrap().show().await.unwrap();
Output + Test files
works
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                 |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Inner Join: #test.name_1 = #?table?.name_2                                                                                                           |
|               |   Filter: #test.name_1 = Utf8("andrew")                                                                                                              |
|               |     TableScan: test projection=[name_1, num], partial_filters=[#test.name_1 = Utf8("andrew")]                                                        |
|               |   Filter: #?table?.name_2 = Utf8("andrew")                                                                                                           |
|               |     TableScan: ?table? projection=[name_2, id], partial_filters=[#?table?.name_2 = Utf8("andrew")]                                                   |
| physical_plan | CoalesceBatchesExec: target_batch_size=4096                                                                                                          |
|               |   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "name_1", index: 0 }, Column { name: "name_2", index: 0 })]                   |
|               |     CoalesceBatchesExec: target_batch_size=4096                                                                                                      |
|               |       RepartitionExec: partitioning=Hash([Column { name: "name_1", index: 0 }], 12)                                                                  |
|               |         CoalesceBatchesExec: target_batch_size=4096                                                                                                  |
|               |           FilterExec: name_1@0 = andrew                                                                                                              |
|               |             RepartitionExec: partitioning=RoundRobinBatch(12)                                                                                        |
|               |               CsvExec: files=[<>/test.csv], has_header=true, limit=None, projection=[name_1, num] |
|               |     CoalesceBatchesExec: target_batch_size=4096                                                                                                      |
|               |       RepartitionExec: partitioning=Hash([Column { name: "name_2", index: 0 }], 12)                                                                  |
|               |         CoalesceBatchesExec: target_batch_size=4096                                                                                                  |
|               |           FilterExec: name_2@0 = andrew                                                                                                              |
|               |             RepartitionExec: partitioning=RoundRobinBatch(12)                                                                                        |
|               |               CsvExec: files=[<>/test2.csv], has_header=true, limit=None, projection=[name_2, id] |
|               |                                                                                                                                                      |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
don't work
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                           |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Filter: #table_alias.name_1 = Utf8("andrew")                                                                                                                   |
|               |   TableScan: table_alias projection=[name_1, num, name_2, id]                                                                                                  |
| physical_plan | CoalesceBatchesExec: target_batch_size=4096                                                                                                                    |
|               |   FilterExec: name_1@0 = andrew                                                                                                                                |
|               |     ProjectionExec: expr=[name_1@0 as name_1, num@1 as num, name_2@2 as name_2, id@3 as id]                                                                    |
|               |       CoalesceBatchesExec: target_batch_size=4096                                                                                                              |
|               |         CoalesceBatchesExec: target_batch_size=4096                                                                                                            |
|               |           HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "name_1", index: 0 }, Column { name: "name_2", index: 0 })]                     |
|               |             CoalesceBatchesExec: target_batch_size=4096                                                                                                        |
|               |               CoalesceBatchesExec: target_batch_size=4096                                                                                                      |
|               |                 RepartitionExec: partitioning=Hash([Column { name: "name_1", index: 0 }], 12)                                                                  |
|               |                   CoalesceBatchesExec: target_batch_size=4096                                                                                                  |
|               |                     RepartitionExec: partitioning=RoundRobinBatch(12)                                                                                          |
|               |                       RepartitionExec: partitioning=RoundRobinBatch(12)                                                                                        |
|               |                         CsvExec: files=[<>/test.csv], has_header=true, limit=None, projection=[name_1, num] |
|               |             CoalesceBatchesExec: target_batch_size=4096                                                                                                        |
|               |               CoalesceBatchesExec: target_batch_size=4096                                                                                                      |
|               |                 RepartitionExec: partitioning=Hash([Column { name: "name_2", index: 0 }], 12)                                                                  |
|               |                   CoalesceBatchesExec: target_batch_size=4096                                                                                                  |
|               |                     RepartitionExec: partitioning=RoundRobinBatch(12)                                                                                          |
|               |                       RepartitionExec: partitioning=RoundRobinBatch(12)                                                                                        |
|               |                         CsvExec: files=[<>/test2.csv], has_header=true, limit=None, projection=[name_2, id] |
|               |                                                                                                                                                                |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+

test.csv

name_1,num
andrew,100
jorge,200
andy,150
paul,300

test2.csv

name_2,id
andrew,1
jorge,2
andy,3
paul,4

Expected behavior
With join, the filtering should happening before join.

Additional context
Seems like the TableProvider implementation tries to support filtering. But it's the default supports_filter_pushdown is responding to with Unsupported thus there will not be filters passed down.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinghelp wantedExtra attention is needed

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions