Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ use crate::utils::{
find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
};
use crate::{
and, binary_expr, logical_plan::tree_node::unwrap_arc, DmlStatement, Expr,
ExprSchemable, Operator, RecursiveQuery, TableProviderFilterPushDown, TableSource,
WriteOp,
and, binary_expr, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery,
TableProviderFilterPushDown, TableSource, WriteOp,
};

use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
Expand Down Expand Up @@ -376,7 +375,7 @@ impl LogicalPlanBuilder {
self,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self> {
project(unwrap_arc(self.plan), expr).map(Self::new)
project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
}

/// Select the given column indices
Expand Down Expand Up @@ -429,7 +428,7 @@ impl LogicalPlanBuilder {

/// Apply an alias
pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
subquery_alias(unwrap_arc(self.plan), alias).map(Self::new)
subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
}

/// Add missing sort columns to all downstream projection
Expand Down Expand Up @@ -484,7 +483,7 @@ impl LogicalPlanBuilder {
Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
}
expr.extend(missing_exprs);
project(unwrap_arc(input), expr)
project(Arc::unwrap_or_clone(input), expr)
}
_ => {
let is_distinct =
Expand Down Expand Up @@ -580,8 +579,11 @@ impl LogicalPlanBuilder {
let new_expr = schema.columns().into_iter().map(Expr::Column).collect();

let is_distinct = false;
let plan =
Self::add_missing_columns(unwrap_arc(self.plan), &missing_cols, is_distinct)?;
let plan = Self::add_missing_columns(
Arc::unwrap_or_clone(self.plan),
&missing_cols,
is_distinct,
)?;
let sort_plan = LogicalPlan::Sort(Sort {
expr: normalize_cols(exprs, &plan)?,
input: Arc::new(plan),
Expand All @@ -595,12 +597,12 @@ impl LogicalPlanBuilder {

/// Apply a union, preserving duplicate rows
pub fn union(self, plan: LogicalPlan) -> Result<Self> {
union(unwrap_arc(self.plan), plan).map(Self::new)
union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
}

/// Apply a union, removing duplicate rows
pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
let left_plan: LogicalPlan = unwrap_arc(self.plan);
let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
let right_plan: LogicalPlan = plan;

Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
Expand Down Expand Up @@ -1064,7 +1066,7 @@ impl LogicalPlanBuilder {

/// Build the plan
pub fn build(self) -> Result<LogicalPlan> {
Ok(unwrap_arc(self.plan))
Ok(Arc::unwrap_or_clone(self.plan))
}

/// Apply a join with the expression on constraint.
Expand Down Expand Up @@ -1138,7 +1140,7 @@ impl LogicalPlanBuilder {

/// Unnest the given column.
pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
unnest(unwrap_arc(self.plan), vec![column.into()]).map(Self::new)
unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
}

/// Unnest the given column given [`UnnestOptions`]
Expand All @@ -1147,8 +1149,12 @@ impl LogicalPlanBuilder {
column: impl Into<Column>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(unwrap_arc(self.plan), vec![column.into()], options)
.map(Self::new)
unnest_with_options(
Arc::unwrap_or_clone(self.plan),
vec![column.into()],
options,
)
.map(Self::new)
}

/// Unnest the given columns with the given [`UnnestOptions`]
Expand All @@ -1157,7 +1163,8 @@ impl LogicalPlanBuilder {
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(unwrap_arc(self.plan), columns, options).map(Self::new)
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
.map(Self::new)
}
}

Expand Down
3 changes: 1 addition & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use datafusion_common::{

// backwards compatibility
use crate::display::PgJsonVisitor;
use crate::logical_plan::tree_node::unwrap_arc;
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};

Expand Down Expand Up @@ -770,7 +769,7 @@ impl LogicalPlan {
..
}) => {
// Update schema with unnested column type.
unnest_with_options(unwrap_arc(input), exec_columns, options)
unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
}
}
}
Expand Down
11 changes: 1 addition & 10 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,21 +379,12 @@ impl TreeNode for LogicalPlan {
}
}

/// Converts a `Arc<LogicalPlan>` without copying, if possible. Copies the plan
/// if there is a shared reference
pub fn unwrap_arc(plan: Arc<LogicalPlan>) -> LogicalPlan {
Arc::try_unwrap(plan)
// if None is returned, there is another reference to this
// LogicalPlan, so we can not own it, and must clone instead
.unwrap_or_else(|node| node.as_ref().clone())
}

/// Applies `f` to rewrite a `Arc<LogicalPlan>` without copying, if possible
fn rewrite_arc<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
plan: Arc<LogicalPlan>,
mut f: F,
) -> Result<Transformed<Arc<LogicalPlan>>> {
f(unwrap_arc(plan))?.map_data(|new_plan| Ok(Arc::new(new_plan)))
f(Arc::unwrap_or_clone(plan))?.map_data(|new_plan| Ok(Arc::new(new_plan)))
}

