Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ to the `.gitignore` file.
The benchmark can then be run (assuming the data created from `dbgen` is in `./data`) with a command such as:

```bash
cargo run --release --bin tpch -- benchmark --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
```

You can enable the features `simd` (to use SIMD instructions) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`:

```
cargo run --release --features "simd mimalloc" --bin tpch -- benchmark --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
cargo run --release --features "simd mimalloc" --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
```

The benchmark program also supports CSV and Parquet input file formats and a utility is provided to convert from `tbl`
Expand Down Expand Up @@ -123,7 +123,7 @@ To run the benchmarks:

```bash
cd $ARROW_HOME/ballista/rust/benchmarks/tpch
cargo run --release benchmark --host localhost --port 50050 --query 1 --path $(pwd)/data --format tbl
cargo run --release benchmark ballista --host localhost --port 50050 --query 1 --path $(pwd)/data --format tbl
```

## Running the Ballista Benchmarks on docker-compose
Expand All @@ -140,7 +140,7 @@ docker-compose up
Then you can run the benchmark with:

```bash
docker-compose run ballista-client cargo run benchmark --host ballista-scheduler --port 50050 --query 1 --path /data --format tbl
docker-compose run ballista-client cargo run benchmark ballista --host ballista-scheduler --port 50050 --query 1 --path /data --format tbl
```

## Expected output
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ set -e
cd /
for query in 1 3 5 6 10 12
do
/tpch benchmark --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug
/tpch benchmark ballista --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug
done
77 changes: 60 additions & 17 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

#[derive(Debug, StructOpt, Clone)]
struct BenchmarkOpt {
struct BallistaBenchmarkOpt {
/// Query number
#[structopt(short, long)]
query: usize,
Expand All @@ -67,10 +67,6 @@ struct BenchmarkOpt {
#[structopt(short = "i", long = "iterations", default_value = "3")]
iterations: usize,

/// Number of threads to use for parallel execution
#[structopt(short = "c", long = "concurrency", default_value = "2")]
concurrency: usize,

/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
batch_size: usize,
Expand Down Expand Up @@ -100,6 +96,45 @@ struct BenchmarkOpt {
port: Option<u16>,
}

#[derive(Debug, StructOpt, Clone)]
struct DataFusionBenchmarkOpt {
/// Query number
#[structopt(short, long)]
query: usize,

/// Activate debug mode to see query results
#[structopt(short, long)]
debug: bool,

/// Number of iterations of each test run
#[structopt(short = "i", long = "iterations", default_value = "3")]
iterations: usize,

/// Number of threads to use for parallel execution
#[structopt(short = "c", long = "concurrency", default_value = "2")]
concurrency: usize,

/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
batch_size: usize,

/// Path to data files
#[structopt(parse(from_os_str), required = true, short = "p", long = "path")]
path: PathBuf,

/// File format: `csv` or `parquet`
#[structopt(short = "f", long = "format", default_value = "csv")]
file_format: String,

/// Load the data into a MemTable before executing the query
#[structopt(short = "m", long = "mem-table")]
mem_table: bool,

/// Number of partitions to create when using MemTable as input
#[structopt(short = "n", long = "partitions", default_value = "8")]
partitions: usize,
}

#[derive(Debug, StructOpt)]
struct ConvertOpt {
/// Path to csv files
Expand Down Expand Up @@ -127,10 +162,19 @@ struct ConvertOpt {
batch_size: usize,
}

#[derive(Debug, StructOpt)]
#[structopt(about = "benchmark command")]
enum BenchmarkSubCommandOpt {
#[structopt(name = "ballista")]
BallistaBenchmark(BallistaBenchmarkOpt),
#[structopt(name = "datafusion")]
DataFusionBenchmark(DataFusionBenchmarkOpt),
}

#[derive(Debug, StructOpt)]
#[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")]
enum TpchOpt {
Benchmark(BenchmarkOpt),
Benchmark(BenchmarkSubCommandOpt),
Convert(ConvertOpt),
}

Expand All @@ -140,20 +184,21 @@ const TABLES: &[&str] = &[

#[tokio::main]
async fn main() -> Result<()> {
use BenchmarkSubCommandOpt::*;

env_logger::init();
match TpchOpt::from_args() {
TpchOpt::Benchmark(opt) => {
if opt.host.is_some() && opt.port.is_some() {
benchmark_ballista(opt).await.map(|_| ())
} else {
benchmark_datafusion(opt).await.map(|_| ())
}
TpchOpt::Benchmark(BallistaBenchmark(opt)) => {
benchmark_ballista(opt).await.map(|_| ())
}
TpchOpt::Benchmark(DataFusionBenchmark(opt)) => {
benchmark_datafusion(opt).await.map(|_| ())
}
TpchOpt::Convert(opt) => convert_tbl(opt).await,
}
}

async fn benchmark_datafusion(opt: BenchmarkOpt) -> Result<Vec<RecordBatch>> {
async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
println!("Running benchmarks with the following options: {:?}", opt);
let config = ExecutionConfig::new()
.with_concurrency(opt.concurrency)
Expand Down Expand Up @@ -204,7 +249,7 @@ async fn benchmark_datafusion(opt: BenchmarkOpt) -> Result<Vec<RecordBatch>> {
Ok(result)
}

async fn benchmark_ballista(opt: BenchmarkOpt) -> Result<()> {
async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
println!("Running benchmarks with the following options: {:?}", opt);

let mut settings = HashMap::new();
Expand Down Expand Up @@ -950,7 +995,7 @@ mod tests {
let expected = df.collect().await?;

// run the query to compute actual results of the query
let opt = BenchmarkOpt {
let opt = DataFusionBenchmarkOpt {
query: n,
debug: false,
iterations: 1,
Expand All @@ -960,8 +1005,6 @@ mod tests {
file_format: "tbl".to_string(),
mem_table: false,
partitions: 16,
host: None,
port: None,
};
let actual = benchmark_datafusion(opt).await?;

Expand Down
2 changes: 1 addition & 1 deletion docs/user-guide/src/distributed/raspberrypi.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ Run the benchmarks:

```bash
docker run -it myrepo/ballista-arm64 \
/tpch benchmark --query=1 --path=/path/to/data --format=parquet \
/tpch benchmark datafusion --query=1 --path=/path/to/data --format=parquet \
--concurrency=24 --iterations=1 --debug --host=ballista-scheduler --port=50050
```

Expand Down