Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,39 @@ config_namespace! {
///
/// Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct
/// partitions is less than the target_partitions.
///
/// Note for partitioned hash join dynamic filtering:
/// preserving file partitions can allow partition-index routing (`i -> i`) instead of
/// CASE-hash routing, but this assumes build/probe partition indices stay aligned for
/// partition hash join / dynamic filter consumers.
///
/// Misaligned Partitioned Hash Join Example:
/// ```text
/// ┌───────────────────────────┐
/// │ HashJoinExec │
/// │ mode=Partitioned │
/// │┌───────┐┌───────┐┌───────┐│
/// ││ Hash ││ Hash ││ Hash ││
/// ││Table 1││Table 2││Table 2││
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// ││Table 1││Table 2││Table 2││
/// ││Table 1││Table 2││Table 3││

/// ││ ││ ││ ││
/// ││ key=A ││ key=B ││ key=C ││
/// │└───▲───┘└───▲───┘└───▲───┘│
/// └────┴────────┼────────┼────┘
/// ... Misaligned! Misaligned!
/// │ │
/// ... ┌───────┼────────┴───────────────┐
/// ┌────────┼───────┴───────────────┐ │
/// │ │ │ │ │ │
///┌────┴────────┴────────┴────┐ ┌───┴─────────┴────────┴────┐
///│ DataSourceExec │ │ DataSourceExec │
///│┌───────┐┌───────┐┌───────┐│ │┌───────┐┌───────┐┌───────┐│
///││ File ││ File ││ File ││ ││ File ││ File ││ File ││
///││Group 1││Group 2││Group 3││ ││Group 1││Group 2││Group 3││
///││ ││ ││ ││ ││ ││ ││ ││
///││ key=A ││ key=B ││ key=C ││ ││ key=A ││ key=C ││ key=B ││
///│└───────┘└───────┘└───────┘│ │└───────┘└───────┘└───────┘│
///└───────────────────────────┘ └───────────────────────────┘
///```
pub preserve_file_partitions: usize, default = 0

/// Should DataFusion repartition data using the partitions keys to execute window
Expand Down
272 changes: 266 additions & 6 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{CsvSource, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::ScalarValue;
use datafusion_common::config::CsvOptions;
use datafusion_common::error::Result;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{NullEquality, ScalarValue};
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_expr::{JoinType, Operator};
Expand All @@ -61,14 +61,18 @@ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::ExecutionPlan;
use datafusion_physical_plan::expressions::col;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::joins::utils::JoinOn;
use datafusion_physical_plan::joins::{
DynamicFilterRoutingMode, HashJoinExec, HashJoinExecBuilder, PartitionMode,
utils::JoinOn,
};
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, Statistics,
displayable,
DisplayAs, DisplayFormatType, ExecutionPlanProperties, Partitioning, PlanProperties,
Statistics, displayable,
};
use insta::Settings;

Expand Down Expand Up @@ -366,6 +370,62 @@ fn hash_join_exec(
.unwrap()
}

fn partitioned_hash_join_exec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn partitioned_hash_join_exec(
// Build a partitioned hash join for 2 given inputs
fn build_partitioned_hash_join(

left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
join_on: &JoinOn,
join_type: &JoinType,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
HashJoinExecBuilder::new(left, right, join_on.clone(), *join_type)
.with_partition_mode(PartitionMode::Partitioned)
.with_null_equality(NullEquality::NullEqualsNothing)
.build()
.unwrap(),
)
}

fn first_hash_join_and_direct_hash_repartition_children(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this comment sound correct? Fix it as you see fit

Suggested change
fn first_hash_join_and_direct_hash_repartition_children(
// Traversing down the plan and returning the first hash join with direct repartition children
fn first_hash_join_and_direct_hash_repartition_children(

plan: &Arc<dyn ExecutionPlan>,
) -> Option<(&HashJoinExec, usize)> {
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
let direct_hash_repartition_children = hash_join
.children()
.into_iter()
.filter(|child| {
child
.as_any()
.downcast_ref::<RepartitionExec>()
.is_some_and(|repartition| {
matches!(repartition.partitioning(), Partitioning::Hash(_, _))
})
})
.count();
return Some((hash_join, direct_hash_repartition_children));
}

for child in plan.children() {
if let Some(result) = first_hash_join_and_direct_hash_repartition_children(child)
{
return Some(result);
}
}
None
}

fn hash_repartition_on_column(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this function is to add RepartitionExec on top of the input plan. How about rename it to:

Suggested change
fn hash_repartition_on_column(
// Add RepartitionExec for the given input
fn add_repartition(

input: Arc<dyn ExecutionPlan>,
column_name: &str,
partition_count: usize,
) -> Arc<dyn ExecutionPlan> {
let expr = Arc::new(Column::new_with_schema(column_name, &input.schema()).unwrap())
as Arc<dyn PhysicalExpr>;
Arc::new(
RepartitionExec::try_new(input, Partitioning::Hash(vec![expr], partition_count))
.unwrap(),
)
}

fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let predicate = Arc::new(BinaryExpr::new(
col("c", &schema()).unwrap(),
Expand Down Expand Up @@ -405,6 +465,23 @@ fn ensure_distribution_helper(
ensure_distribution(distribution_context, &config).map(|item| item.data.plan)
}

/// Like [`ensure_distribution_helper`] but uses bottom-up `transform_up`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be useful to also explain what will be transformed. Add example if it is easier to explain

fn ensure_distribution_helper_transform_up(
plan: Arc<dyn ExecutionPlan>,
target_partitions: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
let distribution_context = DistributionContext::new_default(plan);
let mut config = ConfigOptions::new();
config.execution.target_partitions = target_partitions;
config.optimizer.enable_round_robin_repartition = false;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will be clearer if you add comments explaining why you need these settings and for which tests.

distribution_context
.transform_up(|node| ensure_distribution(node, &config))
.map(|item| item.data.plan)
}

fn test_suite_default_config_options() -> ConfigOptions {
let mut config = ConfigOptions::new();

Expand Down Expand Up @@ -737,6 +814,182 @@ fn multi_hash_joins() -> Result<()> {
Ok(())
}

#[test]
fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
// Verify that if the join’s inputs are not explicitly direct/indirect hash repartitioned,
// its `dynamic_filter_routing_mode` must be `DynamicFilterRoutingMode::PartitionIndex`.
fn enforce_distribution_switches_to_partition_index_without_hash_repartition()

-> Result<()> {
let left = parquet_exec();
let right = parquet_exec();

let join_on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
)];

let join = partitioned_hash_join_exec(left, right, &join_on, &JoinType::Inner);

let optimized = ensure_distribution_helper_transform_up(join, 1)?;
assert_plan!(optimized, @r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");
let (hash_join, direct_hash_repartition_children) =
first_hash_join_and_direct_hash_repartition_children(&optimized)
.expect("expected HashJoinExec");

assert_eq!(
hash_join.dynamic_filter_routing_mode,
DynamicFilterRoutingMode::PartitionIndex,
);
assert_eq!(direct_hash_repartition_children, 0);

Ok(())
}

#[test]
fn enforce_distribution_disables_dynamic_filtering_for_misaligned_partitioning()
-> Result<()> {
let left = parquet_exec_multiple();
let right = parquet_exec();

let join_on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
)];

// One side starts with multiple partitions while target is 1. EnforceDistribution inserts a
// hash repartition on the left child. The partitioning schemes are now misaligned:
// - Left: hash-repartitioned (repartitioned=true)
// - Right: file-grouped (repartitioned=false)
// This is a correctness bug, so we expect an error.
Comment on lines +864 to +868
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have the other way around as well? having a Join of type Partitioned and the left perserving file parttioning and the right having RepartitionExec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand this test but it will be clearer if you can explain the flow in an example.
Maybe add comments right before the test name using example to describe the query plan and why EnforceDistribution insert hash repartition there. And then when the error is caught and thrown

let join = partitioned_hash_join_exec(left, right, &join_on, &JoinType::Inner);

let result = ensure_distribution_helper_transform_up(join, 1);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.to_string()
.contains("incompatible partitioning schemes"),
"Expected error about incompatible partitioning, got: {err}",
);

Ok(())
}

#[test]
fn enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate()
// Verify that if the join inputs are direct/indirect hash repartitioned,
// its `dynamic_filter_routing_mode` must be `DynamicFilterRoutingMode::CaseHash`
fn enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate()

-> Result<()> {
let left = projection_exec_with_alias(
hash_repartition_on_column(parquet_exec(), "a", 4),
vec![("a".to_string(), "a".to_string())],
);

let right = aggregate_exec_with_alias(
hash_repartition_on_column(parquet_exec(), "a", 4),
vec![("a".to_string(), "a".to_string())],
);

let join_on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
)];

// Both sides are already hash repartitioned, but the hash repartitions are below other
// operators not directly under the join. EnforceDistribution should detect both sides are
// repartitioned and set CaseHash routing mode.
let join = partitioned_hash_join_exec(left, right, &join_on, &JoinType::Inner);

let optimized = ensure_distribution_helper_transform_up(join, 4)?;
assert_plan!(optimized, @r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
ProjectionExec: expr=[a@0 as a]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");
let (hash_join, direct_hash_repartition_children) =
first_hash_join_and_direct_hash_repartition_children(&optimized)
.expect("expected HashJoinExec");

assert_eq!(
hash_join.dynamic_filter_routing_mode,
DynamicFilterRoutingMode::CaseHash,
);
assert_eq!(direct_hash_repartition_children, 1);

Ok(())
}

#[test]
fn enforce_distribution_ignores_hash_repartition_off_dynamic_filter_path() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn enforce_distribution_ignores_hash_repartition_off_dynamic_filter_path() -> Result<()> {
// Verify dynamic_filter_routing_mode works correctly with alias
fn enforce_distribution_ignores_hash_repartition_off_dynamic_filter_path() -> Result<()> {

// This hash repartition is in the probe subtree but off the dynamic filter pushdown path
// because the top filter references `a` while this branch only exposes `a2`.
Comment on lines +934 to +935
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This hash repartition is in the probe subtree but off the dynamic filter pushdown path
// because the top filter references `a` while this branch only exposes `a2`.
// Create alias `a2` for column `a`

let lower_left = projection_exec_with_alias(
hash_repartition_on_column(parquet_exec(), "a", 4),
vec![("a".to_string(), "a2".to_string())],
);
let lower_right: Arc<dyn ExecutionPlan> = parquet_exec();

let lower_join_on = vec![(
Arc::new(Column::new_with_schema("a2", &lower_left.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
Arc::new(Column::new_with_schema("a", &lower_right.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
)];

let lower_join: Arc<dyn ExecutionPlan> = Arc::new(
HashJoinExecBuilder::new(
lower_left.clone(),
lower_right.clone(),
lower_join_on,
JoinType::Inner,
)
.with_partition_mode(PartitionMode::CollectLeft)
.with_null_equality(NullEquality::NullEqualsNothing)
.build()
.unwrap(),
);

let left = parquet_exec();
let join_on = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
Arc::new(Column::new_with_schema("a", &lower_join.schema()).unwrap())
as Arc<dyn PhysicalExpr>,
)];

// EnforceDistribution should detect no repartition on either side and switch to PartitionIndex.
let join = partitioned_hash_join_exec(left, lower_join, &join_on, &JoinType::Inner);

let optimized = ensure_distribution_helper_transform_up(join, 1)?;
assert_plan!(optimized, @r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@1)]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to display if DataSourceExec is perserving partitioning? something like preserve_partitioning=[bool]? this may be useful for users to know why there is no RepartitionExec in the plan even if the mode is Partitioned

HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a2@0, a@0)]
ProjectionExec: expr=[a@0 as a2]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");
let (hash_join, _) = first_hash_join_and_direct_hash_repartition_children(&optimized)
.expect("expected HashJoinExec");

assert_eq!(
hash_join.dynamic_filter_routing_mode,
DynamicFilterRoutingMode::PartitionIndex
);

Ok(())
}

#[test]
fn multi_joins_after_alias() -> Result<()> {
let left = parquet_exec();
Expand Down Expand Up @@ -3647,8 +3900,15 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> {
// Create distribution context
let dist_context = DistributionContext::new(
spm_exec,
true,
vec![DistributionContext::new(parquet_exec, false, vec![])],
DistFlags {
dist_changing: true,
..Default::default()
},
vec![DistributionContext::new(
parquet_exec,
DistFlags::default(),
vec![],
)],
);

// Apply the function
Expand Down
Loading