Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Array specific rewrites to datafusion_functions_array AnalyzerRule #9557

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, Result, ScalarValue};
use datafusion_expr::analyzer::AnalyzerRule;
use datafusion_expr::{
AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, WindowUDF,
};
use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
Expand Down
32 changes: 12 additions & 20 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::{
DropView, Explain, LogicalPlan, LogicalPlanBuilder, PlanType, SetVariable,
TableSource, TableType, ToStringifiedPlan, UNNAMED_TABLE,
},
optimizer::analyzer::{Analyzer, AnalyzerRule},
optimizer::analyzer::Analyzer,
optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule},
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
physical_plan::{udaf::AggregateUDF, udf::ScalarUDF, ExecutionPlan},
Expand All @@ -69,6 +69,7 @@ use datafusion_common::{
OwnedTableReference, SchemaReference,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::analyzer::AnalyzerRuleRef;
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
var_provider::is_system_variables,
Expand Down Expand Up @@ -1603,15 +1604,6 @@ impl SessionState {
self
}

/// Override the [`AnalyzerRule`]s optimizer plan rules.
pub fn with_analyzer_rules(
mut self,
rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
) -> Self {
self.analyzer = Analyzer::with_rules(rules);
self
}

/// Replace the entire list of [`OptimizerRule`]s used to optimize plans
pub fn with_optimizer_rules(
mut self,
Expand All @@ -1630,16 +1622,6 @@ impl SessionState {
self
}

/// Add `analyzer_rule` to the end of the list of
/// [`AnalyzerRule`]s used to rewrite queries.
pub fn add_analyzer_rule(
mut self,
analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
) -> Self {
self.analyzer.rules.push(analyzer_rule);
self
}

/// Add `optimizer_rule` to the end of the list of
/// [`OptimizerRule`]s used to rewrite queries.
pub fn add_optimizer_rule(
Expand Down Expand Up @@ -2156,6 +2138,16 @@ impl FunctionRegistry for SessionState {
})
}

/// Override the `AnalyzerRule`s optimizer plan rules.
fn with_analyzer_rules(&mut self, rules: Vec<AnalyzerRuleRef>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

self.analyzer = Analyzer::with_rules(rules);
}

/// return the existing analyzer rules
fn analyzer_rules(&self) -> Vec<AnalyzerRuleRef> {
self.analyzer.rules.clone()
}

fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
udf.aliases().iter().for_each(|alias| {
self.scalar_functions.insert(alias.clone(), udf.clone());
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@
//! [`WindowUDF`]: crate::logical_expr::WindowUDF
//! [`QueryPlanner`]: execution::context::QueryPlanner
//! [`OptimizerRule`]: datafusion_optimizer::optimizer::OptimizerRule
//! [`AnalyzerRule`]: datafusion_optimizer::analyzer::AnalyzerRule
//! [`AnalyzerRule`]: datafusion_expr::analyzer::AnalyzerRule
//! [`PhysicalOptimizerRule`]: crate::physical_optimizer::optimizer::PhysicalOptimizerRule
//!
//! # Architecture
Expand Down
8 changes: 8 additions & 0 deletions datafusion/execution/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! FunctionRegistry trait

use datafusion_common::{not_impl_err, plan_datafusion_err, Result};
use datafusion_expr::analyzer::AnalyzerRuleRef;
use datafusion_expr::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
use std::collections::HashMap;
use std::{collections::HashSet, sync::Arc};
Expand Down Expand Up @@ -67,6 +68,13 @@ pub trait FunctionRegistry {
not_impl_err!("Registering WindowUDF")
}

/// Return the existing analyzer rules
fn analyzer_rules(&self) -> Vec<AnalyzerRuleRef> {
vec![]
}

fn with_analyzer_rules(&mut self, _rules: Vec<AnalyzerRuleRef>) {}

/// Deregisters a [`ScalarUDF`], returning the implementation that was
/// deregistered.
///
Expand Down
48 changes: 48 additions & 0 deletions datafusion/expr/src/analyzer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

/// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make
/// the plan valid prior to the rest of the DataFusion optimization process.
///
/// This is different than an [`OptimizerRule`](crate::OptimizerRule)
/// which must preserve the semantics of the `LogicalPlan`, while computing
/// results in a more optimal way.
///
/// For example, an `AnalyzerRule` may resolve [`Expr`]s into more specific
/// forms such as a subquery reference, or do type coercion to ensure the types
/// of operands are correct.
///
/// Use [`SessionState::register_analyzer_rule`] to register additional
/// `AnalyzerRule`s.
///
/// [`SessionState::register_analyzer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionState.html#method.register_analyzer_rule
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;

use crate::LogicalPlan;

pub trait AnalyzerRule {
/// Rewrite `plan`
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;

/// A human readable name for this analyzer rule
fn name(&self) -> &str;
}

pub type AnalyzerRuleRef = Arc<dyn AnalyzerRule + Send + Sync>;
1 change: 1 addition & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod udf;
mod udwf;

pub mod aggregate_function;
pub mod analyzer;
pub mod conditional_expressions;
pub mod execution_props;
pub mod expr;
Expand Down
18 changes: 18 additions & 0 deletions datafusion/functions-array/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod rewrite;
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,26 @@

//! Analyzer rule for to replace operators with function calls (e.g `||` to array_concat`)

#[cfg(feature = "array_expressions")]
use std::sync::Arc;

use super::AnalyzerRule;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
#[cfg(feature = "array_expressions")]

use datafusion_common::{utils::list_ndims, DFSchemaRef};
use datafusion_common::{DFSchema, Result};
use datafusion_expr::analyzer::AnalyzerRule;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::expr_rewriter::rewrite_preserving_name;
use datafusion_expr::utils::merge_schema;
use datafusion_expr::BuiltinScalarFunction;
use datafusion_expr::GetFieldAccess;
use datafusion_expr::GetIndexedField;
#[cfg(feature = "array_expressions")]

use datafusion_expr::{BinaryExpr, Operator, ScalarFunctionDefinition};
use datafusion_expr::{Expr, LogicalPlan};
#[cfg(feature = "array_expressions")]
use datafusion_functions_array::expr_fn::{array_append, array_concat, array_prepend};

use crate::array_has::array_has_all;
use crate::concat::{array_append, array_concat, array_prepend};

#[derive(Default)]
pub struct OperatorToFunction {}
Expand Down Expand Up @@ -77,7 +76,6 @@ fn analyze_internal(plan: &LogicalPlan) -> Result<LogicalPlan> {
}

let mut expr_rewrite = OperatorToFunctionRewriter {
#[cfg(feature = "array_expressions")]
schema: Arc::new(schema),
};

Expand All @@ -95,15 +93,13 @@ fn analyze_internal(plan: &LogicalPlan) -> Result<LogicalPlan> {
}

pub(crate) struct OperatorToFunctionRewriter {
#[cfg(feature = "array_expressions")]
pub(crate) schema: DFSchemaRef,
}

impl TreeNodeRewriter for OperatorToFunctionRewriter {
type Node = Expr;

fn f_up(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
#[cfg(feature = "array_expressions")]
if let Expr::BinaryExpr(BinaryExpr {
ref left,
op,
Expand All @@ -125,7 +121,7 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter {

// TODO: change OperatorToFunction to OperatoToArrayFunction and configure it with array_expressions feature
// after other array functions are udf-based
#[cfg(feature = "array_expressions")]

if let Some(expr) = rewrite_array_has_all_operator_to_func(left, op, right) {
return Ok(Transformed::yes(expr));
}
Expand Down Expand Up @@ -170,14 +166,12 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter {
// Note This rewrite is only done if the built in DataFusion `array_expressions` feature is enabled.
// Even if users implement their own array functions, those functions are not equal to the DataFusion
// udf based array functions, so this rewrite is not corrrect
#[cfg(feature = "array_expressions")]

fn rewrite_array_has_all_operator_to_func(
left: &Expr,
op: Operator,
right: &Expr,
) -> Option<Expr> {
use super::array_has_all;

if op != Operator::AtArrow && op != Operator::ArrowAt {
return None;
}
Expand All @@ -198,6 +192,7 @@ fn rewrite_array_has_all_operator_to_func(
let left = left.clone();
let right = right.clone();

// TODO: run kernel function directly?
let expr = if let Operator::ArrowAt = op {
array_has_all(right, left)
} else {
Expand All @@ -220,7 +215,7 @@ fn rewrite_array_has_all_operator_to_func(
/// 4) (arry concat, array append, array prepend) || array -> array concat
///
/// 5) (arry concat, array append, array prepend) || scalar -> array append
#[cfg(feature = "array_expressions")]

fn rewrite_array_concat_operator_to_func(
left: &Expr,
op: Operator,
Expand Down Expand Up @@ -306,7 +301,7 @@ fn rewrite_array_concat_operator_to_func(
/// 1) (arry concat, array append, array prepend) || column -> (array append, array concat)
///
/// 2) column1 || column2 -> (array prepend, array append, array concat)
#[cfg(feature = "array_expressions")]

fn rewrite_array_concat_operator_to_func_for_column(
left: &Expr,
op: Operator,
Expand Down
10 changes: 10 additions & 0 deletions datafusion/functions-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
#[macro_use]
pub mod macros;

mod analyzer;
mod array_has;
mod concat;
mod kernels;
mod make_array;
mod udf;
mod utils;

use analyzer::rewrite::OperatorToFunction;
use datafusion_common::Result;
use datafusion_execution::FunctionRegistry;
use datafusion_expr::ScalarUDF;
Expand Down Expand Up @@ -94,5 +96,13 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
}
Ok(()) as Result<()>
})?;

let mut rules = registry.analyzer_rules();
// OperatorToFunction should be run before TypeCoercion, since it rewrite based on the argument types (List or Scalar),
// and TypeCoercion may cast the argument types from Scalar to List.
rules.insert(1, Arc::new(OperatorToFunction::new()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't notice that vec has insert...


registry.with_analyzer_rules(rules);

Ok(())
}
3 changes: 1 addition & 2 deletions datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

use std::sync::Arc;

use crate::analyzer::AnalyzerRule;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRewriter,
};
use datafusion_common::Result;
use datafusion_expr::analyzer::AnalyzerRule;
use datafusion_expr::expr::{AggregateFunction, AggregateFunctionDefinition, InSubquery};
use datafusion_expr::expr_rewriter::rewrite_preserving_name;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/src/analyzer/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
//! such as DataFrames and Views and inlines the LogicalPlan.
use std::sync::Arc;

use crate::analyzer::AnalyzerRule;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_expr::analyzer::AnalyzerRule;
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::{
logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan,
Expand Down
Loading
Loading