Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::expr::{Between, BinaryExpr, GroupingSet, Like};
use datafusion_expr::expr::{Between, BinaryExpr, GetIndexedField, GroupingSet, Like};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::utils::{expand_wildcard, expr_to_columns};
use datafusion_expr::WindowFrameUnits;
Expand Down Expand Up @@ -174,7 +174,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
let expr = create_physical_name(expr, false)?;
Ok(format!("{} IS NOT UNKNOWN", expr))
}
Expr::GetIndexedField { expr, key } => {
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{}[{}]", expr, key))
}
Expand Down
27 changes: 19 additions & 8 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,7 @@ pub enum Expr {
/// arithmetic negation of an expression, the operand must be of a signed numeric data type
Negative(Box<Expr>),
/// Returns the field of a [`arrow::array::ListArray`] or [`arrow::array::StructArray`] by key
GetIndexedField {
/// the expression to take the field from
expr: Box<Expr>,
/// The name of the field to take
key: ScalarValue,
},
GetIndexedField(GetIndexedField),
/// Whether an expression is between a given range.
Between(Between),
/// The CASE expression is similar to a series of nested if/else and there are two forms that
Expand Down Expand Up @@ -349,6 +344,22 @@ impl Between {
}
}

/// Returns the field of a [`arrow::array::ListArray`] or [`arrow::array::StructArray`] by key
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct GetIndexedField {
/// the expression to take the field from
pub expr: Box<Expr>,
/// The name of the field to take
pub key: ScalarValue,
}

impl GetIndexedField {
/// Create a new GetIndexedField expression
pub fn new(expr: Box<Expr>, key: ScalarValue) -> Self {
Self { expr, key }
}
}

