From 16d4ac106d0031cd8c8b420c081c53dfc0d492ce Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 26 Jun 2021 13:21:00 -0700 Subject: [PATCH 1/6] honor table name for csv/parquet scan in ballista plan serde --- .../core/src/serde/logical_plan/from_proto.rs | 22 ++++++++++++----- datafusion/src/logical_plan/builder.rs | 24 +++++++++++++++++-- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 418d60de3e7a..15ee50733eca 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -126,9 +126,14 @@ impl TryInto for &protobuf::LogicalPlanNode { projection = Some(column_indices); } - LogicalPlanBuilder::scan_csv(&scan.path, options, projection)? - .build() - .map_err(|e| e.into()) + LogicalPlanBuilder::scan_csv_with_name( + &scan.path, + options, + projection, + &scan.table_name, + )? + .build() + .map_err(|e| e.into()) } LogicalPlanType::ParquetScan(scan) => { let projection = match scan.projection.as_ref() { @@ -151,9 +156,14 @@ impl TryInto for &protobuf::LogicalPlanNode { Some(r?) } }; - LogicalPlanBuilder::scan_parquet(&scan.path, projection, 24)? //TODO concurrency - .build() - .map_err(|e| e.into()) + LogicalPlanBuilder::scan_parquet_with_name( + &scan.path, + projection, + 24, + &scan.table_name, + )? //TODO concurrency + .build() + .map_err(|e| e.into()) } LogicalPlanType::Sort(sort) => { let input: LogicalPlan = convert_box_required!(sort.input)?; diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 147f8322df5d..ced77ba6c6f6 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -118,9 +118,19 @@ impl LogicalPlanBuilder { path: &str, options: CsvReadOptions, projection: Option>, + ) -> Result { + Self::scan_csv_with_name(path, options, projection, path) + } + + /// Scan a CSV data source and register it with a given table name + pub fn scan_csv_with_name( + path: &str, + options: CsvReadOptions, + projection: Option>, + table_name: &str, ) -> Result { let provider = Arc::new(CsvFile::try_new(path, options)?); - Self::scan(path, provider, projection) + Self::scan(table_name, provider, projection) } /// Scan a Parquet data source @@ -128,9 +138,19 @@ impl LogicalPlanBuilder { path: &str, projection: Option>, max_concurrency: usize, + ) -> Result { + Self::scan_parquet_with_name(path, projection, max_concurrency, path) + } + + /// Scan a Parquet data source and register it with a given table name + pub fn scan_parquet_with_name( + path: &str, + projection: Option>, + max_concurrency: usize, + table_name: &str, ) -> Result { let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?); - Self::scan(path, provider, projection) + Self::scan(table_name, provider, projection) } /// Scan an empty data source, mainly used in tests From 0830cba904f314dab97290dd4f3f83a00c603770 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 26 Jun 2021 16:08:10 -0700 Subject: [PATCH 2/6] disable query 7,8,9 in ballista integration test --- benchmarks/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/run.sh b/benchmarks/run.sh index 21633d39c23a..8e36424da89f 100755 --- a/benchmarks/run.sh +++ b/benchmarks/run.sh @@ -20,7 +20,7 @@ set -e # This bash script is meant to be run inside the docker-compose environment. Check the README for instructions cd / -for query in 1 3 5 6 7 8 9 10 12 +for query in 1 3 5 6 10 12 do /tpch benchmark ballista --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug done From 71efc005c8c6a4fe76ad38476aa5fc604052a0a2 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 26 Jun 2021 18:00:26 -0700 Subject: [PATCH 3/6] add tpch query ballista roundtrip test --- benchmarks/Cargo.toml | 3 ++ benchmarks/src/bin/tpch.rs | 48 +++++++++++++++++++++++++++++++- datafusion/src/datasource/mod.rs | 1 + 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 6a763420c782..19a67a504e77 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -39,3 +39,6 @@ futures = "0.3" env_logger = "^0.8" mimalloc = { version = "0.1", optional = true, default-features = false } snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] } + +[dev-dependencies] +ballista-core = { path = "../ballista/rust/core" } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 286fe4594510..3c03852edb01 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -573,7 +573,6 @@ mod tests { use datafusion::arrow::array::*; use datafusion::arrow::util::display::array_value_to_string; - use datafusion::logical_plan::Expr; use datafusion::logical_plan::Expr::Cast; @@ -1042,4 +1041,51 @@ mod tests { Ok(()) } + + mod ballista_round_trip { + use super::*; + use ballista_core::serde::protobuf; + use std::convert::TryInto; + + fn round_trip_query(n: usize) -> Result<()> { + let config = ExecutionConfig::new() + .with_concurrency(1) + .with_batch_size(10); + let mut ctx = ExecutionContext::with_config(config); + + for &table in TABLES { + let schema = get_schema(table); + let options = CsvReadOptions::new() + .schema(&schema) + .delimiter(b'|') + .has_header(false) + .file_extension(".tbl"); + let provider = CsvFile::try_new("./foo.csv", options)?; + ctx.register_table(table, Arc::new(provider))?; + } + + let plan = create_logical_plan(&mut ctx, n)?; + let proto: protobuf::LogicalPlanNode = (&plan).try_into().unwrap(); + let round_trip: LogicalPlan = (&proto).try_into().unwrap(); + assert_eq!(format!("{:?}", plan), format!("{:?}", round_trip)); + + Ok(()) + } + + macro_rules! test_round_trip { + ($tn:ident, $query:expr) => { + #[test] + fn $tn() -> Result<()> { + round_trip_query($query) + } + }; + } + + test_round_trip!(q1, 1); + test_round_trip!(q3, 3); + test_round_trip!(q5, 5); + test_round_trip!(q6, 6); + test_round_trip!(q10, 10); + test_round_trip!(q12, 12); + } } diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index b46b9cc4e899..9699a997caa1 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -28,6 +28,7 @@ pub use self::csv::{CsvFile, CsvReadOptions}; pub use self::datasource::{TableProvider, TableType}; pub use self::memory::MemTable; +/// Source for table input data pub(crate) enum Source> { /// Path to a single file or a directory containing one of more files Path(String), From afde3b3b6f35051b9e2ee6ccbe2d4fcc92b0fc65 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 26 Jun 2021 18:37:44 -0700 Subject: [PATCH 4/6] also roud trip physical plan --- benchmarks/src/bin/tpch.rs | 42 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 3c03852edb01..bada0c24165c 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -1045,6 +1045,7 @@ mod tests { mod ballista_round_trip { use super::*; use ballista_core::serde::protobuf; + use datafusion::physical_plan::ExecutionPlan; use std::convert::TryInto; fn round_trip_query(n: usize) -> Result<()> { @@ -1053,6 +1054,12 @@ mod tests { .with_batch_size(10); let mut ctx = ExecutionContext::with_config(config); + let tpch_data_path = if let Ok(path) = env::var("TPCH_DATA") { + path + } else { + "./".to_string() + }; + for &table in TABLES { let schema = get_schema(table); let options = CsvReadOptions::new() @@ -1060,14 +1067,45 @@ mod tests { .delimiter(b'|') .has_header(false) .file_extension(".tbl"); - let provider = CsvFile::try_new("./foo.csv", options)?; + let provider = CsvFile::try_new( + &format!("{}/{}.tbl", tpch_data_path, table), + options, + )?; ctx.register_table(table, Arc::new(provider))?; } + // test logical plan round trip let plan = create_logical_plan(&mut ctx, n)?; let proto: protobuf::LogicalPlanNode = (&plan).try_into().unwrap(); let round_trip: LogicalPlan = (&proto).try_into().unwrap(); - assert_eq!(format!("{:?}", plan), format!("{:?}", round_trip)); + assert_eq!( + format!("{:?}", plan), + format!("{:?}", round_trip), + "logical plan round trip failed" + ); + + // test optimized logical plan round trip + let plan = ctx.optimize(&plan)?; + let proto: protobuf::LogicalPlanNode = (&plan).try_into().unwrap(); + let round_trip: LogicalPlan = (&proto).try_into().unwrap(); + assert_eq!( + format!("{:?}", plan), + format!("{:?}", round_trip), + "opitmized logical plan round trip failed" + ); + + // test physical plan roundtrip + if let Ok(_) = env::var("TPCH_DATA") { + let physical_plan = ctx.create_physical_plan(&plan)?; + let proto: protobuf::PhysicalPlanNode = + (physical_plan.clone()).try_into().unwrap(); + let round_trip: Arc = (&proto).try_into().unwrap(); + assert_eq!( + format!("{:?}", physical_plan), + format!("{:?}", round_trip), + "physical plan round trip failed" + ); + } Ok(()) } From 7c719c9e90ced5d3415572c1d475d5a1247a6227 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 26 Jun 2021 20:27:15 -0700 Subject: [PATCH 5/6] fix clippy --- benchmarks/src/bin/tpch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index bada0c24165c..ea88cbdbaae7 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -1095,7 +1095,7 @@ mod tests { ); // test physical plan roundtrip - if let Ok(_) = env::var("TPCH_DATA") { + if env::var("TPCH_DATA").is_ok() { let physical_plan = ctx.create_physical_plan(&plan)?; let proto: protobuf::PhysicalPlanNode = (physical_plan.clone()).try_into().unwrap(); From 59ed6b3096693d03c1c1a7bc885056fb30b9254a Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 26 Jun 2021 20:58:42 -0700 Subject: [PATCH 6/6] simplify test code --- benchmarks/src/bin/tpch.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index ea88cbdbaae7..77c69f0ce524 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -1054,11 +1054,10 @@ mod tests { .with_batch_size(10); let mut ctx = ExecutionContext::with_config(config); - let tpch_data_path = if let Ok(path) = env::var("TPCH_DATA") { - path - } else { - "./".to_string() - }; + // set tpch_data_path to dummy value and skip physical plan serde test when TPCH_DATA + // is not set. + let tpch_data_path = + env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string()); for &table in TABLES { let schema = get_schema(table);