From d53375862646d4e21651ec58edd9f08565bd878c Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sat, 26 Aug 2023 13:36:24 +0800 Subject: [PATCH 1/8] support memory-limit --- datafusion-cli/src/main.rs | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index aea499d60323..53083fbadec9 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -59,6 +59,14 @@ struct Args { )] command: Vec, + #[clap( + short = 'm', + long, + help = "The memory pool limitation, default to zero", + validator(is_valid_memory_pool_size) + )] + memory_limit: Option, + #[clap( short, long, @@ -109,7 +117,16 @@ pub async fn main() -> Result<()> { session_config = session_config.with_batch_size(batch_size); }; - let runtime_env = create_runtime_env()?; + let rn_config = RuntimeConfig::new(); + let rn_config = if let Some(memory_limit) = args.memory_limit { + let memory_limit = memory_limit.parse::().unwrap(); + rn_config.with_memory_limit(memory_limit, 1.0) + } else { + rn_config.with_memory_limit(0, 1.0) + }; + + let runtime_env = create_runtime_env(rn_config.clone())?; + let mut ctx = SessionContext::with_config_rt(session_config.clone(), Arc::new(runtime_env)); ctx.refresh_catalogs().await?; @@ -162,8 +179,7 @@ pub async fn main() -> Result<()> { Ok(()) } -fn create_runtime_env() -> Result { - let rn_config = RuntimeConfig::new(); +fn create_runtime_env(rn_config: RuntimeConfig) -> Result { RuntimeEnv::new(rn_config) } @@ -189,3 +205,17 @@ fn is_valid_batch_size(size: &str) -> Result<(), String> { _ => Err(format!("Invalid batch size '{}'", size)), } } + +fn is_valid_memory_pool_size(size: &str) -> Result<(), String> { + if let Some(last_char) = size.chars().last() { + if last_char != 'g' && last_char != 'G' { + return Err(format!("Invalid memory pool size format '{}'", size)); + } + } + + let size = &size[..size.len() - 1]; + match size.parse::() { + Ok(size) if size > 0 => Ok(()), + _ => Err(format!("Invalid memory pool size '{}'", size)), + } +} From c06afacd247c11d995497541f85a8462cc52d981 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sat, 26 Aug 2023 13:37:16 +0800 Subject: [PATCH 2/8] update doc --- docs/source/user-guide/cli.md | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md index 3d869d5a7e86..229615e2a256 100644 --- a/docs/source/user-guide/cli.md +++ b/docs/source/user-guide/cli.md @@ -118,15 +118,16 @@ USAGE: datafusion-cli [OPTIONS] OPTIONS: - -c, --batch-size The batch size of each query, or use DataFusion default - -f, --file ... Execute commands from file(s), then exit - --format [default: table] [possible values: csv, tsv, table, json, - nd-json] - -h, --help Print help information - -p, --data-path Path to your data, default to current directory - -q, --quiet Reduce printing other than the results and work quietly - -r, --rc ... Run the provided files on startup instead of ~/.datafusionrc - -V, --version Print version information + -c, --batch-size The batch size of each query, or use DataFusion default + -f, --file ... Execute commands from file(s), then exit + --format [default: table] [possible values: csv, tsv, table, json, + nd-json] + -h, --help Print help information + -m, --memory-limit The memory pool limitation, default to zero + -p, --data-path Path to your data, default to current directory + -q, --quiet Reduce printing other than the results and work quietly + -r, --rc ... Run the provided files on startup instead of ~/.datafusionrc + -V, --version Print version information ``` ## Selecting files directly From e6c0feb5c1cf1e303bd376e8ac19565ef9520815 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sat, 26 Aug 2023 13:43:31 +0800 Subject: [PATCH 3/8] avoid unwrap --- datafusion-cli/src/main.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 53083fbadec9..ab59776b157d 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -119,7 +119,9 @@ pub async fn main() -> Result<()> { let rn_config = RuntimeConfig::new(); let rn_config = if let Some(memory_limit) = args.memory_limit { - let memory_limit = memory_limit.parse::().unwrap(); + let memory_limit = memory_limit[..memory_limit.len() - 1] + .parse::() + .unwrap(); rn_config.with_memory_limit(memory_limit, 1.0) } else { rn_config.with_memory_limit(0, 1.0) From ba0f94c407920313981ccc35cbb4e7597b2d529a Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sat, 26 Aug 2023 14:45:15 +0800 Subject: [PATCH 4/8] support memory pool type setting --- datafusion-cli/src/main.rs | 49 +++++++++++++++++++++++++++++------ docs/source/user-guide/cli.md | 1 + 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index ab59776b157d..4ff425450437 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -18,6 +18,7 @@ use clap::Parser; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionConfig; +use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicFileCatalog; @@ -95,6 +96,13 @@ struct Args { help = "Reduce printing other than the results and work quietly" )] quiet: bool, + + #[clap( + long, + help = "Specify the memory pool type 'greedy' or 'fair', default to 'greedy'", + validator(is_valid_memory_pool_type) + )] + mem_pool_type: Option, } #[tokio::main] @@ -118,14 +126,29 @@ pub async fn main() -> Result<()> { }; let rn_config = RuntimeConfig::new(); - let rn_config = if let Some(memory_limit) = args.memory_limit { - let memory_limit = memory_limit[..memory_limit.len() - 1] - .parse::() - .unwrap(); - rn_config.with_memory_limit(memory_limit, 1.0) - } else { - rn_config.with_memory_limit(0, 1.0) - }; + let rn_config = + // set memory pool size + if let Some(memory_limit) = args.memory_limit { + let memory_limit = memory_limit[..memory_limit.len() - 1] + .parse::() + .unwrap(); + // set memory pool type + if let Some(mem_pool_type) = args.mem_pool_type { + match mem_pool_type.as_str() { + "greedy" => rn_config + .with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))), + "fair" => rn_config + .with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))), + _ => unreachable!(), + } + } else { + rn_config + .with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))) + } + } else { + rn_config + .with_memory_limit(0, 1.0) + }; let runtime_env = create_runtime_env(rn_config.clone())?; @@ -221,3 +244,13 @@ fn is_valid_memory_pool_size(size: &str) -> Result<(), String> { _ => Err(format!("Invalid memory pool size '{}'", size)), } } + +fn is_valid_memory_pool_type(pool_type: &str) -> Result<(), String> { + match pool_type { + "greedy" | "fair" => Ok(()), + _ => Err(format!( + "Invalid memory pool type '{}', it should be 'fair' or 'greedy'", + pool_type + )), + } +} diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md index 229615e2a256..aeb648c3d0b9 100644 --- a/docs/source/user-guide/cli.md +++ b/docs/source/user-guide/cli.md @@ -124,6 +124,7 @@ OPTIONS: nd-json] -h, --help Print help information -m, --memory-limit The memory pool limitation, default to zero + --mem-pool-type Specify the memory pool type 'greedy' or 'fair', default to 'greedy' -p, --data-path Path to your data, default to current directory -q, --quiet Reduce printing other than the results and work quietly -r, --rc ... Run the provided files on startup instead of ~/.datafusionrc From 67444709693f0e18881268df752ee4b0db8176f1 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sat, 26 Aug 2023 14:57:01 +0800 Subject: [PATCH 5/8] format doc --- docs/source/user-guide/cli.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md index aeb648c3d0b9..1de9e8693919 100644 --- a/docs/source/user-guide/cli.md +++ b/docs/source/user-guide/cli.md @@ -118,17 +118,17 @@ USAGE: datafusion-cli [OPTIONS] OPTIONS: - -c, --batch-size The batch size of each query, or use DataFusion default - -f, --file ... Execute commands from file(s), then exit - --format [default: table] [possible values: csv, tsv, table, json, - nd-json] - -h, --help Print help information - -m, --memory-limit The memory pool limitation, default to zero - --mem-pool-type Specify the memory pool type 'greedy' or 'fair', default to 'greedy' - -p, --data-path Path to your data, default to current directory - -q, --quiet Reduce printing other than the results and work quietly - -r, --rc ... Run the provided files on startup instead of ~/.datafusionrc - -V, --version Print version information + -c, --batch-size The batch size of each query, or use DataFusion default + -f, --file ... Execute commands from file(s), then exit + --format [default: table] [possible values: csv, tsv, table, json, + nd-json] + -h, --help Print help information + -m, --memory-limit The memory pool limitation, default to zero + --mem-pool-type Specify the memory pool type 'greedy' or 'fair', default to 'greedy' + -p, --data-path Path to your data, default to current directory + -q, --quiet Reduce printing other than the results and work quietly + -r, --rc ... Run the provided files on startup instead of ~/.datafusionrc + -V, --version Print version information ``` ## Selecting files directly From bb3f8737cd1e699dae7cedfe6ff1636c4efbddae Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sat, 26 Aug 2023 18:15:06 +0800 Subject: [PATCH 6/8] fix else case --- datafusion-cli/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 4ff425450437..49d28ffa73bc 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -147,7 +147,6 @@ pub async fn main() -> Result<()> { } } else { rn_config - .with_memory_limit(0, 1.0) }; let runtime_env = create_runtime_env(rn_config.clone())?; From 8031651fa9f3a175edac54cb6e7542fddd8412c4 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sat, 26 Aug 2023 18:28:01 +0800 Subject: [PATCH 7/8] update doc --- datafusion-cli/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 49d28ffa73bc..5edfe780f21b 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -63,7 +63,7 @@ struct Args { #[clap( short = 'm', long, - help = "The memory pool limitation, default to zero", + help = "The memory pool limitation (e.g. '10g'), default to 0", validator(is_valid_memory_pool_size) )] memory_limit: Option, From 6efae154f03f75bcf2400fe38264b7a9eb1cecd1 Mon Sep 17 00:00:00 2001 From: Weijun Huang Date: Sat, 26 Aug 2023 23:56:25 +0800 Subject: [PATCH 8/8] refactor --- datafusion-cli/src/main.rs | 76 ++++++++++++++++++++++------------- docs/source/user-guide/cli.md | 2 +- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 5edfe780f21b..8429738a0953 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -28,11 +28,30 @@ use datafusion_cli::{ use mimalloc::MiMalloc; use std::env; use std::path::Path; +use std::str::FromStr; use std::sync::Arc; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; +#[derive(PartialEq, Debug)] +enum PoolType { + Greedy, + Fair, +} + +impl FromStr for PoolType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "Greedy" | "greedy" => Ok(PoolType::Greedy), + "Fair" | "fair" => Ok(PoolType::Fair), + _ => Err(format!("Invalid memory pool type '{}'", s)), + } + } +} + #[derive(Debug, Parser, PartialEq)] #[clap(author, version, about, long_about= None)] struct Args { @@ -63,7 +82,7 @@ struct Args { #[clap( short = 'm', long, - help = "The memory pool limitation (e.g. '10g'), default to 0", + help = "The memory pool limitation (e.g. '10g'), default to None (no limit)", validator(is_valid_memory_pool_size) )] memory_limit: Option, @@ -99,10 +118,9 @@ struct Args { #[clap( long, - help = "Specify the memory pool type 'greedy' or 'fair', default to 'greedy'", - validator(is_valid_memory_pool_type) + help = "Specify the memory pool type 'greedy' or 'fair', default to 'greedy'" )] - mem_pool_type: Option, + mem_pool_type: Option, } #[tokio::main] @@ -129,17 +147,14 @@ pub async fn main() -> Result<()> { let rn_config = // set memory pool size if let Some(memory_limit) = args.memory_limit { - let memory_limit = memory_limit[..memory_limit.len() - 1] - .parse::() - .unwrap(); + let memory_limit = extract_memory_pool_size(&memory_limit).unwrap(); // set memory pool type if let Some(mem_pool_type) = args.mem_pool_type { - match mem_pool_type.as_str() { - "greedy" => rn_config + match mem_pool_type { + PoolType::Greedy => rn_config .with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))), - "fair" => rn_config + PoolType::Fair => rn_config .with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))), - _ => unreachable!(), } } else { rn_config @@ -231,25 +246,32 @@ fn is_valid_batch_size(size: &str) -> Result<(), String> { } fn is_valid_memory_pool_size(size: &str) -> Result<(), String> { - if let Some(last_char) = size.chars().last() { - if last_char != 'g' && last_char != 'G' { - return Err(format!("Invalid memory pool size format '{}'", size)); - } + match extract_memory_pool_size(size) { + Ok(_) => Ok(()), + Err(e) => Err(e), } +} + +fn extract_memory_pool_size(size: &str) -> Result { + let mut size = size; + let factor = if let Some(last_char) = size.chars().last() { + match last_char { + 'm' | 'M' => { + size = &size[..size.len() - 1]; + 1024 * 1024 + } + 'g' | 'G' => { + size = &size[..size.len() - 1]; + 1024 * 1024 * 1024 + } + _ => 1, + } + } else { + return Err(format!("Invalid memory pool size '{}'", size)); + }; - let size = &size[..size.len() - 1]; match size.parse::() { - Ok(size) if size > 0 => Ok(()), + Ok(size) if size > 0 => Ok(factor * size), _ => Err(format!("Invalid memory pool size '{}'", size)), } } - -fn is_valid_memory_pool_type(pool_type: &str) -> Result<(), String> { - match pool_type { - "greedy" | "fair" => Ok(()), - _ => Err(format!( - "Invalid memory pool type '{}', it should be 'fair' or 'greedy'", - pool_type - )), - } -} diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md index 1de9e8693919..e3a8cd74c33b 100644 --- a/docs/source/user-guide/cli.md +++ b/docs/source/user-guide/cli.md @@ -123,7 +123,7 @@ OPTIONS: --format [default: table] [possible values: csv, tsv, table, json, nd-json] -h, --help Print help information - -m, --memory-limit The memory pool limitation, default to zero + -m, --memory-limit The memory pool limitation (e.g. '10g'), default to None (no limit) --mem-pool-type Specify the memory pool type 'greedy' or 'fair', default to 'greedy' -p, --data-path Path to your data, default to current directory -q, --quiet Reduce printing other than the results and work quietly