Skip to content

Commit

Permalink
Add assertion for invariant in create_physical_expression and fix V…
Browse files Browse the repository at this point in the history
…iewTable projection (#3242)
  • Loading branch information
andygrove committed Aug 25, 2022
1 parent 82da46d commit 92110dd
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 13 deletions.
53 changes: 51 additions & 2 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{any::Any, sync::Arc};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_expr::LogicalPlanBuilder;

use crate::{
error::Result,
Expand Down Expand Up @@ -81,14 +82,36 @@ impl TableProvider for ViewTable {
async fn scan(
&self,
state: &SessionState,
_projection: &Option<Vec<usize>>,
projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// clone state and start_execution so that now() works in views
let mut state_cloned = state.clone();
state_cloned.execution_props.start_execution();
state_cloned.create_physical_plan(&self.logical_plan).await
if let Some(projection) = projection {
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
let current_projection =
(0..self.logical_plan.schema().fields().len()).collect::<Vec<usize>>();
if projection == &current_projection {
state_cloned.create_physical_plan(&self.logical_plan).await
} else {
let fields: Vec<Expr> = projection
.iter()
.map(|i| {
Expr::Column(
self.logical_plan.schema().field(*i).qualified_column(),
)
})
.collect();
let plan = LogicalPlanBuilder::from(self.logical_plan.clone())
.project(fields)?
.build()?;
state_cloned.create_physical_plan(&plan).await
}
} else {
state_cloned.create_physical_plan(&self.logical_plan).await
}
}
}

Expand All @@ -99,6 +122,32 @@ mod tests {

use super::*;

#[tokio::test]
async fn issue_3242() -> Result<()> {
// regression test for https://github.com/apache/arrow-datafusion/pull/3242
let session_ctx = SessionContext::with_config(
SessionConfig::new().with_information_schema(true),
);

session_ctx
.sql("create view v as select 1 as a, 2 as b, 3 as c")
.await?
.collect()
.await?;

let results = session_ctx
.sql("select * from (select b from v)")
.await?
.collect()
.await?;

let expected = vec!["+---+", "| b |", "+---+", "| 2 |", "+---+"];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn create_view_return_empty_dataframe() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/expr_simplifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl ExprSimplifiable for Expr {
/// ```
fn simplify<S: SimplifyInfo>(self, info: &S) -> Result<Self> {
let mut rewriter = Simplifier::new(info);
let mut const_evaluator = ConstEvaluator::new(info.execution_props());
let mut const_evaluator = ConstEvaluator::try_new(info.execution_props())?;

// TODO iterate until no changes are made during rewrite
// (evaluating constants can enable new simplifications and
Expand Down
16 changes: 7 additions & 9 deletions datafusion/optimizer/src/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ impl SimplifyExpressions {
/// # use datafusion_expr::expr_rewriter::ExprRewritable;
///
/// let execution_props = ExecutionProps::new();
/// let mut const_evaluator = ConstEvaluator::new(&execution_props);
/// let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap();
///
/// // (1 + 2) + a
/// let expr = (lit(1) + lit(2)) + col("a");
Expand Down Expand Up @@ -403,25 +403,23 @@ impl<'a> ConstEvaluator<'a> {
/// Create a new `ConstantEvaluator`. Session constants (such as
/// the time for `now()` are taken from the passed
/// `execution_props`.
pub fn new(execution_props: &'a ExecutionProps) -> Self {
let input_schema = DFSchema::empty();

pub fn try_new(execution_props: &'a ExecutionProps) -> Result<Self> {
// The dummy column name is unused and doesn't matter as only
// expressions without column references can be evaluated
static DUMMY_COL_NAME: &str = ".";
let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]);
let input_schema = DFSchema::try_from(schema.clone())?;

// Need a single "input" row to produce a single output row
let col = new_null_array(&DataType::Null, 1);
let input_batch =
RecordBatch::try_new(std::sync::Arc::new(schema), vec![col]).unwrap();
let input_batch = RecordBatch::try_new(std::sync::Arc::new(schema), vec![col])?;

Self {
Ok(Self {
can_evaluate: vec![],
execution_props,
input_schema,
input_batch,
}
})
}

/// Can a function of the specified volatility be evaluated?
Expand Down Expand Up @@ -1273,7 +1271,7 @@ mod tests {
var_providers: None,
};

let mut const_evaluator = ConstEvaluator::new(&execution_props);
let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap();
let evaluated_expr = input_expr
.clone()
.rewrite(&mut const_evaluator)
Expand Down
11 changes: 10 additions & 1 deletion datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,22 @@ use datafusion_expr::binary_rule::comparison_coercion;
use datafusion_expr::{Expr, Operator};
use std::sync::Arc;

/// Create a physical expression from a logical expression ([Expr])
/// Create a physical expression from a logical expression ([Expr]).
///
/// # Arguments
///
/// * `e` - The logical expression
/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
/// to qualified or unqualified fields by name.
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
pub fn create_physical_expr(
e: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
execution_props: &ExecutionProps,
) -> Result<Arc<dyn PhysicalExpr>> {
assert_eq!(input_schema.fields.len(), input_dfschema.fields().len());
match e {
Expr::Alias(expr, ..) => Ok(create_physical_expr(
expr,
Expand Down

0 comments on commit 92110dd

Please sign in to comment.