Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ select * from t1 order by a asc nulls first
- [x] Limit
- DML
- [x] Insert
- [ ] Update
- [x] Update
- [ ] Delete
- DataTypes
- Invalid
Expand Down
18 changes: 8 additions & 10 deletions src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use crate::planner::operator::values::ValuesOperator;
use crate::types::value::ValueRef;

impl Binder {

// TODO: 支持Project
pub(crate) fn bind_insert(
&mut self,
name: ObjectName,
Expand All @@ -24,21 +22,21 @@ impl Binder {

if let Some(table) = self.context.catalog.get_table_by_name(table_name) {
let table_id = table.id;
let mut col_catalogs = Vec::new();
let mut columns = Vec::new();

if idents.is_empty() {
col_catalogs = table.all_columns()
columns = table.all_columns()
.into_iter()
.map(|(_, catalog)| catalog.clone())
.collect_vec();
} else {
let bind_table_name = Some(table.name.to_string());
let bind_table_name = Some(table_name.to_string());
for ident in idents {
match self.bind_column_ref_from_identifiers(
slice::from_ref(ident),
bind_table_name.as_ref()
)? {
ScalarExpression::ColumnRef(catalog) => col_catalogs.push(catalog),
ScalarExpression::ColumnRef(catalog) => columns.push(catalog),
_ => unreachable!()
}
}
Expand All @@ -56,7 +54,7 @@ impl Binder {
})
.try_collect()?;

let values_plan = self.bind_values(rows, col_catalogs.clone());
let values_plan = self.bind_values(rows, columns);

Ok(LogicalPlan {
operator: Operator::Insert(
Expand All @@ -71,15 +69,15 @@ impl Binder {
}
}

fn bind_values(
pub(crate) fn bind_values(
&mut self,
rows: Vec<Vec<ValueRef>>,
col_catalogs: Vec<ColumnRef>
columns: Vec<ColumnRef>
) -> LogicalPlan {
LogicalPlan {
operator: Operator::Values(ValuesOperator {
rows,
columns: col_catalogs,
columns,
}),
childrens: vec![],
}
Expand Down
8 changes: 8 additions & 0 deletions src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod create_table;
pub mod expr;
mod select;
mod insert;
mod update;

use std::collections::BTreeMap;
use sqlparser::ast::{Ident, ObjectName, SetExpr, Statement};
Expand Down Expand Up @@ -71,6 +72,13 @@ impl Binder {
todo!()
}
}
Statement::Update { table, selection, assignments, .. } => {
if !table.joins.is_empty() {
unimplemented!()
} else {
self.bind_update(table, selection, assignments)?
}
}
_ => unimplemented!(),
};
Ok(plan)
Expand Down
4 changes: 2 additions & 2 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Binder {
Ok(plan)
}

fn bind_table_ref(&mut self, from: &[TableWithJoins]) -> Result<LogicalPlan, BindError> {
pub(crate) fn bind_table_ref(&mut self, from: &[TableWithJoins]) -> Result<LogicalPlan, BindError> {
assert!(from.len() < 2, "not support yet.");
if from.is_empty() {
return Ok(LogicalPlan {
Expand Down Expand Up @@ -253,7 +253,7 @@ impl Binder {
Ok(LJoinOperator::new(left, right, on, join_type))
}

fn bind_where(
pub(crate) fn bind_where(
&mut self,
children: LogicalPlan,
predicate: &Expr,
Expand Down
72 changes: 72 additions & 0 deletions src/binder/update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::slice;
use sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins};
use crate::binder::{Binder, BindError, lower_case_name, split_name};
use crate::expression::ScalarExpression;
use crate::planner::LogicalPlan;
use crate::planner::operator::Operator;
use crate::planner::operator::update::UpdateOperator;
use crate::types::value::ValueRef;

impl Binder {
pub(crate) fn bind_update(
&mut self,
to: &TableWithJoins,
selection: &Option<Expr>,
assignments: &[Assignment]
) -> Result<LogicalPlan, BindError> {
if let TableFactor::Table { name, .. } = &to.relation {
let name = lower_case_name(&name);
let (_, table_name) = split_name(&name)?;

let mut plan = self.bind_table_ref(slice::from_ref(to))?;

if let Some(predicate) = selection {
plan = self.bind_where(plan, predicate)?;
}

if let Some(table) = self.context.catalog.get_table_by_name(table_name) {
let table_id = table.id;
let bind_table_name = Some(table_name.to_string());

let mut columns = Vec::with_capacity(assignments.len());
let mut row = Vec::with_capacity(assignments.len());


for assignment in assignments {
let value = match self.bind_expr(&assignment.value)? {
ScalarExpression::Constant(value) => Ok::<ValueRef, BindError>(value),
_ => unreachable!(),
}?;

for ident in &assignment.id {
match self.bind_column_ref_from_identifiers(
slice::from_ref(&ident),
bind_table_name.as_ref()
)? {
ScalarExpression::ColumnRef(catalog) => {
columns.push(catalog);
row.push(value.clone());
},
_ => unreachable!()
}
}
}

let values_plan = self.bind_values(vec![row], columns);

Ok(LogicalPlan {
operator: Operator::Update(
UpdateOperator {
table_id,
}
),
childrens: vec![plan, values_plan],
})
} else {
Err(BindError::InvalidTable(format!("not found table {}", table_name)))
}
} else {
unreachable!("only table")
}
}
}
7 changes: 7 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ mod test {
let tuples_full_join = kipsql.run("select * from t1 full join t2 on a = c").await?;
println!("{}", create_table(&tuples_full_join));

println!("update t1 and filter:");
let _ = kipsql.run("update t1 set a = 0 where b > 1").await?;
println!("after t1:");
let update_after_full_t1 = kipsql.run("select * from t1").await?;
println!("{}", create_table(&update_after_full_t1));


Ok(())
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/execution/executor/dml/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::catalog::CatalogError;
use crate::execution::executor::BoxedExecutor;
use crate::execution::ExecutorError;
use crate::storage::{Storage, Table};
use crate::types::{ColumnId, TableId};
use crate::types::{ColumnId, IdGenerator, TableId};
use crate::types::tuple::Tuple;
use crate::types::value::{DataValue, ValueRef};

Expand All @@ -29,7 +29,7 @@ impl Insert {
let all_columns = table_catalog.all_columns();

let mut tuple = Tuple {
id: None,
id: Some(IdGenerator::build() as usize),
columns: Vec::with_capacity(all_columns.len()),
values: Vec::with_capacity(all_columns.len()),
};
Expand Down
1 change: 1 addition & 0 deletions src/execution/executor/dml/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub(crate) mod insert;
pub(crate) mod update;
45 changes: 45 additions & 0 deletions src/execution/executor/dml/update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::collections::HashMap;
use futures_async_stream::try_stream;
use crate::catalog::CatalogError;
use crate::execution::executor::BoxedExecutor;
use crate::execution::ExecutorError;
use crate::storage::{Storage, Table};
use crate::types::TableId;
use crate::types::tuple::Tuple;

pub struct Update { }

impl Update {
#[try_stream(boxed, ok = Tuple, error = ExecutorError)]
pub async fn execute(table_id: TableId, input: BoxedExecutor, values: BoxedExecutor, storage: impl Storage) {
if let Some(table_catalog) = storage.get_catalog().get_table(&table_id) {
let mut value_map = HashMap::new();

// only once
#[for_await]
for tuple in values {
let Tuple { columns, values, .. } = tuple?;
for i in 0..columns.len() {
value_map.insert(columns[i].id, values[i].clone());
}
}

let table = storage.get_table(&table_catalog.id)?;

#[for_await]
for tuple in input {
let mut tuple = tuple?;

for (i, column) in tuple.columns.iter().enumerate() {
if let Some(value) = value_map.get(&column.id) {
tuple.values[i] = value.clone();
}
}

table.append(tuple)?;
}
} else {
Err(CatalogError::NotFound("root", table_id.to_string()))?;
}
}
}
8 changes: 8 additions & 0 deletions src/execution/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use crate::execution::physical_plan::PhysicalPlan;
use crate::execution::executor::ddl::create::CreateTable;
use crate::execution::executor::dql::filter::Filter;
use crate::execution::executor::dml::insert::Insert;
use crate::execution::executor::dml::update::Update;
use crate::execution::executor::dql::join::hash_join::HashJoin;
use crate::execution::executor::dql::limit::Limit;
use crate::execution::executor::dql::projection::Projection;
use crate::execution::executor::dql::seq_scan::SeqScan;
use crate::execution::executor::dql::sort::Sort;
use crate::execution::executor::dql::values::Values;
use crate::execution::ExecutorError;
use crate::execution::physical_plan::physical_update::PhysicalUpdate;
use crate::planner::operator::join::JoinOperator;
use crate::storage::memory::MemStorage;
use crate::types::tuple::Tuple;
Expand Down Expand Up @@ -53,6 +55,12 @@ impl Executor {

Insert::execute(table_id, input, self.storage.clone())
}
PhysicalPlan::Update(PhysicalUpdate { table_id, input, values}) => {
let input = self.build(*input);
let values = self.build(*values);

Update::execute(table_id, input, values, self.storage.clone())
}
PhysicalPlan::Values(op) => {
Values::execute(op)
}
Expand Down
3 changes: 3 additions & 0 deletions src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::execution::physical_plan::physical_limit::PhysicalLimit;
use crate::execution::physical_plan::physical_projection::PhysicalProjection;
use crate::execution::physical_plan::physical_sort::PhysicalSort;
use crate::execution::physical_plan::physical_table_scan::PhysicalTableScan;
use crate::execution::physical_plan::physical_update::PhysicalUpdate;
use crate::execution::physical_plan::physical_values::PhysicalValues;

pub(crate) mod physical_create_table;
Expand All @@ -18,10 +19,12 @@ pub(crate) mod physical_filter;
pub(crate) mod physical_sort;
pub(crate) mod physical_limit;
pub(crate) mod physical_hash_join;
pub(crate) mod physical_update;

#[derive(Debug)]
pub enum PhysicalPlan {
Insert(PhysicalInsert),
Update(PhysicalUpdate),
CreateTable(PhysicalCreateTable),
TableScan(PhysicalTableScan),
Projection(PhysicalProjection),
Expand Down
19 changes: 19 additions & 0 deletions src/execution/physical_plan/physical_plan_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::execution::physical_plan::physical_hash_join::PhysicalHashJoin;
use crate::execution::physical_plan::physical_insert::PhysicalInsert;
use crate::execution::physical_plan::physical_limit::PhysicalLimit;
use crate::execution::physical_plan::physical_sort::PhysicalSort;
use crate::execution::physical_plan::physical_update::PhysicalUpdate;
use crate::execution::physical_plan::physical_values::PhysicalValues;
use crate::planner::operator::create_table::CreateTableOperator;
use crate::planner::operator::filter::FilterOperator;
Expand All @@ -18,6 +19,7 @@ use crate::planner::operator::join::{JoinOperator, JoinType};
use crate::planner::operator::limit::LimitOperator;
use crate::planner::operator::project::ProjectOperator;
use crate::planner::operator::sort::SortOperator;
use crate::planner::operator::update::UpdateOperator;
use crate::planner::operator::values::ValuesOperator;

pub struct PhysicalPlanMapping;
Expand Down Expand Up @@ -65,6 +67,12 @@ impl PhysicalPlanMapping {

Self::build_physical_join(left_child, right_child, op)?
}
Operator::Update(op) => {
let input = plan.childrens.remove(0);
let values = plan.childrens.remove(0);

Self::build_physical_update(input, values, op)?
}
_ => return Err(MappingError::Unsupported(format!("{:?}", plan.operator))),
};

Expand Down Expand Up @@ -146,4 +154,15 @@ impl PhysicalPlanMapping {
}))
}
}

fn build_physical_update(input: LogicalPlan, values: LogicalPlan, op: UpdateOperator) -> Result<PhysicalPlan, MappingError> {
let input = Box::new(Self::build_plan(input)?);
let values = Box::new(Self::build_plan(values)?);

Ok(PhysicalPlan::Update(PhysicalUpdate {
table_id: op.table_id,
input,
values,
}))
}
}
9 changes: 9 additions & 0 deletions src/execution/physical_plan/physical_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::execution::physical_plan::PhysicalPlan;
use crate::types::TableId;

#[derive(Debug)]
pub struct PhysicalUpdate {
pub(crate) table_id: TableId,
pub(crate) input: Box<PhysicalPlan>,
pub(crate) values: Box<PhysicalPlan>
}
Loading