This directory contains examples for executing distributed queries with Ballista.
The standalone example is the easiest to get started with. Ballista supports a standalone mode where a scheduler and executor are started in-process.
cargo run --example standalone_sql --features="ballista/standalone"
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "1")
.build()?;
let ctx = BallistaContext::standalone(&config, 2).await?;
ctx.register_csv(
"test",
"testdata/aggregate_test_100.csv",
CsvReadOptions::new(),
)
.await?;
let df = ctx.sql("select count(1) from test").await?;
df.show().await?;
Ok(())
}
For background information on the Ballista architecture, refer to the Ballista README.
From the root of the project, build release binaries.
cargo build --release
Start a Ballista scheduler process in a new terminal session.
RUST_LOG=info ./target/release/ballista-scheduler
Start one or more Ballista executor processes in new terminal sessions. When starting more than one executor, a unique port number must be specified for each executor.
RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50051
RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052
The examples can be run using the cargo run --bin
syntax.
cargo run --release --bin sql
use ballista::prelude::*;
use datafusion::prelude::CsvReadOptions;
/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results, using SQL
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
// register csv file with the execution context
ctx.register_csv(
"test",
"testdata/aggregate_test_100.csv",
CsvReadOptions::new(),
)
.await?;
// execute the query
let df = ctx
.sql(
"SELECT c1, MIN(c12), MAX(c12) \
FROM test \
WHERE c11 > 0.1 AND c11 < 0.9 \
GROUP BY c1",
)
.await?;
// print the results
df.show().await?;
Ok(())
}
cargo run --release --bin dataframe
#[tokio::main]
async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
let filename = "testdata/alltypes_plain.parquet";
// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;
// print the results
df.show().await?;
Ok(())
}