/// rewrite a `Vec` of `Arc<LogicalPlan>` without copying, if possible
Expand Down
18 changes: 12 additions & 6 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use datafusion_expr::expr::{
};
use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema;
use datafusion_expr::expr_schema::cast_subquery;
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::Subquery;
use datafusion_expr::type_coercion::binary::{
comparison_coercion, get_input_types, like_coercion,
Expand Down Expand Up @@ -241,15 +240,19 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
subquery,
outer_ref_columns,
}) => {
let new_plan = analyze_internal(self.schema, unwrap_arc(subquery))?.data;
let new_plan =
analyze_internal(self.schema, Arc::unwrap_or_clone(subquery))?.data;
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns,
})))
}
Expr::Exists(Exists { subquery, negated }) => {
let new_plan =
analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data;
let new_plan = analyze_internal(
self.schema,
Arc::unwrap_or_clone(subquery.subquery),
)?
.data;
Ok(Transformed::yes(Expr::Exists(Exists {
subquery: Subquery {
subquery: Arc::new(new_plan),
Expand All @@ -263,8 +266,11 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
subquery,
negated,
}) => {
let new_plan =
analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data;
let new_plan = analyze_internal(
self.schema,
Arc::unwrap_or_clone(subquery.subquery),
)?
.data;
let expr_type = expr.get_type(self.schema)?;
let subquery_type = new_plan.schema().field(0).data_type();
let common_type = comparison_coercion(&expr_type, subquery_type).ok_or(plan_datafusion_err!(
Expand Down
11 changes: 5 additions & 6 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use datafusion_common::tree_node::{
};
use datafusion_common::{qualified_name, Column, DFSchema, DFSchemaRef, Result};
use datafusion_expr::expr::{Alias, ScalarFunction};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::{
Aggregate, Filter, LogicalPlan, Projection, Sort, Window,
};
Expand Down Expand Up @@ -314,7 +313,7 @@ impl CommonSubexprEliminate {
schema,
..
} = projection;
let input = unwrap_arc(input);
let input = Arc::unwrap_or_clone(input);
self.try_unary_plan(expr, input, config)?
.map_data(|(new_expr, new_input)| {
Projection::try_new_with_schema(new_expr, Arc::new(new_input), schema)
Expand All @@ -327,7 +326,7 @@ impl CommonSubexprEliminate {
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let Sort { expr, input, fetch } = sort;
let input = unwrap_arc(input);
let input = Arc::unwrap_or_clone(input);
let new_sort = self.try_unary_plan(expr, input, config)?.update_data(
|(new_expr, new_input)| {
LogicalPlan::Sort(Sort {
Expand All @@ -348,7 +347,7 @@ impl CommonSubexprEliminate {
let Filter {
predicate, input, ..
} = filter;
let input = unwrap_arc(input);
let input = Arc::unwrap_or_clone(input);
let expr = vec![predicate];
self.try_unary_plan(expr, input, config)?
.map_data(|(mut new_expr, new_input)| {
Expand Down Expand Up @@ -458,7 +457,7 @@ impl CommonSubexprEliminate {
schema,
..
} = aggregate;
let input = unwrap_arc(input);
let input = Arc::unwrap_or_clone(input);
// Extract common sub-expressions from the aggregate and grouping expressions.
self.find_common_exprs(vec![group_expr, aggr_expr], config, ExprMask::Normal)?
.map_data(|common| {
Expand Down Expand Up @@ -729,7 +728,7 @@ fn get_consecutive_window_exprs(
window_expr_list.push(window_expr);
window_schemas.push(schema);

plan = unwrap_arc(input);
plan = Arc::unwrap_or_clone(input);
}
(window_expr_list, window_schemas, plan)
}
Expand Down
9 changes: 5 additions & 4 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use datafusion_expr::{
LogicalPlan, LogicalPlanBuilder, Operator,
};

use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use log::debug;

/// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins
Expand All @@ -55,8 +54,10 @@ impl DecorrelatePredicateSubquery {
mut subquery: Subquery,
config: &dyn OptimizerConfig,
) -> Result<Subquery> {
subquery.subquery =
Arc::new(self.rewrite(unwrap_arc(subquery.subquery), config)?.data);
subquery.subquery = Arc::new(
self.rewrite(Arc::unwrap_or_clone(subquery.subquery), config)?
.data,
);
Ok(subquery)
}

Expand Down Expand Up @@ -164,7 +165,7 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = unwrap_arc(input);
let mut cur_input = Arc::unwrap_or_clone(input);
for subquery in subqueries {
if let Some(plan) =
build_join(&subquery, &cur_input, config.alias_generator())?
Expand Down
27 changes: 21 additions & 6 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::join_key_set::JoinKeySet;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{internal_err, Result};
use datafusion_expr::expr::{BinaryExpr, Expr};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::{
CrossJoin, Filter, Join, JoinConstraint, JoinType, LogicalPlan, Projection,
};
Expand Down Expand Up @@ -114,7 +113,7 @@ impl OptimizerRule for EliminateCrossJoin {
input, predicate, ..
} = filter;
flatten_join_inputs(
unwrap_arc(input),
Arc::unwrap_or_clone(input),
&mut possible_join_keys,
&mut all_inputs,
)?;
Expand Down Expand Up @@ -217,12 +216,28 @@ fn flatten_join_inputs(
);
}
possible_join_keys.insert_all_owned(join.on);
flatten_join_inputs(unwrap_arc(join.left), possible_join_keys, all_inputs)?;
flatten_join_inputs(unwrap_arc(join.right), possible_join_keys, all_inputs)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.left),
possible_join_keys,
all_inputs,
)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.right),
possible_join_keys,
all_inputs,
)?;
}
LogicalPlan::CrossJoin(join) => {
flatten_join_inputs(unwrap_arc(join.left), possible_join_keys, all_inputs)?;
flatten_join_inputs(unwrap_arc(join.right), possible_join_keys, all_inputs)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.left),
possible_join_keys,
all_inputs,
)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.right),
possible_join_keys,
all_inputs,
)?;
}
_ => {
all_inputs.push(plan);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use datafusion_common::tree_node::Transformed;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::{EmptyRelation, Expr, Filter, LogicalPlan};
use std::sync::Arc;

Expand Down Expand Up @@ -65,7 +64,7 @@ impl OptimizerRule for EliminateFilter {
input,
..
}) => match v {
Some(true) => Ok(Transformed::yes(unwrap_arc(input))),
Some(true) => Ok(Transformed::yes(Arc::unwrap_or_clone(input))),
Some(false) | None => Ok(Transformed::yes(LogicalPlan::EmptyRelation(
EmptyRelation {
produce_one_row: false,
Expand Down
6 changes: 4 additions & 2 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::Transformed;
use datafusion_common::Result;
use datafusion_expr::logical_plan::{tree_node::unwrap_arc, EmptyRelation, LogicalPlan};
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};
use std::sync::Arc;

/// Optimizer rule to replace `LIMIT 0` or `LIMIT` whose ancestor LIMIT's skip is
Expand Down Expand Up @@ -74,7 +74,9 @@ impl OptimizerRule for EliminateLimit {
}
} else if limit.skip == 0 {
// input also can be Limit, so we should apply again.
return Ok(self.rewrite(unwrap_arc(limit.input), _config).unwrap());
return Ok(self
.rewrite(Arc::unwrap_or_clone(limit.input), _config)
.unwrap());
}
Ok(Transformed::no(LogicalPlan::Limit(limit)))
}
Expand Down
Loading