From c10668ff4e1e64b47e26de61975e67cb6593872d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Aug 2022 15:23:10 -0600 Subject: [PATCH 1/5] Add assertion for invariant that both schemas provided to create_physical_expression must be the same length --- datafusion/core/src/datasource/view.rs | 18 ++++++++++++++++-- datafusion/optimizer/src/expr_simplifier.rs | 2 +- .../optimizer/src/simplify_expressions.rs | 16 +++++++--------- datafusion/physical-expr/src/planner.rs | 1 + 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index c93d83f2789a..a8f86de7e237 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -21,6 +21,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion_expr::LogicalPlanBuilder; use crate::{ error::Result, @@ -81,14 +82,27 @@ impl TableProvider for ViewTable { async fn scan( &self, state: &SessionState, - _projection: &Option>, + projection: &Option>, _filters: &[Expr], _limit: Option, ) -> Result> { // clone state and start_execution so that now() works in views let mut state_cloned = state.clone(); state_cloned.execution_props.start_execution(); - state_cloned.create_physical_plan(&self.logical_plan).await + if let Some(projection) = projection { + let fields: Vec = projection + .iter() + .map(|i| { + Expr::Column(self.logical_plan.schema().field(*i).qualified_column()) + }) + .collect(); + let plan = LogicalPlanBuilder::from(self.logical_plan.clone()) + .project(fields)? + .build()?; + state_cloned.create_physical_plan(&plan).await + } else { + state_cloned.create_physical_plan(&self.logical_plan).await + } } } diff --git a/datafusion/optimizer/src/expr_simplifier.rs b/datafusion/optimizer/src/expr_simplifier.rs index d71ecdaa201f..7cf5f02c6966 100644 --- a/datafusion/optimizer/src/expr_simplifier.rs +++ b/datafusion/optimizer/src/expr_simplifier.rs @@ -88,7 +88,7 @@ impl ExprSimplifiable for Expr { /// ``` fn simplify(self, info: &S) -> Result { let mut rewriter = Simplifier::new(info); - let mut const_evaluator = ConstEvaluator::new(info.execution_props()); + let mut const_evaluator = ConstEvaluator::try_new(info.execution_props())?; // TODO iterate until no changes are made during rewrite // (evaluating constants can enable new simplifications and diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs index 34ec24fb61a1..384fd09ae13b 100644 --- a/datafusion/optimizer/src/simplify_expressions.rs +++ b/datafusion/optimizer/src/simplify_expressions.rs @@ -332,7 +332,7 @@ impl SimplifyExpressions { /// # use datafusion_expr::expr_rewriter::ExprRewritable; /// /// let execution_props = ExecutionProps::new(); -/// let mut const_evaluator = ConstEvaluator::new(&execution_props); +/// let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap(); /// /// // (1 + 2) + a /// let expr = (lit(1) + lit(2)) + col("a"); @@ -403,25 +403,23 @@ impl<'a> ConstEvaluator<'a> { /// Create a new `ConstantEvaluator`. Session constants (such as /// the time for `now()` are taken from the passed /// `execution_props`. - pub fn new(execution_props: &'a ExecutionProps) -> Self { - let input_schema = DFSchema::empty(); - + pub fn try_new(execution_props: &'a ExecutionProps) -> Result { // The dummy column name is unused and doesn't matter as only // expressions without column references can be evaluated static DUMMY_COL_NAME: &str = "."; let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]); + let input_schema = DFSchema::try_from(schema.clone())?; // Need a single "input" row to produce a single output row let col = new_null_array(&DataType::Null, 1); - let input_batch = - RecordBatch::try_new(std::sync::Arc::new(schema), vec![col]).unwrap(); + let input_batch = RecordBatch::try_new(std::sync::Arc::new(schema), vec![col])?; - Self { + Ok(Self { can_evaluate: vec![], execution_props, input_schema, input_batch, - } + }) } /// Can a function of the specified volatility be evaluated? @@ -1273,7 +1271,7 @@ mod tests { var_providers: None, }; - let mut const_evaluator = ConstEvaluator::new(&execution_props); + let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap(); let evaluated_expr = input_expr .clone() .rewrite(&mut const_evaluator) diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 44fe0595ac58..780b31b52f4c 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -38,6 +38,7 @@ pub fn create_physical_expr( input_schema: &Schema, execution_props: &ExecutionProps, ) -> Result> { + assert_eq!(input_schema.fields.len(), input_dfschema.fields().len()); match e { Expr::Alias(expr, ..) => Ok(create_physical_expr( expr, From 8848a58953dec9e79ff442502670568b7affa046 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Aug 2022 15:57:59 -0600 Subject: [PATCH 2/5] rustdoc --- datafusion/physical-expr/src/planner.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 780b31b52f4c..7d42c90d445d 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -31,7 +31,15 @@ use datafusion_expr::binary_rule::comparison_coercion; use datafusion_expr::{Expr, Operator}; use std::sync::Arc; -/// Create a physical expression from a logical expression ([Expr]) +/// Create a physical expression from a logical expression ([Expr]). +/// +/// # Arguments +/// +/// * `e` - The logical expression +/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references +/// to qualified or unqualified fields by name. +/// * `input_schema` - The Arrow schema for the input, used for determining expression data types +/// when performing type coercion. pub fn create_physical_expr( e: &Expr, input_dfschema: &DFSchema, From 063c319ece4cc03193b642b6546ba1544887b37a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Aug 2022 16:44:43 -0600 Subject: [PATCH 3/5] add repro test --- datafusion/core/src/datasource/view.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index a8f86de7e237..97763f91fb30 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -113,6 +113,32 @@ mod tests { use super::*; + #[tokio::test] + async fn issue_3242() -> Result<()> { + // regression test for https://github.com/apache/arrow-datafusion/pull/3242 + let session_ctx = SessionContext::with_config( + SessionConfig::new().with_information_schema(true), + ); + + session_ctx + .sql("create view v as select 1 as a, 2 as b, 3 as c") + .await? + .collect() + .await?; + + let results = session_ctx + .sql("select * from (select b from v)") + .await? + .collect() + .await?; + + let expected = vec!["+---+", "| b |", "+---+", "| 2 |", "+---+"]; + + assert_batches_eq!(expected, &results); + + Ok(()) + } + #[tokio::test] async fn query_view() -> Result<()> { let session_ctx = SessionContext::with_config( From 6b56b6d5357dd1c8316d1ee241c855577ad16d60 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 24 Aug 2022 08:37:43 -0600 Subject: [PATCH 4/5] avoid adding a redundant projection --- datafusion/core/src/datasource/view.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index cc9442d4a15b..9ba24b4d05ba 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -90,16 +90,22 @@ impl TableProvider for ViewTable { let mut state_cloned = state.clone(); state_cloned.execution_props.start_execution(); if let Some(projection) = projection { - let fields: Vec = projection - .iter() - .map(|i| { - Expr::Column(self.logical_plan.schema().field(*i).qualified_column()) - }) - .collect(); - let plan = LogicalPlanBuilder::from(self.logical_plan.clone()) - .project(fields)? - .build()?; - state_cloned.create_physical_plan(&plan).await + // avoiding adding a redundant projection (e.g. SELECT * FROM view) + let current_projection = (0..self.logical_plan.schema().fields().len()).collect::>(); + if projection == ¤t_projection { + state_cloned.create_physical_plan(&self.logical_plan).await + } else { + let fields: Vec = projection + .iter() + .map(|i| { + Expr::Column(self.logical_plan.schema().field(*i).qualified_column()) + }) + .collect(); + let plan = LogicalPlanBuilder::from(self.logical_plan.clone()) + .project(fields)? + .build()?; + state_cloned.create_physical_plan(&plan).await + } } else { state_cloned.create_physical_plan(&self.logical_plan).await } From 6719fc10a9d3bf74ca4f3ad5ba6cf41870219836 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 24 Aug 2022 10:17:08 -0600 Subject: [PATCH 5/5] fmt --- datafusion/core/src/datasource/view.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 9ba24b4d05ba..ef7e12aa2b01 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -91,14 +91,17 @@ impl TableProvider for ViewTable { state_cloned.execution_props.start_execution(); if let Some(projection) = projection { // avoiding adding a redundant projection (e.g. SELECT * FROM view) - let current_projection = (0..self.logical_plan.schema().fields().len()).collect::>(); + let current_projection = + (0..self.logical_plan.schema().fields().len()).collect::>(); if projection == ¤t_projection { state_cloned.create_physical_plan(&self.logical_plan).await } else { let fields: Vec = projection .iter() .map(|i| { - Expr::Column(self.logical_plan.schema().field(*i).qualified_column()) + Expr::Column( + self.logical_plan.schema().field(*i).qualified_column(), + ) }) .collect(); let plan = LogicalPlanBuilder::from(self.logical_plan.clone())