Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 43 additions & 42 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ use crate::{
},
Expr, ExprSchemable, TableSource,
};
use arrow::datatypes::{DataType, Schema};
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
ToDFSchema,
};
use std::any::Any;
use std::convert::TryFrom;
use std::iter;
use std::{
Expand All @@ -49,10 +50,9 @@ pub const UNNAMED_TABLE: &str = "?table?";

/// Builder for logical plans
///
/// ``` ignore
/// # use datafusion::prelude::*;
/// # use datafusion::logical_plan::LogicalPlanBuilder;
/// # use datafusion::error::Result;
/// ```
/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
/// # use datafusion_common::Result;
/// # use arrow::datatypes::{Schema, DataType, Field};
/// #
/// # fn main() -> Result<()> {
Expand All @@ -71,7 +71,7 @@ pub const UNNAMED_TABLE: &str = "?table?";
/// // SELECT last_name
/// // FROM employees
/// // WHERE salary < 1000
/// let plan = LogicalPlanBuilder::scan_empty(
/// let plan = table_scan(
/// Some("employee"),
/// &employee_schema(),
/// None,
Expand Down Expand Up @@ -934,12 +934,37 @@ pub fn project_with_alias(
}))
}

/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
/// This is mostly used for testing and documentation.
pub fn table_scan(
name: Option<&str>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
) -> Result<LogicalPlanBuilder> {
let table_schema = Arc::new(table_schema.clone());
let table_source = Arc::new(LogicalTableSource { table_schema });
LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection)
}

struct LogicalTableSource {
table_schema: SchemaRef,
}

impl TableSource for LogicalTableSource {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.table_schema.clone()
}
}

