diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 1809f2a08857..ef7e12aa2b01 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -21,6 +21,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion_expr::LogicalPlanBuilder; use crate::{ error::Result, @@ -81,14 +82,36 @@ impl TableProvider for ViewTable { async fn scan( &self, state: &SessionState, - _projection: &Option>, + projection: &Option>, _filters: &[Expr], _limit: Option, ) -> Result> { // clone state and start_execution so that now() works in views let mut state_cloned = state.clone(); state_cloned.execution_props.start_execution(); - state_cloned.create_physical_plan(&self.logical_plan).await + if let Some(projection) = projection { + // avoiding adding a redundant projection (e.g. SELECT * FROM view) + let current_projection = + (0..self.logical_plan.schema().fields().len()).collect::>(); + if projection == ¤t_projection { + state_cloned.create_physical_plan(&self.logical_plan).await + } else { + let fields: Vec = projection + .iter() + .map(|i| { + Expr::Column( + self.logical_plan.schema().field(*i).qualified_column(), + ) + }) + .collect(); + let plan = LogicalPlanBuilder::from(self.logical_plan.clone()) + .project(fields)? + .build()?; + state_cloned.create_physical_plan(&plan).await + } + } else { + state_cloned.create_physical_plan(&self.logical_plan).await + } } } @@ -99,6 +122,32 @@ mod tests { use super::*; + #[tokio::test] + async fn issue_3242() -> Result<()> { + // regression test for https://github.com/apache/arrow-datafusion/pull/3242 + let session_ctx = SessionContext::with_config( + SessionConfig::new().with_information_schema(true), + ); + + session_ctx + .sql("create view v as select 1 as a, 2 as b, 3 as c") + .await? + .collect() + .await?; + + let results = session_ctx + .sql("select * from (select b from v)") + .await? + .collect() + .await?; + + let expected = vec!["+---+", "| b |", "+---+", "| 2 |", "+---+"]; + + assert_batches_eq!(expected, &results); + + Ok(()) + } + #[tokio::test] async fn create_view_return_empty_dataframe() -> Result<()> { let session_ctx = SessionContext::new(); diff --git a/datafusion/optimizer/src/expr_simplifier.rs b/datafusion/optimizer/src/expr_simplifier.rs index d71ecdaa201f..7cf5f02c6966 100644 --- a/datafusion/optimizer/src/expr_simplifier.rs +++ b/datafusion/optimizer/src/expr_simplifier.rs @@ -88,7 +88,7 @@ impl ExprSimplifiable for Expr { /// ``` fn simplify(self, info: &S) -> Result { let mut rewriter = Simplifier::new(info); - let mut const_evaluator = ConstEvaluator::new(info.execution_props()); + let mut const_evaluator = ConstEvaluator::try_new(info.execution_props())?; // TODO iterate until no changes are made during rewrite // (evaluating constants can enable new simplifications and diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs index 34ec24fb61a1..384fd09ae13b 100644 --- a/datafusion/optimizer/src/simplify_expressions.rs +++ b/datafusion/optimizer/src/simplify_expressions.rs @@ -332,7 +332,7 @@ impl SimplifyExpressions { /// # use datafusion_expr::expr_rewriter::ExprRewritable; /// /// let execution_props = ExecutionProps::new(); -/// let mut const_evaluator = ConstEvaluator::new(&execution_props); +/// let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap(); /// /// // (1 + 2) + a /// let expr = (lit(1) + lit(2)) + col("a"); @@ -403,25 +403,23 @@ impl<'a> ConstEvaluator<'a> { /// Create a new `ConstantEvaluator`. Session constants (such as /// the time for `now()` are taken from the passed /// `execution_props`. - pub fn new(execution_props: &'a ExecutionProps) -> Self { - let input_schema = DFSchema::empty(); - + pub fn try_new(execution_props: &'a ExecutionProps) -> Result { // The dummy column name is unused and doesn't matter as only // expressions without column references can be evaluated static DUMMY_COL_NAME: &str = "."; let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]); + let input_schema = DFSchema::try_from(schema.clone())?; // Need a single "input" row to produce a single output row let col = new_null_array(&DataType::Null, 1); - let input_batch = - RecordBatch::try_new(std::sync::Arc::new(schema), vec![col]).unwrap(); + let input_batch = RecordBatch::try_new(std::sync::Arc::new(schema), vec![col])?; - Self { + Ok(Self { can_evaluate: vec![], execution_props, input_schema, input_batch, - } + }) } /// Can a function of the specified volatility be evaluated? @@ -1273,7 +1271,7 @@ mod tests { var_providers: None, }; - let mut const_evaluator = ConstEvaluator::new(&execution_props); + let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap(); let evaluated_expr = input_expr .clone() .rewrite(&mut const_evaluator) diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index f84a5fbb9b10..010b61a9b312 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -32,13 +32,22 @@ use datafusion_expr::binary_rule::comparison_coercion; use datafusion_expr::{Expr, Operator}; use std::sync::Arc; -/// Create a physical expression from a logical expression ([Expr]) +/// Create a physical expression from a logical expression ([Expr]). +/// +/// # Arguments +/// +/// * `e` - The logical expression +/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references +/// to qualified or unqualified fields by name. +/// * `input_schema` - The Arrow schema for the input, used for determining expression data types +/// when performing type coercion. pub fn create_physical_expr( e: &Expr, input_dfschema: &DFSchema, input_schema: &Schema, execution_props: &ExecutionProps, ) -> Result> { + assert_eq!(input_schema.fields.len(), input_dfschema.fields().len()); match e { Expr::Alias(expr, ..) => Ok(create_physical_expr( expr,