Skip to content
174 changes: 85 additions & 89 deletions datafusion/core/tests/dataframe/mod.rs

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,10 @@ impl JoinFuzzTestCase {
if join_tests.contains(&NljHj) {
let err_msg_rowcnt = format!("NestedLoopJoinExec and HashJoinExec produced different row counts, batch_size: {batch_size}");
assert_eq!(nlj_rows, hj_rows, "{}", err_msg_rowcnt.as_str());
if nlj_rows == 0 && hj_rows == 0 {
// both joins returned no rows, skip content comparison
continue;
}

let err_msg_contents = format!("NestedLoopJoinExec and HashJoinExec produced different results, batch_size: {batch_size}");
// row level compare if any of joins returns the result
Expand Down
15 changes: 7 additions & 8 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,14 +812,13 @@ async fn test_physical_plan_display_indent_multi_children() {
assert_snapshot!(
actual,
@r"
CoalesceBatchesExec: target_batch_size=4096
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1@0 as c2], file_type=csv, has_header=true
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1@0 as c2], file_type=csv, has_header=true
"
);
}
Expand Down
21 changes: 10 additions & 11 deletions datafusion/physical-optimizer/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_common::{assert_eq_or_internal_err, DataFusionError};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_plan::{
async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec,
joins::HashJoinExec, repartition::RepartitionExec, ExecutionPlan,
repartition::RepartitionExec, ExecutionPlan,
};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
Expand Down Expand Up @@ -57,17 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches {
let target_batch_size = config.execution.batch_size;
plan.transform_up(|plan| {
let plan_any = plan.as_any();
let wrap_in_coalesce = plan_any.downcast_ref::<HashJoinExec>().is_some()
let wrap_in_coalesce = plan_any
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list is getting smaller and smaller

// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
|| plan_any
.downcast_ref::<RepartitionExec>()
.map(|repart_exec| {
!matches!(
repart_exec.partitioning().clone(),
Partitioning::RoundRobinBatch(_)
)
})
.unwrap_or(false);
.downcast_ref::<RepartitionExec>()
.map(|repart_exec| {
!matches!(
repart_exec.partitioning().clone(),
Partitioning::RoundRobinBatch(_)
)
})
.unwrap_or(false);

if wrap_in_coalesce {
Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
Expand Down
69 changes: 57 additions & 12 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1936,7 +1936,12 @@ mod tests {
div_ceil(9, batch_size)
};

assert_eq!(batches.len(), expected_batch_count);
// With batch coalescing, we may have fewer batches than expected
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);

// Inner join output is expected to preserve both inputs order
allow_duplicates! {
Expand Down Expand Up @@ -2016,7 +2021,12 @@ mod tests {
div_ceil(9, batch_size)
};

assert_eq!(batches.len(), expected_batch_count);
// With batch coalescing, we may have fewer batches than expected
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);

// Inner join output is expected to preserve both inputs order
allow_duplicates! {
Expand Down Expand Up @@ -2152,7 +2162,12 @@ mod tests {
// and filtered later.
div_ceil(6, batch_size)
};
assert_eq!(batches.len(), expected_batch_count);
// With batch coalescing, we may have fewer batches than expected
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);

// Inner join output is expected to preserve both inputs order
allow_duplicates! {
Expand All @@ -2177,7 +2192,12 @@ mod tests {
// and filtered later.
div_ceil(3, batch_size)
};
assert_eq!(batches.len(), expected_batch_count);
// With batch coalescing, we may have fewer batches than expected
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} batches, got {}",
batches.len()
);

// Inner join output is expected to preserve both inputs order
allow_duplicates! {
Expand Down Expand Up @@ -3467,6 +3487,8 @@ mod tests {
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;

let mut probe_indices_buffer = Vec::new();
let mut build_indices_buffer = Vec::new();
let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&[left_keys_values],
Expand All @@ -3475,6 +3497,8 @@ mod tests {
&hashes_buffer,
8192,
(0, None),
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;

let left_ids: UInt64Array = vec![0, 1].into();
Expand Down Expand Up @@ -3524,6 +3548,8 @@ mod tests {
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?;

let mut probe_indices_buffer = Vec::new();
let mut build_indices_buffer = Vec::new();
let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&[left_keys_values],
Expand All @@ -3532,6 +3558,8 @@ mod tests {
&hashes_buffer,
8192,
(0, None),
&mut probe_indices_buffer,
&mut build_indices_buffer,
)?;

// We still expect to match rows 0 and 1 on both sides
Expand Down Expand Up @@ -4197,10 +4225,11 @@ mod tests {
}
_ => div_ceil(expected_resultset_records, batch_size) + 1,
};
assert_eq!(
batches.len(),
expected_batch_count,
"expected {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}"
// With batch coalescing, we may have fewer batches than expected
assert!(
batches.len() <= expected_batch_count,
"expected at most {expected_batch_count} output batches for {join_type} join with batch_size = {batch_size}, got {}",
batches.len()
);

let expected = match join_type {
Expand All @@ -4210,7 +4239,17 @@ mod tests {
JoinType::LeftAnti => left_empty.to_vec(),
_ => common_result.to_vec(),
};
assert_batches_eq!(expected, &batches);
// For anti joins with empty results, we may get zero batches
// (with coalescing) instead of one empty batch with schema
if batches.is_empty() {
// Verify this is an expected empty result case
assert!(
matches!(join_type, JoinType::RightAnti | JoinType::LeftAnti),
"Unexpected empty result for {join_type} join"
);
} else {
assert_batches_eq!(expected, &batches);
}
}
}
}
Expand Down Expand Up @@ -4473,9 +4512,15 @@ mod tests {

assert_join_metrics!(metrics, 0);

let expected_null_neq =
["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
assert_batches_eq!(expected_null_neq, &batches_null_neq);
// With batch coalescing, empty results may not emit any batches
// Check that either we have no batches, or an empty batch with proper schema
if batches_null_neq.is_empty() {
// This is fine - no output rows
} else {
let expected_null_neq =
["+----+----+", "| n1 | n2 |", "+----+----+", "+----+----+"];
assert_batches_eq!(expected_null_neq, &batches_null_neq);
}

Ok(())
}
Expand Down
Loading