Skip to content

Gene.bordegaray/2026/02/partition index dynamic filters#20331

Open
gene-bordegaray wants to merge 11 commits intoapache:mainfrom
gene-bordegaray:gene.bordegaray/2026/02/partition_index_dynamic_filters
Open

Gene.bordegaray/2026/02/partition index dynamic filters#20331
gene-bordegaray wants to merge 11 commits intoapache:mainfrom
gene-bordegaray:gene.bordegaray/2026/02/partition_index_dynamic_filters

Conversation

@gene-bordegaray
Copy link
Contributor

@gene-bordegaray gene-bordegaray commented Feb 12, 2026

Which issue does this PR close?

Closes #20195

Rationale for this change

Dynamic filter pushdown was completely when preserve_file_partitions on due to a correctness bug.

The Problem

When preserve_file_partitions enabled, DataFusion treats file groups as pre-partitioned data. Existing dynamic filtering used hash-based routing which is incompatible with the value-based partitioning that file groups are kept in:

Example:

Table partitioned by col_a:
- Partition 0: col_a = 'A'
- Partition 1: col_a = 'B'
- Partition 2: col_a = 'C'

Dimension Table: values = ['A', 'B']

SELECT * FROM large_table
JOIN small_table ON large_table.col_a = small_table.col_a

Hash routing doesn't work:
- Hash routing: hash('A') % 3 might map to partition 1 (not partition 0)
- File partitioning: 'A' data is in partition 0 (value-based)

For this reason was diabled, this PR re-enables it via PartitionIndex routing for dynamic filters.

What changes are included in this PR?

Partition-Indexed dynamic filtering

New routing mode that uses direct partition-to-partition mapping:

Build partition 0 → filters Probe partition 0
Build partition 1 → filters Probe partition 1
Build partition 2 → filters Probe partition 2

Example:

HashJoinExec: mode=Partitioned, routing=PartitionIndex, on=[col_a = col_b]
    DataSourceExec: table_large, file_groups={3: [col_a=A], [col_a=B], [col_a=C]} predicate=DynamicFilter[
        {0: col_a IN ['A','B']},  -- Partition 0 filtered
        {1: col_a IN ['A','B']},  -- Partition 1 filtered
       {2: col_a IN ['A','B']}   -- Partition 2 filtered (no matches, pruned)
    ]
    DataSourceExec: table_small, values: [col_b='A', col_b='B']

How it works:
- Build partition 0 (col_b='A') creates filter for probe partition 0 (col_a='A')
- Build partition 1 (col_b='B') creates filter for probe partition 1 (col_a='B')
- Probe partition 2 (col_a='C') gets pruned (no matching build partition)

Alignment Detection
Detects compatible partitioning to enable safe optimization:

  • Both sides file-grouped (value-based partitioning) -> PartitionIndex
  • Both sides hash-repartitioned (hash-based partitioning) -> CashHash
  • Both has different partitioning -> Error, this shouldn't happen and can cause incorrect results
    match (left.repartitioned, right.repartitioned) {

In the case there is a RepartitionExec in the path leading from the DataSourceExec to either the build or probe side of a Partitioned Hash Join -> Falls back to CaseHash.

The reason is RepartitionExec uses hash(value) % N to distribute rows, breaking the value-based partition alignment. When hash-partitioned, partition 0 no longer contains 'A' exclusively, breaking the partition index assumptions

With hash partitioning, use:

  CASE hash(col_a) % 4
    WHEN 0 THEN filter_partition_0
    WHEN 1 THEN filter_partition_1
    ...
  END

Are these changes tested?

sqlogictests: test_files/preserve_file_partitioning.slt
Integration tests: datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Unit tests: in effected files

Are there any user-facing changes?

Yes a new error message can appear is partition hash joins are not aligned properly and the dynamic filtering display for partition index is a but different then CASE routing.

cc: @NGA-TRAN @LiaCastaneda @adriangb @gabotechs

@github-actions github-actions bot added documentation Improvements or additions to documentation physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) common Related to common crate proto Related to proto crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Feb 12, 2026
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@0, a@0)]
- RepartitionExec: partitioning=Hash([b@0], 1), input_partitions=1
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no dynamic filter because its the build side of a build a build side...took me a second 😂

