Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ exit_code: 1
[CLI_VERSION]
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
Consumer(can spill: bool) consumed XB, peak XB,
Consumer(can spill: bool) consumed XB, peak XB.
Error: Failed to allocate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ exit_code: 1
[CLI_VERSION]
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
Consumer(can spill: bool) consumed XB, peak XB,
Consumer(can spill: bool) consumed XB, peak XB,
Consumer(can spill: bool) consumed XB, peak XB.
Expand Down
30 changes: 18 additions & 12 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ async fn group_by_none() {
TestCase::new()
.with_query("select median(request_bytes) from t")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n AggregateStream"
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n AggregateStream",
])
.with_memory_limit(2_000)
.run()
Expand All @@ -97,7 +98,7 @@ async fn group_by_row_hash() {
TestCase::new()
.with_query("select count(*) from t GROUP BY response_bytes")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
])
.with_memory_limit(2_000)
.run()
Expand All @@ -110,7 +111,7 @@ async fn group_by_hash() {
// group by dict column
.with_query("select count(*) from t GROUP BY service, host, pod, container")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
])
.with_memory_limit(1_000)
.run()
Expand All @@ -123,7 +124,8 @@ async fn join_by_key_multiple_partitions() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput",
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n HashJoinInput",
])
.with_memory_limit(1_000)
.with_config(config)
Expand All @@ -137,7 +139,8 @@ async fn join_by_key_single_partition() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput",
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n HashJoinInput",
])
.with_memory_limit(1_000)
.with_config(config)
Expand All @@ -150,7 +153,7 @@ async fn join_by_expression() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]",
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]",
])
.with_memory_limit(1_000)
.run()
Expand All @@ -162,7 +165,8 @@ async fn cross_join() {
TestCase::new()
.with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n CrossJoinExec",
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n CrossJoinExec",
])
.with_memory_limit(1_000)
.run()
Expand Down Expand Up @@ -218,7 +222,7 @@ async fn symmetric_hash_join() {
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n SymmetricHashJoinStream",
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SymmetricHashJoinStream",
])
.with_memory_limit(1_000)
.with_scenario(Scenario::AccessLogStreaming)
Expand All @@ -236,7 +240,7 @@ async fn sort_preserving_merge() {
// so only a merge is needed
.with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n SortPreservingMergeExec",
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SortPreservingMergeExec",
])
// provide insufficient memory to merge
.with_memory_limit(partition_size / 2)
Expand Down Expand Up @@ -315,7 +319,8 @@ async fn sort_spill_reservation() {

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:",
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:",
"B for ExternalSorterMerge",
])
.with_config(config)
Expand Down Expand Up @@ -345,7 +350,8 @@ async fn oom_recursive_cte() {
SELECT * FROM nodes;",
)
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n RecursiveQuery",
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n RecursiveQuery",
])
.with_memory_limit(2_000)
.run()
Expand Down Expand Up @@ -397,7 +403,7 @@ async fn oom_with_tracked_consumer_pool() {
.with_expected_errors(vec![
"Failed to allocate additional",
"for ParquetSink(ArrowColumnWriter)",
"Additional allocation failed with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)"
"Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)"
])
.with_memory_pool(Arc::new(
TrackConsumersPool::new(
Expand Down
22 changes: 12 additions & 10 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
// wrap OOM message in top consumers
DataFusionError::ResourcesExhausted(
provide_top_memory_consumers_to_error_msg(
&reservation.consumer().name,
e,
self.report_top(self.top.into()),
),
Expand Down Expand Up @@ -490,10 +491,11 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
}

fn provide_top_memory_consumers_to_error_msg(
consumer_name: &str,
error_msg: String,
top_consumers: String,
) -> String {
format!("Additional allocation failed with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}")
format!("Additional allocation failed for {consumer_name} with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}")
}

#[cfg(test)]
Expand Down Expand Up @@ -619,7 +621,7 @@ mod tests {
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for r5 with top memory consumers (across reservations) as:
r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
Expand All @@ -644,7 +646,7 @@ mod tests {
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
");
Expand All @@ -661,7 +663,7 @@ mod tests {
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
Expand All @@ -674,7 +676,7 @@ mod tests {
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
Expand All @@ -689,7 +691,7 @@ mod tests {
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
Expand All @@ -713,7 +715,7 @@ mod tests {
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
Expand All @@ -726,7 +728,7 @@ mod tests {
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
"));
Expand All @@ -737,7 +739,7 @@ mod tests {
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
"));
Expand All @@ -748,7 +750,7 @@ mod tests {
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
"));
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ mod tests {

assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n CrossJoinExec"
"Resources exhausted: Additional allocation failed for CrossJoinExec with top memory consumers (across reservations) as:\n CrossJoinExec"
);

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4300,7 +4300,7 @@ mod tests {
// Asserting that operator-level reservation attempting to overallocate
assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput"
"Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n HashJoinInput"
);

assert_contains!(
Expand Down Expand Up @@ -4381,7 +4381,7 @@ mod tests {
// Asserting that stream-level reservation attempting to overallocate
assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n HashJoinInput[1]"
"Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]"

);

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2475,7 +2475,7 @@ pub(crate) mod tests {

assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]"
"Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[0] with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]"
);
}

Expand Down
Loading