Skip to content

Commit

Permalink
Execute LogicalPlans after building for TPCH Benchmarks (#3290)
Browse files Browse the repository at this point in the history
  • Loading branch information
DaltonModlin committed Aug 30, 2022
1 parent 7aed4d6 commit 3effee8
Showing 1 changed file with 16 additions and 20 deletions.
36 changes: 16 additions & 20 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::{

use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::parquet::basic::Compression;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
Expand Down Expand Up @@ -196,10 +195,12 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
for i in 0..opt.iterations {
let start = Instant::now();
let plans = create_logical_plans(&ctx, opt.query)?;
for plan in plans {
result = execute_query(&ctx, &plan, opt.debug).await?;

let sql = &get_query_sql(opt.query)?;
for query in sql {
result = execute_query(&ctx, query, opt.debug).await?;
}

let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
let row_count = result.iter().map(|b| b.num_rows()).sum();
Expand Down Expand Up @@ -253,7 +254,7 @@ fn get_query_sql(query: usize) -> Result<Vec<String>> {
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect())
.collect());
}
Err(e) => errors.push(format!("{}: {}", filename, e)),
};
Expand All @@ -269,23 +270,18 @@ fn get_query_sql(query: usize) -> Result<Vec<String>> {
}
}

/// Create a logical plan for each query in the specified query file
fn create_logical_plans(ctx: &SessionContext, query: usize) -> Result<Vec<LogicalPlan>> {
let sql = get_query_sql(query)?;
sql.iter()
.map(|sql| ctx.create_logical_plan(sql.as_str()))
.collect::<Result<Vec<_>>>()
}

async fn execute_query(
ctx: &SessionContext,
plan: &LogicalPlan,
sql: &str,
debug: bool,
) -> Result<Vec<RecordBatch>> {
let plan = ctx.sql(sql).await?;
let plan = plan.to_logical_plan()?;

if debug {
println!("=== Logical plan ===\n{:?}\n", plan);
}
let plan = ctx.optimize(plan)?;
let plan = ctx.optimize(&plan)?;
if debug {
println!("=== Optimized logical plan ===\n{:?}\n", plan);
}
Expand Down Expand Up @@ -357,7 +353,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
return Err(DataFusionError::NotImplemented(format!(
"Invalid compression format: {}",
other
)))
)));
}
};
let props = WriterProperties::builder()
Expand All @@ -369,7 +365,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
return Err(DataFusionError::NotImplemented(format!(
"Invalid output format: {}",
other
)))
)));
}
}
println!("Conversion completed in {} ms", start.elapsed().as_millis());
Expand Down Expand Up @@ -1022,9 +1018,9 @@ mod tests {
ctx.register_table(table, Arc::new(provider))?;
}

let plans = create_logical_plans(&ctx, n)?;
for plan in plans {
execute_query(&ctx, &plan, false).await?;
let sql = &get_query_sql(n)?;
for query in sql {
execute_query(&ctx, query, false).await?;
}

Ok(())
Expand Down

0 comments on commit 3effee8

Please sign in to comment.