Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ impl LogicalPlanBuilder {
Ok(Self::from(LogicalPlan::Projection(Projection::try_new(
new_expr,
Arc::new(sort_plan),
None,
)?)))
}

Expand Down Expand Up @@ -874,12 +873,12 @@ pub fn project_with_column_index_alias(
x => x.alias(schema.field(i).name()),
})
.collect::<Vec<_>>();
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
alias_expr, input, schema, alias,
)?))
Ok(LogicalPlan::Projection(
Projection::try_new_with_schema_alias(alias_expr, input, schema, alias)?,
))
}

/// Union two logical plans with an optional alias.
/// Union two logical plans.
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
let left_col_num = left_plan.schema().fields().len();

Expand Down Expand Up @@ -986,12 +985,14 @@ pub fn project_with_alias(
None => input_schema,
};

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
projected_expr,
Arc::new(plan.clone()),
DFSchemaRef::new(schema),
alias,
)?))
Ok(LogicalPlan::Projection(
Projection::try_new_with_schema_alias(
projected_expr,
Arc::new(plan.clone()),
DFSchemaRef::new(schema),
alias,
)?,
))
}

/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
Expand Down
21 changes: 12 additions & 9 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,20 +1118,28 @@ impl Projection {
pub fn try_new(
expr: Vec<Expr>,
input: Arc<LogicalPlan>,
alias: Option<String>,
) -> Result<Self, DataFusionError> {
let schema = Arc::new(DFSchema::new_with_metadata(
exprlist_to_fields(&expr, &input)?,
input.schema().metadata().clone(),
)?);
Self::try_new_with_schema(expr, input, schema, alias)
Self::try_new_with_schema(expr, input, schema)
}

/// Create a new Projection using the specified output schema
pub fn try_new_with_schema(
expr: Vec<Expr>,
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
) -> Result<Self, DataFusionError> {
Self::try_new_with_schema_alias(expr, input, schema, None)
}

/// Create a new Projection using the specified output schema
pub fn try_new_with_schema_alias(
expr: Vec<Expr>,
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
alias: Option<String>,
) -> Result<Self, DataFusionError> {
if expr.len() != schema.fields().len() {
Expand All @@ -1146,11 +1154,7 @@ impl Projection {
}

/// Create a new Projection using the specified output schema
pub fn new_from_schema(
input: Arc<LogicalPlan>,
schema: DFSchemaRef,
alias: Option<String>,
) -> Self {
pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
let expr: Vec<Expr> = schema
.fields()
.iter()
Expand All @@ -1161,7 +1165,7 @@ impl Projection {
expr,
input,
schema,
alias,
alias: None,
}
}

Expand Down Expand Up @@ -1990,7 +1994,6 @@ mod tests {
schema: empty_schema.clone(),
})),
empty_schema,
None,
);
assert_eq!("Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)", format!("{}", p.err().unwrap()));
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,14 @@ pub fn from_plan(
inputs: &[LogicalPlan],
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Projection(Projection { schema, alias, .. }) => {
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
LogicalPlan::Projection(Projection { schema, alias, .. }) => Ok(
LogicalPlan::Projection(Projection::try_new_with_schema_alias(
expr.to_vec(),
Arc::new(inputs[0].clone()),
schema.clone(),
alias.clone(),
)?))
}
)?),
),
LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values {
schema: schema.clone(),
values: expr
Expand Down
15 changes: 8 additions & 7 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,14 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
pop_expr(&mut new_expr)?,
Arc::new(new_input),
schema.clone(),
alias.clone(),
)?))
Ok(LogicalPlan::Projection(
Projection::try_new_with_schema_alias(
pop_expr(&mut new_expr)?,
Arc::new(new_input),
schema.clone(),
alias.clone(),
)?,
))
}
LogicalPlan::Filter(filter) => {
let input = filter.input();
Expand Down Expand Up @@ -328,7 +330,6 @@ fn build_project_plan(
project_exprs,
Arc::new(input),
Arc::new(schema),
None,
)?))
}

Expand Down
24 changes: 13 additions & 11 deletions datafusion/optimizer/src/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,19 @@ fn limit_push_down(
ancestor,
) => {
// Push down limit directly (projection doesn't change number of rows)
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
expr.clone(),
Arc::new(limit_push_down(
_optimizer,
ancestor,
input.as_ref(),
_optimizer_config,
)?),
schema.clone(),
alias.clone(),
)?))
Ok(LogicalPlan::Projection(
Projection::try_new_with_schema_alias(
expr.clone(),
Arc::new(limit_push_down(
_optimizer,
ancestor,
input.as_ref(),
_optimizer_config,
)?),
schema.clone(),
alias.clone(),
)?,
))
}
(
LogicalPlan::Union(Union { inputs, schema }),
Expand Down
23 changes: 12 additions & 11 deletions datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,16 @@ fn optimize_plan(
Ok(new_input)
} else {
let metadata = new_input.schema().metadata().clone();
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
new_expr,
Arc::new(new_input),
DFSchemaRef::new(DFSchema::new_with_metadata(new_fields, metadata)?),
alias.clone(),
)?))
Ok(LogicalPlan::Projection(
Projection::try_new_with_schema_alias(
new_expr,
Arc::new(new_input),
DFSchemaRef::new(DFSchema::new_with_metadata(
new_fields, metadata,
)?),
alias.clone(),
)?,
))
}
}
LogicalPlan::Join(Join {
Expand Down Expand Up @@ -836,11 +840,8 @@ mod tests {
// that the Column references are unqualified (e.g. their
// relation is `None`). PlanBuilder resolves the expressions
let expr = vec![col("a"), col("b")];
let plan = LogicalPlan::Projection(Projection::try_new(
expr,
Arc::new(table_scan),
None,
)?);
let plan =
LogicalPlan::Projection(Projection::try_new(expr, Arc::new(table_scan))?);

assert_fields_eq(&plan, vec!["a", "b"]);

Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ impl OptimizerRule for PropagateEmptyRelation {
Ok(LogicalPlan::Projection(Projection::new_from_schema(
Arc::new(child),
optimized_children_plan.schema().clone(),
None,
)))
}
} else {
Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ impl OptimizerRule for SingleDistinctToGroupBy {
alias_expr,
Arc::new(outer_aggr),
schema.clone(),
None,
)?))
} else {
utils::optimize_children(self, plan, _optimizer_config)
Expand Down
Loading