diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index 5ff51e07514e..d61f570cb4c2 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -295,8 +295,9 @@ mod test { use ballista_core::serde::BallistaCodec; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::execution::context::default_session_builder; - use datafusion::logical_plan::{col, sum, LogicalPlan, LogicalPlanBuilder}; + use datafusion::logical_plan::{col, sum, LogicalPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; + use datafusion::test_util::scan_empty; use crate::scheduler_server::event::QueryStageSchedulerEvent; use crate::scheduler_server::SchedulerServer; @@ -606,7 +607,7 @@ mod test { Field::new("gmv", DataType::UInt64, false), ]); - LogicalPlanBuilder::scan_empty(None, &schema, Some(vec![0, 1])) + scan_empty(None, &schema, Some(vec![0, 1])) .unwrap() .aggregate(vec![col("id")], vec![sum(col("gmv"))]) .unwrap() diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 739d194949f3..e84313f82f66 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -17,7 +17,7 @@ //! This module provides a builder for creating LogicalPlans -use crate::datasource::{empty::EmptyTable, TableProvider}; +use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_expr::ExprSchemable; use crate::logical_plan::plan::{ @@ -52,7 +52,7 @@ pub const UNNAMED_TABLE: &str = "?table?"; /// Builder for logical plans /// -/// ``` +/// ``` ignore /// # use datafusion::prelude::*; /// # use datafusion::logical_plan::LogicalPlanBuilder; /// # use datafusion::error::Result; @@ -188,17 +188,6 @@ impl LogicalPlanBuilder { Ok(Self::from(LogicalPlan::Values(Values { schema, values }))) } - /// Scan an empty data source, mainly used in tests - pub fn scan_empty( - name: Option<&str>, - table_schema: &Schema, - projection: Option>, - ) -> Result { - let table_schema = Arc::new(table_schema.clone()); - let provider = Arc::new(EmptyTable::new(table_schema)); - Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection) - } - /// Convert a table provider into a builder with a TableScan pub fn scan( table_name: impl Into, @@ -951,20 +940,18 @@ mod tests { use crate::logical_plan::StringifiedPlan; use crate::prelude::*; use crate::test::test_table_scan_with_name; + use crate::test_util::scan_empty; use super::super::{col, lit, sum}; use super::*; #[test] fn plan_builder_simple() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( - Some("employee_csv"), - &employee_schema(), - Some(vec![0, 3]), - )? - .filter(col("state").eq(lit("CO")))? - .project(vec![col("id")])? - .build()?; + let plan = + scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))? + .filter(col("state").eq(lit("CO")))? + .project(vec![col("id")])? + .build()?; let expected = "Projection: #employee_csv.id\ \n Filter: #employee_csv.state = Utf8(\"CO\")\ @@ -978,8 +965,7 @@ mod tests { #[test] fn plan_builder_schema() { let schema = employee_schema(); - let plan = - LogicalPlanBuilder::scan_empty(Some("employee_csv"), &schema, None).unwrap(); + let plan = scan_empty(Some("employee_csv"), &schema, None).unwrap(); let expected = DFSchema::try_from_qualified_schema("employee_csv", &schema).unwrap(); @@ -989,19 +975,16 @@ mod tests { #[test] fn plan_builder_aggregate() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( - Some("employee_csv"), - &employee_schema(), - Some(vec![3, 4]), - )? - .aggregate( - vec![col("state")], - vec![sum(col("salary")).alias("total_salary")], - )? - .project(vec![col("state"), col("total_salary")])? - .limit(10)? - .offset(2)? - .build()?; + let plan = + scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))? + .aggregate( + vec![col("state")], + vec![sum(col("salary")).alias("total_salary")], + )? + .project(vec![col("state"), col("total_salary")])? + .limit(10)? + .offset(2)? + .build()?; let expected = "Offset: 2\ \n Limit: 10\ @@ -1016,24 +999,21 @@ mod tests { #[test] fn plan_builder_sort() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( - Some("employee_csv"), - &employee_schema(), - Some(vec![3, 4]), - )? - .sort(vec![ - Expr::Sort { - expr: Box::new(col("state")), - asc: true, - nulls_first: true, - }, - Expr::Sort { - expr: Box::new(col("salary")), - asc: false, - nulls_first: false, - }, - ])? - .build()?; + let plan = + scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))? + .sort(vec![ + Expr::Sort { + expr: Box::new(col("state")), + asc: true, + nulls_first: true, + }, + Expr::Sort { + expr: Box::new(col("salary")), + asc: false, + nulls_first: false, + }, + ])? + .build()?; let expected = "Sort: #employee_csv.state ASC NULLS FIRST, #employee_csv.salary DESC NULLS LAST\ \n TableScan: employee_csv projection=Some([3, 4])"; @@ -1045,10 +1025,9 @@ mod tests { #[test] fn plan_using_join_wildcard_projection() -> Result<()> { - let t2 = LogicalPlanBuilder::scan_empty(Some("t2"), &employee_schema(), None)? - .build()?; + let t2 = scan_empty(Some("t2"), &employee_schema(), None)?.build()?; - let plan = LogicalPlanBuilder::scan_empty(Some("t1"), &employee_schema(), None)? + let plan = scan_empty(Some("t1"), &employee_schema(), None)? .join_using(&t2, JoinType::Inner, vec!["id"])? .project(vec![Expr::Wildcard])? .build()?; @@ -1066,11 +1045,8 @@ mod tests { #[test] fn plan_builder_union_combined_single_union() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( - Some("employee_csv"), - &employee_schema(), - Some(vec![3, 4]), - )?; + let plan = + scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?; let plan = plan .union(plan.build()?)? @@ -1167,7 +1143,7 @@ mod tests { #[test] fn projection_non_unique_names() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( + let plan = scan_empty( Some("employee_csv"), &employee_schema(), // project id and first_name by column index @@ -1193,7 +1169,7 @@ mod tests { #[test] fn aggregate_non_unique_names() -> Result<()> { - let plan = LogicalPlanBuilder::scan_empty( + let plan = scan_empty( Some("employee_csv"), &employee_schema(), // project state and salary by column index diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index f12981707f86..1dfda1a8665e 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -100,8 +100,9 @@ pub fn source_as_provider( #[cfg(test)] mod tests { - use super::super::{col, lit, LogicalPlanBuilder}; + use super::super::{col, lit}; use super::*; + use crate::test_util::scan_empty; use arrow::datatypes::{DataType, Field, Schema}; fn employee_schema() -> Schema { @@ -115,18 +116,14 @@ mod tests { } fn display_plan() -> LogicalPlan { - LogicalPlanBuilder::scan_empty( - Some("employee_csv"), - &employee_schema(), - Some(vec![0, 3]), - ) - .unwrap() - .filter(col("state").eq(lit("CO"))) - .unwrap() - .project(vec![col("id")]) - .unwrap() - .build() - .unwrap() + scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![0, 3])) + .unwrap() + .filter(col("state").eq(lit("CO"))) + .unwrap() + .project(vec![col("id")]) + .unwrap() + .build() + .unwrap() } #[test] @@ -424,7 +421,7 @@ mod tests { Field::new("state", DataType::Utf8, false), ]); - LogicalPlanBuilder::scan_empty(None, &schema, Some(vec![0, 1])) + scan_empty(None, &schema, Some(vec![0, 1])) .unwrap() .filter(col("state").eq(lit("CO"))) .unwrap() diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 20b8f683dfb0..cf14adcd1803 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -517,6 +517,7 @@ mod tests { col, exprlist_to_fields, lit, max, min, Expr, JoinType, LogicalPlanBuilder, }; use crate::test::*; + use crate::test_util::scan_empty; use arrow::datatypes::DataType; #[test] @@ -646,8 +647,7 @@ mod tests { let table_scan = test_table_scan()?; let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]); - let table2_scan = - LogicalPlanBuilder::scan_empty(Some("test2"), &schema, None)?.build()?; + let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?; let plan = LogicalPlanBuilder::from(table_scan) .join(&table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]))? @@ -688,8 +688,7 @@ mod tests { let table_scan = test_table_scan()?; let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]); - let table2_scan = - LogicalPlanBuilder::scan_empty(Some("test2"), &schema, None)?.build()?; + let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?; let plan = LogicalPlanBuilder::from(table_scan) .join(&table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]))? @@ -732,8 +731,7 @@ mod tests { let table_scan = test_table_scan()?; let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]); - let table2_scan = - LogicalPlanBuilder::scan_empty(Some("test2"), &schema, None)?.build()?; + let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?; let plan = LogicalPlanBuilder::from(table_scan) .join_using(&table2_scan, JoinType::Left, vec!["a"])? diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs index e9694ebc528c..216af223a07c 100644 --- a/datafusion/core/src/optimizer/simplify_expressions.rs +++ b/datafusion/core/src/optimizer/simplify_expressions.rs @@ -749,6 +749,7 @@ mod tests { }; use crate::physical_plan::functions::make_scalar_function; use crate::physical_plan::udf::ScalarUDF; + use crate::test_util::scan_empty; #[test] fn test_simplify_or_true() { @@ -1508,7 +1509,7 @@ mod tests { Field::new("c", DataType::Boolean, false), Field::new("d", DataType::UInt32, false), ]); - LogicalPlanBuilder::scan_empty(Some("test"), &schema, None) + scan_empty(Some("test"), &schema, None) .expect("creating scan") .build() .expect("building plan") diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 911f9f67f573..27dd46b072d4 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1543,6 +1543,7 @@ mod tests { }; use crate::prelude::{SessionConfig, SessionContext}; use crate::scalar::ScalarValue; + use crate::test_util::scan_empty; use crate::{ logical_plan::LogicalPlanBuilder, physical_plan::SendableRecordBatchStream, }; @@ -1856,13 +1857,12 @@ mod tests { async fn test_explain() { let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); - let logical_plan = - LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None) - .unwrap() - .explain(true, false) - .unwrap() - .build() - .unwrap(); + let logical_plan = scan_empty(Some("employee"), &schema, None) + .unwrap() + .explain(true, false) + .unwrap() + .build() + .unwrap(); let plan = plan(&logical_plan).await.unwrap(); if let Some(plan) = plan.as_any().downcast_ref::() { diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index c8523a52e44d..1798232b3fdf 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -21,9 +21,9 @@ use crate::arrow::array::UInt32Array; use crate::datasource::{listing::local_unpartitioned_file, MemTable, TableProvider}; use crate::error::Result; use crate::from_slice::FromSlice; -use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder}; +use crate::logical_plan::LogicalPlan; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; -use crate::test_util::aggr_test_schema; +use crate::test_util::{aggr_test_schema, scan_empty}; use array::{Array, ArrayRef}; use arrow::array::{self, DecimalBuilder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -132,7 +132,7 @@ pub fn test_table_scan_with_name(name: &str) -> Result { Field::new("b", DataType::UInt32, false), Field::new("c", DataType::UInt32, false), ]); - LogicalPlanBuilder::scan_empty(Some(name), &schema, None)?.build() + scan_empty(Some(name), &schema, None)?.build() } /// some tests share a common table diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 1dd36316b797..3e174e9dc67b 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -20,7 +20,10 @@ use std::collections::BTreeMap; use std::{env, error::Error, path::PathBuf, sync::Arc}; +use crate::datasource::empty::EmptyTable; +use crate::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::DataFusionError; /// Compares formatted output of a record batch with an expected /// vector of strings, with the result of pretty formatting record @@ -232,6 +235,17 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result, + table_schema: &Schema, + projection: Option>, +) -> Result { + let table_schema = Arc::new(table_schema.clone()); + let provider = Arc::new(EmptyTable::new(table_schema)); + LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection) +} + /// Get the schema for the aggregate_test_* csv files pub fn aggr_test_schema() -> SchemaRef { let mut f1 = Field::new("c1", DataType::Utf8, false); diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 42999a743bbd..08ccbe453042 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -16,7 +16,8 @@ // under the License. use super::*; -use datafusion::{logical_plan::LogicalPlanBuilder, scalar::ScalarValue}; +use datafusion::scalar::ScalarValue; +use datafusion::test_util::scan_empty; #[tokio::test] async fn csv_query_avg_multi_batch() -> Result<()> { @@ -1479,7 +1480,7 @@ async fn aggregate_with_alias() -> Result<()> { Field::new("c2", DataType::UInt32, false), ])); - let plan = LogicalPlanBuilder::scan_empty(None, schema.as_ref(), None)? + let plan = scan_empty(None, schema.as_ref(), None)? .aggregate(vec![col("c1")], vec![sum(col("c2"))])? .project(vec![col("c1"), sum(col("c2")).alias("total_salary")])? .build()?; diff --git a/datafusion/core/tests/sql/explain.rs b/datafusion/core/tests/sql/explain.rs index b85228016e50..97e7e6e76123 100644 --- a/datafusion/core/tests/sql/explain.rs +++ b/datafusion/core/tests/sql/explain.rs @@ -16,8 +16,9 @@ // under the License. use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::test_util::scan_empty; use datafusion::{ - logical_plan::{LogicalPlan, LogicalPlanBuilder, PlanType}, + logical_plan::{LogicalPlan, PlanType}, prelude::SessionContext, }; @@ -25,7 +26,7 @@ use datafusion::{ fn optimize_explain() { let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); - let plan = LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None) + let plan = scan_empty(Some("employee"), &schema, None) .unwrap() .explain(true, false) .unwrap() diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index 0b7ce860eccc..e1b1742bfa2d 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -16,6 +16,7 @@ // under the License. use datafusion::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE}; +use datafusion::test_util::scan_empty; use tempfile::TempDir; use super::*; @@ -209,7 +210,7 @@ async fn preserve_nullability_on_projection() -> Result<()> { let schema: Schema = ctx.table("test").unwrap().schema().clone().into(); assert!(!schema.field_with_name("c1")?.is_nullable()); - let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)? + let plan = scan_empty(None, &schema, None)? .project(vec![col("c1")])? .build()?;