diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index aea499d60323..8429738a0953 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -18,6 +18,7 @@ use clap::Parser; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionConfig; +use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicFileCatalog; @@ -27,11 +28,30 @@ use datafusion_cli::{ use mimalloc::MiMalloc; use std::env; use std::path::Path; +use std::str::FromStr; use std::sync::Arc; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; +#[derive(PartialEq, Debug)] +enum PoolType { + Greedy, + Fair, +} + +impl FromStr for PoolType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "Greedy" | "greedy" => Ok(PoolType::Greedy), + "Fair" | "fair" => Ok(PoolType::Fair), + _ => Err(format!("Invalid memory pool type '{}'", s)), + } + } +} + #[derive(Debug, Parser, PartialEq)] #[clap(author, version, about, long_about= None)] struct Args { @@ -59,6 +79,14 @@ struct Args { )] command: Vec, + #[clap( + short = 'm', + long, + help = "The memory pool limitation (e.g. '10g'), default to None (no limit)", + validator(is_valid_memory_pool_size) + )] + memory_limit: Option, + #[clap( short, long, @@ -87,6 +115,12 @@ struct Args { help = "Reduce printing other than the results and work quietly" )] quiet: bool, + + #[clap( + long, + help = "Specify the memory pool type 'greedy' or 'fair', default to 'greedy'" + )] + mem_pool_type: Option, } #[tokio::main] @@ -109,7 +143,29 @@ pub async fn main() -> Result<()> { session_config = session_config.with_batch_size(batch_size); }; - let runtime_env = create_runtime_env()?; + let rn_config = RuntimeConfig::new(); + let rn_config = + // set memory pool size + if let Some(memory_limit) = args.memory_limit { + let memory_limit = extract_memory_pool_size(&memory_limit).unwrap(); + // set memory pool type + if let Some(mem_pool_type) = args.mem_pool_type { + match mem_pool_type { + PoolType::Greedy => rn_config + .with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))), + PoolType::Fair => rn_config + .with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))), + } + } else { + rn_config + .with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))) + } + } else { + rn_config + }; + + let runtime_env = create_runtime_env(rn_config.clone())?; + let mut ctx = SessionContext::with_config_rt(session_config.clone(), Arc::new(runtime_env)); ctx.refresh_catalogs().await?; @@ -162,8 +218,7 @@ pub async fn main() -> Result<()> { Ok(()) } -fn create_runtime_env() -> Result { - let rn_config = RuntimeConfig::new(); +fn create_runtime_env(rn_config: RuntimeConfig) -> Result { RuntimeEnv::new(rn_config) } @@ -189,3 +244,34 @@ fn is_valid_batch_size(size: &str) -> Result<(), String> { _ => Err(format!("Invalid batch size '{}'", size)), } } + +fn is_valid_memory_pool_size(size: &str) -> Result<(), String> { + match extract_memory_pool_size(size) { + Ok(_) => Ok(()), + Err(e) => Err(e), + } +} + +fn extract_memory_pool_size(size: &str) -> Result { + let mut size = size; + let factor = if let Some(last_char) = size.chars().last() { + match last_char { + 'm' | 'M' => { + size = &size[..size.len() - 1]; + 1024 * 1024 + } + 'g' | 'G' => { + size = &size[..size.len() - 1]; + 1024 * 1024 * 1024 + } + _ => 1, + } + } else { + return Err(format!("Invalid memory pool size '{}'", size)); + }; + + match size.parse::() { + Ok(size) if size > 0 => Ok(factor * size), + _ => Err(format!("Invalid memory pool size '{}'", size)), + } +} diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md index 3d869d5a7e86..e3a8cd74c33b 100644 --- a/docs/source/user-guide/cli.md +++ b/docs/source/user-guide/cli.md @@ -118,15 +118,17 @@ USAGE: datafusion-cli [OPTIONS] OPTIONS: - -c, --batch-size The batch size of each query, or use DataFusion default - -f, --file ... Execute commands from file(s), then exit - --format [default: table] [possible values: csv, tsv, table, json, - nd-json] - -h, --help Print help information - -p, --data-path Path to your data, default to current directory - -q, --quiet Reduce printing other than the results and work quietly - -r, --rc ... Run the provided files on startup instead of ~/.datafusionrc - -V, --version Print version information + -c, --batch-size The batch size of each query, or use DataFusion default + -f, --file ... Execute commands from file(s), then exit + --format [default: table] [possible values: csv, tsv, table, json, + nd-json] + -h, --help Print help information + -m, --memory-limit The memory pool limitation (e.g. '10g'), default to None (no limit) + --mem-pool-type Specify the memory pool type 'greedy' or 'fair', default to 'greedy' + -p, --data-path Path to your data, default to current directory + -q, --quiet Reduce printing other than the results and work quietly + -r, --rc ... Run the provided files on startup instead of ~/.datafusionrc + -V, --version Print version information ``` ## Selecting files directly