Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::utils::split_conjunction;
use datafusion_expr::{
Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension,
FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan,
WindowFrame, WindowFrameBound, WriteOp,
Analyze, BinaryExpr, DelimGet, DescribeTable, DmlStatement, Explain, ExplainFormat,
Extension, FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType,
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::{
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
};
Expand Down Expand Up @@ -1520,6 +1520,63 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: Analyze must be root of the plan"
)
}
LogicalPlan::DependentJoin(_) => {
return internal_err!(
"Optimizors have not completely remove dependent join"
)
}
LogicalPlan::DelimGet(DelimGet {
table_name,
projected_schema,
..
}) => {
let resolved = session_state.resolve_table_ref(table_name.clone());
if let Ok(schema) = session_state.schema_for_ref(resolved.clone()) {
if let Some(table) = schema.table(&resolved.table).await? {
let mut proj = vec![];
for (i, field) in table.schema().fields().iter().enumerate() {
for iter in projected_schema.as_ref().iter() {
if iter.1 == field {
proj.push(i);
}
}
}

// First create the scan execution plan.
let scan_plan =
table.scan(session_state, Some(&proj), &[], None).await?;

// Now add aggregation to eliminate duplicated rows.
// Create a PhysicalGroupBy with empty expressions, which means we're grouping by all columns
let schema = &scan_plan.schema();
let group_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = (0
..schema.fields().len())
.map(|i| {
let name = schema.field(i).name().to_string();
let expr = Arc::new(Column::new(&name, i))
as Arc<dyn PhysicalExpr>;
(expr, name)
})
.collect();

let group_by = PhysicalGroupBy::new_single(group_exprs);

// Create the AggregateExec with no aggregate expressions to deduplicate the rows
Arc::new(AggregateExec::try_new(
AggregateMode::Final,
group_by,
vec![], // No aggregate expressions, just grouping to deduplicate
vec![], // No filters
scan_plan.clone(),
scan_plan.schema(),
)?)
} else {
return internal_err!("no table provider");
}
} else {
return internal_err!("empty schema");
}
}
};
Ok(exec_node)
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,8 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"StructColumn": expr_vec_fmt!(struct_type_columns),
})
}
LogicalPlan::DependentJoin(..) => json!({}),
LogicalPlan::DelimGet(_) => todo!(),
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter,
Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
projection_schema, Aggregate, Analyze, ColumnUnnestList, DelimGet, DependentJoin,
DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, ExplainOption,
Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, Projection, RecursiveQuery, Repartition, SkipType, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
Unnest, Values, Window,
};
pub use statement::{
Deallocate, Execute, Prepare, ResetVariable, SetVariable, Statement,
Expand Down
182 changes: 180 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,158 @@ pub enum LogicalPlan {
Unnest(Unnest),
/// A variadic query (e.g. "Recursive CTEs")
RecursiveQuery(RecursiveQuery),
/// A node type that only exist during subquery decorrelation
/// TODO: maybe we can avoid creating new type of LogicalPlan for this usecase
DependentJoin(DependentJoin),

DelimGet(DelimGet),
}

#[derive(Debug, Clone, Eq)]
pub struct DelimGet {
// TODO: is it necessary to alias?
pub table_name: TableReference,
pub columns: Vec<Column>,
/// The schema description of the output
pub projected_schema: DFSchemaRef,
// TODO: add more variables as needed.
}

impl PartialEq for DelimGet {
fn eq(&self, other: &Self) -> bool {
self.table_name == other.table_name && self.columns == other.columns
}
}

impl Hash for DelimGet {
fn hash<H: Hasher>(&self, state: &mut H) {
self.table_name.hash(state);
self.columns.hash(state);
}
}

impl PartialOrd for DelimGet {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.table_name.partial_cmp(&other.table_name) {
Some(Ordering::Equal) => self.columns.partial_cmp(&other.columns),
cmp => cmp,
}
}
}

#[derive(Clone, Debug, Eq, PartialOrd, Hash)]
pub struct CorrelatedColumnInfo {
pub col: Column,
// TODO: is data_type necessary?
pub data_type: DataType,
pub depth: usize,
}

impl CorrelatedColumnInfo {
pub fn new(col: Column) -> Self {
Self {
col,
data_type: DataType::Null,
depth: 0,
}
}
}

