Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ exit_code: 1
[CLI_VERSION]
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for ExternalSorter[partition=0] with top memory consumers (across reservations) as:
Consumer(can spill: bool) consumed XB, peak XB,
Consumer(can spill: bool) consumed XB, peak XB.
Error: Failed to allocate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ exit_code: 1
[CLI_VERSION]
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for ExternalSorter[partition=0] with top memory consumers (across reservations) as:
Consumer(can spill: bool) consumed XB, peak XB,
Consumer(can spill: bool) consumed XB, peak XB,
Consumer(can spill: bool) consumed XB, peak XB.
Expand Down
129 changes: 100 additions & 29 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ async fn group_by_none() {
TestCase::new()
.with_query("select median(request_bytes) from t")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n AggregateStream",
"Resources exhausted: Additional allocation failed for AggregateStream[partition=0] with top memory consumers (across reservations) as:
AggregateStream[partition=0]#ID(can spill: false) consumed x B, peak x B,
AggregateStream[partition=0]#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x B for AggregateStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(2_000)
.run()
Expand All @@ -98,7 +100,10 @@ async fn group_by_row_hash() {
TestCase::new()
.with_query("select count(*) from t GROUP BY response_bytes")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
"Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1)) with top memory consumers (across reservations) as:
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B,
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B.
Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1)) with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(2_000)
.run()
Expand All @@ -111,7 +116,10 @@ async fn group_by_hash() {
// group by dict column
.with_query("select count(*) from t GROUP BY service, host, pod, container")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
"Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1)) with top memory consumers (across reservations) as:
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B,
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B.
Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1)) with x B already allocated for this reservation - x B remain available for the total pool"
])
.with_memory_limit(1_000)
.run()
Expand All @@ -124,8 +132,9 @@ async fn join_by_key_multiple_partitions() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n HashJoinInput",
"Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:
HashJoinInput#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for HashJoinInput with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.with_config(config)
Expand All @@ -139,8 +148,9 @@ async fn join_by_key_single_partition() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n HashJoinInput",
"Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:
HashJoinInput#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for HashJoinInput with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.with_config(config)
Expand All @@ -153,7 +163,9 @@ async fn join_by_expression() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]",
"Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[partition=0] with top memory consumers (across reservations) as:
NestedLoopJoinLoad[partition=0]#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for NestedLoopJoinLoad[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.run()
Expand All @@ -165,8 +177,9 @@ async fn cross_join() {
TestCase::new()
.with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n CrossJoinExec",
"Resources exhausted: Additional allocation failed for CrossJoinExec with top memory consumers (across reservations) as:
CrossJoinExec#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for CrossJoinExec with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.run()
Expand All @@ -185,9 +198,9 @@ async fn sort_merge_join_no_spill() {
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_expected_errors(vec![
"Failed to allocate additional",
"SMJStream",
"Disk spilling disabled",
"Execution error: Additional allocation failed for SMJStream[partition=0] with top memory consumers (across reservations) as:
SMJStream[partition=0]#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for SMJStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.with_config(config)
Expand Down Expand Up @@ -222,7 +235,9 @@ async fn symmetric_hash_join() {
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SymmetricHashJoinStream",
"Resources exhausted: Additional allocation failed for SymmetricHashJoinStream[partition=0] with top memory consumers (across reservations) as:
SymmetricHashJoinStream[partition=0]#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for SymmetricHashJoinStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.with_scenario(Scenario::AccessLogStreaming)
Expand All @@ -240,7 +255,9 @@ async fn sort_preserving_merge() {
// so only a merge is needed
.with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SortPreservingMergeExec",
"Resources exhausted: Additional allocation failed for SortPreservingMergeExec[partition=0] with top memory consumers (across reservations) as:
SortPreservingMergeExec[partition=0]#ID(can spill: false) consumed x KB, peak x KB.
Error: Failed to allocate additional x KB for SortPreservingMergeExec[partition=0] with x KB already allocated for this reservation - x B remain available for the total pool",
])
// provide insufficient memory to merge
.with_memory_limit(partition_size / 2)
Expand Down Expand Up @@ -319,9 +336,10 @@ async fn sort_spill_reservation() {

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:",
"B for ExternalSorterMerge",
"Resources exhausted: Additional allocation failed for ExternalSorterMerge[partition=0] with top memory consumers (across reservations) as:
ExternalSorter[partition=0]#ID(can spill: true) consumed x KB, peak x KB,
ExternalSorterMerge[partition=0]#ID(can spill: false) consumed x KB, peak x KB.
Error: Failed to allocate additional x KB for ExternalSorterMerge[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_config(config)
.run()
Expand Down Expand Up @@ -350,8 +368,9 @@ async fn oom_recursive_cte() {
SELECT * FROM nodes;",
)
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n RecursiveQuery",
"Resources exhausted: Additional allocation failed for RecursiveQuery with top memory consumers (across reservations) as:
RecursiveQuery#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for RecursiveQuery with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(2_000)
.run()
Expand All @@ -374,8 +393,38 @@ async fn oom_parquet_sink() {
path.to_string_lossy()
))
.with_expected_errors(vec![
"Failed to allocate additional",
"for ParquetSink(ArrowColumnWriter)",
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=1)) with top memory consumers (across reservations) as:
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
,
ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
,
ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
,
ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
,
ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
.
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=1)) with x KB already allocated for this reservation - x KB remain available for the total pool",
Comment on lines +396 to +427
Copy link
Contributor Author

@wiedld wiedld Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of using the parent/child relationship to build a trace of consumers.

Currently, this approach is limited to the current way that memory reservations work. Meaning, the parent's bytes (consumed & peak) do NOT include cummulative from all the children. If this is desired, we can make this change at report generation time using the snapshot ReportedConsumer (to not hold the lock).

])
.with_memory_limit(200_000)
.run()
Expand All @@ -401,9 +450,14 @@ async fn oom_with_tracked_consumer_pool() {
path.to_string_lossy()
))
.with_expected_errors(vec![
"Failed to allocate additional",
"for ParquetSink(ArrowColumnWriter)",
"Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)"
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=2)) with top memory consumers (across reservations) as:
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB:
stack backtrace:
0: ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
.
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=2)) with x KB already allocated for this reservation - x KB remain available for the total pool"
])
.with_memory_pool(Arc::new(
TrackConsumersPool::new(
Expand All @@ -420,8 +474,10 @@ async fn oom_grouped_hash_aggregate() {
TestCase::new()
.with_query("SELECT COUNT(*), SUM(request_bytes) FROM t GROUP BY host")
.with_expected_errors(vec![
"Failed to allocate additional",
"GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))",
"Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes)) with top memory consumers (across reservations) as:
GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))#ID(can spill: true) consumed x B, peak x B,
GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))#ID(can spill: true) consumed x B, peak x B.
Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes)) with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.run()
Expand Down Expand Up @@ -885,15 +941,30 @@ impl TestCase {
"Unexpected failure when running, expected success but got: {e}"
)
} else {
let err_msg = normalize_oom_errors(&e.to_string());
for error_substring in expected_errors {
assert_contains!(e.to_string(), error_substring);
assert_contains!(&err_msg, error_substring);
}
}
}
}
}
}

fn normalize_oom_errors(err: &str) -> String {
let re = regex::Regex::new(r"\#\d+\(can spill:").unwrap();
let mut err = re.replace_all(err, "#ID(can spill:").to_string();

let re = regex::Regex::new(r"\d+\.\d+ KB").unwrap();
err = re.replace_all(&err, "x KB").to_string();

let re = regex::Regex::new(r"\d+\.\d+ MB").unwrap();
err = re.replace_all(&err, "x MB").to_string();

let re = regex::Regex::new(r"\d+\.\d+ B").unwrap();
re.replace_all(&err, "x B").to_string()
}

/// 50 byte memory limit
const MEMORY_FRACTION: f64 = 0.95;

Expand Down
Loading