// its own filter.
predicate = predicate
.map(|p| snapshot_physical_expr_for_partition(p, partition_index))
.transpose()?;
Copy link
Contributor Author

@gene-bordegaray gene-bordegaray Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decided to only do this in the parquet opener, if we did for all files (by default) just do nothing since predicates aren't passed to other opneers. This does mean that users will have to implement this for their on data sources.

Given this is a large PR, didn't want to include logic for a fallback and doing nothing seemed out of place, could still reconsider if others have an opinion.

"hash-repartitioned"
} else {
"file-grouped"
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't love this, maybe could have a helper to map and make it cleaner

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think its a good idea

Copy link
Contributor

@LiaCastaneda LiaCastaneda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me and will be very helpful for use cases where we want to avoid repartitioning data. My only concern is that API users would need to align the probe and build side partitions, but this seems like a reasonable tradeoff. Let’s see what other contributors think. (this is a partial review I will finish later today or early next week) but until now it's looking good to me :)

Comment on lines +864 to +868
// One side starts with multiple partitions while target is 1. EnforceDistribution inserts a
// hash repartition on the left child. The partitioning schemes are now misaligned:
// - Left: hash-repartitioned (repartitioned=true)
// - Right: file-grouped (repartitioned=false)
// This is a correctness bug, so we expect an error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have the other way around as well? having a Join of type Partitioned and the left perserving file parttioning and the right having RepartitionExec.

let optimized = ensure_distribution_helper_transform_up(join, 1)?;
assert_plan!(optimized, @r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@1)]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to display if DataSourceExec is perserving partitioning? something like preserve_partitioning=[bool]? this may be useful for users to know why there is no RepartitionExec in the plan even if the mode is Partitioned

- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this was a Partitioned Join why was there no CASE dynamic filter before? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there is only one partition, thus it only applied one filter

@gene-bordegaray
Copy link
Contributor Author

gene-bordegaray commented Feb 13, 2026

This makes sense to me and will be very helpful for use cases where we want to avoid repartitioning data. My only concern is that API users would need to align the probe and build side partitions, but this seems like a reasonable tradeoff. Let’s see what other contributors think. (this is a partial review I will finish later today or early next week) but until now it's looking good to me :)

💯 thank you for the reviews
I know we have discussed this but want to document here, for the API it is clear that partitioning structure is a bit vague. I would like to start an effort to make partitioning a trait that will more clearly define how data is partitioned to eliminate the overload on Hash partitioning.

Copy link
Contributor

@LiaCastaneda LiaCastaneda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I think I'm done with my review, overall looks good, just some minor comments.

Comment on lines +1428 to +1430
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: would it be enough to assert on the plan after execution only on these kind of tests? this file is becoming increasingly large

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya I thought similarly. I wouldn't be opposed. I will let @adriangb or @NGA-TRAN weigh in their thoughs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree we only need to assert on the plan after execution only on these kind of tests

"hash-repartitioned"
} else {
"file-grouped"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think its a good idea

@LiaCastaneda
Copy link
Contributor

I would like to start an effort to make partitioning a trait that will more clearly define how data is partitioned to eliminate the overload on Hash partitioning.

We need to make sure that in the future it’s easy to revert or migrate users away from index-based routing to their custom Partitioning implementation. Since this does not introduce a new API, I don’t think it should be a problem. This was previously a bug, and with this PR dynamic filtering works, but it’s something to keep in mind.

Copy link
Contributor

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach looks great, Gene. Nice work!

I do have some suggestions on comments and test data to make things clearer for reviewers and future maintennce

/// │ mode=Partitioned │
/// │┌───────┐┌───────┐┌───────┐│
/// ││ Hash ││ Hash ││ Hash ││
/// ││Table 1││Table 2││Table 2││
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// ││Table 1││Table 2││Table 2││
/// ││Table 1││Table 2││Table 3││

)
}

