diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 418d60de3e7a..15ee50733eca 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -126,9 +126,14 @@ impl TryInto for &protobuf::LogicalPlanNode { projection = Some(column_indices); } - LogicalPlanBuilder::scan_csv(&scan.path, options, projection)? - .build() - .map_err(|e| e.into()) + LogicalPlanBuilder::scan_csv_with_name( + &scan.path, + options, + projection, + &scan.table_name, + )? + .build() + .map_err(|e| e.into()) } LogicalPlanType::ParquetScan(scan) => { let projection = match scan.projection.as_ref() { @@ -151,9 +156,14 @@ impl TryInto for &protobuf::LogicalPlanNode { Some(r?) } }; - LogicalPlanBuilder::scan_parquet(&scan.path, projection, 24)? //TODO concurrency - .build() - .map_err(|e| e.into()) + LogicalPlanBuilder::scan_parquet_with_name( + &scan.path, + projection, + 24, + &scan.table_name, + )? //TODO concurrency + .build() + .map_err(|e| e.into()) } LogicalPlanType::Sort(sort) => { let input: LogicalPlan = convert_box_required!(sort.input)?; diff --git a/benchmarks/run.sh b/benchmarks/run.sh index 21633d39c23a..8e36424da89f 100755 --- a/benchmarks/run.sh +++ b/benchmarks/run.sh @@ -20,7 +20,7 @@ set -e # This bash script is meant to be run inside the docker-compose environment. Check the README for instructions cd / -for query in 1 3 5 6 7 8 9 10 12 +for query in 1 3 5 6 10 12 do /tpch benchmark ballista --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug done diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 147f8322df5d..ced77ba6c6f6 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -118,9 +118,19 @@ impl LogicalPlanBuilder { path: &str, options: CsvReadOptions, projection: Option>, + ) -> Result { + Self::scan_csv_with_name(path, options, projection, path) + } + + /// Scan a CSV data source and register it with a given table name + pub fn scan_csv_with_name( + path: &str, + options: CsvReadOptions, + projection: Option>, + table_name: &str, ) -> Result { let provider = Arc::new(CsvFile::try_new(path, options)?); - Self::scan(path, provider, projection) + Self::scan(table_name, provider, projection) } /// Scan a Parquet data source @@ -128,9 +138,19 @@ impl LogicalPlanBuilder { path: &str, projection: Option>, max_concurrency: usize, + ) -> Result { + Self::scan_parquet_with_name(path, projection, max_concurrency, path) + } + + /// Scan a Parquet data source and register it with a given table name + pub fn scan_parquet_with_name( + path: &str, + projection: Option>, + max_concurrency: usize, + table_name: &str, ) -> Result { let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?); - Self::scan(path, provider, projection) + Self::scan(table_name, provider, projection) } /// Scan an empty data source, mainly used in tests