diff --git a/benchmarks/README.md b/benchmarks/README.md index e003d9687c9c..e347130689b3 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -44,13 +44,13 @@ to the `.gitignore` file. The benchmark can then be run (assuming the data created from `dbgen` is in `./data`) with a command such as: ```bash -cargo run --release --bin tpch -- benchmark --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 +cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 ``` You can enable the features `simd` (to use SIMD instructions) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`: ``` -cargo run --release --features "simd mimalloc" --bin tpch -- benchmark --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 +cargo run --release --features "simd mimalloc" --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 ``` The benchmark program also supports CSV and Parquet input file formats and a utility is provided to convert from `tbl` @@ -123,7 +123,7 @@ To run the benchmarks: ```bash cd $ARROW_HOME/ballista/rust/benchmarks/tpch -cargo run --release benchmark --host localhost --port 50050 --query 1 --path $(pwd)/data --format tbl +cargo run --release benchmark ballista --host localhost --port 50050 --query 1 --path $(pwd)/data --format tbl ``` ## Running the Ballista Benchmarks on docker-compose @@ -140,7 +140,7 @@ docker-compose up Then you can run the benchmark with: ```bash -docker-compose run ballista-client cargo run benchmark --host ballista-scheduler --port 50050 --query 1 --path /data --format tbl +docker-compose run ballista-client cargo run benchmark ballista --host ballista-scheduler --port 50050 --query 1 --path /data --format tbl ``` ## Expected output diff --git a/benchmarks/run.sh b/benchmarks/run.sh index fd97ff9a9a6a..8e36424da89f 100755 --- a/benchmarks/run.sh +++ b/benchmarks/run.sh @@ -22,5 +22,5 @@ set -e cd / for query in 1 3 5 6 10 12 do - /tpch benchmark --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug + /tpch benchmark ballista --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug done diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index cee555fe675e..f25ab987ed6e 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -54,7 +54,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; #[derive(Debug, StructOpt, Clone)] -struct BenchmarkOpt { +struct BallistaBenchmarkOpt { /// Query number #[structopt(short, long)] query: usize, @@ -67,10 +67,6 @@ struct BenchmarkOpt { #[structopt(short = "i", long = "iterations", default_value = "3")] iterations: usize, - /// Number of threads to use for parallel execution - #[structopt(short = "c", long = "concurrency", default_value = "2")] - concurrency: usize, - /// Batch size when reading CSV or Parquet files #[structopt(short = "s", long = "batch-size", default_value = "8192")] batch_size: usize, @@ -100,6 +96,45 @@ struct BenchmarkOpt { port: Option, } +#[derive(Debug, StructOpt, Clone)] +struct DataFusionBenchmarkOpt { + /// Query number + #[structopt(short, long)] + query: usize, + + /// Activate debug mode to see query results + #[structopt(short, long)] + debug: bool, + + /// Number of iterations of each test run + #[structopt(short = "i", long = "iterations", default_value = "3")] + iterations: usize, + + /// Number of threads to use for parallel execution + #[structopt(short = "c", long = "concurrency", default_value = "2")] + concurrency: usize, + + /// Batch size when reading CSV or Parquet files + #[structopt(short = "s", long = "batch-size", default_value = "8192")] + batch_size: usize, + + /// Path to data files + #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] + path: PathBuf, + + /// File format: `csv` or `parquet` + #[structopt(short = "f", long = "format", default_value = "csv")] + file_format: String, + + /// Load the data into a MemTable before executing the query + #[structopt(short = "m", long = "mem-table")] + mem_table: bool, + + /// Number of partitions to create when using MemTable as input + #[structopt(short = "n", long = "partitions", default_value = "8")] + partitions: usize, +} + #[derive(Debug, StructOpt)] struct ConvertOpt { /// Path to csv files @@ -127,10 +162,19 @@ struct ConvertOpt { batch_size: usize, } +#[derive(Debug, StructOpt)] +#[structopt(about = "benchmark command")] +enum BenchmarkSubCommandOpt { + #[structopt(name = "ballista")] + BallistaBenchmark(BallistaBenchmarkOpt), + #[structopt(name = "datafusion")] + DataFusionBenchmark(DataFusionBenchmarkOpt), +} + #[derive(Debug, StructOpt)] #[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")] enum TpchOpt { - Benchmark(BenchmarkOpt), + Benchmark(BenchmarkSubCommandOpt), Convert(ConvertOpt), } @@ -140,20 +184,21 @@ const TABLES: &[&str] = &[ #[tokio::main] async fn main() -> Result<()> { + use BenchmarkSubCommandOpt::*; + env_logger::init(); match TpchOpt::from_args() { - TpchOpt::Benchmark(opt) => { - if opt.host.is_some() && opt.port.is_some() { - benchmark_ballista(opt).await.map(|_| ()) - } else { - benchmark_datafusion(opt).await.map(|_| ()) - } + TpchOpt::Benchmark(BallistaBenchmark(opt)) => { + benchmark_ballista(opt).await.map(|_| ()) + } + TpchOpt::Benchmark(DataFusionBenchmark(opt)) => { + benchmark_datafusion(opt).await.map(|_| ()) } TpchOpt::Convert(opt) => convert_tbl(opt).await, } } -async fn benchmark_datafusion(opt: BenchmarkOpt) -> Result> { +async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result> { println!("Running benchmarks with the following options: {:?}", opt); let config = ExecutionConfig::new() .with_concurrency(opt.concurrency) @@ -204,7 +249,7 @@ async fn benchmark_datafusion(opt: BenchmarkOpt) -> Result> { Ok(result) } -async fn benchmark_ballista(opt: BenchmarkOpt) -> Result<()> { +async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { println!("Running benchmarks with the following options: {:?}", opt); let mut settings = HashMap::new(); @@ -950,7 +995,7 @@ mod tests { let expected = df.collect().await?; // run the query to compute actual results of the query - let opt = BenchmarkOpt { + let opt = DataFusionBenchmarkOpt { query: n, debug: false, iterations: 1, @@ -960,8 +1005,6 @@ mod tests { file_format: "tbl".to_string(), mem_table: false, partitions: 16, - host: None, - port: None, }; let actual = benchmark_datafusion(opt).await?; diff --git a/docs/user-guide/src/distributed/raspberrypi.md b/docs/user-guide/src/distributed/raspberrypi.md index d4d2079bb5cc..a1ce1b04a6a7 100644 --- a/docs/user-guide/src/distributed/raspberrypi.md +++ b/docs/user-guide/src/distributed/raspberrypi.md @@ -114,7 +114,7 @@ Run the benchmarks: ```bash docker run -it myrepo/ballista-arm64 \ - /tpch benchmark --query=1 --path=/path/to/data --format=parquet \ + /tpch benchmark datafusion --query=1 --path=/path/to/data --format=parquet \ --concurrency=24 --iterations=1 --debug --host=ballista-scheduler --port=50050 ```