fn first_hash_join_and_direct_hash_repartition_children(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this comment sound correct? Fix it as you see fit

Suggested change
fn first_hash_join_and_direct_hash_repartition_children(
// Traversing down the plan and returning the first hash join with direct repartition children
fn first_hash_join_and_direct_hash_repartition_children(

None
}

fn hash_repartition_on_column(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this function is to add RepartitionExec on top of the input plan. How about rename it to:

Suggested change
fn hash_repartition_on_column(
// Add RepartitionExec for the given input
fn add_repartition(

config.optimizer.enable_round_robin_repartition = false;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will be clearer if you add comments explaining why you need these settings and for which tests.

// Creates a DynamicFilterPhysicalExpr with per-partition bounds:
// - Partition 0: a >= 1 AND a <= 3 (matches all rows)
// - Partition 1: a >= 10 AND a <= 20 (excludes all rows via row group stats)
// - Partition 2: a >= 2 AND a <= 4 (matches some rows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So Partitions 0 and 2 overlap? Why you need this unit test? Haven's you already throw error if this happens in the query plan?

For unit test, we can make it whatever but I suggest we have the test that makes sense to us. Test non-overlapped partitions

/// - `Some(Some(expr))`: use the partition-local filter.
/// - `Some(None)`: the build partition is known empty, so return `false`.
/// - `None` (out-of-range): return `true` (fail-open) to avoid incorrect pruning if
/// partition alignment/count assumptions are violated by a source.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here describing what the returned value means? Something like this:

  • Ok(Expr) : dynamic filter expression will be used for the given partition
  • Ok(false): will filter everything on the probe side because the build side is empty
  • Ok(true): will not filter anything from the probe side and return as-is

@NGA-TRAN
Copy link
Contributor

@LiaCastaneda

My only concern is that API users would need to align the probe and build side partitions, but this seems like a reasonable tradeoff.

Right, usually when users decide to do this custom partitions, they must have a mechanism to enforce it. Thus, I do not think we need to worry about this at this dynamic filtering stage. We only need to provide a way to use the dynamic filtering correctly which is the purpose of this PR.

Comment on lines +50 to +51
/// Per-partition filter expressions indexed by partition number.
type PartitionedFilters = Vec<Option<Arc<dyn PhysicalExpr>>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one more question -- how would evaluation be done for PartitionedFilters?
My understanding is that each partition would need to first access its corresponding PhysicalExpr and then call evaluate() right? However, the evaluate() trait of PhysicalExpr has no partition number in the args, soevaluate()can't directly integrate PartitionedFilters.
The current evaluate() function remains the same and evaluates inner.expr, which, when we preserve file partitioning, holds nothing (just lit(true) placeholder).

Copy link
Contributor Author

@gene-bordegaray gene-bordegaray Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snapshotting happens before evaluation

The full path in chronological order is:

  1. ParquetOpener::open() is called
  2. snapshot_physical_expr_for_partition(predicate, partition_index) is called -> important to note that we pass the index
  3. snapshot_physical_expr_for_partition replaces the DynamicFilterPhysicalExpr with the filter for the partition on that index (this is a physical expr)
  4. evaluate() is called which uses the snapshot expr (not the DynamicFilterPhysicalExpr) and we don't need to know the partition parameter because it was already dealt with earlier

For the lit(true) concern, if has_partitioned_filters() returns false during snapshotting then we will fallback to lit(true) which yes then we won't evaluate anything. But this is ok behavior because its ok to do this rather than error

The shouldn't happen because, the we wait until the build side is complete before we snapshot so we should always resolve to true.

Maybe to be safe would be good to add a debug statement if has_partitioned_filters() returns false.

Lmk if this makes sense 🙂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation optimizer Optimizer rules physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate proto Related to proto crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

support dynamic filtering on partitioned data from file source

3 participants