Skip to content

Conversation

@LiaCastaneda
Copy link
Contributor

@LiaCastaneda LiaCastaneda commented Nov 26, 2025

Which issue does this PR close?

Closes #17527

Rationale for this change

Currently, DataFusion computes bounds for all queries that contain a HashJoinExec node whenever the option enable_dynamic_filter_pushdown is set to true (default). It might make sense to compute these bounds only when we explicitly know there is a consumer that will use them.

What changes are included in this PR?

As suggested in #17527 (comment), this PR adds an is_used() method to DynamicFilterPhysicalExpr that checks if any consumers are holding a reference to the filter using Arc::strong_count().

During filter pushdown, consumers that accept the filter in order to replace the node with the current filter and later use it execution, so a reference to Arc is retained. For example here it will keep a reference to the filters in the node.

Are these changes tested?

I added a unit test in dynamic_filters.rs (test_is_used) that verifies the Arc reference counting behavior.
Existing integration tests in datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs validate the end-to-end behavior. These tests verify that dynamic filters are computed and filled when consumers are present.

Are there any user-facing changes?

new is_used() function

@github-actions github-actions bot added physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate labels Nov 26, 2025
@LiaCastaneda LiaCastaneda force-pushed the lia/compute-dyn-filters-only-when-consumer-asks-for-it branch from ebc401a to 3b31893 Compare November 26, 2025 10:40
Comment on lines +302 to +305
pub fn is_used(self: &Arc<Self>) -> bool {
// Strong count > 1 means at least one consumer is holding a reference beyond the producer.
Arc::strong_count(self) > 1
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure how to test a condition where is_used() returns false without adding too much machinery or making the dynamic_filter attribute from HashJoin public which would make it easy to mess with the Arc reference count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can’t you just make a new DynamicFilterPhysicalExpr and check that is_used is False?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you have a test already?

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is draft but the current code looks good to me, I’ll approve once it’s ready :)

@LiaCastaneda
Copy link
Contributor Author

LiaCastaneda commented Nov 26, 2025

Something funky is going on here with the Arc count, some queries are not pushing down the filter to the probe because Arc count remains at 1 -> Arc count=1 in execution time, will take a look...

It doesn't happen in all the tests though, which is strange

Edit: For the tests that fails seems like partition 0 never gets to see a strong_count >1

[partition 0] Arc count at execution: 1
  [partition 0] Is used: false
  [partition 1] Arc count at execution: 2
  [partition 1] Is used: true
  [partition 2] Arc count at execution: 2
  [partition 2] Is used: true
  [partition 3] Arc count at execution: 2
  [partition 3] Is used: true
  [partition 4] Arc count at execution: 2
  [partition 4] Is used: true
  [partition 5] Arc count at execution: 2
  [partition 5] Is used: true
  [partition 6] Arc count at execution: 2
  [partition 6] Is used: true
  [partition 7] Arc count at execution: 2
  [partition 7] Is used: true
  [partition 8] Arc count at execution: 2
  [partition 8] Is used: true
  [partition 9] Arc count at execution: 2
  [partition 9] Is used: true
  [partition 10] Arc count at execution: 2
  [partition 10] Is used: true
  [partition 11] Arc count at execution: 2
  [partition 11] Is used: true

I think this might be because we clone the DynamicFilterPhysicalExpr directly in the execute phase of DataSourceExec (at least for this kind of node)

In any case, seems like the strong_count approach might hit some edge cases here...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Only compute bounds/ dynamic filters if consumer asks for it

2 participants