/// Grouping sets
/// See https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS
/// for Postgres definition.
Expand Down Expand Up @@ -854,7 +865,7 @@ impl fmt::Debug for Expr {
}
Expr::Wildcard => write!(f, "*"),
Expr::QualifiedWildcard { qualifier } => write!(f, "{}.*", qualifier),
Expr::GetIndexedField { ref expr, key } => {
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
write!(f, "({:?})[{}]", expr, key)
}
Expr::GroupingSet(grouping_sets) => match grouping_sets {
Expand Down Expand Up @@ -1082,7 +1093,7 @@ fn create_name(e: &Expr) -> Result<String> {
Expr::ScalarSubquery(subquery) => {
Ok(subquery.subquery.schema().field(0).name().clone())
}
Expr::GetIndexedField { expr, key } => {
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
let expr = create_name(expr)?;
Ok(format!("{}[{}]", expr, key))
}
Expand Down
12 changes: 7 additions & 5 deletions datafusion/expr/src/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Expression rewriter

use crate::expr::{Between, BinaryExpr, Case, GroupingSet, Like};
use crate::expr::{Between, BinaryExpr, Case, GetIndexedField, GroupingSet, Like};
use crate::logical_plan::{Aggregate, Projection};
use crate::utils::{from_plan, grouping_set_to_exprlist};
use crate::{Expr, ExprSchemable, LogicalPlan};
Expand Down Expand Up @@ -286,10 +286,12 @@ impl ExprRewritable for Expr {
Expr::QualifiedWildcard { qualifier } => {
Expr::QualifiedWildcard { qualifier }
}
Expr::GetIndexedField { expr, key } => Expr::GetIndexedField {
expr: rewrite_boxed(expr, rewriter)?,
key,
},
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
Expr::GetIndexedField(GetIndexedField::new(
rewrite_boxed(expr, rewriter)?,
key,
))
}
};

// now rewrite this expression itself
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use super::{Between, Expr, Like};
use crate::expr::BinaryExpr;
use crate::expr::{BinaryExpr, GetIndexedField};
use crate::field_util::get_indexed_field;
use crate::type_coercion::binary::binary_operator_data_type;
use crate::{aggregate_function, function, window_function};
Expand Down Expand Up @@ -138,7 +138,7 @@ impl ExprSchemable for Expr {
// grouping sets do not really have a type and do not appear in projections
Ok(DataType::Null)
}
Expr::GetIndexedField { ref expr, key } => {
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
let data_type = expr.get_type(schema)?;

get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
Expand Down Expand Up @@ -218,7 +218,7 @@ impl ExprSchemable for Expr {
"QualifiedWildcard expressions are not valid in a logical query plan"
.to_owned(),
)),
Expr::GetIndexedField { ref expr, key } => {
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
let data_type = expr.get_type(input_schema)?;
get_indexed_field(&data_type, key).map(|x| x.is_nullable())
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/expr_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use crate::{
expr::{BinaryExpr, GroupingSet},
Between, Expr, Like,
Between, Expr, GetIndexedField, Like,
};
use datafusion_common::Result;

Expand Down Expand Up @@ -111,8 +111,8 @@ impl ExprVisitable for Expr {
| Expr::Cast { expr, .. }
| Expr::TryCast { expr, .. }
| Expr::Sort { expr, .. }
| Expr::InSubquery { expr, .. }
| Expr::GetIndexedField { expr, .. } => expr.accept(visitor),
| Expr::InSubquery { expr, .. } => expr.accept(visitor),
Expr::GetIndexedField(GetIndexedField { expr, .. }) => expr.accept(visitor),
Expr::GroupingSet(GroupingSet::Rollup(exprs)) => exprs
.iter()
.fold(Ok(visitor), |v, e| v.and_then(|v| e.accept(v))),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub use accumulator::{Accumulator, AggregateState};
pub use aggregate_function::AggregateFunction;
pub use built_in_function::BuiltinScalarFunction;
pub use columnar_value::{ColumnarValue, NullColumnarValue};
pub use expr::{Between, Case, Expr, GroupingSet, Like};
pub use expr::{Between, BinaryExpr, Case, Expr, GetIndexedField, GroupingSet, Like};
pub use expr_fn::*;
pub use expr_schema::ExprSchemable;
pub use function::{
Expand Down
20 changes: 14 additions & 6 deletions datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use crate::{
};
use arrow::datatypes::{DataType, Schema};
use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue};
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::{binary_expr, Between, Expr, Like, Operator};
use datafusion_expr::{
binary_expr, Between, BinaryExpr, Expr, GetIndexedField, Like, Operator,
};
use std::sync::Arc;

/// Create a physical expression from a logical expression ([Expr]).
Expand Down Expand Up @@ -308,10 +309,17 @@ pub fn create_physical_expr(
input_schema,
execution_props,
)?),
Expr::GetIndexedField { expr, key } => Ok(Arc::new(GetIndexedFieldExpr::new(
create_physical_expr(expr, input_dfschema, input_schema, execution_props)?,
key.clone(),
))),
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
Ok(Arc::new(GetIndexedFieldExpr::new(
create_physical_expr(
expr,
input_dfschema,
input_schema,
execution_props,
)?,
key.clone(),
)))
}

Expr::ScalarFunction { fun, args } => {
let physical_args = args
Expand Down
8 changes: 4 additions & 4 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion_expr::{
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, tan,
to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
trim, trunc, upper, AggregateFunction, Between, BuiltInWindowFunction,
BuiltinScalarFunction, Case, Expr, GroupingSet,
BuiltinScalarFunction, Case, Expr, GetIndexedField, GroupingSet,
GroupingSet::GroupingSets,
Like, Operator, WindowFrame, WindowFrameBound, WindowFrameUnits,
};
Expand Down Expand Up @@ -801,10 +801,10 @@ pub fn parse_expr(

let expr = parse_required_expr(&field.expr, registry, "expr")?;

Ok(Expr::GetIndexedField {
expr: Box::new(expr),
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(expr),
key,
})
)))
}
ExprType::Column(column) => Ok(Expr::Column(column.into())),
ExprType::Literal(literal) => {
Expand Down
5 changes: 3 additions & 2 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use arrow::datatypes::{
UnionMode,
};
use datafusion_common::{Column, DFField, DFSchemaRef, ScalarValue};
use datafusion_expr::expr::{Between, BinaryExpr, GroupingSet, Like};
use datafusion_expr::expr::{Between, BinaryExpr, GetIndexedField, GroupingSet, Like};
use datafusion_expr::{
logical_plan::PlanType, logical_plan::StringifiedPlan, AggregateFunction,
BuiltInWindowFunction, BuiltinScalarFunction, Expr, WindowFrame, WindowFrameBound,
Expand Down Expand Up @@ -816,7 +816,8 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
// see discussion in https://github.com/apache/arrow-datafusion/issues/2565
return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery { .. } | Expr::Exists { .. } not supported".to_string()))
}
Expr::GetIndexedField { key, expr } => Self {
Expr::GetIndexedField(GetIndexedField{key, expr}) =>
Self {
expr_type: Some(ExprType::GetIndexedField(Box::new(
protobuf::GetIndexedField {
key: Some(key.try_into()?),
Expand Down
20 changes: 10 additions & 10 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use datafusion_expr::utils::{
COUNT_STAR_EXPANSION,
};
use datafusion_expr::{
and, col, lit, AggregateFunction, AggregateUDF, Expr, ExprSchemable, Operator,
ScalarUDF, WindowFrame, WindowFrameUnits,
and, col, lit, AggregateFunction, AggregateUDF, Expr, ExprSchemable, GetIndexedField,
Operator, ScalarUDF, WindowFrame, WindowFrameUnits,
};
use datafusion_expr::{
window_function::WindowFunction, BuiltinScalarFunction, TableSource,
Expand Down Expand Up @@ -123,10 +123,10 @@ fn plan_indexed(expr: Expr, mut keys: Vec<SQLExpr>) -> Result<Expr> {
expr
};

Ok(Expr::GetIndexedField {
expr: Box::new(expr),
key: plan_key(key)?,
})
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(expr),
plan_key(key)?,
)))
}

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Expand Down Expand Up @@ -1834,10 +1834,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Err(_) => {
if let Some(field) = schema.fields().iter().find(|f| f.name().eq(&relation)) {
// Access to a field of a column which is a structure, example: SELECT my_struct.key
Ok(Expr::GetIndexedField {
expr: Box::new(Expr::Column(field.qualified_column())),
key: ScalarValue::Utf8(Some(name)),
})
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(Expr::Column(field.qualified_column())),
ScalarValue::Utf8(Some(name)),
)))
} else {
// table.column identifier
Ok(Expr::Column(Column {
Expand Down
14 changes: 9 additions & 5 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use arrow::datatypes::{DataType, DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE
use sqlparser::ast::Ident;

use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::expr::{Between, BinaryExpr, Case, GroupingSet, Like};
use datafusion_expr::expr::{
Between, BinaryExpr, Case, GetIndexedField, GroupingSet, Like,
};
use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs};
use datafusion_expr::{Expr, LogicalPlan};
use std::collections::HashMap;
Expand Down Expand Up @@ -377,10 +379,12 @@ where
}),
Expr::Wildcard => Ok(Expr::Wildcard),
Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
Expr::GetIndexedField { expr, key } => Ok(Expr::GetIndexedField {
expr: Box::new(clone_with_replacement(expr.as_ref(), replacement_fn)?),
key: key.clone(),
}),
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(clone_with_replacement(expr.as_ref(), replacement_fn)?),
key.clone(),
)))
}
Expr::GroupingSet(set) => match set {
GroupingSet::Rollup(exprs) => Ok(Expr::GroupingSet(GroupingSet::Rollup(
exprs
Expand Down