Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
| Expr::SimilarTo { .. }
| Expr::InList { .. }
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::GetIndexedField { .. }
| Expr::GroupingSet(_)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Expr::Exists { .. } => Err(DataFusionError::NotImplemented(
"EXISTS is not yet supported in the physical plan".to_string(),
)),
Expr::InSubquery { .. } => Err(DataFusionError::NotImplemented(
Expr::InSubquery(_) => Err(DataFusionError::NotImplemented(
"IN subquery is not yet supported in the physical plan".to_string(),
)),
Expr::ScalarSubquery(_) => Err(DataFusionError::NotImplemented(
Expand Down
45 changes: 30 additions & 15 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,7 @@ pub enum Expr {
/// EXISTS subquery
Exists(Exists),
/// IN subquery
InSubquery {
/// The expression to compare
expr: Box<Expr>,
/// subquery that will produce a single column of data to compare against
subquery: Subquery,
/// Whether the expression is negated
negated: bool,
},
InSubquery(InSubquery),
/// Scalar subquery
ScalarSubquery(Subquery),
/// Represents a reference to all fields in a schema.
Expand Down Expand Up @@ -547,6 +540,28 @@ impl InList {
}
}

/// IN subquery
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct InSubquery {
/// The expression to compare
pub expr: Box<Expr>,
/// Subquery that will produce a single column of data to compare against
pub subquery: Subquery,
/// Whether the expression is negated
pub negated: bool,
}

impl InSubquery {
/// Create a new InSubquery expression
pub fn new(expr: Box<Expr>, subquery: Subquery, negated: bool) -> Self {
Self {
expr,
subquery,
negated,
}
}
}

/// Grouping sets
/// See <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>
/// for Postgres definition.
Expand Down Expand Up @@ -636,7 +651,7 @@ impl Expr {
Expr::GetIndexedField { .. } => "GetIndexedField",
Expr::GroupingSet(..) => "GroupingSet",
Expr::InList { .. } => "InList",
Expr::InSubquery { .. } => "InSubquery",
Expr::InSubquery(..) => "InSubquery",
Expr::IsNotNull(..) => "IsNotNull",
Expr::IsNull(..) => "IsNull",
Expr::Like { .. } => "Like",
Expand Down Expand Up @@ -956,16 +971,16 @@ impl fmt::Debug for Expr {
subquery,
negated: false,
}) => write!(f, "EXISTS ({subquery:?})"),
Expr::InSubquery {
Expr::InSubquery(InSubquery {
expr,
subquery,
negated: true,
} => write!(f, "{expr:?} NOT IN ({subquery:?})"),
Expr::InSubquery {
}) => write!(f, "{expr:?} NOT IN ({subquery:?})"),
Expr::InSubquery(InSubquery {
expr,
subquery,
negated: false,
} => write!(f, "{expr:?} IN ({subquery:?})"),
}) => write!(f, "{expr:?} IN ({subquery:?})"),
Expr::ScalarSubquery(subquery) => write!(f, "({subquery:?})"),
Expr::BinaryExpr(expr) => write!(f, "{expr}"),
Expr::Sort(Sort {
Expand Down Expand Up @@ -1334,8 +1349,8 @@ fn create_name(e: &Expr) -> Result<String> {
}
Expr::Exists(Exists { negated: true, .. }) => Ok("NOT EXISTS".to_string()),
Expr::Exists(Exists { negated: false, .. }) => Ok("EXISTS".to_string()),
Expr::InSubquery { negated: true, .. } => Ok("NOT IN".to_string()),
Expr::InSubquery { negated: false, .. } => Ok("IN".to_string()),
Expr::InSubquery(InSubquery { negated: true, .. }) => Ok("NOT IN".to_string()),
Expr::InSubquery(InSubquery { negated: false, .. }) => Ok("IN".to_string()),
Expr::ScalarSubquery(subquery) => {
Ok(subquery.subquery.schema().field(0).name().clone())
}
Expand Down
24 changes: 12 additions & 12 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
//! Functions for creating logical expressions

use crate::expr::{
AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, ScalarFunction,
TryCast,
AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery,
ScalarFunction, TryCast,
};
use crate::{
aggregate_function, built_in_function, conditional_expressions::CaseBuilder,
Expand Down Expand Up @@ -328,27 +328,27 @@ pub fn not_exists(subquery: Arc<LogicalPlan>) -> Expr {
/// Create an IN subquery expression
pub fn in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
let outer_ref_columns = subquery.all_out_ref_exprs();
Expr::InSubquery {
expr: Box::new(expr),
subquery: Subquery {
Expr::InSubquery(InSubquery::new(
Box::new(expr),
Subquery {
subquery,
outer_ref_columns,
},
negated: false,
}
false,
))
}

/// Create a NOT IN subquery expression
pub fn not_in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
let outer_ref_columns = subquery.all_out_ref_exprs();
Expr::InSubquery {
expr: Box::new(expr),
subquery: Subquery {
Expr::InSubquery(InSubquery::new(
Box::new(expr),
Subquery {
subquery,
outer_ref_columns,
},
negated: true,
}
true,
))
}

/// Create a scalar subquery expression
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use super::{Between, Expr, Like};
use crate::expr::{
AggregateFunction, AggregateUDF, BinaryExpr, Cast, GetIndexedField, InList,
ScalarFunction, ScalarUDF, Sort, TryCast, WindowFunction,
InSubquery, ScalarFunction, ScalarUDF, Sort, TryCast, WindowFunction,
};
use crate::field_util::get_indexed_field;
use crate::type_coercion::binary::get_result_type;
Expand Down Expand Up @@ -133,7 +133,7 @@ impl ExprSchemable for Expr {
Expr::Not(_)
| Expr::IsNull(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::InSubquery(_)
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::IsNotNull(_)
Expand Down Expand Up @@ -232,7 +232,7 @@ impl ExprSchemable for Expr {
| Expr::IsNotUnknown(_)
| Expr::Exists { .. }
| Expr::Placeholder { .. } => Ok(true),
Expr::InSubquery { expr, .. } => expr.nullable(input_schema),
Expr::InSubquery(InSubquery { expr, .. }) => expr.nullable(input_schema),
Expr::ScalarSubquery(subquery) => {
Ok(subquery.subquery.schema().field(0).is_nullable())
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::expr::Exists;
use crate::expr::InSubquery;
///! Logical plan types
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
Expand Down Expand Up @@ -546,7 +547,7 @@ impl LogicalPlan {
inspect_expr_pre(expr, |expr| {
match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery { subquery, .. }
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
// use a synthetic plan so the collector sees a
// LogicalPlan::Subquery (even though it is
Expand All @@ -572,7 +573,7 @@ impl LogicalPlan {
inspect_expr_pre(expr, |expr| {
match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery { subquery, .. }
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
// use a synthetic plan so the visitor sees a
// LogicalPlan::Subquery (even though it is
Expand Down
13 changes: 7 additions & 6 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

use crate::expr::{
AggregateFunction, AggregateUDF, Between, BinaryExpr, Case, Cast, GetIndexedField,
GroupingSet, InList, Like, ScalarFunction, ScalarUDF, Sort, TryCast, WindowFunction,
GroupingSet, InList, InSubquery, Like, ScalarFunction, ScalarUDF, Sort, TryCast,
WindowFunction,
};
use crate::Expr;
use datafusion_common::tree_node::VisitRecursion;
Expand All @@ -45,7 +46,7 @@ impl TreeNode for Expr {
| Expr::Cast(Cast { expr, .. })
| Expr::TryCast(TryCast { expr, .. })
| Expr::Sort(Sort { expr, .. })
| Expr::InSubquery { expr, .. } => vec![expr.as_ref().clone()],
| Expr::InSubquery(InSubquery{ expr, .. }) => vec![expr.as_ref().clone()],
Expr::GetIndexedField(GetIndexedField { expr, .. }) => {
vec![expr.as_ref().clone()]
}
Expand Down Expand Up @@ -149,15 +150,15 @@ impl TreeNode for Expr {
Expr::Column(_) => self,
Expr::OuterReferenceColumn(_, _) => self,
Expr::Exists { .. } => self,
Expr::InSubquery {
Expr::InSubquery(InSubquery {
expr,
subquery,
negated,
} => Expr::InSubquery {
expr: transform_boxed(expr, &mut transform)?,
}) => Expr::InSubquery(InSubquery::new(
transform_boxed(expr, &mut transform)?,
subquery,
negated,
},
)),
Expr::ScalarSubquery(_) => self,
Expr::ScalarVariable(ty, names) => Expr::ScalarVariable(ty, names),
Expr::Literal(value) => Expr::Literal(value),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
| Expr::AggregateUDF { .. }
| Expr::InList { .. }
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
Expand Down Expand Up @@ -704,7 +704,7 @@ pub fn from_plan(
match expr {
Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::InSubquery { .. } => {
| Expr::InSubquery(_) => {
// subqueries could contain aliases so we don't recurse into those
Ok(RewriteRecursion::Stop)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec<Expr>) -> Res

impl ExpressionVisitor for InSubqueryVisitor<'_> {
fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
if let Expr::InSubquery { .. } = expr {
if let Expr::InSubquery(_) = expr {
self.accum.push(expr.to_owned());
}
Ok(Recursion::Continue(self))
Expand Down
14 changes: 7 additions & 7 deletions datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{Column, DFField, DFSchema, DFSchemaRef, Result};
use datafusion_expr::expr::AggregateFunction;
use datafusion_expr::expr::{AggregateFunction, InSubquery};
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::Expr::{InSubquery, ScalarSubquery};
use datafusion_expr::Expr::ScalarSubquery;
use datafusion_expr::{
aggregate_function, count, expr, lit, window_function, Aggregate, Expr, Filter,
LogicalPlan, Projection, Sort, Subquery, Window,
Expand Down Expand Up @@ -191,25 +191,25 @@ impl TreeNodeRewriter for CountWildcardRewriter {
outer_ref_columns,
})
}
InSubquery {
Expr::InSubquery(InSubquery {
expr,
subquery,
negated,
} => {
}) => {
let new_plan = subquery
.subquery
.as_ref()
.clone()
.transform_down(&analyze_internal)?;

InSubquery {
Expr::InSubquery(InSubquery::new(
expr,
subquery: Subquery {
Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns: subquery.outer_ref_columns,
},
negated,
}
))
}
Expr::Exists(expr::Exists { subquery, negated }) => {
let new_plan = subquery
Expand Down
13 changes: 6 additions & 7 deletions datafusion/optimizer/src/analyzer/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result;
use datafusion_expr::expr::Exists;
use datafusion_expr::expr::InSubquery;
use datafusion_expr::{
logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan,
};
Expand Down Expand Up @@ -91,19 +92,17 @@ fn rewrite_subquery(expr: Expr) -> Result<Transformed<Expr>> {
let subquery = subquery.with_plan(Arc::new(new_plan));
Ok(Transformed::Yes(Expr::Exists(Exists { subquery, negated })))
}
Expr::InSubquery {
Expr::InSubquery(InSubquery {
expr,
subquery,
negated,
} => {
}) => {
let plan = subquery.subquery.as_ref().clone();
let new_plan = plan.transform_up(&analyze_internal)?;
let subquery = subquery.with_plan(Arc::new(new_plan));
Ok(Transformed::Yes(Expr::InSubquery {
expr,
subquery,
negated,
}))
Ok(Transformed::Yes(Expr::InSubquery(InSubquery::new(
expr, subquery, negated,
))))
}
Expr::ScalarSubquery(subquery) => {
let plan = subquery.subquery.as_ref().clone();
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr::Exists;
use datafusion_expr::expr::InSubquery;
use datafusion_expr::utils::inspect_expr_pre;
use datafusion_expr::{Expr, LogicalPlan};
use log::debug;
Expand Down Expand Up @@ -121,7 +122,7 @@ fn check_plan(plan: &LogicalPlan) -> Result<()> {
// recursively look for subqueries
inspect_expr_pre(expr, |expr| match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery { subquery, .. }
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
check_subquery_expr(plan, &subquery.subquery, expr)
}
Expand Down
Loading