-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Add integration test for erroring when memory limits are hit #4406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -118,6 +118,7 @@ impl ExternalSorter { | |
| ) -> Result<()> { | ||
| if input.num_rows() > 0 { | ||
| let size = batch_byte_size(&input); | ||
| debug!("Inserting {} rows of {} bytes", input.num_rows(), size); | ||
| self.try_grow(size).await?; | ||
| self.metrics.mem_used().add(size); | ||
| let mut in_mem_batches = self.in_mem_batches.lock().await; | ||
|
|
@@ -272,6 +273,13 @@ impl MemoryConsumer for ExternalSorter { | |
| } | ||
|
|
||
| async fn spill(&self) -> Result<usize> { | ||
| let partition = self.partition_id(); | ||
| let mut in_mem_batches = self.in_mem_batches.lock().await; | ||
| // we could always get a chance to free some memory as long as we are holding some | ||
| if in_mem_batches.len() == 0 { | ||
| return Ok(0); | ||
| } | ||
|
|
||
| debug!( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just moved the debug message down as it was confusing that spill didn't actually happen if |
||
| "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)", | ||
| self.name(), | ||
|
|
@@ -280,13 +288,6 @@ impl MemoryConsumer for ExternalSorter { | |
| self.spill_count() | ||
| ); | ||
|
|
||
| let partition = self.partition_id(); | ||
| let mut in_mem_batches = self.in_mem_batches.lock().await; | ||
| // we could always get a chance to free some memory as long as we are holding some | ||
| if in_mem_batches.len() == 0 { | ||
| return Ok(0); | ||
| } | ||
|
|
||
| let tracking_metrics = self | ||
| .metrics_set | ||
| .new_intermediate_tracking(partition, self.runtime.clone()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,8 +21,7 @@ use arrow::array::{ArrayRef, Int32Array}; | |
| use arrow::compute::SortOptions; | ||
| use arrow::record_batch::RecordBatch; | ||
| use arrow::util::pretty::pretty_format_batches; | ||
| use rand::rngs::StdRng; | ||
| use rand::{Rng, SeedableRng}; | ||
| use rand::Rng; | ||
|
|
||
| use datafusion::physical_plan::collect; | ||
| use datafusion::physical_plan::expressions::Column; | ||
|
|
@@ -31,7 +30,7 @@ use datafusion::physical_plan::memory::MemoryExec; | |
| use datafusion_expr::JoinType; | ||
|
|
||
| use datafusion::prelude::{SessionConfig, SessionContext}; | ||
| use test_utils::add_empty_batches; | ||
| use test_utils::stagger_batch_with_seed; | ||
|
|
||
| #[tokio::test] | ||
| async fn test_inner_join_1k() { | ||
|
|
@@ -200,24 +199,14 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> { | |
| let input4 = Int32Array::from_iter_values(input4.into_iter()); | ||
|
|
||
| // split into several record batches | ||
| let mut remainder = RecordBatch::try_from_iter(vec![ | ||
| let batch = RecordBatch::try_from_iter(vec![ | ||
| ("a", Arc::new(input1) as ArrayRef), | ||
| ("b", Arc::new(input2) as ArrayRef), | ||
| ("x", Arc::new(input3) as ArrayRef), | ||
| ("y", Arc::new(input4) as ArrayRef), | ||
| ]) | ||
| .unwrap(); | ||
|
|
||
| let mut batches = vec![]; | ||
|
|
||
| // use a random number generator to pick a random sized output | ||
| let mut rng = StdRng::seed_from_u64(42); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This piece of code was copied in several places so I refactored it into |
||
| while remainder.num_rows() > 0 { | ||
| let batch_size = rng.gen_range(0..remainder.num_rows() + 1); | ||
|
|
||
| batches.push(remainder.slice(0, batch_size)); | ||
| remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size); | ||
| } | ||
|
|
||
| add_empty_batches(batches, &mut rng) | ||
| stagger_batch_with_seed(batch, 42) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,112 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! This module contains tests for limiting memory at runtime in DataFusion | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| use arrow::record_batch::RecordBatch; | ||
| use datafusion::datasource::MemTable; | ||
| use datafusion::execution::disk_manager::DiskManagerConfig; | ||
| use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; | ||
| use datafusion_common::assert_contains; | ||
|
|
||
| use datafusion::prelude::{SessionConfig, SessionContext}; | ||
| use test_utils::{stagger_batch, AccessLogGenerator}; | ||
|
|
||
| #[cfg(test)] | ||
| #[ctor::ctor] | ||
| fn init() { | ||
| let _ = env_logger::try_init(); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn oom_sort() { | ||
| run_limit_test( | ||
| "select * from t order by host DESC", | ||
| "Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)", | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn group_by_none() { | ||
| run_limit_test( | ||
| "select median(image) from t", | ||
| "Resources exhausted: Cannot spill GroupBy None Accumulators", | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn group_by_row_hash() { | ||
| run_limit_test( | ||
| "select count(*) from t GROUP BY response_bytes", | ||
| "Resources exhausted: Cannot spill GroupBy Hash (Row) AggregationState", | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn group_by_hash() { | ||
| run_limit_test( | ||
| // group by dict column | ||
| "select count(*) from t GROUP BY service, host, pod, container", | ||
| "Resources exhausted: Cannot spill GroupBy Hash Accumulators", | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| /// 50 byte memory limit | ||
| const MEMORY_LIMIT_BYTES: usize = 50; | ||
| const MEMORY_FRACTION: f64 = 0.95; | ||
|
|
||
| /// runs the specified query against 1000 rows with a 50 | ||
| /// byte memory limit and no disk manager enabled. | ||
| async fn run_limit_test(query: &str, expected_error: &str) { | ||
| let generator = AccessLogGenerator::new().with_row_limit(Some(1000)); | ||
|
|
||
| let batches: Vec<RecordBatch> = generator | ||
| // split up into more than one batch, as the size limit in sort is not enforced until the second batch | ||
| .flat_map(stagger_batch) | ||
| .collect(); | ||
|
|
||
| let table = MemTable::try_new(batches[0].schema(), vec![batches]).unwrap(); | ||
|
|
||
| let rt_config = RuntimeConfig::new() | ||
| // do not allow spilling | ||
| .with_disk_manager(DiskManagerConfig::Disabled) | ||
| // Only allow 50 bytes | ||
| .with_memory_limit(MEMORY_LIMIT_BYTES, MEMORY_FRACTION); | ||
|
|
||
| let runtime = RuntimeEnv::new(rt_config).unwrap(); | ||
|
|
||
| let ctx = SessionContext::with_config_rt(SessionConfig::new(), Arc::new(runtime)); | ||
| ctx.register_table("t", Arc::new(table)) | ||
| .expect("registering table"); | ||
|
|
||
| let df = ctx.sql(query).await.expect("Planning query"); | ||
|
|
||
| match df.collect().await { | ||
| Ok(_batches) => { | ||
| panic!("Unexpected success when running, expected memory limit failure") | ||
| } | ||
| Err(e) => { | ||
| assert_contains!(e.to_string(), expected_error); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improved error message here