impl PartialEq for CorrelatedColumnInfo {
fn eq(&self, other: &Self) -> bool {
self.col == other.col
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DependentJoin {
pub schema: DFSchemaRef,
// All combinations of (subquery depth,Column and its DataType) on the RHS (and its descendant)
// which points to a column on the LHS of this dependent join
// Note that not all outer_refs from the RHS are mentioned in this vectors
// because RHS may reference columns provided somewhere from the above parent dependent join.
// Depths of each correlated_columns should always be gte current dependent join
// subquery_depth
pub correlated_columns: Vec<CorrelatedColumnInfo>,
// the upper expr that containing the subquery expr
// i.e for predicates: where outer = scalar_sq + 1
// correlated exprs are `scalar_sq + 1`
pub subquery_expr: Option<Expr>,
// begins with depth = 1
pub subquery_depth: usize,
pub left: Arc<LogicalPlan>,
// dependent side accessing columns from left hand side (and maybe columns)
// belong to the parent dependent join node in case of recursion)
pub right: Arc<LogicalPlan>,
pub subquery_name: String,

pub lateral_join_condition: Option<(JoinType, Expr)>,
pub any_join: bool,
}

impl Display for DependentJoin {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let correlated_str = self
.correlated_columns
.iter()
.map(|info| format!("{0} lvl {1}", info.col, info.depth))
.collect::<Vec<String>>()
.join(", ");
let lateral_join_info =
if let Some((join_type, join_expr)) = &self.lateral_join_condition {
format!(" lateral {join_type} join with {join_expr}")
} else {
"".to_string()
};
let subquery_expr_str = if let Some(expr) = &self.subquery_expr {
format!(" with expr {expr}")
} else {
"".to_string()
};
write!(
f,
"DependentJoin on [{correlated_str}]{subquery_expr_str}\
{lateral_join_info} depth {0}",
self.subquery_depth,
)
}
}

impl PartialOrd for DependentJoin {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
#[derive(PartialEq, PartialOrd)]
struct ComparableJoin<'a> {
correlated_columns: &'a Vec<CorrelatedColumnInfo>,
// the upper expr that containing the subquery expr
// i.e for predicates: where outer = scalar_sq + 1
// correlated exprs are `scalar_sq + 1`
subquery_expr: &'a Option<Expr>,

depth: &'a usize,
left: &'a Arc<LogicalPlan>,
// dependent side accessing columns from left hand side (and maybe columns)
// belong to the parent dependent join node in case of recursion)
right: &'a Arc<LogicalPlan>,
lateral_join_condition: &'a Option<(JoinType, Expr)>,
}
let comparable_self = ComparableJoin {
left: &self.left,
right: &self.right,
correlated_columns: &self.correlated_columns,
subquery_expr: &self.subquery_expr,
depth: &self.subquery_depth,
lateral_join_condition: &self.lateral_join_condition,
};
let comparable_other = ComparableJoin {
left: &other.left,
right: &other.right,
correlated_columns: &other.correlated_columns,
subquery_expr: &other.subquery_expr,
depth: &other.subquery_depth,
lateral_join_condition: &other.lateral_join_condition,
};
comparable_self.partial_cmp(&comparable_other)
}
}

impl Default for LogicalPlan {
Expand Down Expand Up @@ -355,6 +507,10 @@ impl LogicalPlan {
// we take the schema of the static term as the schema of the entire recursive query
static_term.schema()
}
LogicalPlan::DependentJoin(DependentJoin { schema, .. }) => schema,
LogicalPlan::DelimGet(DelimGet {
projected_schema, ..
}) => projected_schema,
}
}

Expand Down Expand Up @@ -478,11 +634,12 @@ impl LogicalPlan {
..
}) => vec![static_term, recursive_term],
LogicalPlan::Statement(stmt) => stmt.inputs(),
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::DescribeTable(_) => vec![],
| LogicalPlan::DescribeTable(_)
| LogicalPlan::DependentJoin(_)
| LogicalPlan::DelimGet(_) => vec![],
}
}

Expand Down Expand Up @@ -544,6 +701,8 @@ impl LogicalPlan {
| LogicalPlan::Limit(Limit { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
| LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
LogicalPlan::DependentJoin(_) => todo!(),
LogicalPlan::DelimGet(_) => todo!(),
LogicalPlan::Join(Join {
left,
right,
Expand Down Expand Up @@ -653,6 +812,8 @@ impl LogicalPlan {
}) => Aggregate::try_new(input, group_expr, aggr_expr)
.map(LogicalPlan::Aggregate),
LogicalPlan::Sort(_) => Ok(self),
LogicalPlan::DependentJoin(_) => todo!(),
LogicalPlan::DelimGet(_) => Ok(self),
LogicalPlan::Join(Join {
left,
right,
Expand Down Expand Up @@ -1143,6 +1304,8 @@ impl LogicalPlan {
unnest_with_options(input, columns.clone(), options.clone())?;
Ok(new_plan)
}
LogicalPlan::DependentJoin(_) => todo!(),
LogicalPlan::DelimGet(_) => todo!(),
}
}

Expand Down Expand Up @@ -1381,6 +1544,8 @@ impl LogicalPlan {
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Extension(_) => None,
LogicalPlan::DependentJoin(DependentJoin { left, .. }) => left.max_rows(),
LogicalPlan::DelimGet(_) => todo!(),
}
}

Expand Down Expand Up @@ -2071,6 +2236,19 @@ impl LogicalPlan {
expr_vec_fmt!(list_type_columns),
expr_vec_fmt!(struct_type_columns))
}
LogicalPlan::DependentJoin(dependent_join) => {
Display::fmt(dependent_join, f)
}
LogicalPlan::DelimGet(DelimGet{columns,..}) => {
write!(f, "DelimGet:")?; // TODO
for (i, expr_item) in columns.iter().enumerate() {
if i > 0 {
write!(f, ",")?;
}
write!(f, " {expr_item}")?;
}
Ok(())
}
}
}
}
Expand Down
Loading
Loading