From 49d9916761af4824334fc854181db25730841bad Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Apr 2026 08:53:20 -0500 Subject: [PATCH 1/6] perf: collapse all-Map dynamic filter into MultiMapLookupExpr In Partitioned-mode HashJoinExec, when every reported partition's build side uses a hash-table strategy, replace the routing CASE expression (`CASE hash_repartition % N WHEN p THEN bounds AND hash_lookup ELSE false END`) with `global_minmax AND multi_hash_lookup`. The new MultiMapLookupExpr hashes the join keys once with HASH_JOIN_SEED and ORs `contain_hashes()` across every partition's hash table, eliminating both the routing-hash computation and the per-branch re-hashing that CaseExpr does. Any non-Map partition (InList, Empty) disqualifies the fast path and we use the legacy CASE unchanged; same for partitions that were canceled before reporting build data. Benchmarks (TPC-H SF=1, 7 iters back-to-back): TOTAL min vs no-DF: legacy CASE: +3.0% multi_hash_lookup: +1.6% (~halves the regression) Per-query (multi_hash_lookup vs CASE): Q4 -6.0% Q5 -3.3% Q7 -6.0% Q8 -4.0% Q9 -3.5% Q12 -3.2% Q17 -1.6% Q21 -3.8% Refs: https://github.com/apache/datafusion/issues/19858 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../physical_optimizer/filter_pushdown.rs | 13 +- .../joins/hash_join/partitioned_hash_eval.rs | 199 ++++++++++++- .../src/joins/hash_join/shared_bounds.rs | 264 +++++++++++++----- 3 files changed, 406 insertions(+), 70 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 4ff1fad8f52b9..9864cb7f66504 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -2572,11 +2572,18 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { .await .unwrap(); - // Verify that hash_lookup is used instead of IN (SET) + // Verify the all-Map fast path collapses per-partition routing into a + // single shared `multi_hash_lookup` rather than a + // `CASE hash_repartition % N WHEN p THEN hash_lookup ELSE false END` + // expression. let plan_str = format_plan_for_test(&plan).to_string(); assert!( - plan_str.contains("hash_lookup"), - "Expected hash_lookup in plan but got: {plan_str}" + plan_str.contains("multi_hash_lookup"), + "Expected multi_hash_lookup in plan but got: {plan_str}" + ); + assert!( + !plan_str.contains("hash_repartition"), + "Expected no routing hash_repartition in plan but got: {plan_str}" ); assert!( !plan_str.contains("IN (SET)"), diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 0daac0bb86a75..23bcf38d30f61 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -20,7 +20,7 @@ use std::{fmt::Display, hash::Hash, sync::Arc}; use arrow::{ - array::{ArrayRef, UInt64Array}, + array::{ArrayRef, BooleanArray, BooleanBufferBuilder, UInt64Array}, datatypes::{DataType, Schema}, record_batch::RecordBatch, }; @@ -343,6 +343,203 @@ impl PhysicalExpr for HashTableLookupExpr { } } +/// Physical expression that probes the same join keys against multiple [`Map`]s +/// and returns `true` for any row whose join keys match at least one map. +/// +/// Equivalent to `OR`-ing several [`HashTableLookupExpr`]s but evaluates +/// `create_hashes` exactly once for the whole batch — every [`Map::HashMap`] +/// probe shares that hash buffer. Used for the global-first dynamic filter +/// when every reported partition uses a hash-table pushdown strategy. +pub struct MultiMapLookupExpr { + /// Columns in the ON clause used to compute the join key for lookups + on_columns: Vec, + /// Random state for hashing — every map must have been built with the + /// same `RandomState`, otherwise the shared hash buffer is meaningless. + random_state: SeededRandomState, + /// Maps to OR over (each is one partition's build-side data) + maps: Vec>, + /// Description for display + description: String, +} + +impl MultiMapLookupExpr { + /// Create a new MultiMapLookupExpr. + /// + /// `maps` is the (per-partition) sequence of build-side maps to probe. + /// All `Map::HashMap` entries are expected to use the same `random_state`; + /// `Map::ArrayMap` entries do not consume hashes and are queried via + /// `contain_keys`. + pub fn new( + on_columns: Vec, + random_state: SeededRandomState, + maps: Vec>, + description: String, + ) -> Self { + Self { + on_columns, + random_state, + maps, + description, + } + } +} + +impl std::fmt::Debug for MultiMapLookupExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let cols = self + .on_columns + .iter() + .map(|e| e.to_string()) + .collect::>() + .join(", "); + write!( + f, + "{}({cols}, [{}], maps={})", + self.description, + self.random_state.seed(), + self.maps.len() + ) + } +} + +impl Hash for MultiMapLookupExpr { + fn hash(&self, state: &mut H) { + self.on_columns.dyn_hash(state); + self.description.hash(state); + self.random_state.seed().hash(state); + // See HashTableLookupExpr — pointer identity is what we use for Maps. + for map in &self.maps { + Arc::as_ptr(map).hash(state); + } + } +} + +impl PartialEq for MultiMapLookupExpr { + fn eq(&self, other: &Self) -> bool { + self.on_columns == other.on_columns + && self.description == other.description + && self.random_state.seed() == other.random_state.seed() + && self.maps.len() == other.maps.len() + && self + .maps + .iter() + .zip(other.maps.iter()) + .all(|(a, b)| Arc::ptr_eq(a, b)) + } +} + +impl Eq for MultiMapLookupExpr {} + +impl Display for MultiMapLookupExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.description) + } +} + +impl PhysicalExpr for MultiMapLookupExpr { + fn children(&self) -> Vec<&Arc> { + self.on_columns.iter().collect() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(MultiMapLookupExpr::new( + children, + self.random_state.clone(), + self.maps.clone(), + self.description.clone(), + ))) + } + + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::Boolean) + } + + fn nullable(&self, _input_schema: &Schema) -> Result { + Ok(false) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + let num_rows = batch.num_rows(); + let join_keys = evaluate_columns(&self.on_columns, batch)?; + + if self.maps.is_empty() || num_rows == 0 { + // No maps to probe — this should not happen in practice but + // returning all-false matches the semantics of an empty `OR`. + let buffer = BooleanBufferBuilder::new(num_rows); + let mut buffer = buffer; + buffer.append_n(num_rows, false); + return Ok(ColumnarValue::Array(Arc::new(BooleanArray::new( + buffer.finish(), + None, + )))); + } + + // Whether any map needs hashes. We only compute hashes if at least + // one map is a HashMap. + let needs_hashes = self + .maps + .iter() + .any(|m| matches!(m.as_ref(), Map::HashMap(_))); + + // Result buffer accumulates the OR of every map's `contain_*` result. + let mut result = vec![false; num_rows]; + + let process_one_map = + |result: &mut [bool], map: &Map, hashes: Option<&[u64]>| -> Result<()> { + match map { + Map::HashMap(hm) => { + let hashes = hashes + .expect("hashes computed when at least one map is a HashMap"); + let arr = hm.contain_hashes(hashes); + // OR into the running result. `arr` has no nulls + // (`contain_hashes` returns a non-nullable BooleanArray). + for (slot, hit) in result.iter_mut().zip(arr.values().iter()) { + *slot |= hit; + } + } + Map::ArrayMap(am) => { + let arr = am.contain_keys(&join_keys)?; + for (slot, hit) in result.iter_mut().zip(arr.values().iter()) { + *slot |= hit; + } + } + } + Ok(()) + }; + + if needs_hashes { + // Compute the join-key hashes ONCE, then probe every HashMap + // against the same buffer. ArrayMap probes ignore the hashes. + with_hashes(&join_keys, self.random_state.random_state(), |hashes| { + for map in &self.maps { + process_one_map(&mut result, map, Some(hashes))?; + } + Ok::<(), datafusion_common::DataFusionError>(()) + })?; + } else { + for map in &self.maps { + process_one_map(&mut result, map, None)?; + } + } + + let mut buffer = BooleanBufferBuilder::new(num_rows); + for hit in &result { + buffer.append(*hit); + } + Ok(ColumnarValue::Array(Arc::new(BooleanArray::new( + buffer.finish(), + None, + )))) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.description) + } +} + fn evaluate_columns( columns: &[PhysicalExprRef], batch: &RecordBatch, diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 23ca14f5ba406..97ae29bfcd453 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -28,7 +28,7 @@ use crate::joins::PartitionMode; use crate::joins::hash_join::exec::HASH_JOIN_SEED; use crate::joins::hash_join::inlist_builder::build_struct_fields; use crate::joins::hash_join::partitioned_hash_eval::{ - HashExpr, HashTableLookupExpr, SeededRandomState, + HashExpr, HashTableLookupExpr, MultiMapLookupExpr, SeededRandomState, }; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; @@ -202,6 +202,52 @@ fn combine_membership_and_bounds( } } +/// Compute the global (envelope) min/max bounds across a set of partition bounds. +/// +/// For each column index, returns the smallest min seen and the largest max seen. +/// Columns where any partition is missing bounds, or where bounds are not totally +/// ordered (e.g. mixed-type comparisons), are dropped from the global envelope. +fn compute_global_bounds(per_partition: &[&PartitionBounds]) -> Option { + let mut iter = per_partition.iter(); + let first = iter.next()?; + let mut acc: Vec> = first + .column_bounds + .iter() + .map(|cb| Some(cb.clone())) + .collect(); + + for partition in iter { + if partition.column_bounds.len() != acc.len() { + return None; + } + for (slot, cb) in acc.iter_mut().zip(partition.column_bounds.iter()) { + let Some(existing) = slot.as_mut() else { + continue; + }; + match cb.min.partial_cmp(&existing.min) { + Some(std::cmp::Ordering::Less) => existing.min = cb.min.clone(), + Some(_) => {} + None => { + *slot = None; + continue; + } + } + match cb.max.partial_cmp(&existing.max) { + Some(std::cmp::Ordering::Greater) => existing.max = cb.max.clone(), + Some(_) => {} + None => *slot = None, + } + } + } + + let merged: Vec = acc.into_iter().flatten().collect(); + if merged.is_empty() { + None + } else { + Some(PartitionBounds::new(merged)) + } +} + /// Coordinates build-side information collection across multiple partitions /// /// This structure collects information from the build side (hash tables and/or bounds) and @@ -594,19 +640,8 @@ impl SharedBuildAccumulator { }, FinalizeInput::Partitioned(partitions) => { let num_partitions = partitions.len(); - let routing_hash_expr = Arc::new(HashExpr::new( - self.on_right.clone(), - self.repartition_random_state.clone(), - "hash_repartition".to_string(), - )) as Arc; - - let modulo_expr = Arc::new(BinaryExpr::new( - routing_hash_expr, - Operator::Modulo, - lit(ScalarValue::UInt64(Some(num_partitions as u64))), - )) as Arc; - - let mut real_branches = Vec::new(); + + let mut real_partitions: Vec<(usize, &PartitionData)> = Vec::new(); let mut empty_partition_ids = Vec::new(); let mut has_canceled_unknown = false; @@ -618,25 +653,7 @@ impl SharedBuildAccumulator { empty_partition_ids.push(partition_id); } PartitionStatus::Reported(partition) => { - let membership_expr = create_membership_predicate( - &self.on_right, - partition.pushdown.clone(), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )?; - let bounds_expr = create_bounds_predicate( - &self.on_right, - &partition.bounds, - ); - let then_expr = combine_membership_and_bounds( - membership_expr, - bounds_expr, - ) - .unwrap_or_else(|| lit(true)); - real_branches.push(( - lit(ScalarValue::UInt64(Some(partition_id as u64))), - then_expr, - )); + real_partitions.push((partition_id, partition)); } PartitionStatus::CanceledUnknown => { has_canceled_unknown = true; @@ -649,40 +666,57 @@ impl SharedBuildAccumulator { } } - let filter_expr = if has_canceled_unknown { - let mut when_then_branches = empty_partition_ids - .into_iter() - .map(|partition_id| { - ( - lit(ScalarValue::UInt64(Some(partition_id as u64))), - lit(false), - ) - }) - .collect::>(); - when_then_branches.extend(real_branches); - - if when_then_branches.is_empty() { - lit(true) - } else { - Arc::new(CaseExpr::try_new( - Some(modulo_expr), - when_then_branches, - Some(lit(true)), - )?) as Arc + // Fast path: when no partition is canceled and every reported + // partition uses a hash-table strategy, replace the routing + // CASE expression with `global_minmax AND multi_hash_lookup`. + // The shared `MultiMapLookupExpr` hashes the join keys once + // (with `HASH_JOIN_SEED`) and ORs `contain_hashes()` across + // every partition's hash table, eliminating both the routing + // hash and the per-branch re-hashing inside `CaseExpr`. Any + // canceled partition or any non-Map partition disqualifies the + // fast path and we use the legacy routing CASE. + if !has_canceled_unknown && !real_partitions.is_empty() { + let mut maps: Vec> = + Vec::with_capacity(real_partitions.len()); + let mut all_map = true; + for (_, p) in &real_partitions { + match &p.pushdown { + PushdownStrategy::Map(m) => maps.push(Arc::clone(m)), + PushdownStrategy::InList(_) | PushdownStrategy::Empty => { + all_map = false; + break; + } + } + } + if all_map && !maps.is_empty() { + let bounds_refs: Vec<&PartitionBounds> = + real_partitions.iter().map(|(_, p)| &p.bounds).collect(); + let global_bounds_expr = compute_global_bounds(&bounds_refs) + .as_ref() + .and_then(|b| create_bounds_predicate(&self.on_right, b)); + let multi_lookup = Arc::new(MultiMapLookupExpr::new( + self.on_right.clone(), + HASH_JOIN_SEED.clone(), + maps, + "multi_hash_lookup".to_string(), + )) + as Arc; + let filter_expr = combine_membership_and_bounds( + Some(multi_lookup), + global_bounds_expr, + ) + .unwrap_or_else(|| lit(true)); + self.dynamic_filter.update(filter_expr)?; + return Ok(()); } - } else if real_branches.is_empty() { - lit(false) - } else if real_branches.len() == 1 - && empty_partition_ids.len() + 1 == num_partitions - { - Arc::clone(&real_branches[0].1) - } else { - Arc::new(CaseExpr::try_new( - Some(modulo_expr), - real_branches, - Some(lit(false)), - )?) as Arc - }; + } + + let filter_expr = self.build_case_routing_filter( + num_partitions, + &real_partitions, + &empty_partition_ids, + has_canceled_unknown, + )?; self.dynamic_filter.update(filter_expr)?; } @@ -690,6 +724,84 @@ impl SharedBuildAccumulator { Ok(()) } + + /// Build the per-partition `CASE (hash_repartition % N) WHEN p THEN + /// per_partition_filter ELSE … END` routing expression. Used as the fallback + /// when the all-Map fast path doesn't apply (any InList/Empty partition, + /// any canceled partition, or no real partitions). + fn build_case_routing_filter( + &self, + num_partitions: usize, + real_partitions: &[(usize, &PartitionData)], + empty_partition_ids: &[usize], + has_canceled_unknown: bool, + ) -> Result> { + let routing_hash_expr = Arc::new(HashExpr::new( + self.on_right.clone(), + self.repartition_random_state.clone(), + "hash_repartition".to_string(), + )) as Arc; + + let modulo_expr = Arc::new(BinaryExpr::new( + routing_hash_expr, + Operator::Modulo, + lit(ScalarValue::UInt64(Some(num_partitions as u64))), + )) as Arc; + + let mut real_branches = Vec::with_capacity(real_partitions.len()); + for (partition_id, partition) in real_partitions { + let membership_expr = create_membership_predicate( + &self.on_right, + partition.pushdown.clone(), + &HASH_JOIN_SEED, + self.probe_schema.as_ref(), + )?; + let bounds_expr = create_bounds_predicate(&self.on_right, &partition.bounds); + let then_expr = combine_membership_and_bounds(membership_expr, bounds_expr) + .unwrap_or_else(|| lit(true)); + real_branches.push(( + lit(ScalarValue::UInt64(Some(*partition_id as u64))), + then_expr, + )); + } + + let filter_expr = if has_canceled_unknown { + let mut when_then_branches = empty_partition_ids + .iter() + .map(|partition_id| { + ( + lit(ScalarValue::UInt64(Some(*partition_id as u64))), + lit(false), + ) + }) + .collect::>(); + when_then_branches.extend(real_branches); + + if when_then_branches.is_empty() { + lit(true) + } else { + Arc::new(CaseExpr::try_new( + Some(modulo_expr), + when_then_branches, + Some(lit(true)), + )?) as Arc + } + } else if real_branches.is_empty() { + lit(false) + } else if real_branches.len() == 1 + && empty_partition_ids.len() + 1 == num_partitions + { + Arc::clone(&real_branches[0].1) + } else { + Arc::new(CaseExpr::try_new( + Some(modulo_expr), + real_branches, + Some(lit(false)), + )?) as Arc + }; + + Ok(filter_expr) + } } impl fmt::Debug for SharedBuildAccumulator { @@ -793,4 +905,24 @@ mod tests { assert!(matches!(partitions[0], PartitionStatus::CanceledUnknown)); assert_eq!(completed, 1); } + + #[test] + fn compute_global_bounds_takes_envelope() { + let p1 = PartitionBounds::new(vec![ColumnBounds::new( + ScalarValue::Int32(Some(5)), + ScalarValue::Int32(Some(10)), + )]); + let p2 = PartitionBounds::new(vec![ColumnBounds::new( + ScalarValue::Int32(Some(3)), + ScalarValue::Int32(Some(7)), + )]); + let p3 = PartitionBounds::new(vec![ColumnBounds::new( + ScalarValue::Int32(Some(8)), + ScalarValue::Int32(Some(20)), + )]); + let global = compute_global_bounds(&[&p1, &p2, &p3]).unwrap(); + let cb = global.get_column_bounds(0).unwrap(); + assert_eq!(cb.min, ScalarValue::Int32(Some(3))); + assert_eq!(cb.max, ScalarValue::Int32(Some(20))); + } } From 7adc8aa28c5faff362a92123aee611b53e2d1cc0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Apr 2026 09:18:56 -0500 Subject: [PATCH 2/6] perf: collapse small all-InList dynamic filter into one cross-partition IN (SET) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When every reported partition for a Partitioned hash join uses InList pushdown and the cross-partition union would be ≤ 20 array entries, concatenate the per-partition `ArrayRef`s and emit `global_minmax AND struct(c0,c1,…) IN (SET)` instead of the routing CASE. The cap is set so the merged set can participate in parquet stats / bloom-filter pruning at the scan, which a per-partition CASE or a `multi_hash_lookup` cannot. A TPC-H SF=1 cap sweep (cap=20/50/100/200/2000) confirmed 20–50 is the sweet spot — past ~200 the larger static_filter hash set blows out of L1 and runtime regresses below the legacy CASE. The tightened path also subsumes the `force_hash_collisions` optimization (when the runtime collapses every key into one partition we get the same shape, just for a different reason) so both `#[cfg]` snapshot branches in test_hashjoin_dynamic_filter_pushdown_partitioned now produce the merged `IN (SET)` form. Refs: https://github.com/apache/datafusion/issues/19858 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../physical_optimizer/filter_pushdown.rs | 15 +-- .../src/joins/hash_join/shared_bounds.rs | 98 +++++++++++++++---- 2 files changed, 89 insertions(+), 24 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 9864cb7f66504..18359a56f9be6 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1072,7 +1072,14 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { .await .unwrap(); - // Now check what our filter looks like + // Now check what our filter looks like. When the cross-partition InList + // is small enough (≤ MERGED_INLIST_MAX_TOTAL_LEN) we collapse it into a + // single global `IN (SET)` regardless of how the data was repartitioned — + // this lets the merged set participate in parquet stats / bloom-filter + // pruning at the scan, which a per-partition `CASE` could not. Both the + // normal repartition and the `force_hash_collisions` path produce the + // same logical shape; they only differ in the partition-iteration order + // that controls the InList element order. #[cfg(not(feature = "force_hash_collisions"))] insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @@ -1083,14 +1090,10 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 5 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 8 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}, {c0:aa,c1:ba}]) ] " ); - // When hash collisions force all data into a single partition, we optimize away the CASE expression. - // This avoids calling create_hashes() for every row on the probe side, since hash % 1 == 0 always, - // meaning the WHEN 0 branch would always match. This optimization is also important for primary key - // joins or any scenario where all build-side data naturally lands in one partition. #[cfg(feature = "force_hash_collisions")] insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 97ae29bfcd453..09695df2515aa 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -30,7 +30,8 @@ use crate::joins::hash_join::inlist_builder::build_struct_fields; use crate::joins::hash_join::partitioned_hash_eval::{ HashExpr, HashTableLookupExpr, MultiMapLookupExpr, SeededRandomState, }; -use arrow::array::ArrayRef; +use arrow::array::{Array, ArrayRef}; +use arrow::compute::concat; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult}; @@ -248,6 +249,18 @@ fn compute_global_bounds(per_partition: &[&PartitionBounds]) -> Option Option { + let first = arrays.first()?; + if arrays.iter().any(|a| a.data_type() != first.data_type()) { + return None; + } + let array_refs: Vec<&dyn Array> = arrays.iter().map(|a| a.as_ref()).collect(); + concat(&array_refs).ok() +} + /// Coordinates build-side information collection across multiple partitions /// /// This structure collects information from the build side (hash tables and/or bounds) and @@ -666,34 +679,83 @@ impl SharedBuildAccumulator { } } - // Fast path: when no partition is canceled and every reported - // partition uses a hash-table strategy, replace the routing - // CASE expression with `global_minmax AND multi_hash_lookup`. - // The shared `MultiMapLookupExpr` hashes the join keys once - // (with `HASH_JOIN_SEED`) and ORs `contain_hashes()` across - // every partition's hash table, eliminating both the routing - // hash and the per-branch re-hashing inside `CaseExpr`. Any - // canceled partition or any non-Map partition disqualifies the - // fast path and we use the legacy routing CASE. + // Fast paths: when no partition is canceled, classify the + // reported partitions and route to a shape that drops the + // routing CASE. Two shapes today: + // 1. All-Map → `global_minmax AND multi_hash_lookup`. Hashes + // the join keys once and ORs contain_hashes across every + // partition's hash table. + // 2. All-InList with a small enough cross-partition union + // → `global_minmax AND merged_in_list`. Worth taking + // because a small global InList can participate in + // parquet stats / bloom-filter pruning at the scan, + // which a `multi_hash_lookup` can't. + // Anything else (canceled, mixed, or merged InList over the + // cap) falls through to the legacy routing CASE. if !has_canceled_unknown && !real_partitions.is_empty() { let mut maps: Vec> = Vec::with_capacity(real_partitions.len()); + let mut inlist_arrays: Vec = + Vec::with_capacity(real_partitions.len()); let mut all_map = true; + let mut all_inlist = true; + let mut total_inlist_len: usize = 0; for (_, p) in &real_partitions { match &p.pushdown { - PushdownStrategy::Map(m) => maps.push(Arc::clone(m)), - PushdownStrategy::InList(_) | PushdownStrategy::Empty => { + PushdownStrategy::Map(m) => { + maps.push(Arc::clone(m)); + all_inlist = false; + } + PushdownStrategy::InList(arr) => { + total_inlist_len += arr.len(); + inlist_arrays.push(Arc::clone(arr)); all_map = false; - break; + } + PushdownStrategy::Empty => { + all_map = false; + all_inlist = false; } } } + + let bounds_refs: Vec<&PartitionBounds> = + real_partitions.iter().map(|(_, p)| &p.bounds).collect(); + let global_bounds_expr = compute_global_bounds(&bounds_refs) + .as_ref() + .and_then(|b| create_bounds_predicate(&self.on_right, b)); + + // Threshold on the cross-partition merged InList. Two + // reasons for keeping it small: + // 1. Below ~this many values the merged `IN (SET)` can + // participate in parquet stats / bloom-filter + // pruning at the scan, which a `multi_hash_lookup` + // cannot — that's the headline win. + // 2. Past this size, runtime regresses vs. the routing + // CASE (the larger static_filter hash set blows out + // of L1 and the win flips). On TPC-H SF=1 a sweep + // from 20 to 2000 picked 20–50 as the sweet spot. + const MERGED_INLIST_MAX_TOTAL_LEN: usize = 20; + + if all_inlist + && total_inlist_len <= MERGED_INLIST_MAX_TOTAL_LEN + && let Some(merged) = merge_inlist_arrays(&inlist_arrays) + && let Some(membership) = create_membership_predicate( + &self.on_right, + PushdownStrategy::InList(merged), + &HASH_JOIN_SEED, + self.probe_schema.as_ref(), + )? + { + let filter_expr = combine_membership_and_bounds( + Some(membership), + global_bounds_expr, + ) + .unwrap_or_else(|| lit(true)); + self.dynamic_filter.update(filter_expr)?; + return Ok(()); + } + if all_map && !maps.is_empty() { - let bounds_refs: Vec<&PartitionBounds> = - real_partitions.iter().map(|(_, p)| &p.bounds).collect(); - let global_bounds_expr = compute_global_bounds(&bounds_refs) - .as_ref() - .and_then(|b| create_bounds_predicate(&self.on_right, b)); let multi_lookup = Arc::new(MultiMapLookupExpr::new( self.on_right.clone(), HASH_JOIN_SEED.clone(), From 97d8429432ede9c11bb1daa7cb3c796438d46c42 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Apr 2026 09:56:38 -0500 Subject: [PATCH 3/6] refactor: drop CASE routing from Partitioned dynamic filters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Decouple the dynamic filter from the build-side repartition strategy entirely. The filter for `PartitionMode::Partitioned` is now always `global_minmax AND (merged_in_list | multi_hash_lookup)`, regardless of whether individual partitions chose Map or InList for pushdown: * `merged_in_list` fires when every reported partition contributed an InList array AND the cross-partition union stays within `MERGED_INLIST_MAX_TOTAL_LEN` (= 20). This is the path that participates in parquet stats / bloom-filter pruning. * Otherwise `multi_hash_lookup` probes every partition's hash table in one shared hashing pass. Key change: `PushdownStrategy` is now a struct that always carries the `Map` (the join's hash table is built unconditionally) plus an optional InList array. With the map always available we don't need a per-row `CASE hash_repartition % N WHEN p THEN per_partition_filter ELSE …` expression to route rows to the right partition's data — the shared multi-map probe finds matches in whichever partition holds them. Removed: * `build_case_routing_filter` (~70 LoC) and its `CaseExpr` / `HashExpr` plumbing in shared_bounds * `repartition_random_state` field on `SharedBuildAccumulator` * The `REPARTITION_RANDOM_STATE` import in `exec.rs` * Conditional `force_hash_collisions` snapshot — both the normal and force-collision paths now produce the same shape The canceled-partition fallback collapses to `lit(true)`: with a canceled partition we don't have its map, so we can't include it in multi_hash_lookup; emitting the no-op filter is safe (correctness is preserved) and the query is in the middle of being torn down anyway. Costs: TPC-H SF=1 shows a small (≈0.5–2pp) regression vs the multimap+CASE-fallback design on noisy back-to-back runs — the moderate-InList shape (Q11/Q14 etc.) used to use small per-partition InLists inside CASE; now those joins use multi_hash_lookup. Per the issue discussion the simplification is the goal even when there's no perf win. Refs: https://github.com/apache/datafusion/issues/19858 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../physical-plan/src/joins/hash_join/exec.rs | 38 +- .../src/joins/hash_join/shared_bounds.rs | 484 +++++++++--------- 2 files changed, 247 insertions(+), 275 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 4ebbf7cb31ccf..416a2025780d6 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -51,7 +51,6 @@ use crate::projection::{ EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection, try_pushdown_through_join, }; -use crate::repartition::REPARTITION_RANDOM_STATE; use crate::spill::get_record_batch_memory_size; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, @@ -1316,9 +1315,9 @@ impl ExecutionPlan for HashJoinExec { .with_category(MetricCategory::Rows) .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition); - // Initialize build_accumulator lazily with runtime partition counts (only if enabled) - // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing - let repartition_random_state = REPARTITION_RANDOM_STATE; + // Initialize build_accumulator lazily with runtime partition counts + // (only if enabled). The dynamic filter no longer routes by repartition + // hash, so REPARTITION_RANDOM_STATE is not needed here. let build_accumulator = enable_dynamic_filter_pushdown .then(|| { self.dynamic_filter.as_ref().map(|df| { @@ -1335,7 +1334,6 @@ impl ExecutionPlan for HashJoinExec { self.right.as_ref(), filter, on_right, - repartition_random_state, )) }))) }) @@ -2038,29 +2036,31 @@ async fn collect_left_input( let map = Arc::new(join_hash_map); + // The `Map` is always built (the join itself uses it). The optional + // `inlist` array is set when the build side fit under the per-partition + // InList caps — that's our signal that this partition's keys are small + // enough to participate in parquet stats / bloom-filter pruning when + // collapsed across partitions on the probe-side scan. let membership = if num_rows == 0 { - PushdownStrategy::Empty + PushdownStrategy::empty() } else { - // If the build side is small enough we can use IN list pushdown. - // If it's too big we fall back to pushing down a reference to the hash table. - // See `PushdownStrategy` for more details. let estimated_size = left_values .iter() .map(|arr| arr.get_array_memory_size()) .sum::(); - if left_values.is_empty() - || left_values[0].is_empty() - || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size - || map.num_of_distinct_key() - > config + let fits_inlist = !left_values.is_empty() + && !left_values[0].is_empty() + && estimated_size <= config.optimizer.hash_join_inlist_pushdown_max_size + && map.num_of_distinct_key() + <= config .optimizer - .hash_join_inlist_pushdown_max_distinct_values + .hash_join_inlist_pushdown_max_distinct_values; + if fits_inlist + && let Some(in_list_values) = build_struct_inlist_values(&left_values)? { - PushdownStrategy::Map(Arc::clone(&map)) - } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? { - PushdownStrategy::InList(in_list_values) + PushdownStrategy::from_map_and_inlist(Arc::clone(&map), in_list_values) } else { - PushdownStrategy::Map(Arc::clone(&map)) + PushdownStrategy::from_map(Arc::clone(&map)) } }; diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 09695df2515aa..a14cc4e509d6a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -28,7 +28,7 @@ use crate::joins::PartitionMode; use crate::joins::hash_join::exec::HASH_JOIN_SEED; use crate::joins::hash_join::inlist_builder::build_struct_fields; use crate::joins::hash_join::partitioned_hash_eval::{ - HashExpr, HashTableLookupExpr, MultiMapLookupExpr, SeededRandomState, + HashTableLookupExpr, MultiMapLookupExpr, SeededRandomState, }; use arrow::array::{Array, ArrayRef}; use arrow::compute::concat; @@ -38,7 +38,7 @@ use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult}; use datafusion_expr::Operator; use datafusion_functions::core::r#struct as struct_func; use datafusion_physical_expr::expressions::{ - BinaryExpr, CaseExpr, DynamicFilterPhysicalExpr, InListExpr, lit, + BinaryExpr, DynamicFilterPhysicalExpr, InListExpr, lit, }; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; @@ -80,64 +80,56 @@ impl PartitionBounds { } } -/// Creates a membership predicate for filter pushdown. -/// -/// If `inlist_values` is provided (for small build sides), creates an InList expression. -/// Otherwise, creates a HashTableLookup expression (for large build sides). -/// -/// Supports both single-column and multi-column joins using struct expressions. -fn create_membership_predicate( +/// Build a `IN (SET)` predicate over `on_right` from a deduplicated build-side +/// array. Single-column joins compare scalars directly; multi-column joins +/// wrap the right-hand columns in a `struct(...)` to match the build side's +/// shape. +fn create_inlist_predicate( on_right: &[PhysicalExprRef], - pushdown: PushdownStrategy, - random_state: &SeededRandomState, + in_list_array: ArrayRef, schema: &Schema, -) -> Result>> { - match pushdown { - // Use InList expression for small build sides - PushdownStrategy::InList(in_list_array) => { - // Build the expression to compare against - let expr = if on_right.len() == 1 { - // Single column: col IN (val1, val2, ...) - Arc::clone(&on_right[0]) - } else { - let fields = build_struct_fields( - on_right - .iter() - .map(|r| r.data_type(schema)) - .collect::>>()? - .as_ref(), - )?; - - // The return field name and the function field name don't really matter here. - let return_field = - Arc::new(Field::new("struct", DataType::Struct(fields), true)); - - Arc::new(ScalarFunctionExpr::new( - "struct", - struct_func(), - on_right.to_vec(), - return_field, - Arc::new(ConfigOptions::default()), - )) as Arc - }; - - // Use in_list_from_array() helper to create InList with static_filter optimization (hash-based lookup) - Ok(Some(Arc::new(InListExpr::try_new_from_array( - expr, - in_list_array, - false, - )?))) - } - // Use hash table lookup for large build sides - PushdownStrategy::Map(hash_map) => Ok(Some(Arc::new(HashTableLookupExpr::new( +) -> Result> { + let expr = if on_right.len() == 1 { + Arc::clone(&on_right[0]) + } else { + let fields = build_struct_fields( + on_right + .iter() + .map(|r| r.data_type(schema)) + .collect::>>()? + .as_ref(), + )?; + let return_field = Arc::new(Field::new("struct", DataType::Struct(fields), true)); + Arc::new(ScalarFunctionExpr::new( + "struct", + struct_func(), on_right.to_vec(), - random_state.clone(), - hash_map, - "hash_lookup".to_string(), - )) as Arc)), - // Empty partition - should not create a filter for this - PushdownStrategy::Empty => Ok(None), - } + return_field, + Arc::new(ConfigOptions::default()), + )) as Arc + }; + + Ok(Arc::new(InListExpr::try_new_from_array( + expr, + in_list_array, + false, + )?)) +} + +/// Build a single-map `hash_lookup` predicate over `on_right` against `map`. +/// Used by the CollectLeft path; the Partitioned path emits +/// [`MultiMapLookupExpr`] over multiple maps instead. +fn create_hash_lookup_predicate( + on_right: &[PhysicalExprRef], + map: Arc, + random_state: &SeededRandomState, +) -> Arc { + Arc::new(HashTableLookupExpr::new( + on_right.to_vec(), + random_state.clone(), + map, + "hash_lookup".to_string(), + )) } /// Creates a bounds predicate from partition bounds. @@ -308,22 +300,53 @@ pub(crate) struct SharedBuildAccumulator { dynamic_filter: Arc, /// Right side join expressions needed for creating filter expressions on_right: Vec, - /// Random state for partitioning (RepartitionExec's hash function with 0,0,0,0 seeds) - /// Used for PartitionedHashLookupPhysicalExpr - repartition_random_state: SeededRandomState, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, } -/// Strategy for filter pushdown (decided at collection time) -#[derive(Clone)] -pub(crate) enum PushdownStrategy { - /// Use InList for small build sides (< 128MB) - InList(ArrayRef), - /// Use map lookup for large build sides - Map(Arc), - /// There was no data in this partition, do not build a dynamic filter for it - Empty, +/// Build-side data needed to construct a dynamic filter for one partition. +/// +/// `map` is always set for non-empty partitions (the join's hash table is +/// built unconditionally). `inlist` is additionally set when the build side +/// fit under both the per-partition InList caps +/// (`hash_join_inlist_pushdown_max_size` / +/// `hash_join_inlist_pushdown_max_distinct_values`) — that's our signal that +/// the partition's keys are small enough to participate in parquet stats / +/// bloom-filter pruning at the scan when collapsed across partitions. +#[derive(Clone, Default)] +pub(crate) struct PushdownStrategy { + /// Hash table for the partition's build side. `None` if the partition was + /// empty. + pub(crate) map: Option>, + /// Concatenable array of build-side join keys (single column or struct + /// of columns). `Some` when the build side was small enough for the + /// InList pushdown; otherwise `None` and the filter falls back to + /// `hash_lookup` / `multi_hash_lookup`. + pub(crate) inlist: Option, +} + +impl PushdownStrategy { + pub(crate) fn empty() -> Self { + Self::default() + } + + pub(crate) fn from_map(map: Arc) -> Self { + Self { + map: Some(map), + inlist: None, + } + } + + pub(crate) fn from_map_and_inlist(map: Arc, inlist: ArrayRef) -> Self { + Self { + map: Some(map), + inlist: Some(inlist), + } + } + + pub(crate) fn is_empty(&self) -> bool { + self.map.is_none() + } } /// Build-side data reported by a single partition @@ -415,7 +438,6 @@ impl SharedBuildAccumulator { right_child: &dyn ExecutionPlan, dynamic_filter: Arc, on_right: Vec, - repartition_random_state: SeededRandomState, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -460,7 +482,6 @@ impl SharedBuildAccumulator { completion_notify: Notify::new(), dynamic_filter, on_right, - repartition_random_state, probe_schema: right_child.schema(), } } @@ -625,15 +646,10 @@ impl SharedBuildAccumulator { match finalize_input { FinalizeInput::CollectLeft(partition) => match partition { PartitionStatus::Reported(partition_data) => { - let membership_expr = create_membership_predicate( - &self.on_right, - partition_data.pushdown.clone(), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )?; + let membership_expr = + self.collect_left_membership(&partition_data.pushdown)?; let bounds_expr = create_bounds_predicate(&self.on_right, &partition_data.bounds); - if let Some(filter_expr) = combine_membership_and_bounds(membership_expr, bounds_expr) { @@ -652,21 +668,18 @@ impl SharedBuildAccumulator { } }, FinalizeInput::Partitioned(partitions) => { - let num_partitions = partitions.len(); - - let mut real_partitions: Vec<(usize, &PartitionData)> = Vec::new(); - let mut empty_partition_ids = Vec::new(); + let mut real_partitions: Vec<&PartitionData> = Vec::new(); let mut has_canceled_unknown = false; - - for (partition_id, partition) in partitions.iter().enumerate() { + for partition in partitions.iter() { match partition { PartitionStatus::Reported(partition) - if matches!(partition.pushdown, PushdownStrategy::Empty) => + if partition.pushdown.is_empty() => { - empty_partition_ids.push(partition_id); + // Empty partitions contribute neither a map nor + // an InList; nothing to add to the filter. } PartitionStatus::Reported(partition) => { - real_partitions.push((partition_id, partition)); + real_partitions.push(partition); } PartitionStatus::CanceledUnknown => { has_canceled_unknown = true; @@ -679,107 +692,8 @@ impl SharedBuildAccumulator { } } - // Fast paths: when no partition is canceled, classify the - // reported partitions and route to a shape that drops the - // routing CASE. Two shapes today: - // 1. All-Map → `global_minmax AND multi_hash_lookup`. Hashes - // the join keys once and ORs contain_hashes across every - // partition's hash table. - // 2. All-InList with a small enough cross-partition union - // → `global_minmax AND merged_in_list`. Worth taking - // because a small global InList can participate in - // parquet stats / bloom-filter pruning at the scan, - // which a `multi_hash_lookup` can't. - // Anything else (canceled, mixed, or merged InList over the - // cap) falls through to the legacy routing CASE. - if !has_canceled_unknown && !real_partitions.is_empty() { - let mut maps: Vec> = - Vec::with_capacity(real_partitions.len()); - let mut inlist_arrays: Vec = - Vec::with_capacity(real_partitions.len()); - let mut all_map = true; - let mut all_inlist = true; - let mut total_inlist_len: usize = 0; - for (_, p) in &real_partitions { - match &p.pushdown { - PushdownStrategy::Map(m) => { - maps.push(Arc::clone(m)); - all_inlist = false; - } - PushdownStrategy::InList(arr) => { - total_inlist_len += arr.len(); - inlist_arrays.push(Arc::clone(arr)); - all_map = false; - } - PushdownStrategy::Empty => { - all_map = false; - all_inlist = false; - } - } - } - - let bounds_refs: Vec<&PartitionBounds> = - real_partitions.iter().map(|(_, p)| &p.bounds).collect(); - let global_bounds_expr = compute_global_bounds(&bounds_refs) - .as_ref() - .and_then(|b| create_bounds_predicate(&self.on_right, b)); - - // Threshold on the cross-partition merged InList. Two - // reasons for keeping it small: - // 1. Below ~this many values the merged `IN (SET)` can - // participate in parquet stats / bloom-filter - // pruning at the scan, which a `multi_hash_lookup` - // cannot — that's the headline win. - // 2. Past this size, runtime regresses vs. the routing - // CASE (the larger static_filter hash set blows out - // of L1 and the win flips). On TPC-H SF=1 a sweep - // from 20 to 2000 picked 20–50 as the sweet spot. - const MERGED_INLIST_MAX_TOTAL_LEN: usize = 20; - - if all_inlist - && total_inlist_len <= MERGED_INLIST_MAX_TOTAL_LEN - && let Some(merged) = merge_inlist_arrays(&inlist_arrays) - && let Some(membership) = create_membership_predicate( - &self.on_right, - PushdownStrategy::InList(merged), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )? - { - let filter_expr = combine_membership_and_bounds( - Some(membership), - global_bounds_expr, - ) - .unwrap_or_else(|| lit(true)); - self.dynamic_filter.update(filter_expr)?; - return Ok(()); - } - - if all_map && !maps.is_empty() { - let multi_lookup = Arc::new(MultiMapLookupExpr::new( - self.on_right.clone(), - HASH_JOIN_SEED.clone(), - maps, - "multi_hash_lookup".to_string(), - )) - as Arc; - let filter_expr = combine_membership_and_bounds( - Some(multi_lookup), - global_bounds_expr, - ) - .unwrap_or_else(|| lit(true)); - self.dynamic_filter.update(filter_expr)?; - return Ok(()); - } - } - - let filter_expr = self.build_case_routing_filter( - num_partitions, - &real_partitions, - &empty_partition_ids, - has_canceled_unknown, - )?; - + let filter_expr = self + .build_partitioned_filter(&real_partitions, has_canceled_unknown)?; self.dynamic_filter.update(filter_expr)?; } } @@ -787,82 +701,141 @@ impl SharedBuildAccumulator { Ok(()) } - /// Build the per-partition `CASE (hash_repartition % N) WHEN p THEN - /// per_partition_filter ELSE … END` routing expression. Used as the fallback - /// when the all-Map fast path doesn't apply (any InList/Empty partition, - /// any canceled partition, or no real partitions). - fn build_case_routing_filter( + /// CollectLeft has a single shared build side, so we always have one + /// `Map`. We prefer the InList expression when it's available (the build + /// side fit under the InList caps) because it's directly representable in + /// parquet stats / bloom-filter pruning at the scan; otherwise fall back + /// to a single `hash_lookup` against the map. + fn collect_left_membership( &self, - num_partitions: usize, - real_partitions: &[(usize, &PartitionData)], - empty_partition_ids: &[usize], - has_canceled_unknown: bool, - ) -> Result> { - let routing_hash_expr = Arc::new(HashExpr::new( - self.on_right.clone(), - self.repartition_random_state.clone(), - "hash_repartition".to_string(), - )) as Arc; - - let modulo_expr = Arc::new(BinaryExpr::new( - routing_hash_expr, - Operator::Modulo, - lit(ScalarValue::UInt64(Some(num_partitions as u64))), - )) as Arc; - - let mut real_branches = Vec::with_capacity(real_partitions.len()); - for (partition_id, partition) in real_partitions { - let membership_expr = create_membership_predicate( + pushdown: &PushdownStrategy, + ) -> Result>> { + if let Some(arr) = &pushdown.inlist { + return Ok(Some(create_inlist_predicate( &self.on_right, - partition.pushdown.clone(), - &HASH_JOIN_SEED, + Arc::clone(arr), self.probe_schema.as_ref(), - )?; - let bounds_expr = create_bounds_predicate(&self.on_right, &partition.bounds); - let then_expr = combine_membership_and_bounds(membership_expr, bounds_expr) - .unwrap_or_else(|| lit(true)); - real_branches.push(( - lit(ScalarValue::UInt64(Some(*partition_id as u64))), - then_expr, - )); + )?)); } + Ok(pushdown.map.as_ref().map(|map| { + create_hash_lookup_predicate(&self.on_right, Arc::clone(map), &HASH_JOIN_SEED) + })) + } - let filter_expr = if has_canceled_unknown { - let mut when_then_branches = empty_partition_ids - .iter() - .map(|partition_id| { - ( - lit(ScalarValue::UInt64(Some(*partition_id as u64))), - lit(false), - ) - }) - .collect::>(); - when_then_branches.extend(real_branches); + /// Build the dynamic filter for `PartitionMode::Partitioned`. The filter + /// is decoupled from the repartition strategy: regardless of whether + /// individual partitions chose Map or InList for their pushdown, we + /// always emit `(global_minmax AND ([merged_in_list AND] multi_hash_lookup))`. + /// This drops the legacy `CASE hash_repartition % N WHEN p THEN … END` + /// routing expression entirely. + /// + /// * `global_minmax` — envelope of every partition's per-column min/max. + /// Cheap short-circuit and the only piece visible to scan-level + /// `pruning_predicate` extraction. + /// * `merged_in_list` — concatenated build keys when every reported + /// partition produced an `InList` array and the cross-partition union + /// is small enough (≤ `MERGED_INLIST_MAX_TOTAL_LEN`). Worth carrying + /// because a small `IN (SET)` participates in parquet stats / + /// bloom-filter pruning, which `multi_hash_lookup` cannot. + /// * `multi_hash_lookup` — runtime hash-table probe across every + /// partition's `Map`, hashing the join keys once. + /// + /// The `has_canceled_unknown` case is the only one that can't safely use + /// this shape (we'd be missing maps for the canceled partitions). We + /// could keep a CASE just for that case, but the query is in the middle + /// of being torn down — emit `lit(true)` and let the join do whatever + /// filtering it can on its own. + fn build_partitioned_filter( + &self, + real_partitions: &[&PartitionData], + has_canceled_unknown: bool, + ) -> Result> { + if has_canceled_unknown { + return Ok(lit(true)); + } + if real_partitions.is_empty() { + return Ok(lit(false)); + } - if when_then_branches.is_empty() { - lit(true) - } else { - Arc::new(CaseExpr::try_new( - Some(modulo_expr), - when_then_branches, - Some(lit(true)), - )?) as Arc - } - } else if real_branches.is_empty() { - lit(false) - } else if real_branches.len() == 1 - && empty_partition_ids.len() + 1 == num_partitions + let bounds_refs: Vec<&PartitionBounds> = + real_partitions.iter().map(|p| &p.bounds).collect(); + let global_bounds_expr = compute_global_bounds(&bounds_refs) + .as_ref() + .and_then(|b| create_bounds_predicate(&self.on_right, b)); + + // Threshold on the cross-partition merged InList. Two reasons for + // keeping it small: + // 1. Below this many values the merged `IN (SET)` can participate + // in parquet stats / bloom-filter pruning at the scan; a + // `multi_hash_lookup` cannot. + // 2. Past this size the larger static_filter hash set blows out + // of L1 and runtime regresses. A TPC-H SF=1 cap sweep from 20 + // to 2000 picked 20–50 as the sweet spot. + const MERGED_INLIST_MAX_TOTAL_LEN: usize = 20; + + // Try to build a merged InList. Only fires when *every* reported + // partition contributed an InList array AND the cross-partition + // union is small. The merged InList is already the union of every + // partition's build-side keys, so when present it fully replaces + // `multi_hash_lookup` — no need to AND a redundant probe on top. + let membership_expr = if let Some(merged) = + self.try_build_merged_inlist(real_partitions, MERGED_INLIST_MAX_TOTAL_LEN)? { - Arc::clone(&real_branches[0].1) + Some(merged) } else { - Arc::new(CaseExpr::try_new( - Some(modulo_expr), - real_branches, - Some(lit(false)), - )?) as Arc + let maps: Vec> = real_partitions + .iter() + .filter_map(|p| p.pushdown.map.clone()) + .collect(); + if maps.is_empty() { + // Defensive: every reported (non-empty) partition is + // supposed to carry a Map. Falling through to None means + // we degrade to bounds-only filtering. + None + } else { + Some(Arc::new(MultiMapLookupExpr::new( + self.on_right.clone(), + HASH_JOIN_SEED.clone(), + maps, + "multi_hash_lookup".to_string(), + )) as Arc) + } }; - Ok(filter_expr) + Ok( + combine_membership_and_bounds(membership_expr, global_bounds_expr) + .unwrap_or_else(|| lit(true)), + ) + } + + /// If every reported partition contributed an InList array and their + /// concatenated length stays within `cap`, return the merged + /// `(struct(...))? IN (SET) ([…])` expression — otherwise `None`. + fn try_build_merged_inlist( + &self, + real_partitions: &[&PartitionData], + cap: usize, + ) -> Result>> { + let mut total = 0usize; + let mut arrays: Vec = Vec::with_capacity(real_partitions.len()); + for p in real_partitions { + let Some(arr) = &p.pushdown.inlist else { + return Ok(None); + }; + total += arr.len(); + if total > cap { + return Ok(None); + } + arrays.push(Arc::clone(arr)); + } + let Some(merged) = merge_inlist_arrays(&arrays) else { + return Ok(None); + }; + Ok(Some(create_inlist_predicate( + &self.on_right, + merged, + self.probe_schema.as_ref(), + )?)) } } @@ -894,7 +867,6 @@ mod tests { completion_notify: Notify::new(), dynamic_filter, on_right: vec![], - repartition_random_state: SeededRandomState::with_seed(1), probe_schema, } } @@ -929,7 +901,7 @@ mod tests { &mut guard, PartitionBuildData::Partitioned { partition_id: 0, - pushdown: PushdownStrategy::Empty, + pushdown: PushdownStrategy::empty(), bounds: PartitionBounds::new(vec![]), }, ) From f64718f5ecf8847fcab238f94455551e9fb31ea0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Apr 2026 10:49:08 -0500 Subject: [PATCH 4/6] refactor: dedup cross-partition InList, reuse per-partition cap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cross-partition merged-InList gate now reuses the existing `optimizer.hash_join_inlist_pushdown_max_distinct_values` knob: one configuration option caps both the per-partition InList pushdown and the cross-partition merged set. The combine path explicitly deduplicates by `ScalarValue` (via a HashSet first-seen walk + a single `arrow::compute::take`) and then re-gates on the distinct count rather than the previous total-array-length heuristic. With the cap defaulting to 20, the worst-case dedup input is N×20 entries, which is microseconds at the partition counts we see in practice. The previous hardcoded 20-entry length cap and `MERGED_INLIST_MAX_TOTAL_LEN` constant are gone — the threshold is now configurable. Default lowered from 150 → 20 to align with parquet stats / bloom-filter pruning practicality (a small `IN (SET)` that scans can use to drop row groups is the entire reason for keeping this path). Users that want the wider per-partition InList behavior can raise the value. Within-partition build still ships duplicates: the build code in `exec.rs` doesn't dedupe before populating `PushdownStrategy::inlist`, relying on the join hash map's `num_of_distinct_key()` for the per-partition gate and the static filter inside `InListExpr` to dedupe at filter-evaluation time. Adding explicit per-partition dedup is a follow-up — not required for correctness because the cross-partition dedup catches everything. Refs: https://github.com/apache/datafusion/issues/19858 Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/common/src/config.rs | 12 +- .../physical-plan/src/joins/hash_join/exec.rs | 6 + .../src/joins/hash_join/shared_bounds.rs | 118 +++++++++++------- docs/source/user-guide/configs.md | 2 +- 4 files changed, 87 insertions(+), 51 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index abd58a556e0a1..0d39dc594fb1d 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1309,11 +1309,15 @@ config_namespace! { /// very large IN lists that might not provide much benefit over hash table lookups. /// /// This uses the deduplicated row count once the build side has been evaluated. + /// In `Partitioned` hash-join mode the same threshold also gates the + /// cross-partition merged InList: per-partition InList arrays are + /// concatenated and deduplicated, and the merged `IN (SET)` only fires + /// when the union has at most this many distinct values. /// - /// The default is 150 values per partition. - /// This is inspired by Trino's `max-filter-keys-per-column` setting. - /// See: - pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150 + /// The default is 20 distinct values, tuned so the resulting `IN (SET)` + /// stays small enough to participate in parquet stats / bloom-filter + /// pruning at the scan side. + pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 20 /// The default filter selectivity used by Filter Statistics /// when an exact selectivity cannot be determined. Valid values are diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 416a2025780d6..38aaae83b36f2 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1327,6 +1327,11 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|(_, right_expr)| Arc::clone(right_expr)) .collect::>(); + let inlist_max_distinct_values = context + .session_config() + .options() + .optimizer + .hash_join_inlist_pushdown_max_distinct_values; Some(Arc::clone(df.build_accumulator.get_or_init(|| { Arc::new(SharedBuildAccumulator::new_from_partition_mode( self.mode, @@ -1334,6 +1339,7 @@ impl ExecutionPlan for HashJoinExec { self.right.as_ref(), filter, on_right, + inlist_max_distinct_values, )) }))) }) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index a14cc4e509d6a..7018aaa844997 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -302,6 +302,15 @@ pub(crate) struct SharedBuildAccumulator { on_right: Vec, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, + /// Cap on the cross-partition merged-InList distinct count. Reuses + /// `optimizer.hash_join_inlist_pushdown_max_distinct_values` (the same + /// option that gates per-partition InList pushdown). When the union of + /// every reported partition's deduplicated InList values stays at or + /// below this many distinct entries we collapse them into a single + /// `IN (SET)` predicate that can participate in parquet stats / + /// bloom-filter pruning at the scan; otherwise we use + /// `multi_hash_lookup` over every partition's hash table instead. + inlist_max_distinct_values: usize, } /// Build-side data needed to construct a dynamic filter for one partition. @@ -438,6 +447,7 @@ impl SharedBuildAccumulator { right_child: &dyn ExecutionPlan, dynamic_filter: Arc, on_right: Vec, + inlist_max_distinct_values: usize, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -483,6 +493,7 @@ impl SharedBuildAccumulator { dynamic_filter, on_right, probe_schema: right_child.schema(), + inlist_max_distinct_values, } } @@ -733,9 +744,11 @@ impl SharedBuildAccumulator { /// Cheap short-circuit and the only piece visible to scan-level /// `pruning_predicate` extraction. /// * `merged_in_list` — concatenated build keys when every reported - /// partition produced an `InList` array and the cross-partition union - /// is small enough (≤ `MERGED_INLIST_MAX_TOTAL_LEN`). Worth carrying - /// because a small `IN (SET)` participates in parquet stats / + /// partition produced an `InList` array and the cross-partition + /// *deduplicated* set has at most `inlist_max_distinct_values` distinct + /// entries (the same option that gates the per-partition InList path, + /// `optimizer.hash_join_inlist_pushdown_max_distinct_values`). Worth + /// carrying because a small `IN (SET)` participates in parquet stats / /// bloom-filter pruning, which `multi_hash_lookup` cannot. /// * `multi_hash_lookup` — runtime hash-table probe across every /// partition's `Map`, hashing the join keys once. @@ -763,44 +776,34 @@ impl SharedBuildAccumulator { .as_ref() .and_then(|b| create_bounds_predicate(&self.on_right, b)); - // Threshold on the cross-partition merged InList. Two reasons for - // keeping it small: - // 1. Below this many values the merged `IN (SET)` can participate - // in parquet stats / bloom-filter pruning at the scan; a - // `multi_hash_lookup` cannot. - // 2. Past this size the larger static_filter hash set blows out - // of L1 and runtime regresses. A TPC-H SF=1 cap sweep from 20 - // to 2000 picked 20–50 as the sweet spot. - const MERGED_INLIST_MAX_TOTAL_LEN: usize = 20; - // Try to build a merged InList. Only fires when *every* reported // partition contributed an InList array AND the cross-partition - // union is small. The merged InList is already the union of every + // deduplicated union has at most `inlist_max_distinct_values` + // entries. The merged InList already covers the union of every // partition's build-side keys, so when present it fully replaces // `multi_hash_lookup` — no need to AND a redundant probe on top. - let membership_expr = if let Some(merged) = - self.try_build_merged_inlist(real_partitions, MERGED_INLIST_MAX_TOTAL_LEN)? - { - Some(merged) - } else { - let maps: Vec> = real_partitions - .iter() - .filter_map(|p| p.pushdown.map.clone()) - .collect(); - if maps.is_empty() { - // Defensive: every reported (non-empty) partition is - // supposed to carry a Map. Falling through to None means - // we degrade to bounds-only filtering. - None + let membership_expr = + if let Some(merged) = self.try_build_merged_inlist(real_partitions)? { + Some(merged) } else { - Some(Arc::new(MultiMapLookupExpr::new( - self.on_right.clone(), - HASH_JOIN_SEED.clone(), - maps, - "multi_hash_lookup".to_string(), - )) as Arc) - } - }; + let maps: Vec> = real_partitions + .iter() + .filter_map(|p| p.pushdown.map.clone()) + .collect(); + if maps.is_empty() { + // Defensive: every reported (non-empty) partition is + // supposed to carry a Map. Falling through to None means + // we degrade to bounds-only filtering. + None + } else { + Some(Arc::new(MultiMapLookupExpr::new( + self.on_right.clone(), + HASH_JOIN_SEED.clone(), + maps, + "multi_hash_lookup".to_string(), + )) as Arc) + } + }; Ok( combine_membership_and_bounds(membership_expr, global_bounds_expr) @@ -808,32 +811,54 @@ impl SharedBuildAccumulator { ) } - /// If every reported partition contributed an InList array and their - /// concatenated length stays within `cap`, return the merged - /// `(struct(...))? IN (SET) ([…])` expression — otherwise `None`. + /// If every reported partition contributed an InList array, concatenate + /// them, deduplicate by scalar value, and gate on the + /// `inlist_max_distinct_values` cap. Returns the merged + /// `(struct(...))? IN (SET) ([…])` predicate built over the *deduplicated* + /// keys when the cap is satisfied; `None` otherwise. + /// + /// Per-partition arrays carry duplicates (the build side never dedups + /// before shipping), so each partition's `arr.len()` is an upper bound on + /// its distinct count. We start with a cheap pre-check (sum of lengths ≤ + /// some fast-reject limit) before the dedup walk to keep the cost + /// proportional to actual partition sizes. fn try_build_merged_inlist( &self, real_partitions: &[&PartitionData], - cap: usize, ) -> Result>> { - let mut total = 0usize; + let cap = self.inlist_max_distinct_values; let mut arrays: Vec = Vec::with_capacity(real_partitions.len()); for p in real_partitions { let Some(arr) = &p.pushdown.inlist else { return Ok(None); }; - total += arr.len(); - if total > cap { - return Ok(None); - } arrays.push(Arc::clone(arr)); } let Some(merged) = merge_inlist_arrays(&arrays) else { return Ok(None); }; + // Walk the merged array once, recording the first index of each + // distinct ScalarValue. If we cross the cap we abort early without + // materialising a longer index list. `arrow::compute::take` then + // produces the deduplicated array (in first-seen order, matching the + // shape `InListExpr::try_new_from_array` expects). + let mut seen = std::collections::HashSet::with_capacity(cap.saturating_add(1)); + let mut indices: Vec = Vec::with_capacity(cap.min(merged.len())); + for i in 0..merged.len() { + let scalar = ScalarValue::try_from_array(merged.as_ref(), i)?; + if seen.insert(scalar) { + if indices.len() >= cap { + // One more distinct value would exceed the cap. + return Ok(None); + } + indices.push(i as u32); + } + } + let idx_array = arrow::array::UInt32Array::from(indices); + let deduped = arrow::compute::take(merged.as_ref(), &idx_array, None)?; Ok(Some(create_inlist_predicate( &self.on_right, - merged, + deduped, self.probe_schema.as_ref(), )?)) } @@ -868,6 +893,7 @@ mod tests { dynamic_filter, on_right: vec![], probe_schema, + inlist_max_distinct_values: 20, } } diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 46039f3c99c27..d07fb77eeee1f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -167,7 +167,7 @@ The following configuration settings are available: | datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.hash_join_inlist_pushdown_max_size | 131072 | Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides larger than this will use hash table lookups instead. Set to 0 to always use hash table lookups. InList pushdown can be more efficient for small build sides because it can result in better statistics pruning as well as use any bloom filters present on the scan side. InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion. On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory. This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` \* `target_partitions` memory. The default is 128kB per partition. This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases but avoids excessive memory usage or overhead for larger joins. | -| datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values | 150 | Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides with more rows than this will use hash table lookups instead. Set to 0 to always use hash table lookups. This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent very large IN lists that might not provide much benefit over hash table lookups. This uses the deduplicated row count once the build side has been evaluated. The default is 150 values per partition. This is inspired by Trino's `max-filter-keys-per-column` setting. See: | +| datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values | 20 | Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides with more rows than this will use hash table lookups instead. Set to 0 to always use hash table lookups. This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent very large IN lists that might not provide much benefit over hash table lookups. This uses the deduplicated row count once the build side has been evaluated. In `Partitioned` hash-join mode the same threshold also gates the cross-partition merged InList: per-partition InList arrays are concatenated and deduplicated, and the merged `IN (SET)` only fires when the union has at most this many distinct values. The default is 20 distinct values, tuned so the resulting `IN (SET)` stays small enough to participate in parquet stats / bloom-filter pruning at the scan side. | | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | From ba752023a08fc0df28dea1e81cf55bfc52ea6a45 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Apr 2026 11:29:10 -0500 Subject: [PATCH 5/6] fix(ci): sync information_schema.slt + drop redundant rustdoc link MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `information_schema.slt`: bumps the baked-in default and doc string for `optimizer.hash_join_inlist_pushdown_max_distinct_values` to match the 150 → 20 default change (sqllogictest, extended_tests, sqlite suite, verify-benchmark-results all hit this slt). - `partitioned_hash_eval.rs`: drop the redundant explicit-target on a `[BooleanArray]` doc link. Adding `BooleanArray` to imports for `MultiMapLookupExpr` made the existing `[`BooleanArray`](arrow::array::BooleanArray)` link redundant under `-D rustdoc::redundant_explicit_links`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/joins/hash_join/partitioned_hash_eval.rs | 2 +- datafusion/sqllogictest/test_files/information_schema.slt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 23bcf38d30f61..de41641c931ee 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -203,7 +203,7 @@ impl PhysicalExpr for HashExpr { /// Physical expression that checks join keys in a [`Map`] (hash table or array map). /// -/// Returns a [`BooleanArray`](arrow::array::BooleanArray) indicating if join keys (from `on_columns`) exist in the map. +/// Returns a [`BooleanArray`] indicating if join keys (from `on_columns`) exist in the map. // TODO: rename to MapLookupExpr pub struct HashTableLookupExpr { /// Columns in the ON clause used to compute the join key for lookups diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b04c78bd2774c..0b666fa6d8fb7 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -312,7 +312,7 @@ datafusion.optimizer.enable_window_limits true datafusion.optimizer.enable_window_topn false datafusion.optimizer.expand_views_at_output false datafusion.optimizer.filter_null_join_keys false -datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150 +datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 20 datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072 datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 @@ -459,7 +459,7 @@ datafusion.optimizer.enable_window_limits true When set to true, the optimizer w datafusion.optimizer.enable_window_topn false When set to true, the optimizer will replace Filter(rn<=K) → Window(ROW_NUMBER) → Sort patterns with a PartitionedTopKExec that maintains per-partition heaps, avoiding a full sort of the input. When the window partition key has low cardinality, enabling this optimization can improve performance. However, for high cardinality keys, it may cause regressions in both memory usage and runtime. datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. -datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150 Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides with more rows than this will use hash table lookups instead. Set to 0 to always use hash table lookups. This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent very large IN lists that might not provide much benefit over hash table lookups. This uses the deduplicated row count once the build side has been evaluated. The default is 150 values per partition. This is inspired by Trino's `max-filter-keys-per-column` setting. See: +datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 20 Maximum number of distinct values (rows) in the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides with more rows than this will use hash table lookups instead. Set to 0 to always use hash table lookups. This provides an additional limit beyond `hash_join_inlist_pushdown_max_size` to prevent very large IN lists that might not provide much benefit over hash table lookups. This uses the deduplicated row count once the build side has been evaluated. In `Partitioned` hash-join mode the same threshold also gates the cross-partition merged InList: per-partition InList arrays are concatenated and deduplicated, and the merged `IN (SET)` only fires when the union has at most this many distinct values. The default is 20 distinct values, tuned so the resulting `IN (SET)` stays small enough to participate in parquet stats / bloom-filter pruning at the scan side. datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072 Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides larger than this will use hash table lookups instead. Set to 0 to always use hash table lookups. InList pushdown can be more efficient for small build sides because it can result in better statistics pruning as well as use any bloom filters present on the scan side. InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion. On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory. This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory. The default is 128kB per partition. This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases but avoids excessive memory usage or overhead for larger joins. datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition From f717a99adc3eee55d2b07676bba57e343b3baf4d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 29 Apr 2026 14:03:46 -0500 Subject: [PATCH 6/6] docs: rewrite comments to describe the code, not the change Drop comments that read like PR-review notes ("X no longer Y", "the legacy CASE", "this drops the routing") in favour of comments that describe the current behaviour for someone reading the file cold. Trim some now-redundant field-level docs and tighten doc strings on `MultiMapLookupExpr`, `PushdownStrategy`, `build_partitioned_filter`, and `try_build_merged_inlist`. No functional change. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../physical_optimizer/filter_pushdown.rs | 20 +++--- .../physical-plan/src/joins/hash_join/exec.rs | 13 ++-- .../joins/hash_join/partitioned_hash_eval.rs | 42 +++++------ .../src/joins/hash_join/shared_bounds.rs | 71 ++++++++----------- 4 files changed, 62 insertions(+), 84 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 18359a56f9be6..2734adcbf2f6c 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1072,14 +1072,11 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { .await .unwrap(); - // Now check what our filter looks like. When the cross-partition InList - // is small enough (≤ MERGED_INLIST_MAX_TOTAL_LEN) we collapse it into a - // single global `IN (SET)` regardless of how the data was repartitioned — - // this lets the merged set participate in parquet stats / bloom-filter - // pruning at the scan, which a per-partition `CASE` could not. Both the - // normal repartition and the `force_hash_collisions` path produce the - // same logical shape; they only differ in the partition-iteration order - // that controls the InList element order. + // The dynamic filter for a `Partitioned` hash join with a small enough + // cross-partition InList collapses to a single global `struct(...) IN + // (SET) ([...])`. The two `#[cfg]` arms differ only in the order of the + // InList elements, which is determined by partition-iteration order + // (normal repartition vs. the `force_hash_collisions` collapse). #[cfg(not(feature = "force_hash_collisions"))] insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @@ -2575,10 +2572,9 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { .await .unwrap(); - // Verify the all-Map fast path collapses per-partition routing into a - // single shared `multi_hash_lookup` rather than a - // `CASE hash_repartition % N WHEN p THEN hash_lookup ELSE false END` - // expression. + // The dynamic filter for an all-Map Partitioned hash join uses a single + // shared `multi_hash_lookup` over every partition's hash table. There is + // no per-row `hash_repartition` routing. let plan_str = format_plan_for_test(&plan).to_string(); assert!( plan_str.contains("multi_hash_lookup"), diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 38aaae83b36f2..1cdb7718b5fcb 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1316,8 +1316,7 @@ impl ExecutionPlan for HashJoinExec { .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition); // Initialize build_accumulator lazily with runtime partition counts - // (only if enabled). The dynamic filter no longer routes by repartition - // hash, so REPARTITION_RANDOM_STATE is not needed here. + // (only when dynamic filter pushdown is enabled). let build_accumulator = enable_dynamic_filter_pushdown .then(|| { self.dynamic_filter.as_ref().map(|df| { @@ -2042,11 +2041,11 @@ async fn collect_left_input( let map = Arc::new(join_hash_map); - // The `Map` is always built (the join itself uses it). The optional - // `inlist` array is set when the build side fit under the per-partition - // InList caps — that's our signal that this partition's keys are small - // enough to participate in parquet stats / bloom-filter pruning when - // collapsed across partitions on the probe-side scan. + // The hash map is needed by the join itself, so it always travels in + // `PushdownStrategy::map`. The optional `inlist` array is attached when + // the build side fits under the per-partition InList caps; the + // `SharedBuildAccumulator` may then merge those arrays across partitions + // into a single `IN (SET)` for scan-side pruning. let membership = if num_rows == 0 { PushdownStrategy::empty() } else { diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index de41641c931ee..3b0e1092ac2bc 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -346,29 +346,24 @@ impl PhysicalExpr for HashTableLookupExpr { /// Physical expression that probes the same join keys against multiple [`Map`]s /// and returns `true` for any row whose join keys match at least one map. /// -/// Equivalent to `OR`-ing several [`HashTableLookupExpr`]s but evaluates -/// `create_hashes` exactly once for the whole batch — every [`Map::HashMap`] -/// probe shares that hash buffer. Used for the global-first dynamic filter -/// when every reported partition uses a hash-table pushdown strategy. +/// Equivalent to `OR`-ing several [`HashTableLookupExpr`]s, but +/// `create_hashes` runs exactly once for the whole batch and every +/// [`Map::HashMap`] probe shares the same hash buffer. All `HashMap` +/// entries must therefore have been built with the same `RandomState`; +/// [`Map::ArrayMap`] entries are queried via `contain_keys` and do not +/// consume hashes. pub struct MultiMapLookupExpr { - /// Columns in the ON clause used to compute the join key for lookups + /// Join-key expressions evaluated against each input batch. on_columns: Vec, - /// Random state for hashing — every map must have been built with the - /// same `RandomState`, otherwise the shared hash buffer is meaningless. + /// Hashing seed shared by every entry in `maps`. random_state: SeededRandomState, - /// Maps to OR over (each is one partition's build-side data) + /// Build-side maps to OR over, one per partition. maps: Vec>, - /// Description for display + /// Display name used in `EXPLAIN` output (e.g. `"multi_hash_lookup"`). description: String, } impl MultiMapLookupExpr { - /// Create a new MultiMapLookupExpr. - /// - /// `maps` is the (per-partition) sequence of build-side maps to probe. - /// All `Map::HashMap` entries are expected to use the same `random_state`; - /// `Map::ArrayMap` entries do not consume hashes and are queried via - /// `contain_keys`. pub fn new( on_columns: Vec, random_state: SeededRandomState, @@ -466,8 +461,8 @@ impl PhysicalExpr for MultiMapLookupExpr { let join_keys = evaluate_columns(&self.on_columns, batch)?; if self.maps.is_empty() || num_rows == 0 { - // No maps to probe — this should not happen in practice but - // returning all-false matches the semantics of an empty `OR`. + // Empty `maps` would not be constructed by the dynamic-filter + // builder — guard anyway: an empty OR is `false` for every row. let buffer = BooleanBufferBuilder::new(num_rows); let mut buffer = buffer; buffer.append_n(num_rows, false); @@ -477,14 +472,13 @@ impl PhysicalExpr for MultiMapLookupExpr { )))); } - // Whether any map needs hashes. We only compute hashes if at least - // one map is a HashMap. + // Hashes are only needed for `HashMap` probes; `ArrayMap` queries + // its keys directly via `contain_keys`. let needs_hashes = self .maps .iter() .any(|m| matches!(m.as_ref(), Map::HashMap(_))); - // Result buffer accumulates the OR of every map's `contain_*` result. let mut result = vec![false; num_rows]; let process_one_map = @@ -494,8 +488,8 @@ impl PhysicalExpr for MultiMapLookupExpr { let hashes = hashes .expect("hashes computed when at least one map is a HashMap"); let arr = hm.contain_hashes(hashes); - // OR into the running result. `arr` has no nulls - // (`contain_hashes` returns a non-nullable BooleanArray). + // `contain_hashes` always returns a non-null + // `BooleanArray`; OR its bits into `result`. for (slot, hit) in result.iter_mut().zip(arr.values().iter()) { *slot |= hit; } @@ -511,8 +505,8 @@ impl PhysicalExpr for MultiMapLookupExpr { }; if needs_hashes { - // Compute the join-key hashes ONCE, then probe every HashMap - // against the same buffer. ArrayMap probes ignore the hashes. + // Hash the join keys once and reuse the buffer for every + // `HashMap` probe; `ArrayMap` probes pass `None` for hashes. with_hashes(&join_keys, self.random_state.random_state(), |hashes| { for map in &self.maps { process_one_map(&mut result, map, Some(hashes))?; diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 7018aaa844997..3e37d3e44374b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -302,14 +302,11 @@ pub(crate) struct SharedBuildAccumulator { on_right: Vec, /// Schema of the probe (right) side for evaluating filter expressions probe_schema: Arc, - /// Cap on the cross-partition merged-InList distinct count. Reuses - /// `optimizer.hash_join_inlist_pushdown_max_distinct_values` (the same - /// option that gates per-partition InList pushdown). When the union of - /// every reported partition's deduplicated InList values stays at or - /// below this many distinct entries we collapse them into a single - /// `IN (SET)` predicate that can participate in parquet stats / - /// bloom-filter pruning at the scan; otherwise we use - /// `multi_hash_lookup` over every partition's hash table instead. + /// Maximum distinct entries the cross-partition merged InList may + /// contain before we fall back to `multi_hash_lookup`. Sourced from + /// `optimizer.hash_join_inlist_pushdown_max_distinct_values` so the + /// same threshold caps both per-partition and cross-partition InList + /// pushdown. inlist_max_distinct_values: usize, } @@ -733,31 +730,26 @@ impl SharedBuildAccumulator { })) } - /// Build the dynamic filter for `PartitionMode::Partitioned`. The filter - /// is decoupled from the repartition strategy: regardless of whether - /// individual partitions chose Map or InList for their pushdown, we - /// always emit `(global_minmax AND ([merged_in_list AND] multi_hash_lookup))`. - /// This drops the legacy `CASE hash_repartition % N WHEN p THEN … END` - /// routing expression entirely. + /// Build the dynamic filter for `PartitionMode::Partitioned`. Emits + /// `global_minmax AND ([merged_in_list AND] multi_hash_lookup)` — + /// independent of how the build side was repartitioned. /// /// * `global_minmax` — envelope of every partition's per-column min/max. /// Cheap short-circuit and the only piece visible to scan-level /// `pruning_predicate` extraction. - /// * `merged_in_list` — concatenated build keys when every reported - /// partition produced an `InList` array and the cross-partition - /// *deduplicated* set has at most `inlist_max_distinct_values` distinct - /// entries (the same option that gates the per-partition InList path, - /// `optimizer.hash_join_inlist_pushdown_max_distinct_values`). Worth - /// carrying because a small `IN (SET)` participates in parquet stats / - /// bloom-filter pruning, which `multi_hash_lookup` cannot. - /// * `multi_hash_lookup` — runtime hash-table probe across every - /// partition's `Map`, hashing the join keys once. + /// * `merged_in_list` — concatenated, deduplicated build keys when every + /// reported partition contributed an `InList` array and the + /// cross-partition union fits under + /// `optimizer.hash_join_inlist_pushdown_max_distinct_values`. A small + /// `IN (SET)` participates in parquet stats / bloom-filter pruning, + /// which `multi_hash_lookup` does not. When present it fully replaces + /// the lookup. + /// * `multi_hash_lookup` — hashes the join keys once and ORs + /// `contain_hashes()` across every partition's hash table. /// - /// The `has_canceled_unknown` case is the only one that can't safely use - /// this shape (we'd be missing maps for the canceled partitions). We - /// could keep a CASE just for that case, but the query is in the middle - /// of being torn down — emit `lit(true)` and let the join do whatever - /// filtering it can on its own. + /// `has_canceled_unknown` partitions short-circuit to `lit(true)`: we + /// don't have their maps, so we cannot include them in the lookup, and + /// the query is being torn down anyway. fn build_partitioned_filter( &self, real_partitions: &[&PartitionData], @@ -776,12 +768,10 @@ impl SharedBuildAccumulator { .as_ref() .and_then(|b| create_bounds_predicate(&self.on_right, b)); - // Try to build a merged InList. Only fires when *every* reported - // partition contributed an InList array AND the cross-partition - // deduplicated union has at most `inlist_max_distinct_values` - // entries. The merged InList already covers the union of every - // partition's build-side keys, so when present it fully replaces - // `multi_hash_lookup` — no need to AND a redundant probe on top. + // The merged InList covers the union of every partition's + // build-side keys, so when it fires it stands alone — there is no + // need to also AND a `multi_hash_lookup` (which would just probe + // the same data via a different structure). let membership_expr = if let Some(merged) = self.try_build_merged_inlist(real_partitions)? { Some(merged) @@ -814,14 +804,13 @@ impl SharedBuildAccumulator { /// If every reported partition contributed an InList array, concatenate /// them, deduplicate by scalar value, and gate on the /// `inlist_max_distinct_values` cap. Returns the merged - /// `(struct(...))? IN (SET) ([…])` predicate built over the *deduplicated* - /// keys when the cap is satisfied; `None` otherwise. + /// `(struct(...))? IN (SET) ([…])` predicate built over the + /// deduplicated keys when the cap is satisfied; `None` otherwise. /// - /// Per-partition arrays carry duplicates (the build side never dedups - /// before shipping), so each partition's `arr.len()` is an upper bound on - /// its distinct count. We start with a cheap pre-check (sum of lengths ≤ - /// some fast-reject limit) before the dedup walk to keep the cost - /// proportional to actual partition sizes. + /// Per-partition arrays carry duplicates — each partition ships its raw + /// build-side join keys, dedup happens here. The dedup walk early-aborts + /// the moment we cross the cap, so the cost stays bounded by + /// `O(rows-until-cap+1-distinct-found)` rather than total input size. fn try_build_merged_inlist( &self, real_partitions: &[&PartitionData],