Skip to content

Conversation

@LiaCastaneda
Copy link
Contributor

@LiaCastaneda LiaCastaneda commented Nov 18, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

Dynamic filter pushdown in DataFusion currently lacks an API to determine when filters are "complete" (all contributing partitions have reported), this creates an ambiguity issue where it's impossible to differentiate between:

  1. Complete filter with no data: Build side produced 0 rows, filter remains as placeholder lit(true), no more updates coming
  2. Incomplete filter: Filter is still being computed, updates are pending

I think this could be especially useful when we want to make the filter updates progressively in the future.

What changes are included in this PR?

  • Calls mark_complete() after barrier completes, regardless of whether bounds exist.
  • Exposes is_complete() function on the DynamicFilterPhysicalExpr.

Are these changes tested?

I didn't add any tests because the change is minimal , and comprehensive testing would require making the DynamicFilterPhysicalExpr public or running through the full optimizer pipeline.

Are there any user-facing changes?

Exposing is_complete() function.

@github-actions github-actions bot added physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate labels Nov 18, 2025
@LiaCastaneda LiaCastaneda force-pushed the lia/add-function-is_complete branch from 5528b6e to dffd77b Compare November 18, 2025 11:10
@LiaCastaneda LiaCastaneda changed the title Add field to DynamicPhysicalExpr to indicate the filter is complete Add field to DynamicPhysicalExpr to indicate when the filter is complete Nov 18, 2025
@LiaCastaneda LiaCastaneda marked this pull request as ready for review November 18, 2025 11:23
@gabotechs
Copy link
Contributor

Will take a look at this one soon

@adriangb
Copy link
Contributor

This makes sense and is a relatively simple change, but could you share an example use case? Would the scan node care if the filter is complete/in progress?

@LiaCastaneda LiaCastaneda force-pushed the lia/add-function-is_complete branch from 28c5406 to 278c3bb Compare November 19, 2025 09:17
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! I do think it would be good to have some tests:

  1. Unit tests for DynamicFilterPhysicalExpr
  2. Unit tests for TopK/SortExec and HashJoinExec to ensure they mark the filter as completed.

@LiaCastaneda LiaCastaneda force-pushed the lia/add-function-is_complete branch 2 times, most recently from 3c40331 to c6a91b6 Compare November 19, 2025 11:11
@LiaCastaneda LiaCastaneda force-pushed the lia/add-function-is_complete branch from c10ef72 to 3115cfb Compare November 19, 2025 11:21
@LiaCastaneda LiaCastaneda changed the title Add field to DynamicPhysicalExpr to indicate when the filter is complete Add field to DynamicPhysicalExpr to indicate when the filter is complete or updated Nov 19, 2025
Copy link
Contributor

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

@gabotechs
Copy link
Contributor

Will give it until tomorrow in case anyone else want to chime in, and otherwise merge it then.

@LiaCastaneda
Copy link
Contributor Author

Thanks for the reviews @adriangb and @gabotechs 🙇

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(1), Some(2)]));
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array])?;
topk.insert_batch(batch)?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also do an assertion for 'in progress' here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the issue is that its hard to set up in the tests a wait_update before the update happens without doing something like tokio::spawn (which I think is not allowed to use in tests) or setting a timeout which would introduce some indeterminism to the test

@LiaCastaneda
Copy link
Contributor Author

I think this PR might slightly conflict logically with #18451 -- I have to update the branch before merging

@gabotechs gabotechs added this pull request to the merge queue Nov 20, 2025
Merged via the queue into apache:main with commit 7fa2a69 Nov 20, 2025
35 checks passed
@gabotechs
Copy link
Contributor

Thanks @LiaCastaneda for the PR and @adriangb and @2010YOUY01 for the reviews!

LiaCastaneda added a commit to DataDog/datafusion that referenced this pull request Nov 20, 2025
…ete or updated (apache#18799)

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes #.

Dynamic filter pushdown in DataFusion currently lacks an API to
determine when filters are "complete" (all contributing partitions have
reported), this creates an ambiguity issue where it's impossible to
differentiate between:

1. **Complete filter with no data**: Build side produced 0 rows, filter
remains as placeholder `lit(true)`, no more updates coming
2. **Incomplete filter**: Filter is still being computed, updates are
pending

I think this could be especially useful when we want to make the filter
updates progressively in the future.

- Calls `mark_complete()` after barrier completes, regardless of whether
bounds exist.
- Exposes` is_complete() f`unction on the `DynamicFilterPhysicalExpr`.

I didn't add any tests because the change is minimal , and comprehensive
testing would require making the `DynamicFilterPhysicalExpr` public or
running through the full optimizer pipeline.

Exposing is_complete() function.

(cherry picked from commit 7fa2a69)
LiaCastaneda added a commit to DataDog/datafusion that referenced this pull request Nov 20, 2025
…ete or updated (apache#18799)

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes #.

Dynamic filter pushdown in DataFusion currently lacks an API to
determine when filters are "complete" (all contributing partitions have
reported), this creates an ambiguity issue where it's impossible to
differentiate between:

1. **Complete filter with no data**: Build side produced 0 rows, filter
remains as placeholder `lit(true)`, no more updates coming
2. **Incomplete filter**: Filter is still being computed, updates are
pending

I think this could be especially useful when we want to make the filter
updates progressively in the future.

- Calls `mark_complete()` after barrier completes, regardless of whether
bounds exist.
- Exposes` is_complete() f`unction on the `DynamicFilterPhysicalExpr`.

I didn't add any tests because the change is minimal , and comprehensive
testing would require making the `DynamicFilterPhysicalExpr` public or
running through the full optimizer pipeline.

Exposing is_complete() function.

(cherry picked from commit 7fa2a69)
LiaCastaneda added a commit to DataDog/datafusion that referenced this pull request Nov 20, 2025
…ete or updated (apache#18799) (#60)

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes #.

Dynamic filter pushdown in DataFusion currently lacks an API to
determine when filters are "complete" (all contributing partitions have
reported), this creates an ambiguity issue where it's impossible to
differentiate between:

1. **Complete filter with no data**: Build side produced 0 rows, filter
remains as placeholder `lit(true)`, no more updates coming
2. **Incomplete filter**: Filter is still being computed, updates are
pending

I think this could be especially useful when we want to make the filter
updates progressively in the future.

- Calls `mark_complete()` after barrier completes, regardless of whether
bounds exist.
- Exposes` is_complete() f`unction on the `DynamicFilterPhysicalExpr`.

I didn't add any tests because the change is minimal , and comprehensive
testing would require making the `DynamicFilterPhysicalExpr` public or
running through the full optimizer pipeline.

Exposing is_complete() function.

(cherry picked from commit 7fa2a69)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants