Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/query/catalog/src/plan/datasource/datasource_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::plan::Partitions;
use crate::plan::PushDownInfo;
use crate::table_args::TableArgs;

// TODO: Delete the scan plan field, but it depends on plan_parser:L394
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct DataSourcePlan {
pub source_info: DataSourceInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ optimized_plan: |
│ ├── order by: []
│ └── limit: NONE
└── EvalScalar
├── scalars: [sr_store_sk (#103) AS (#103), multiply(divide(sum(ctr_total_return) (#145), if(eq(count(ctr_total_return) (#146), 0), 1, count(ctr_total_return) (#146))), 1.2) AS (#147)]
├── scalars: [outer.sr_store_sk (#103) AS (#103), multiply(divide(sum(ctr_total_return) (#145), if(eq(count(ctr_total_return) (#146), 0), 1, count(ctr_total_return) (#146))), 1.2) AS (#147)]
└── Aggregate(Final)
├── group items: [subquery_103 (#103)]
├── group items: [outer.sr_store_sk (#103)]
├── aggregate functions: [sum(ctr_total_return) (#145), count(ctr_total_return) (#146)]
└── Aggregate(Partial)
├── group items: [subquery_103 (#103)]
├── group items: [outer.sr_store_sk (#103)]
├── aggregate functions: [sum(ctr_total_return) (#145), count(ctr_total_return) (#146)]
└── Aggregate(Final)
├── group items: [store_returns.sr_customer_sk (#99), store_returns.sr_store_sk (#103)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
use databend_common_exception::Result;
use databend_common_expression::Column;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;

use crate::executor::PhysicalPlan;
use crate::executor::PhysicalPlanBuilder;
use crate::ColumnSet;
use crate::IndexType;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct ConstantTableScan {
Expand Down Expand Up @@ -50,20 +50,35 @@ impl PhysicalPlanBuilder {
scan: &crate::plans::ConstantTableScan,
required: ColumnSet,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
let used: ColumnSet = required.intersection(&scan.columns).cloned().collect();
let (values, fields) = if used == scan.columns {
(scan.values.clone(), scan.schema.fields().clone())
} else {
let new_scan = scan.prune_columns(used);
(new_scan.values.clone(), new_scan.schema.fields().clone())
};
// 2. Build physical plan.
debug_assert!(scan
.schema
.fields
.iter()
.map(|field| field.name().parse::<IndexType>().unwrap())
.collect::<ColumnSet>()
.is_superset(&scan.columns));

let used: ColumnSet = required.intersection(&scan.columns).copied().collect();
if used.len() < scan.columns.len() {
let crate::plans::ConstantTableScan {
values,
num_rows,
schema,
..
} = scan.prune_columns(used);
return Ok(PhysicalPlan::ConstantTableScan(ConstantTableScan {
plan_id: 0,
values,
num_rows,
output_schema: schema,
}));
}

Ok(PhysicalPlan::ConstantTableScan(ConstantTableScan {
plan_id: 0,
values,
values: scan.values.clone(),
num_rows: scan.num_rows,
output_schema: DataSchemaRefExt::create(fields),
output_schema: scan.schema.clone(),
}))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ impl Binder {
let mut decorrelator =
SubqueryRewriter::new(self.ctx.clone(), self.metadata.clone(), Some(self.clone()));
right_child = decorrelator.flatten_plan(
&left_child,
&right_child,
&right_prop.outer_columns,
&mut FlattenInfo {
Expand Down
9 changes: 5 additions & 4 deletions src/query/sql/src/planner/format/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,6 @@ impl<
let mut tree = self
.operator_humanizer
.humanize_operator(self.id_humanizer, op);
let children = s_expr
.children()
.map(|s_expr| self.humanize_s_expr(s_expr))
.collect::<Result<Vec<_>>>()?;

if self.verbose {
let rel_expr = RelExpr::with_s_expr(s_expr);
Expand All @@ -177,6 +173,11 @@ impl<
tree.children.extend(stats);
}

let children = s_expr
.children()
.map(|s_expr| self.humanize_s_expr(s_expr))
.collect::<Result<Vec<_>>>()?;

tree.children.extend(children);
Ok(tree)
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ pub struct DerivedColumn {
pub alias: String,
pub data_type: DataType,
// if the derived column is generated by the scalar expr, save the `scalar_expr`.
// Currently, it's only used by decorrelating subquery.
// Currently, it's only used by WindowRewriter.
pub scalar_expr: Option<ScalarExpr>,
}

Expand Down
59 changes: 31 additions & 28 deletions src/query/sql/src/planner/optimizer/decorrelate/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,18 @@ impl SubqueryRewriter {

pub fn try_decorrelate_subquery(
&mut self,
left: &SExpr,
outer: &SExpr,
subquery: &SubqueryExpr,
flatten_info: &mut FlattenInfo,
is_conjunctive_predicate: bool,
) -> Result<(SExpr, UnnestResult)> {
match subquery.typ {
SubqueryType::Scalar => {
let correlated_columns = subquery.outer_columns.clone();
let correlated_columns = &subquery.outer_columns;
let flatten_plan = self.flatten_plan(
outer,
&subquery.subquery,
&correlated_columns,
correlated_columns,
flatten_info,
false,
)?;
Expand All @@ -280,23 +281,23 @@ impl SubqueryRewriter {
let mut right_conditions = Vec::with_capacity(correlated_columns.len());
self.add_equi_conditions(
subquery.span,
&correlated_columns,
correlated_columns,
&mut right_conditions,
&mut left_conditions,
)?;

let mut join_type = JoinType::LeftSingle;
if matches!(subquery.contain_agg, Some(true)) {
let join_type = if matches!(subquery.contain_agg, Some(true)) && {
let rel_expr = RelExpr::with_s_expr(&subquery.subquery);
let card = rel_expr
rel_expr
.derive_cardinality()?
.statistics
.precise_cardinality;

if card.is_some() {
join_type = JoinType::Left;
}
}
.precise_cardinality
.is_some()
} {
JoinType::Left
} else {
JoinType::LeftSingle
};

let join_plan = Join {
equi_conditions: JoinEquiCondition::new_conditions(
Expand All @@ -315,21 +316,22 @@ impl SubqueryRewriter {
};
let s_expr = SExpr::create_binary(
Arc::new(join_plan.into()),
Arc::new(left.clone()),
Arc::new(outer.clone()),
Arc::new(flatten_plan),
);
Ok((s_expr, UnnestResult::SingleJoin))
}
SubqueryType::Exists | SubqueryType::NotExists => {
if is_conjunctive_predicate {
if let Some(result) = self.try_decorrelate_simple_subquery(left, subquery)? {
if let Some(result) = self.try_decorrelate_simple_subquery(outer, subquery)? {
return Ok((result, UnnestResult::SimpleJoin { output_index: None }));
}
}
let correlated_columns = subquery.outer_columns.clone();
let correlated_columns = &subquery.outer_columns;
let flatten_plan = self.flatten_plan(
outer,
&subquery.subquery,
&correlated_columns,
correlated_columns,
flatten_info,
false,
)?;
Expand All @@ -338,7 +340,7 @@ impl SubqueryRewriter {
let mut right_conditions = Vec::with_capacity(correlated_columns.len());
self.add_equi_conditions(
subquery.span,
&correlated_columns,
correlated_columns,
&mut left_conditions,
&mut right_conditions,
)?;
Expand Down Expand Up @@ -379,24 +381,25 @@ impl SubqueryRewriter {
};
let s_expr = SExpr::create_binary(
Arc::new(join_plan.into()),
Arc::new(left.clone()),
Arc::new(outer.clone()),
Arc::new(flatten_plan),
);
Ok((s_expr, UnnestResult::MarkJoin { marker_index }))
}
SubqueryType::Any => {
let correlated_columns = subquery.outer_columns.clone();
let correlated_columns = &subquery.outer_columns;
let flatten_plan = self.flatten_plan(
outer,
&subquery.subquery,
&correlated_columns,
correlated_columns,
flatten_info,
false,
)?;
let mut left_conditions = Vec::with_capacity(correlated_columns.len());
let mut right_conditions = Vec::with_capacity(correlated_columns.len());
self.add_equi_conditions(
subquery.span,
&correlated_columns,
correlated_columns,
&mut left_conditions,
&mut right_conditions,
)?;
Expand Down Expand Up @@ -450,7 +453,7 @@ impl SubqueryRewriter {
Ok((
SExpr::create_binary(
Arc::new(mark_join),
Arc::new(left.clone()),
Arc::new(outer.clone()),
Arc::new(flatten_plan),
),
UnnestResult::MarkJoin { marker_index },
Expand All @@ -467,23 +470,23 @@ impl SubqueryRewriter {
left_conditions: &mut Vec<ScalarExpr>,
right_conditions: &mut Vec<ScalarExpr>,
) -> Result<()> {
let mut correlated_columns = correlated_columns.clone().into_iter().collect::<Vec<_>>();
let mut correlated_columns = correlated_columns.iter().copied().collect::<Vec<_>>();
correlated_columns.sort();
for correlated_column in correlated_columns.iter() {
for correlated_column in correlated_columns {
let metadata = self.metadata.read();
let column_entry = metadata.column(*correlated_column);
let column_entry = metadata.column(correlated_column);
let right_column = ScalarExpr::BoundColumnRef(BoundColumnRef {
span,
column: ColumnBindingBuilder::new(
column_entry.name(),
*correlated_column,
correlated_column,
Box::from(column_entry.data_type()),
Visibility::Visible,
)
.table_index(column_entry.table_index())
.build(),
});
let Some(derive_column) = self.derived_columns.get(correlated_column) else {
let Some(derive_column) = self.derived_columns.get(&correlated_column) else {
continue;
};
let column_entry = metadata.column(*derive_column);
Expand Down
Loading