Skip to content

[flink] Lake filters shoud be pushed down for non-partitioned scans #3168

@luoyuxia

Description

@luoyuxia

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

main (development)

Please describe the bug 🐞

FlinkTableSource.applyFilters(...) only forwards converted predicates to lakeSource.withFilters(...) inside the isPartitioned() branch.

That means a non-partitioned table with data lake enabled never pushes SQL filters down to the lake source, even when the predicates are convertible and the underlying lake format supports filter pushdown. In practice, batch reads in FULL mode fall back to Flink-side filtering for the lake portion of the scan instead of letting the lake source prune files or rows earlier.

A minimal reproduction is:

  1. Create a non-partitioned Fluss table with data lake enabled.
  2. Run a batch query in FULL startup mode with filters such as region = 'HangZhou' or value > 1000.
  3. Observe that FlinkTableSource never calls lakeSource.withFilters(...) for that table, because the pushdown path is guarded by isPartitioned().

Expected behavior:

Convertible filters should be pushed to the lake source whenever lakeSource != null, regardless of whether the table is partitioned.

Actual behavior:

Only partitioned tables attempt lake filter pushdown.

Solution

Move the lake filter pushdown logic out of the partition-only branch so that all lake-backed scans can attempt lakeSource.withFilters(...). Keep the existing fallback behavior when the lake source rejects some predicates.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions