From d9e1d8581d0435a398bdbd3fa4f41982dcd63c1b Mon Sep 17 00:00:00 2001 From: Xander Bailey Date: Fri, 14 Nov 2025 12:43:16 +0000 Subject: [PATCH 1/7] add test --- .../physical-plan/src/aggregates/row_hash.rs | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 3c6577af4286..f96f87baf977 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -1239,3 +1239,144 @@ impl GroupedHashAggregateStream { Ok(states_batch) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::TestMemoryExec; + use arrow::array::{Int32Array, Int64Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use datafusion_execution::TaskContext; + use datafusion_functions_aggregate::count::count_udaf; + use datafusion_physical_expr::aggregate::AggregateExprBuilder; + use datafusion_physical_expr::expressions::col; + use std::sync::Arc; + + #[tokio::test] + async fn test_double_emission_race_condition_bug() -> Result<()> { + // This test specifically reproduces the double emission race condition + // where emit_early_if_necessary() and switch_to_skip_aggregation() + // both emit in the same loop iteration, causing data loss + + let schema = Arc::new(Schema::new(vec![ + Field::new("group_col", DataType::Int32, false), + Field::new("value_col", DataType::Int64, false), + ])); + + // Create data that will trigger BOTH conditions in the same iteration: + // 1. More groups than batch_size (triggers early emission when memory pressure hits) + // 2. High cardinality ratio (triggers skip aggregation) + let batch_size = 1024; // We'll set this in session config + let num_groups = batch_size + 100; // Slightly more than batch_size (1124 groups) + + // Create exactly 1 row per group = 100% cardinality ratio + let group_ids: Vec = (0..num_groups as i32).collect(); + let values: Vec = vec![1; num_groups]; + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(group_ids)), + Arc::new(Int64Array::from(values)), + ], + )?; + + let input_partitions = vec![vec![batch]]; + + // Create constrained memory to trigger early emission but not completely fail + let runtime = RuntimeEnvBuilder::default() + .with_memory_limit(1024, 1.0) // 100KB - enough to start but will trigger pressure + .build_arc()?; + + let mut task_ctx = TaskContext::default().with_runtime(runtime); + + // Configure to trigger BOTH conditions: + // 1. Low probe threshold (triggers skip probe after few rows) + // 2. Low ratio threshold (triggers skip aggregation immediately) + // 3. Set batch_size to 1024 so our 1124 groups will trigger early emission + // This creates the race condition where both emit paths are triggered + let mut session_config = task_ctx.session_config().clone(); + session_config = session_config.set( + "datafusion.execution.batch_size", + &datafusion_common::ScalarValue::UInt64(Some(1024)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + &datafusion_common::ScalarValue::UInt64(Some(50)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", + &datafusion_common::ScalarValue::Float64(Some(0.8)), + ); + task_ctx = task_ctx.with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); + + // Create aggregate: COUNT(*) GROUP BY group_col + let group_expr = vec![(col("group_col", &schema)?, "group_col".to_string())]; + let aggr_expr = vec![Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![col("value_col", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("count_value") + .build()?, + )]; + + let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?; + let exec = Arc::new(TestMemoryExec::update_cache(Arc::new(exec))); + + // Use Partial mode where the race condition occurs + let aggregate_exec = AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(group_expr), + aggr_expr, + vec![None], + exec, + Arc::clone(&schema), + )?; + + // Execute and collect results + let mut stream = GroupedHashAggregateStream::new(&aggregate_exec, Arc::clone(&task_ctx), 0)?; + let mut results = Vec::new(); + + while let Some(result) = stream.next().await { + let batch = result?; + results.push(batch); + } + + // Count total groups emitted + let mut total_output_groups = 0; + for batch in &results { + total_output_groups += batch.num_rows(); + } + + // With the race condition bug: + // 1. emit_early_if_necessary() emits first batch_size groups (1024) + // 2. switch_to_skip_aggregation() immediately overwrites with remaining groups (100) + // 3. The 1024 groups from step 1 are LOST! + // 4. Only 100 groups are returned instead of 1124 + + println!( + "Double emission race condition test: Expected {} groups, got {} groups", + num_groups, total_output_groups + ); + + if total_output_groups < num_groups / 2 { + println!( + "🐛 BUG REPRODUCED! Lost {} groups ({:.1}% loss) - this indicates the double emission race condition", + num_groups - total_output_groups, + (1.0 - total_output_groups as f64 / num_groups as f64) * 100.0 + ); + } + + // This test documents the expected behavior vs actual buggy behavior + // TODO: Once fixed, this assertion should pass + assert_eq!( + total_output_groups, num_groups, + "Double emission race condition detected! emit_early_if_necessary() result \ + was overwritten by switch_to_skip_aggregation(). Expected {} groups, got {} groups", + num_groups, total_output_groups + ); + + Ok(()) + } +} From 416f19df208ace46ba63161ae9817a804cad7209 Mon Sep 17 00:00:00 2001 From: Xander Bailey Date: Fri, 14 Nov 2025 15:54:57 +0000 Subject: [PATCH 2/7] The fix --- .../physical-plan/src/aggregates/row_hash.rs | 59 ++++++++----------- 1 file changed, 26 insertions(+), 33 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index f96f87baf977..17eb2c1b00db 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -709,9 +709,20 @@ impl Stream for GroupedHashAggregateStream { break 'reading_input; } - self.emit_early_if_necessary()?; - self.switch_to_skip_aggregation()?; + // Check if we should switch to skip aggregation mode + if let Some(new_state) = self.switch_to_skip_aggregation()? { + timer.done(); + self.exec_state = new_state; + break 'reading_input; + } + + // Check if we need to emit early due to memory pressure + if let Some(new_state) = self.emit_early_if_necessary()? { + timer.done(); + self.exec_state = new_state; + break 'reading_input; + } timer.done(); } @@ -1096,7 +1107,9 @@ impl GroupedHashAggregateStream { /// Emit if the used memory exceeds the target for partial aggregation. /// Currently only [`GroupOrdering::None`] is supported for early emitting. /// TODO: support group_ordering for early emitting - fn emit_early_if_necessary(&mut self) -> Result<()> { + /// + /// Returns `Some(ExecutionState)` if the state should be changed, None otherwise. + fn emit_early_if_necessary(&mut self) -> Result> { if self.group_values.len() >= self.batch_size && matches!(self.group_ordering, GroupOrdering::None) && self.update_memory_reservation().is_err() @@ -1104,10 +1117,10 @@ impl GroupedHashAggregateStream { assert_eq!(self.mode, AggregateMode::Partial); let n = self.group_values.len() / self.batch_size * self.batch_size; if let Some(batch) = self.emit(EmitTo::First(n), false)? { - self.exec_state = ExecutionState::ProducingOutput(batch); + return Ok(Some(ExecutionState::ProducingOutput(batch))); }; } - Ok(()) + Ok(None) } /// At this point, all the inputs are read and there are some spills. @@ -1190,16 +1203,18 @@ impl GroupedHashAggregateStream { /// skipped, forces stream to produce currently accumulated output. /// /// Notice: It should only be called in Partial aggregation - fn switch_to_skip_aggregation(&mut self) -> Result<()> { + /// + /// Returns `Some(ExecutionState)` if the state should be changed, None otherwise. + fn switch_to_skip_aggregation(&mut self) -> Result> { if let Some(probe) = self.skip_aggregation_probe.as_mut() { if probe.should_skip() { if let Some(batch) = self.emit(EmitTo::All, false)? { - self.exec_state = ExecutionState::ProducingOutput(batch); + return Ok(Some(ExecutionState::ProducingOutput(batch))); }; } } - Ok(()) + Ok(None) } /// Returns true if the aggregation probe indicates that aggregation @@ -1255,7 +1270,8 @@ mod tests { #[tokio::test] async fn test_double_emission_race_condition_bug() -> Result<()> { - // This test specifically reproduces the double emission race condition + // Fix for https://github.com/apache/datafusion/issues/18701 + // This test specifically proves that we have fixed double emission race condition // where emit_early_if_necessary() and switch_to_skip_aggregation() // both emit in the same loop iteration, causing data loss @@ -1349,32 +1365,9 @@ mod tests { total_output_groups += batch.num_rows(); } - // With the race condition bug: - // 1. emit_early_if_necessary() emits first batch_size groups (1024) - // 2. switch_to_skip_aggregation() immediately overwrites with remaining groups (100) - // 3. The 1024 groups from step 1 are LOST! - // 4. Only 100 groups are returned instead of 1124 - - println!( - "Double emission race condition test: Expected {} groups, got {} groups", - num_groups, total_output_groups - ); - - if total_output_groups < num_groups / 2 { - println!( - "🐛 BUG REPRODUCED! Lost {} groups ({:.1}% loss) - this indicates the double emission race condition", - num_groups - total_output_groups, - (1.0 - total_output_groups as f64 / num_groups as f64) * 100.0 - ); - } - - // This test documents the expected behavior vs actual buggy behavior - // TODO: Once fixed, this assertion should pass assert_eq!( total_output_groups, num_groups, - "Double emission race condition detected! emit_early_if_necessary() result \ - was overwritten by switch_to_skip_aggregation(). Expected {} groups, got {} groups", - num_groups, total_output_groups + "Unexpected number of groups", ); Ok(()) From 7449fa6f1538041a0ecd2882030761250881f7a0 Mon Sep 17 00:00:00 2001 From: Xander Bailey Date: Fri, 14 Nov 2025 15:59:28 +0000 Subject: [PATCH 3/7] fmt --- datafusion/physical-plan/src/aggregates/row_hash.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 17eb2c1b00db..33206f3611a5 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -709,7 +709,6 @@ impl Stream for GroupedHashAggregateStream { break 'reading_input; } - // Check if we should switch to skip aggregation mode if let Some(new_state) = self.switch_to_skip_aggregation()? { timer.done(); @@ -1351,7 +1350,8 @@ mod tests { )?; // Execute and collect results - let mut stream = GroupedHashAggregateStream::new(&aggregate_exec, Arc::clone(&task_ctx), 0)?; + let mut stream = + GroupedHashAggregateStream::new(&aggregate_exec, Arc::clone(&task_ctx), 0)?; let mut results = Vec::new(); while let Some(result) = stream.next().await { From 36df60cce6a10b468ec557be5ed80920f1ac8081 Mon Sep 17 00:00:00 2001 From: Xander Bailey Date: Fri, 14 Nov 2025 16:03:25 +0000 Subject: [PATCH 4/7] add comment --- datafusion/physical-plan/src/aggregates/row_hash.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 33206f3611a5..78a5c2d83be3 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -710,6 +710,8 @@ impl Stream for GroupedHashAggregateStream { } // Check if we should switch to skip aggregation mode + // It's important that we do this before we early emit since we've + // already updated the probe. if let Some(new_state) = self.switch_to_skip_aggregation()? { timer.done(); self.exec_state = new_state; From febdd78f3b4ccee4879b2914b0da401f1899ad66 Mon Sep 17 00:00:00 2001 From: Xander Date: Sat, 15 Nov 2025 15:41:56 +0000 Subject: [PATCH 5/7] move probe update --- datafusion/physical-plan/src/aggregates/row_hash.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 78a5c2d83be3..c5aa5de0de26 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -684,7 +684,6 @@ impl Stream for GroupedHashAggregateStream { // Do the grouping self.group_aggregate_batch(batch)?; - self.update_skip_aggregation_probe(input_rows); // If we can begin emitting rows, do so, // otherwise keep consuming input @@ -712,6 +711,7 @@ impl Stream for GroupedHashAggregateStream { // Check if we should switch to skip aggregation mode // It's important that we do this before we early emit since we've // already updated the probe. + self.update_skip_aggregation_probe(input_rows); if let Some(new_state) = self.switch_to_skip_aggregation()? { timer.done(); self.exec_state = new_state; From b1bde9e54265b777318a8d2f884648a6628cd48d Mon Sep 17 00:00:00 2001 From: Xander Date: Sat, 15 Nov 2025 17:24:42 +0000 Subject: [PATCH 6/7] fmt --- datafusion/physical-plan/src/aggregates/row_hash.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index c5aa5de0de26..4f4a6d281f10 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -684,7 +684,6 @@ impl Stream for GroupedHashAggregateStream { // Do the grouping self.group_aggregate_batch(batch)?; - // If we can begin emitting rows, do so, // otherwise keep consuming input assert!(!self.input_done); From ac72aba32c7952f571175a9cea5e1648eb569da2 Mon Sep 17 00:00:00 2001 From: Xander Date: Sat, 15 Nov 2025 17:33:19 +0000 Subject: [PATCH 7/7] Add protection --- .../physical-plan/src/aggregates/row_hash.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 4f4a6d281f10..f2b7bc0ebc02 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -796,6 +796,15 @@ impl Stream for GroupedHashAggregateStream { } None => { // inner is done, switching to `Done` state + // Sanity check: when switching from SkippingAggregation to Done, + // all groups should have already been emitted + if !self.group_values.is_empty() { + return Poll::Ready(Some(internal_err!( + "Switching from SkippingAggregation to Done with {} groups still in hash table. \ + This is a bug - all groups should have been emitted before skip aggregation started.", + self.group_values.len() + ))); + } self.exec_state = ExecutionState::Done; } } @@ -843,6 +852,13 @@ impl Stream for GroupedHashAggregateStream { } ExecutionState::Done => { + // Sanity check: all groups should have been emitted by now + if !self.group_values.is_empty() { + return Poll::Ready(Some(internal_err!( + "AggregateStream was in Done state with {} groups left in hash table. \ + This is a bug - all groups should have been emitted before entering Done state.", + self.group_values.len()))); + } // release the memory reservation since sending back output batch itself needs // some memory reservation, so make some room for it. self.clear_all();