#[cfg(test)]
mod tests {
use crate::expr_fn::exists;
use arrow::datatypes::{DataType, Field, SchemaRef};
use arrow::datatypes::{DataType, Field};
use datafusion_common::SchemaError;
use std::any::Any;

use crate::logical_plan::StringifiedPlan;

Expand All @@ -949,7 +974,7 @@ mod tests {
#[test]
fn plan_builder_simple() -> Result<()> {
let plan =
scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
.filter(col("state").eq(lit("CO")))?
.project(vec![col("id")])?
.build()?;
Expand All @@ -966,7 +991,7 @@ mod tests {
#[test]
fn plan_builder_schema() {
let schema = employee_schema();
let plan = scan_empty(Some("employee_csv"), &schema, None).unwrap();
let plan = table_scan(Some("employee_csv"), &schema, None).unwrap();

let expected =
DFSchema::try_from_qualified_schema("employee_csv", &schema).unwrap();
Expand All @@ -977,7 +1002,7 @@ mod tests {
#[test]
fn plan_builder_aggregate() -> Result<()> {
let plan =
scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
.aggregate(
vec![col("state")],
vec![sum(col("salary")).alias("total_salary")],
Expand All @@ -1001,7 +1026,7 @@ mod tests {
#[test]
fn plan_builder_sort() -> Result<()> {
let plan =
scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
.sort(vec![
Expr::Sort {
expr: Box::new(col("state")),
Expand All @@ -1026,9 +1051,9 @@ mod tests {

#[test]
fn plan_using_join_wildcard_projection() -> Result<()> {
let t2 = scan_empty(Some("t2"), &employee_schema(), None)?.build()?;
let t2 = table_scan(Some("t2"), &employee_schema(), None)?.build()?;

let plan = scan_empty(Some("t1"), &employee_schema(), None)?
let plan = table_scan(Some("t1"), &employee_schema(), None)?
.join_using(&t2, JoinType::Inner, vec!["id"])?
.project(vec![Expr::Wildcard])?
.build()?;
Expand All @@ -1047,7 +1072,7 @@ mod tests {
#[test]
fn plan_builder_union_combined_single_union() -> Result<()> {
let plan =
scan_empty(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;

let plan = plan
.union(plan.build()?)?
Expand Down Expand Up @@ -1144,7 +1169,7 @@ mod tests {

#[test]
fn projection_non_unique_names() -> Result<()> {
let plan = scan_empty(
let plan = table_scan(
Some("employee_csv"),
&employee_schema(),
// project id and first_name by column index
Expand All @@ -1170,7 +1195,7 @@ mod tests {

#[test]
fn aggregate_non_unique_names() -> Result<()> {
let plan = scan_empty(
let plan = table_scan(
Some("employee_csv"),
&employee_schema(),
// project state and salary by column index
Expand Down Expand Up @@ -1242,30 +1267,6 @@ mod tests {
Field::new("b", DataType::UInt32, false),
Field::new("c", DataType::UInt32, false),
]);
scan_empty(Some(name), &schema, None)?.build()
}

fn scan_empty(
name: Option<&str>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
) -> Result<LogicalPlanBuilder> {
let table_schema = Arc::new(table_schema.clone());
let table_source = Arc::new(EmptyTable { table_schema });
LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection)
}

struct EmptyTable {
table_schema: SchemaRef,
}

impl TableSource for EmptyTable {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.table_schema.clone()
}
table_scan(Some(name), &schema, None)?.build()
}
}
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod display;
mod extension;
mod plan;

pub use builder::LogicalPlanBuilder;
pub use builder::{table_scan, LogicalPlanBuilder};
pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, Explain,
Expand Down
33 changes: 16 additions & 17 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,21 +462,20 @@ impl LogicalPlan {
/// CsvScan: employee projection=Some([0, 3])
/// ```
///
/// ```ignore
/// ```
/// use arrow::datatypes::{Field, Schema, DataType};
/// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
/// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
/// let schema = Schema::new(vec![
/// Field::new("id", DataType::Int32, false),
/// ]);
/// let plan = LogicalPlanBuilder::scan_empty(Some("foo_csv"), &schema, None).unwrap()
/// let plan = table_scan(Some("t1"), &schema, None).unwrap()
/// .filter(col("id").eq(lit(5))).unwrap()
/// .build().unwrap();
///
/// // Format using display_indent
/// let display_string = format!("{}", plan.display_indent());
///
/// assert_eq!("Filter: #foo_csv.id = Int32(5)\
/// \n TableScan: foo_csv projection=None",
/// assert_eq!("Filter: #t1.id = Int32(5)\n TableScan: t1 projection=None",
/// display_string);
/// ```
pub fn display_indent(&self) -> impl fmt::Display + '_ {
Expand All @@ -503,21 +502,21 @@ impl LogicalPlan {
/// TableScan: employee projection=Some([0, 3]) [id:Int32, state:Utf8]";
/// ```
///
/// ```ignore
/// ```
/// use arrow::datatypes::{Field, Schema, DataType};
/// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
/// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
/// let schema = Schema::new(vec![
/// Field::new("id", DataType::Int32, false),
/// ]);
/// let plan = LogicalPlanBuilder::scan_empty(Some("foo_csv"), &schema, None).unwrap()
/// let plan = table_scan(Some("t1"), &schema, None).unwrap()
/// .filter(col("id").eq(lit(5))).unwrap()
/// .build().unwrap();
///
/// // Format using display_indent_schema
/// let display_string = format!("{}", plan.display_indent_schema());
///
/// assert_eq!("Filter: #foo_csv.id = Int32(5) [id:Int32]\
/// \n TableScan: foo_csv projection=None [id:Int32]",
/// assert_eq!("Filter: #t1.id = Int32(5) [id:Int32]\
/// \n TableScan: t1 projection=None [id:Int32]",
/// display_string);
/// ```
pub fn display_indent_schema(&self) -> impl fmt::Display + '_ {
Expand All @@ -543,13 +542,13 @@ impl LogicalPlan {
/// This currently produces two graphs -- one with the basic
/// structure, and one with additional details such as schema.
///
/// ```ignore
/// ```
/// use arrow::datatypes::{Field, Schema, DataType};
/// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
/// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
/// let schema = Schema::new(vec![
/// Field::new("id", DataType::Int32, false),
/// ]);
/// let plan = LogicalPlanBuilder::scan_empty(Some("foo.csv"), &schema, None).unwrap()
/// let plan = table_scan(Some("t1"), &schema, None).unwrap()
/// .filter(col("id").eq(lit(5))).unwrap()
/// .build().unwrap();
///
Expand Down Expand Up @@ -602,19 +601,19 @@ impl LogicalPlan {
/// ```text
/// Projection: #id
/// ```
/// ```ignore
/// ```
/// use arrow::datatypes::{Field, Schema, DataType};
/// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
/// use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
/// let schema = Schema::new(vec![
/// Field::new("id", DataType::Int32, false),
/// ]);
/// let plan = LogicalPlanBuilder::scan_empty(Some("foo.csv"), &schema, None).unwrap()
/// let plan = table_scan(Some("t1"), &schema, None).unwrap()
/// .build().unwrap();
///
/// // Format using display
/// let display_string = format!("{}", plan.display());
///
/// assert_eq!("TableScan: foo.csv projection=None", display_string);
/// assert_eq!("TableScan: t1 projection=None", display_string);
/// ```
pub fn display(&self) -> impl fmt::Display + '_ {
// Boilerplate structure to wrap LogicalPlan with something
Expand Down