Skip to content

filter not pushdown through sort in FilterPushdown #21526

@haohuaijin

Description

@haohuaijin

Describe the bug

for the physical plan like below

FilterExec: log@1 LIKE %datafusion%, projection=[_timestamp@0]
  SortExec: expr=[_timestamp@0 DESC NULLS LAST], preserve_partitioning=[true]
    DataSourceExec: partitions=1, partition_sizes=[1]

then i try use FilterPushdown to pushdown the filter under the sort, but current not work
After FilterPushdown:

FilterExec: log@1 LIKE %datafusion%, projection=[_timestamp@0]
  SortExec: expr=[_timestamp@0 DESC NULLS LAST], preserve_partitioning=[true]
    DataSourceExec: partitions=1, partition_sizes=[1]

To Reproduce

use std::sync::Arc;

use arrow::array::{RecordBatch, StringArray};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::catalog::memory::MemorySourceConfig;
use datafusion::common::config::ConfigOptions;
use datafusion::physical_expr::expressions::{col, like, lit};
use datafusion::physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::filter::FilterExecBuilder;
use datafusion::physical_plan::sorts::sort::SortExec;

/// Constructs and executes a physical plan equivalent to:
///
/// ```text
///   FilterExec: log@1 LIKE %datafusion%, projection=[_timestamp@0]
///     SortExec: expr=[_timestamp@0 DESC NULLS LAST], preserve_partitioning=[true]
///       DataSourceExec (MemoryExec): projection=[_timestamp, log]
/// ```
#[tokio::main]
async fn main() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("_timestamp", DataType::Utf8, false),
        Field::new("log", DataType::Utf8, false),
    ]));

    let batch1 = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(StringArray::from(vec!["2026-04-09T02:00:00Z"])),
            Arc::new(StringArray::from(vec!["starting datafusion query engine"])),
        ],
    )
    .unwrap();

    // DataSourceExec (MemoryExec): 1 partitions, projection=[_timestamp, log]
    let memory_exec =
        MemorySourceConfig::try_new_exec(&[vec![batch1]], schema.clone(), None).unwrap();

    // SortExec: expr=[_timestamp@0 DESC NULLS LAST], preserve_partitioning=[true]
    let sort_expr = PhysicalSortExpr::new(
        col("_timestamp", &schema).unwrap(),
        SortOptions {
            descending: true,
            nulls_first: false,
        },
    );
    let lex_ordering = LexOrdering::new(vec![sort_expr]).unwrap();
    let sort_exec = Arc::new(
        SortExec::new(lex_ordering, memory_exec).with_preserve_partitioning(true),
    );

    // FilterExec: log@1 LIKE %datafusion%, projection=[_timestamp@0], fetch=10
    let like_predicate = like(
        false, // not negated
        false, // case sensitive
        col("log", &schema).unwrap(),
        lit("%datafusion%"),
        &schema,
    )
    .unwrap();

    let filter_exec = Arc::new(
        FilterExecBuilder::new(like_predicate, sort_exec)
            .apply_projection(Some(vec![0])) // projection=[_timestamp@0]
            .unwrap()
            .build()
            .unwrap(),
    );

    // Print the physical plan
    let display = displayable(filter_exec.as_ref());
    println!("Physical plan:\n{}", display.indent(true));

    // Run filter pushdown optimization and print the optimized plan
    let config = ConfigOptions::default();
    let optimized_plan = FilterPushdown::new()
        .optimize(filter_exec.clone(), &config)
        .unwrap();
    let optimized_display = displayable(optimized_plan.as_ref());
    println!("After FilterPushdown:\n{}", optimized_display.indent(true));
}

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions