diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 9778e2fe99e1..a20208c7de8a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -32,11 +32,12 @@ use crate::{ }, Expr, ExprSchemable, TableSource, }; -use arrow::datatypes::{DataType, Schema}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::{ Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, ToDFSchema, }; +use std::any::Any; use std::convert::TryFrom; use std::iter; use std::{ @@ -49,10 +50,9 @@ pub const UNNAMED_TABLE: &str = "?table?"; /// Builder for logical plans /// -/// ``` ignore -/// # use datafusion::prelude::*; -/// # use datafusion::logical_plan::LogicalPlanBuilder; -/// # use datafusion::error::Result; +/// ``` +/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan}; +/// # use datafusion_common::Result; /// # use arrow::datatypes::{Schema, DataType, Field}; /// # /// # fn main() -> Result<()> { @@ -71,7 +71,7 @@ pub const UNNAMED_TABLE: &str = "?table?"; /// // SELECT last_name /// // FROM employees /// // WHERE salary < 1000 -/// let plan = LogicalPlanBuilder::scan_empty( +/// let plan = table_scan( /// Some("employee"), /// &employee_schema(), /// None, @@ -934,12 +934,37 @@ pub fn project_with_alias( })) } +/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. +/// This is mostly used for testing and documentation. +pub fn table_scan( + name: Option<&str>, + table_schema: &Schema, + projection: Option>, +) -> Result { + let table_schema = Arc::new(table_schema.clone()); + let table_source = Arc::new(LogicalTableSource { table_schema }); + LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection) +} + +struct LogicalTableSource { + table_schema: SchemaRef, +} + +impl TableSource for LogicalTableSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.table_schema.clone() + } +} + #[cfg(test)] mod tests { use crate::expr_fn::exists; - use arrow::datatypes::{DataType, Field, SchemaRef}; + use arrow::datatypes::{DataType, Field}; use datafusion_common::SchemaError; - use std::any::Any; use crate::logical_plan::StringifiedPlan; @@ -949,7 +974,7 @@ mod tests { #[test] fn plan_builder_simple() -> Result<()> { let plan = - scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))? + table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))? .filter(col("state").eq(lit("CO")))? .project(vec![col("id")])? .build()?; @@ -966,7 +991,7 @@ mod tests { #[test] fn plan_builder_schema() { let schema = employee_schema(); - let plan = scan_empty(Some("employee_csv"), &schema, None).unwrap(); + let plan = table_scan(Some("employee_csv"), &schema, None).unwrap(); let expected = DFSchema::try_from_qualified_schema("employee_csv", &schema).unwrap(); @@ -977,7 +1002,7 @@ mod tests { #[test] fn plan_builder_aggregate() -> Result<()> { let plan = - scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))? + table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))? .aggregate( vec![col("state")], vec![sum(col("salary")).alias("total_salary")], @@ -1001,7 +1026,7 @@ mod tests { #[test] fn plan_builder_sort() -> Result<()> { let plan = - scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))? + table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))? .sort(vec![ Expr::Sort { expr: Box::new(col("state")), @@ -1026,9 +1051,9 @@ mod tests { #[test] fn plan_using_join_wildcard_projection() -> Result<()> { - let t2 = scan_empty(Some("t2"), &employee_schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &employee_schema(), None)?.build()?; - let plan = scan_empty(Some("t1"), &employee_schema(), None)? + let plan = table_scan(Some("t1"), &employee_schema(), None)? .join_using(&t2, JoinType::Inner, vec!["id"])? .project(vec![Expr::Wildcard])? .build()?; @@ -1047,7 +1072,7 @@ mod tests { #[test] fn plan_builder_union_combined_single_union() -> Result<()> { let plan = - scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?; + table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?; let plan = plan .union(plan.build()?)? @@ -1144,7 +1169,7 @@ mod tests { #[test] fn projection_non_unique_names() -> Result<()> { - let plan = scan_empty( + let plan = table_scan( Some("employee_csv"), &employee_schema(), // project id and first_name by column index @@ -1170,7 +1195,7 @@ mod tests { #[test] fn aggregate_non_unique_names() -> Result<()> { - let plan = scan_empty( + let plan = table_scan( Some("employee_csv"), &employee_schema(), // project state and salary by column index @@ -1242,30 +1267,6 @@ mod tests { Field::new("b", DataType::UInt32, false), Field::new("c", DataType::UInt32, false), ]); - scan_empty(Some(name), &schema, None)?.build() - } - - fn scan_empty( - name: Option<&str>, - table_schema: &Schema, - projection: Option>, - ) -> Result { - let table_schema = Arc::new(table_schema.clone()); - let table_source = Arc::new(EmptyTable { table_schema }); - LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection) - } - - struct EmptyTable { - table_schema: SchemaRef, - } - - impl TableSource for EmptyTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.table_schema.clone() - } + table_scan(Some(name), &schema, None)?.build() } } diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index f96e4320fe2a..f3bd051e57ec 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -20,7 +20,7 @@ pub mod display; mod extension; mod plan; -pub use builder::LogicalPlanBuilder; +pub use builder::{table_scan, LogicalPlanBuilder}; pub use plan::{ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, Explain, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 50ca75fdf76f..05c8a22e7d26 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -462,21 +462,20 @@ impl LogicalPlan { /// CsvScan: employee projection=Some([0, 3]) /// ``` /// - /// ```ignore + /// ``` /// use arrow::datatypes::{Field, Schema, DataType}; - /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder}; + /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan}; /// let schema = Schema::new(vec![ /// Field::new("id", DataType::Int32, false), /// ]); - /// let plan = LogicalPlanBuilder::scan_empty(Some("foo_csv"), &schema, None).unwrap() + /// let plan = table_scan(Some("t1"), &schema, None).unwrap() /// .filter(col("id").eq(lit(5))).unwrap() /// .build().unwrap(); /// /// // Format using display_indent /// let display_string = format!("{}", plan.display_indent()); /// - /// assert_eq!("Filter: #foo_csv.id = Int32(5)\ - /// \n TableScan: foo_csv projection=None", + /// assert_eq!("Filter: #t1.id = Int32(5)\n TableScan: t1 projection=None", /// display_string); /// ``` pub fn display_indent(&self) -> impl fmt::Display + '_ { @@ -503,21 +502,21 @@ impl LogicalPlan { /// TableScan: employee projection=Some([0, 3]) [id:Int32, state:Utf8]"; /// ``` /// - /// ```ignore + /// ``` /// use arrow::datatypes::{Field, Schema, DataType}; - /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder}; + /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan}; /// let schema = Schema::new(vec![ /// Field::new("id", DataType::Int32, false), /// ]); - /// let plan = LogicalPlanBuilder::scan_empty(Some("foo_csv"), &schema, None).unwrap() + /// let plan = table_scan(Some("t1"), &schema, None).unwrap() /// .filter(col("id").eq(lit(5))).unwrap() /// .build().unwrap(); /// /// // Format using display_indent_schema /// let display_string = format!("{}", plan.display_indent_schema()); /// - /// assert_eq!("Filter: #foo_csv.id = Int32(5) [id:Int32]\ - /// \n TableScan: foo_csv projection=None [id:Int32]", + /// assert_eq!("Filter: #t1.id = Int32(5) [id:Int32]\ + /// \n TableScan: t1 projection=None [id:Int32]", /// display_string); /// ``` pub fn display_indent_schema(&self) -> impl fmt::Display + '_ { @@ -543,13 +542,13 @@ impl LogicalPlan { /// This currently produces two graphs -- one with the basic /// structure, and one with additional details such as schema. /// - /// ```ignore + /// ``` /// use arrow::datatypes::{Field, Schema, DataType}; - /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder}; + /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan}; /// let schema = Schema::new(vec![ /// Field::new("id", DataType::Int32, false), /// ]); - /// let plan = LogicalPlanBuilder::scan_empty(Some("foo.csv"), &schema, None).unwrap() + /// let plan = table_scan(Some("t1"), &schema, None).unwrap() /// .filter(col("id").eq(lit(5))).unwrap() /// .build().unwrap(); /// @@ -602,19 +601,19 @@ impl LogicalPlan { /// ```text /// Projection: #id /// ``` - /// ```ignore + /// ``` /// use arrow::datatypes::{Field, Schema, DataType}; - /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder}; + /// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan}; /// let schema = Schema::new(vec![ /// Field::new("id", DataType::Int32, false), /// ]); - /// let plan = LogicalPlanBuilder::scan_empty(Some("foo.csv"), &schema, None).unwrap() + /// let plan = table_scan(Some("t1"), &schema, None).unwrap() /// .build().unwrap(); /// /// // Format using display /// let display_string = format!("{}", plan.display()); /// - /// assert_eq!("TableScan: foo.csv projection=None", display_string); + /// assert_eq!("TableScan: t1 projection=None", display_string); /// ``` pub fn display(&self) -> impl fmt::Display + '_ { // Boilerplate structure to wrap LogicalPlan with something