Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implement join #559

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ mod plan_expression_validator;
mod plan_expression_visitor;
mod plan_filter;
mod plan_having;
mod plan_join;
mod plan_insert_into;
mod plan_limit;
mod plan_node;
Expand Down
10 changes: 10 additions & 0 deletions common/planners/src/plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::Expression;
use crate::ExpressionPlan;
use crate::FilterPlan;
use crate::HavingPlan;
use crate::JoinPlan;
use crate::LimitPlan;
use crate::PlanNode;
use crate::ProjectionPlan;
Expand Down Expand Up @@ -224,6 +225,15 @@ impl PlanBuilder {
})))
}

/// Apply a join
pub fn join(&self, conditions: &[ExpressionAction], right: &PlanNode) -> Result<Self> {
Ok(Self::from(&PlanNode::Join(JoinPlan {
conditions: Vec::from(conditions),
left_input: Arc::new(self.plan.clone()),
right_input: Arc::new(PlanBuilder::from(right).plan.clone())
})))
}

/// Apply a filter
pub fn filter(&self, expr: Expression) -> Result<Self> {
validate_expression(&expr)?;
Expand Down
6 changes: 6 additions & 0 deletions common/planners/src/plan_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ impl PlanNode {
write!(f, " if_exists:{:}", plan.if_exists)?;
Ok(false)
}
PlanNode::Join(plan) => {
write!(f, "Join\n")?;
write!(f, "{:?}\n", &plan.left_input)?;
write!(f, "{:?}\n", &plan.right_input)?;
Ok(false)
}
_ => Ok(false),
}
})
Expand Down
53 changes: 53 additions & 0 deletions common/planners/src/plan_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2020-2021 The Datafuse Authors.
//
// SPDX-License-Identifier: Apache-2.0.

use std::sync::Arc;

use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_exception::Result;

use crate::ExpressionAction;
use crate::PlanNode;

#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct JoinPlan {
/// TODO: Support outer join and semi join

/// The conjunctions of join condition
pub conditions: Vec<ExpressionAction>,
pub left_input: Arc<PlanNode>,
pub right_input: Arc<PlanNode>
}

impl JoinPlan {
/// TODO: support duplicated column name
pub fn schema(&self) -> DataSchemaRef {
Arc::new(
DataSchema::try_merge(vec![
(*self.left_input.schema()).clone(),
(*self.right_input.schema()).clone(),
])
.unwrap()
)
}

pub fn get_left_child(&self) -> Arc<PlanNode> {
self.left_input.clone()
}

pub fn get_right_child(&self) -> Arc<PlanNode> {
self.right_input.clone()
}

pub fn set_left_child(&mut self, child: &PlanNode) -> Result<()> {
self.left_input = Arc::new(child.clone());
Ok(())
}

pub fn set_right_child(&mut self, child: &PlanNode) -> Result<()> {
self.right_input = Arc::new(child.clone());
Ok(())
}
}
4 changes: 4 additions & 0 deletions common/planners/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::ExplainPlan;
use crate::ExpressionPlan;
use crate::FilterPlan;
use crate::HavingPlan;
use crate::JoinPlan;
use crate::InsertIntoPlan;
use crate::LimitPlan;
use crate::ProjectionPlan;
Expand All @@ -40,6 +41,7 @@ pub enum PlanNode {
Sort(SortPlan),
Limit(LimitPlan),
Scan(ScanPlan),
Join(JoinPlan),
ReadSource(ReadDataSourcePlan),
Select(SelectPlan),
Explain(ExplainPlan),
Expand All @@ -65,6 +67,7 @@ impl PlanNode {
PlanNode::AggregatorFinal(v) => v.schema(),
PlanNode::Filter(v) => v.schema(),
PlanNode::Having(v) => v.schema(),
PlanNode::Join(v) => v.schema(),
PlanNode::Limit(v) => v.schema(),
PlanNode::ReadSource(v) => v.schema(),
PlanNode::Select(v) => v.schema(),
Expand All @@ -91,6 +94,7 @@ impl PlanNode {
PlanNode::AggregatorFinal(_) => "AggregatorFinalPlan",
PlanNode::Filter(_) => "FilterPlan",
PlanNode::Having(_) => "HavingPlan",
PlanNode::Join(_) => "JoinPlan",
PlanNode::Limit(_) => "LimitPlan",
PlanNode::ReadSource(_) => "ReadSourcePlan",
PlanNode::Select(_) => "SelectPlan",
Expand Down
6 changes: 6 additions & 0 deletions common/planners/src/plan_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::Expression;
use crate::ExpressionPlan;
use crate::FilterPlan;
use crate::HavingPlan;
use crate::JoinPlan;
use crate::InsertIntoPlan;
use crate::LimitPlan;
use crate::PlanNode;
Expand Down Expand Up @@ -77,6 +78,7 @@ pub trait PlanRewriter<'plan> {
PlanNode::Expression(plan) => self.rewrite_expression(plan),
PlanNode::DropTable(plan) => self.rewrite_drop_table(plan),
PlanNode::DropDatabase(plan) => self.rewrite_drop_database(plan),
PlanNode::Join(plan) => self.rewrite_join(plan)
PlanNode::InsertInto(plan) => self.rewrite_insert_into(plan)
}
}
Expand Down Expand Up @@ -205,6 +207,10 @@ pub trait PlanRewriter<'plan> {
Ok(PlanNode::DropDatabase(plan.clone()))
}

