From 9ded2f6bc0c29ea480217a655682fb7f4ad39b96 Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Sun, 23 May 2021 21:11:33 +0200 Subject: [PATCH 1/5] #401: Add subcommand to TPC-H benchmark args to distinguish between DataFusion and Ballista --- benchmarks/README.md | 8 ++-- benchmarks/run.sh | 2 +- benchmarks/src/bin/tpch.rs | 76 ++++++++++++++++++++++++++++++-------- 3 files changed, 66 insertions(+), 20 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index e003d9687c9c..e347130689b3 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -44,13 +44,13 @@ to the `.gitignore` file. The benchmark can then be run (assuming the data created from `dbgen` is in `./data`) with a command such as: ```bash -cargo run --release --bin tpch -- benchmark --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 +cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 ``` You can enable the features `simd` (to use SIMD instructions) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`: ``` -cargo run --release --features "simd mimalloc" --bin tpch -- benchmark --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 +cargo run --release --features "simd mimalloc" --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 ``` The benchmark program also supports CSV and Parquet input file formats and a utility is provided to convert from `tbl` @@ -123,7 +123,7 @@ To run the benchmarks: ```bash cd $ARROW_HOME/ballista/rust/benchmarks/tpch -cargo run --release benchmark --host localhost --port 50050 --query 1 --path $(pwd)/data --format tbl +cargo run --release benchmark ballista --host localhost --port 50050 --query 1 --path $(pwd)/data --format tbl ``` ## Running the Ballista Benchmarks on docker-compose @@ -140,7 +140,7 @@ docker-compose up Then you can run the benchmark with: ```bash -docker-compose run ballista-client cargo run benchmark --host ballista-scheduler --port 50050 --query 1 --path /data --format tbl +docker-compose run ballista-client cargo run benchmark ballista --host ballista-scheduler --port 50050 --query 1 --path /data --format tbl ``` ## Expected output diff --git a/benchmarks/run.sh b/benchmarks/run.sh index fd97ff9a9a6a..8e36424da89f 100755 --- a/benchmarks/run.sh +++ b/benchmarks/run.sh @@ -22,5 +22,5 @@ set -e cd / for query in 1 3 5 6 10 12 do - /tpch benchmark --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug + /tpch benchmark ballista --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug done diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index cee555fe675e..022940910f81 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -53,8 +53,9 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; + #[derive(Debug, StructOpt, Clone)] -struct BenchmarkOpt { +struct BallistaBenchmarkOpt { /// Query number #[structopt(short, long)] query: usize, @@ -67,10 +68,6 @@ struct BenchmarkOpt { #[structopt(short = "i", long = "iterations", default_value = "3")] iterations: usize, - /// Number of threads to use for parallel execution - #[structopt(short = "c", long = "concurrency", default_value = "2")] - concurrency: usize, - /// Batch size when reading CSV or Parquet files #[structopt(short = "s", long = "batch-size", default_value = "8192")] batch_size: usize, @@ -100,6 +97,45 @@ struct BenchmarkOpt { port: Option, } +#[derive(Debug, StructOpt, Clone)] +struct DatafusionBenchmarkOpt { + /// Query number + #[structopt(short, long)] + query: usize, + + /// Activate debug mode to see query results + #[structopt(short, long)] + debug: bool, + + /// Number of iterations of each test run + #[structopt(short = "i", long = "iterations", default_value = "3")] + iterations: usize, + + /// Number of threads to use for parallel execution + #[structopt(short = "c", long = "concurrency", default_value = "2")] + concurrency: usize, + + /// Batch size when reading CSV or Parquet files + #[structopt(short = "s", long = "batch-size", default_value = "8192")] + batch_size: usize, + + /// Path to data files + #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] + path: PathBuf, + + /// File format: `csv` or `parquet` + #[structopt(short = "f", long = "format", default_value = "csv")] + file_format: String, + + /// Load the data into a MemTable before executing the query + #[structopt(short = "m", long = "mem-table")] + mem_table: bool, + + /// Number of partitions to create when using MemTable as input + #[structopt(short = "n", long = "partitions", default_value = "8")] + partitions: usize, +} + #[derive(Debug, StructOpt)] struct ConvertOpt { /// Path to csv files @@ -127,10 +163,19 @@ struct ConvertOpt { batch_size: usize, } +#[derive(Debug, StructOpt)] +#[structopt(about = "benchmark command")] +enum SubCommandOpt { + #[structopt(name = "ballista")] + BallistaBenchmark(BallistaBenchmarkOpt), + #[structopt(name = "datafusion")] + DatafusionBenchmark(DatafusionBenchmarkOpt), +} + #[derive(Debug, StructOpt)] #[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")] enum TpchOpt { - Benchmark(BenchmarkOpt), + Benchmark(SubCommandOpt), Convert(ConvertOpt), } @@ -140,20 +185,21 @@ const TABLES: &[&str] = &[ #[tokio::main] async fn main() -> Result<()> { + use SubCommandOpt::*; + env_logger::init(); match TpchOpt::from_args() { - TpchOpt::Benchmark(opt) => { - if opt.host.is_some() && opt.port.is_some() { - benchmark_ballista(opt).await.map(|_| ()) - } else { - benchmark_datafusion(opt).await.map(|_| ()) - } - } + TpchOpt::Benchmark(BallistaBenchmark(opt)) => { + benchmark_ballista(opt).await.map(|_| ()) + }, + TpchOpt::Benchmark(DatafusionBenchmark(opt)) => { + benchmark_datafusion(opt).await.map(|_| ()) + }, TpchOpt::Convert(opt) => convert_tbl(opt).await, } } -async fn benchmark_datafusion(opt: BenchmarkOpt) -> Result> { +async fn benchmark_datafusion(opt: DatafusionBenchmarkOpt) -> Result> { println!("Running benchmarks with the following options: {:?}", opt); let config = ExecutionConfig::new() .with_concurrency(opt.concurrency) @@ -204,7 +250,7 @@ async fn benchmark_datafusion(opt: BenchmarkOpt) -> Result> { Ok(result) } -async fn benchmark_ballista(opt: BenchmarkOpt) -> Result<()> { +async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { println!("Running benchmarks with the following options: {:?}", opt); let mut settings = HashMap::new(); From ebc3a3d083160411a3c3fa534286cd679d3e2119 Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Sun, 23 May 2021 21:18:29 +0200 Subject: [PATCH 2/5] fix benchmark subcommand name --- benchmarks/src/bin/tpch.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 022940910f81..671307a117a7 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -165,7 +165,7 @@ struct ConvertOpt { #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] -enum SubCommandOpt { +enum BenchmarkSubCommandOpt { #[structopt(name = "ballista")] BallistaBenchmark(BallistaBenchmarkOpt), #[structopt(name = "datafusion")] @@ -175,7 +175,7 @@ enum SubCommandOpt { #[derive(Debug, StructOpt)] #[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")] enum TpchOpt { - Benchmark(SubCommandOpt), + Benchmark(BenchmarkSubCommandOpt), Convert(ConvertOpt), } @@ -185,7 +185,7 @@ const TABLES: &[&str] = &[ #[tokio::main] async fn main() -> Result<()> { - use SubCommandOpt::*; + use BenchmarkSubCommandOpt::*; env_logger::init(); match TpchOpt::from_args() { From cd920070a0af8bfdae12424c2d4388ff0e77d28e Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Sun, 23 May 2021 21:47:34 +0200 Subject: [PATCH 3/5] Fix lint --- benchmarks/src/bin/tpch.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 671307a117a7..2265b9224d97 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -53,7 +53,6 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; - #[derive(Debug, StructOpt, Clone)] struct BallistaBenchmarkOpt { /// Query number @@ -191,10 +190,10 @@ async fn main() -> Result<()> { match TpchOpt::from_args() { TpchOpt::Benchmark(BallistaBenchmark(opt)) => { benchmark_ballista(opt).await.map(|_| ()) - }, + } TpchOpt::Benchmark(DatafusionBenchmark(opt)) => { benchmark_datafusion(opt).await.map(|_| ()) - }, + } TpchOpt::Convert(opt) => convert_tbl(opt).await, } } From c939c19265d1b07ccf7bd587ff88efad2312efb2 Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Sun, 23 May 2021 22:10:57 +0200 Subject: [PATCH 4/5] fix benchmark tests using DatafusionBenchmarkOpts --- benchmarks/src/bin/tpch.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 2265b9224d97..8bb7189359f1 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -995,7 +995,7 @@ mod tests { let expected = df.collect().await?; // run the query to compute actual results of the query - let opt = BenchmarkOpt { + let opt = DatafusionBenchmarkOpt { query: n, debug: false, iterations: 1, @@ -1005,8 +1005,6 @@ mod tests { file_format: "tbl".to_string(), mem_table: false, partitions: 16, - host: None, - port: None, }; let actual = benchmark_datafusion(opt).await?; From 278812c80c6f944de4a8e8eeb00f4f08c349de25 Mon Sep 17 00:00:00 2001 From: Javier Goday Date: Mon, 24 May 2021 18:28:27 +0200 Subject: [PATCH 5/5] Fix DataFusionBenchmarkOpts name and update doc --- benchmarks/src/bin/tpch.rs | 10 +++++----- docs/user-guide/src/distributed/raspberrypi.md | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 8bb7189359f1..f25ab987ed6e 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -97,7 +97,7 @@ struct BallistaBenchmarkOpt { } #[derive(Debug, StructOpt, Clone)] -struct DatafusionBenchmarkOpt { +struct DataFusionBenchmarkOpt { /// Query number #[structopt(short, long)] query: usize, @@ -168,7 +168,7 @@ enum BenchmarkSubCommandOpt { #[structopt(name = "ballista")] BallistaBenchmark(BallistaBenchmarkOpt), #[structopt(name = "datafusion")] - DatafusionBenchmark(DatafusionBenchmarkOpt), + DataFusionBenchmark(DataFusionBenchmarkOpt), } #[derive(Debug, StructOpt)] @@ -191,14 +191,14 @@ async fn main() -> Result<()> { TpchOpt::Benchmark(BallistaBenchmark(opt)) => { benchmark_ballista(opt).await.map(|_| ()) } - TpchOpt::Benchmark(DatafusionBenchmark(opt)) => { + TpchOpt::Benchmark(DataFusionBenchmark(opt)) => { benchmark_datafusion(opt).await.map(|_| ()) } TpchOpt::Convert(opt) => convert_tbl(opt).await, } } -async fn benchmark_datafusion(opt: DatafusionBenchmarkOpt) -> Result> { +async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result> { println!("Running benchmarks with the following options: {:?}", opt); let config = ExecutionConfig::new() .with_concurrency(opt.concurrency) @@ -995,7 +995,7 @@ mod tests { let expected = df.collect().await?; // run the query to compute actual results of the query - let opt = DatafusionBenchmarkOpt { + let opt = DataFusionBenchmarkOpt { query: n, debug: false, iterations: 1, diff --git a/docs/user-guide/src/distributed/raspberrypi.md b/docs/user-guide/src/distributed/raspberrypi.md index d4d2079bb5cc..a1ce1b04a6a7 100644 --- a/docs/user-guide/src/distributed/raspberrypi.md +++ b/docs/user-guide/src/distributed/raspberrypi.md @@ -114,7 +114,7 @@ Run the benchmarks: ```bash docker run -it myrepo/ballista-arm64 \ - /tpch benchmark --query=1 --path=/path/to/data --format=parquet \ + /tpch benchmark datafusion --query=1 --path=/path/to/data --format=parquet \ --concurrency=24 --iterations=1 --debug --host=ballista-scheduler --port=50050 ```