Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 89 additions & 3 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use clap::Parser;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
Expand All @@ -27,11 +28,30 @@ use datafusion_cli::{
use mimalloc::MiMalloc;
use std::env;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

#[derive(PartialEq, Debug)]
enum PoolType {
Greedy,
Fair,
}

impl FromStr for PoolType {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"Greedy" | "greedy" => Ok(PoolType::Greedy),
"Fair" | "fair" => Ok(PoolType::Fair),
_ => Err(format!("Invalid memory pool type '{}'", s)),
}
}
}

#[derive(Debug, Parser, PartialEq)]
#[clap(author, version, about, long_about= None)]
struct Args {
Expand Down Expand Up @@ -59,6 +79,14 @@ struct Args {
)]
command: Vec<String>,

#[clap(
short = 'm',
long,
help = "The memory pool limitation (e.g. '10g'), default to None (no limit)",
validator(is_valid_memory_pool_size)
)]
memory_limit: Option<String>,

#[clap(
short,
long,
Expand Down Expand Up @@ -87,6 +115,12 @@ struct Args {
help = "Reduce printing other than the results and work quietly"
)]
quiet: bool,

#[clap(
long,
help = "Specify the memory pool type 'greedy' or 'fair', default to 'greedy'"
)]
mem_pool_type: Option<PoolType>,
}

#[tokio::main]
Expand All @@ -109,7 +143,29 @@ pub async fn main() -> Result<()> {
session_config = session_config.with_batch_size(batch_size);
};

let runtime_env = create_runtime_env()?;
let rn_config = RuntimeConfig::new();
let rn_config =
// set memory pool size
if let Some(memory_limit) = args.memory_limit {
let memory_limit = extract_memory_pool_size(&memory_limit).unwrap();
// set memory pool type
if let Some(mem_pool_type) = args.mem_pool_type {
match mem_pool_type {
PoolType::Greedy => rn_config
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))),
PoolType::Fair => rn_config
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))),
}
} else {
rn_config
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit)))
}
} else {
rn_config
};

let runtime_env = create_runtime_env(rn_config.clone())?;

let mut ctx =
SessionContext::with_config_rt(session_config.clone(), Arc::new(runtime_env));
ctx.refresh_catalogs().await?;
Expand Down Expand Up @@ -162,8 +218,7 @@ pub async fn main() -> Result<()> {
Ok(())
}

fn create_runtime_env() -> Result<RuntimeEnv> {
let rn_config = RuntimeConfig::new();
fn create_runtime_env(rn_config: RuntimeConfig) -> Result<RuntimeEnv> {
RuntimeEnv::new(rn_config)
}

Expand All @@ -189,3 +244,34 @@ fn is_valid_batch_size(size: &str) -> Result<(), String> {
_ => Err(format!("Invalid batch size '{}'", size)),
}
}

fn is_valid_memory_pool_size(size: &str) -> Result<(), String> {
match extract_memory_pool_size(size) {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}

fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
let mut size = size;
let factor = if let Some(last_char) = size.chars().last() {
match last_char {
'm' | 'M' => {
size = &size[..size.len() - 1];
1024 * 1024
}
'g' | 'G' => {
size = &size[..size.len() - 1];
1024 * 1024 * 1024
}
_ => 1,
}
} else {
return Err(format!("Invalid memory pool size '{}'", size));
};

match size.parse::<usize>() {
Ok(size) if size > 0 => Ok(factor * size),
_ => Err(format!("Invalid memory pool size '{}'", size)),
}
}
20 changes: 11 additions & 9 deletions docs/source/user-guide/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,17 @@ USAGE:
datafusion-cli [OPTIONS]

OPTIONS:
-c, --batch-size <BATCH_SIZE> The batch size of each query, or use DataFusion default
-f, --file <FILE>... Execute commands from file(s), then exit
--format <FORMAT> [default: table] [possible values: csv, tsv, table, json,
nd-json]
-h, --help Print help information
-p, --data-path <DATA_PATH> Path to your data, default to current directory
-q, --quiet Reduce printing other than the results and work quietly
-r, --rc <RC>... Run the provided files on startup instead of ~/.datafusionrc
-V, --version Print version information
-c, --batch-size <BATCH_SIZE> The batch size of each query, or use DataFusion default
-f, --file <FILE>... Execute commands from file(s), then exit
--format <FORMAT> [default: table] [possible values: csv, tsv, table, json,
nd-json]
-h, --help Print help information
-m, --memory-limit <MEMORY_LIMIT> The memory pool limitation (e.g. '10g'), default to None (no limit)
--mem-pool-type <MEM_POOL_TYPE> Specify the memory pool type 'greedy' or 'fair', default to 'greedy'
-p, --data-path <DATA_PATH> Path to your data, default to current directory
-q, --quiet Reduce printing other than the results and work quietly
-r, --rc <RC>... Run the provided files on startup instead of ~/.datafusionrc
-V, --version Print version information
```

## Selecting files directly
Expand Down