fn rewrite_join(&mut self, plan: &'plan JoinPlan) -> Result<PlanNode> {
Ok(PlanNode::Join(plan.clone()))
}

fn rewrite_insert_into(&mut self, plan: &'plan InsertIntoPlan) -> Result<PlanNode> {
Ok(PlanNode::InsertInto(plan.clone()))
}
Expand Down
8 changes: 8 additions & 0 deletions common/planners/src/plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::ExplainPlan;
use crate::ExpressionPlan;
use crate::FilterPlan;
use crate::HavingPlan;
use crate::JoinPlan;
use crate::InsertIntoPlan;
use crate::LimitPlan;
use crate::PlanNode;
Expand Down Expand Up @@ -89,6 +90,7 @@ pub trait PlanVisitor<'plan> {
PlanNode::Stage(plan) => self.visit_stage(plan),
PlanNode::Having(plan) => self.visit_having(plan),
PlanNode::Expression(plan) => self.visit_expression(plan),
PlanNode::Join(plan) => self.visit_join(plan),
PlanNode::InsertInto(plan) => self.visit_insert_into(plan)
}
}
Expand Down Expand Up @@ -154,5 +156,11 @@ pub trait PlanVisitor<'plan> {
fn visit_use_database(&mut self, _: &'plan UseDatabasePlan) {}

fn visit_set_variable(&mut self, _: &'plan SettingPlan) {}

fn visit_join(&mut self, join: &'plan JoinPlan) {
self.visit_plan_node(join.left_input.as_ref());
self.visit_plan_node(join.right_input.as_ref());
}

fn visit_insert_into(&mut self, _: &'plan InsertIntoPlan) {}
}
11 changes: 4 additions & 7 deletions common/streams/src/stream_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@ use crate::SendableDataBlockStream;

pub struct SortStream {
input: SendableDataBlockStream,
sort_columns_descriptions: Vec<SortColumnDescription>,
limit: Option<usize>
sort_columns_descriptions: Vec<SortColumnDescription>
}

impl SortStream {
pub fn try_create(
input: SendableDataBlockStream,
sort_columns_descriptions: Vec<SortColumnDescription>,
limit: Option<usize>
sort_columns_descriptions: Vec<SortColumnDescription>
) -> Result<Self> {
Ok(SortStream {
input,
sort_columns_descriptions,
limit
sort_columns_descriptions
})
}
}
Expand All @@ -44,7 +41,7 @@ impl Stream for SortStream {
Some(Ok(v)) => Some(DataBlock::sort_block(
&v,
&self.sort_columns_descriptions,
self.limit
None
)),
other => other
})
Expand Down