diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap index a49d7c9755f6..62f864b3adb6 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap @@ -16,7 +16,7 @@ exit_code: 1 [CLI_VERSION] Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes caused by -Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: +Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as: Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB. Error: Failed to allocate diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap index 7416f5cf6bc5..9845d095c918 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap @@ -14,7 +14,7 @@ exit_code: 1 [CLI_VERSION] Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes caused by -Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: +Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as: Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB. diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 7ee85ea1be23..89bc48b1e634 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -85,7 +85,8 @@ async fn group_by_none() { TestCase::new() .with_query("select median(request_bytes) from t") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n AggregateStream" + "Resources exhausted: Additional allocation failed", + "with top memory consumers (across reservations) as:\n AggregateStream", ]) .with_memory_limit(2_000) .run() @@ -97,7 +98,7 @@ async fn group_by_row_hash() { TestCase::new() .with_query("select count(*) from t GROUP BY response_bytes") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n GroupedHashAggregateStream" + "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream" ]) .with_memory_limit(2_000) .run() @@ -110,7 +111,7 @@ async fn group_by_hash() { // group by dict column .with_query("select count(*) from t GROUP BY service, host, pod, container") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n GroupedHashAggregateStream" + "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream" ]) .with_memory_limit(1_000) .run() @@ -123,7 +124,8 @@ async fn join_by_key_multiple_partitions() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput", + "Resources exhausted: Additional allocation failed", + "with top memory consumers (across reservations) as:\n HashJoinInput", ]) .with_memory_limit(1_000) .with_config(config) @@ -137,7 +139,8 @@ async fn join_by_key_single_partition() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput", + "Resources exhausted: Additional allocation failed", + "with top memory consumers (across reservations) as:\n HashJoinInput", ]) .with_memory_limit(1_000) .with_config(config) @@ -150,7 +153,7 @@ async fn join_by_expression() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]", + "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]", ]) .with_memory_limit(1_000) .run() @@ -162,7 +165,8 @@ async fn cross_join() { TestCase::new() .with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n CrossJoinExec", + "Resources exhausted: Additional allocation failed", + "with top memory consumers (across reservations) as:\n CrossJoinExec", ]) .with_memory_limit(1_000) .run() @@ -218,7 +222,7 @@ async fn symmetric_hash_join() { "select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time", ) .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n SymmetricHashJoinStream", + "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SymmetricHashJoinStream", ]) .with_memory_limit(1_000) .with_scenario(Scenario::AccessLogStreaming) @@ -236,7 +240,7 @@ async fn sort_preserving_merge() { // so only a merge is needed .with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n SortPreservingMergeExec", + "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SortPreservingMergeExec", ]) // provide insufficient memory to merge .with_memory_limit(partition_size / 2) @@ -315,7 +319,8 @@ async fn sort_spill_reservation() { test.clone() .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:", + "Resources exhausted: Additional allocation failed", + "with top memory consumers (across reservations) as:", "B for ExternalSorterMerge", ]) .with_config(config) @@ -345,7 +350,8 @@ async fn oom_recursive_cte() { SELECT * FROM nodes;", ) .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n RecursiveQuery", + "Resources exhausted: Additional allocation failed", + "with top memory consumers (across reservations) as:\n RecursiveQuery", ]) .with_memory_limit(2_000) .run() @@ -397,7 +403,7 @@ async fn oom_with_tracked_consumer_pool() { .with_expected_errors(vec![ "Failed to allocate additional", "for ParquetSink(ArrowColumnWriter)", - "Additional allocation failed with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)" + "Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)" ]) .with_memory_pool(Arc::new( TrackConsumersPool::new( diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index da456b7071f7..306df3defdbb 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -463,6 +463,7 @@ impl MemoryPool for TrackConsumersPool { // wrap OOM message in top consumers DataFusionError::ResourcesExhausted( provide_top_memory_consumers_to_error_msg( + &reservation.consumer().name, e, self.report_top(self.top.into()), ), @@ -490,10 +491,11 @@ impl MemoryPool for TrackConsumersPool { } fn provide_top_memory_consumers_to_error_msg( + consumer_name: &str, error_msg: String, top_consumers: String, ) -> String { - format!("Additional allocation failed with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}") + format!("Additional allocation failed for {consumer_name} with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}") } #[cfg(test)] @@ -619,7 +621,7 @@ mod tests { assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Resources exhausted: Additional allocation failed for r5 with top memory consumers (across reservations) as: r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B, r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B, r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B. @@ -644,7 +646,7 @@ mod tests { assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as: foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B. Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool "); @@ -661,7 +663,7 @@ mod tests { assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as: foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B, foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B. Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool @@ -674,7 +676,7 @@ mod tests { assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as: foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B, foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool @@ -689,7 +691,7 @@ mod tests { assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as: foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B, foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B, foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B. @@ -713,7 +715,7 @@ mod tests { assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); allow_duplicates!(assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B, r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool @@ -726,7 +728,7 @@ mod tests { assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); allow_duplicates!(assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool ")); @@ -737,7 +739,7 @@ mod tests { assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); allow_duplicates!(assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool ")); @@ -748,7 +750,7 @@ mod tests { assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); allow_duplicates!(assert_snapshot!(error, @r" - Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as: r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool ")); diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 864def5e916f..949c4e784bc3 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -895,7 +895,7 @@ mod tests { assert_contains!( err.to_string(), - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n CrossJoinExec" + "Resources exhausted: Additional allocation failed for CrossJoinExec with top memory consumers (across reservations) as:\n CrossJoinExec" ); Ok(()) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index fd3962c6ae73..4c293b0498e7 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -4300,7 +4300,7 @@ mod tests { // Asserting that operator-level reservation attempting to overallocate assert_contains!( err.to_string(), - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput" + "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n HashJoinInput" ); assert_contains!( @@ -4381,7 +4381,7 @@ mod tests { // Asserting that stream-level reservation attempting to overallocate assert_contains!( err.to_string(), - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput[1]" + "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]" ); diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 6cd39e5a40c1..0974b3a9114e 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -2475,7 +2475,7 @@ pub(crate) mod tests { assert_contains!( err.to_string(), - "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]" + "Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[0] with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]" ); }