From 929dedabb7cc5be9121933a5171f22fda1e14bc8 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Wed, 26 Jul 2023 22:54:09 +0800 Subject: [PATCH 1/3] feat(binder): implement `Binder::bind_expr` --- src/binder/aggregate.rs | 4 +- src/binder/expr.rs | 80 ++++++- src/binder/select.rs | 416 ++++++++++++++++++++++++++++++++++-- src/expression/evaluator.rs | 4 +- src/expression/mod.rs | 41 +++- src/storage/memory.rs | 1 - 6 files changed, 518 insertions(+), 28 deletions(-) diff --git a/src/binder/aggregate.rs b/src/binder/aggregate.rs index 0540e04e..6934ebfd 100644 --- a/src/binder/aggregate.rs +++ b/src/binder/aggregate.rs @@ -206,7 +206,7 @@ impl Binder { }; let mut select_item = &mut select_list[i]; - let return_type = select_item.return_type().unwrap(); + let return_type = select_item.return_type(); self.context.group_by_exprs.push(std::mem::replace( &mut select_item, ScalarExpression::InputRef { @@ -234,7 +234,7 @@ impl Binder { expr, ScalarExpression::InputRef { index, - ty: expr.return_type().unwrap(), + ty: expr.return_type(), }, )) } diff --git a/src/binder/expr.rs b/src/binder/expr.rs index 0c57cbed..59313ecb 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -1,11 +1,13 @@ use crate::binder::BindError; use anyhow::Result; use itertools::Itertools; -use sqlparser::ast::{Expr, Ident}; +use sqlparser::ast::{BinaryOperator, Expr, Function, FunctionArg, FunctionArgExpr, Ident}; use std::slice; +use crate::expression::agg::AggKind; use super::Binder; use crate::expression::ScalarExpression; +use crate::types::LogicalType; impl Binder { pub(crate) fn bind_expr(&mut self, expr: &Expr) -> Result { @@ -13,6 +15,15 @@ impl Binder { Expr::Identifier(ident) => { self.bind_column_ref_from_identifiers(slice::from_ref(ident)) } + Expr::CompoundIdentifier(idents) => { + self.bind_column_ref_from_identifiers(idents) + } + Expr::BinaryOp { left, right, op} => { + self.bind_binary_op_internal(left, right, op) + } + Expr::Value(v) => Ok(ScalarExpression::Constant(v.into())), + Expr::Function(func) => self.bind_agg_call(func), + Expr::Nested(expr) => self.bind_expr(expr), _ => { todo!() } @@ -75,4 +86,71 @@ impl Binder { Ok(ScalarExpression::ColumnRef(column_catalog.clone())) } } + + fn bind_binary_op_internal( + &mut self, + left: &Expr, + right: &Expr, + op: &BinaryOperator, + ) -> Result { + let left_expr = Box::new(self.bind_expr(left)?); + let right_expr = Box::new(self.bind_expr(right)?); + let ty = LogicalType::max_logical_type( + &left_expr.return_type(), + &right_expr.return_type() + )?; + + Ok(ScalarExpression::Binary { + op: (op.clone()).into(), + left_expr, + right_expr, + ty, + }) + } + + fn bind_agg_call(&mut self, func: &Function) -> Result { + let args: Vec = func.args + .iter() + .map(|arg| { + let arg_expr = match arg { + FunctionArg::Named { arg, .. } => arg, + FunctionArg::Unnamed(arg) => arg, + }; + match arg_expr { + FunctionArgExpr::Expr(expr) => self.bind_expr(expr), + _ => todo!() + } + }) + .try_collect()?; + let ty = args[0].return_type(); + + Ok(match func.name.to_string().to_lowercase().as_str() { + "count" => ScalarExpression::AggCall{ + kind: AggKind::Count, + args, + ty: LogicalType::UInteger, + }, + "sum" => ScalarExpression::AggCall{ + kind: AggKind::Sum, + args, + ty, + }, + "min" => ScalarExpression::AggCall{ + kind: AggKind::Min, + args, + ty, + }, + "max" => ScalarExpression::AggCall{ + kind: AggKind::Max, + args, + ty, + }, + "avg" => ScalarExpression::AggCall{ + kind: AggKind::Avg, + args, + ty, + }, + _ => todo!(), + }) + } } diff --git a/src/binder/select.rs b/src/binder/select.rs index 417f4251..05130e28 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -316,36 +316,428 @@ impl Binder { #[cfg(test)] mod tests { - use sqlparser::ast::CharacterLength; - use super::*; - use crate::binder::{BinderContext, BindError}; + use crate::binder::BinderContext; use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog}; + use crate::expression::agg::AggKind; + use crate::expression::BinaryOperator::{Gt, Minus}; + use crate::expression::ScalarExpression::{AggCall, Binary, ColumnRef, Constant, InputRef}; use crate::planner::LogicalPlan; - use crate::types::LogicalType; - use crate::types::LogicalType::{Boolean, Integer}; + use crate::planner::operator::aggregate::AggregateOperator; + use crate::types::LogicalType::Integer; + use crate::types::value::DataValue::Int32; - fn test_root_catalog() -> Result { + fn test_root_catalog() -> Result { let mut root = RootCatalog::new(); let cols = vec![ ColumnCatalog::new("c1".to_string(), false, ColumnDesc::new(Integer, true)), - ColumnCatalog::new("c2".to_string(), false, ColumnDesc::new(Boolean, false)), + ColumnCatalog::new("c2".to_string(), false, ColumnDesc::new(Integer, false)), ]; let _ = root.add_table("t1".to_string(), cols)?; Ok(root) } - #[test] - fn test_select_bind() -> Result<(), BindError> { - let sql = "select * from t1"; + fn select_sql_run(sql: &str) -> Result { let root = test_root_catalog()?; let binder = Binder::new(BinderContext::new(root)); let stmt = crate::parser::parse_sql(sql).unwrap(); - let plan = binder.bind(&stmt[0]).unwrap(); - println!("{:#?}", plan); + binder.bind(&stmt[0]) + } + + #[test] + fn test_select_bind() -> Result<()> { + let plan_1 = select_sql_run("select * from t1")?; + assert_eq!(plan_1, just_col_mock()); + println!( + "just_col:\n {:#?}", + plan_1 + ); + let plan_2 = select_sql_run("select t1.c1, t1.c2 from t1")?; + assert_eq!(plan_2, table_with_col_mock()); + println!( + "table_with_col:\n {:#?}", + plan_2 + ); + let plan_3 = select_sql_run("select t1.c1, t1.c2 from t1 where c1 > 2")?; + assert_eq!(plan_3, table_with_col_and_c1_compare_constant_mock()); + println!( + "table_with_col_and_c1_compare_constant:\n {:#?}", + plan_3 + ); + let plan_4 = select_sql_run("select t1.c1, t1.c2 from t1 where c1 > c2")?; + assert_eq!(plan_4, table_with_col_and_c1_compare_c2_mock()); + println!( + "table_with_col_and_c1_compare_c2:\n {:#?}", + plan_4 + ); + let plan_5 = select_sql_run("select avg(t1.c1) from t1")?; + assert_eq!(plan_5, table_with_col_and_c1_avg_mock()); + println!( + "table_with_col_and_c1_avg:\n {:#?}", + plan_5 + ); + let plan_6 = select_sql_run("select t1.c1, t1.c2 from t1 where (t1.c1 - t1.c2) > 1")?; + assert_eq!(plan_6, table_with_col_nested_mock()); + println!( + "table_with_col_nested:\n {:#?}", + plan_6 + ); Ok(()) } + + fn just_col_mock() -> LogicalPlan { + LogicalPlan::Select( + LogicalSelectPlan { + operator: Arc::new(Operator::Project( + ProjectOperator { + columns: vec![ + ColumnRef( + ColumnCatalog { + id: Some(0), + name: "c1".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: true + }, + } + ), + ColumnRef( + ColumnCatalog { + id: Some(1), + name: "c2".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: false + }, + } + ) + ], + } + )), + children: vec![ + Arc::new( + LogicalSelectPlan { + operator: Arc::new(Operator::Scan( + ScanOperator { + table_ref_id: 0, + columns: vec![], + sort_fields: vec![], + pre_where: vec![], + limit: None, + } + )), + children: vec![], + } + ) + ], + } + ) + } + + fn table_with_col_mock() -> LogicalPlan { + LogicalPlan::Select( + LogicalSelectPlan { + operator: Arc::new(Operator::Project( + ProjectOperator { + columns: vec![ + ColumnRef( + ColumnCatalog { + id: Some(0), + name: "c1".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: true + }, + } + ), + ColumnRef( + ColumnCatalog { + id: Some(1), + name: "c2".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: false + }, + } + ) + ], + } + )), + children: vec![ + Arc::new( + LogicalSelectPlan { + operator: Arc::new(Operator::Scan( + ScanOperator { + table_ref_id: 0, + columns: vec![], + sort_fields: vec![], + pre_where: vec![], + limit: None, + } + )), + children: vec![], + } + ) + ], + } + ) + } + + fn table_with_col_and_c1_compare_constant_mock() -> LogicalPlan { + let col_ref_1 = ColumnRef( + ColumnCatalog { + id: Some(0), + name: "c1".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: true + }, + } + ); + LogicalPlan::Select( + LogicalSelectPlan { + operator: Arc::new(Operator::Project( + ProjectOperator { + columns: vec![ + col_ref_1.clone(), + ColumnRef( + ColumnCatalog { + id: Some(1), + name: "c2".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: false + }, + } + ) + ], + } + )), + children: vec![ + Arc::new( + FilterOperator::new( + Binary { + op: Gt, + left_expr: Box::new(col_ref_1), + right_expr: Box::new(Constant(Int32(Some(2)))), + ty: Integer, + }, + LogicalSelectPlan { + operator: Arc::new(Operator::Scan( + ScanOperator { + table_ref_id: 0, + columns: vec![], + sort_fields: vec![], + pre_where: vec![], + limit: None, + } + )), + children: vec![], + }, + false + ) + ) + ], + } + ) + } + + fn table_with_col_and_c1_compare_c2_mock() -> LogicalPlan { + let col_ref_1 = ColumnRef( + ColumnCatalog { + id: Some(0), + name: "c1".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: true + }, + } + ); + let col_ref_2 = ColumnRef( + ColumnCatalog { + id: Some(1), + name: "c2".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: false + }, + } + ); + LogicalPlan::Select( + LogicalSelectPlan { + operator: Arc::new(Operator::Project( + ProjectOperator { + columns: vec![ + col_ref_1.clone(), + col_ref_2.clone() + ], + } + )), + children: vec![ + Arc::new( + FilterOperator::new( + Binary { + op: Gt, + left_expr: Box::new(col_ref_1), + right_expr: Box::new(col_ref_2), + ty: Integer, + }, + LogicalSelectPlan { + operator: Arc::new(Operator::Scan( + ScanOperator { + table_ref_id: 0, + columns: vec![], + sort_fields: vec![], + pre_where: vec![], + limit: None, + } + )), + children: vec![], + }, + false + ) + ) + ], + } + ) + } + fn table_with_col_and_c1_avg_mock() -> LogicalPlan { + LogicalPlan::Select( + LogicalSelectPlan { + operator: Arc::new(Operator::Project( + ProjectOperator { + columns: vec![InputRef { + index: 0, + ty: Integer, + }], + } + )), + children: vec![ + Arc::new( + LogicalSelectPlan { + operator: Arc::new(Operator::Aggregate( + AggregateOperator { + groupby_exprs: vec![], + agg_calls: vec![ + AggCall { + kind: AggKind::Avg, + args: vec![ + ColumnRef( + ColumnCatalog { + id: Some(0), + name: "c1".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: true + }, + } + ) + ], + ty: Integer, + } + ], + } + )), + children: vec![ + Arc::new( + LogicalSelectPlan { + operator: Arc::new(Operator::Scan( + ScanOperator { + table_ref_id: 0, + columns: vec![], + sort_fields: vec![], + pre_where: vec![], + limit: None, + } + )), + children: vec![], + } + ) + ], + } + ) + ], + } + ) + } + fn table_with_col_nested_mock() -> LogicalPlan { + let col_ref_1 = ColumnRef( + ColumnCatalog { + id: Some(0), + name: "c1".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: true + }, + } + ); + let col_ref_2 = ColumnRef( + ColumnCatalog { + id: Some(1), + name: "c2".to_string(), + nullable: false, + desc: ColumnDesc { + column_datatype: Integer, + is_primary: false + }, + } + ); + LogicalPlan::Select( + LogicalSelectPlan { + operator: Arc::new(Operator::Project( + ProjectOperator { + columns: vec![ + col_ref_1.clone(), + col_ref_2.clone() + ], + } + )), + children: vec![ + Arc::new( + FilterOperator::new( + Binary { + op: Gt, + left_expr: Box::new( + Binary { + op: Minus, + left_expr: Box::new(col_ref_1), + right_expr: Box::new(col_ref_2), + ty: Integer, + } + ), + right_expr: Box::new(Constant(Int32(Some(1)))), + ty: Integer, + }, + LogicalSelectPlan { + operator: Arc::new(Operator::Scan( + ScanOperator { + table_ref_id: 0, + columns: vec![], + sort_fields: vec![], + pre_where: vec![], + limit: None, + } + )), + children: vec![], + }, + false + ) + ) + ], + } + ) + } } \ No newline at end of file diff --git a/src/expression/evaluator.rs b/src/expression/evaluator.rs index edc75d22..9cda6723 100644 --- a/src/expression/evaluator.rs +++ b/src/expression/evaluator.rs @@ -42,7 +42,7 @@ impl ScalarExpression { ScalarExpression::InputRef { index, .. } => batch.schema().field(*index).clone(), ScalarExpression::Alias { alias, expr, .. } => { - let logic_type = expr.return_type().unwrap(); + let logic_type = expr.return_type(); Field::new(alias, logic_type.into(), true) } ScalarExpression::TypeCast { expr, ty, .. } => { @@ -64,7 +64,7 @@ impl ScalarExpression { Field::new(new_name.as_str(), data_type, true) } ScalarExpression::IsNull { expr } => { - let data_type = DataType::from(expr.return_type().unwrap()); + let data_type = DataType::from(expr.return_type()); let new_name = format!("{}", data_type); Field::new(new_name.as_str(), data_type, true) } diff --git a/src/expression/mod.rs b/src/expression/mod.rs index 7d027550..a63dad67 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -69,26 +69,26 @@ impl ScalarExpression { } } - pub fn return_type(&self) -> Option { + pub fn return_type(&self) -> LogicalType { match self { - Self::Constant(v) => Some(v.logic_type().clone()), - Self::ColumnRef(col) => Some(col.datatype().clone()), + Self::Constant(v) => v.logic_type().clone(), + Self::ColumnRef(col) => col.datatype().clone(), Self::Binary { ty: return_type, .. - } => Some(return_type.clone()), + } => return_type.clone(), Self::Unary { ty: return_type, .. - } => Some(return_type.clone()), + } => return_type.clone(), Self::TypeCast { ty: return_type, .. - } => Some(return_type.clone()), + } => return_type.clone(), Self::AggCall { ty: return_type, .. - } => Some(return_type.clone()), + } => return_type.clone(), Self::InputRef { ty: return_type, .. - } => Some(return_type.clone()), - Self::IsNull { .. } => Some(LogicalType::Boolean), + } => return_type.clone(), + Self::IsNull { .. } => LogicalType::Boolean, Self::Alias { expr, .. } => expr.return_type(), } } @@ -144,6 +144,27 @@ pub enum BinaryOperator { impl From for BinaryOperator { fn from(value: SqlBinaryOperator) -> Self { - todo!() + match value { + SqlBinaryOperator::Plus => BinaryOperator::Plus, + SqlBinaryOperator::Minus => BinaryOperator::Minus, + SqlBinaryOperator::Multiply => BinaryOperator::Multiply, + SqlBinaryOperator::Divide => BinaryOperator::Divide, + SqlBinaryOperator::Modulo => BinaryOperator::Modulo, + SqlBinaryOperator::StringConcat => BinaryOperator::StringConcat, + SqlBinaryOperator::Gt => BinaryOperator::Gt, + SqlBinaryOperator::Lt => BinaryOperator::Lt, + SqlBinaryOperator::GtEq => BinaryOperator::GtEq, + SqlBinaryOperator::LtEq => BinaryOperator::LtEq, + SqlBinaryOperator::Spaceship => BinaryOperator::Spaceship, + SqlBinaryOperator::Eq => BinaryOperator::Eq, + SqlBinaryOperator::NotEq => BinaryOperator::NotEq, + SqlBinaryOperator::And => BinaryOperator::And, + SqlBinaryOperator::Or => BinaryOperator::Or, + SqlBinaryOperator::Xor => BinaryOperator::Xor, + SqlBinaryOperator::BitwiseOr => BinaryOperator::BitwiseOr, + SqlBinaryOperator::BitwiseAnd => BinaryOperator::BitwiseAnd, + SqlBinaryOperator::BitwiseXor => BinaryOperator::BitwiseXor, + _ => todo!() + } } } diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 4609eb88..1ff93972 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::sync::Arc; use arrow::record_batch::RecordBatch; From d5401693c2fe700183fe9341d343c33ad8944fc6 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Fri, 28 Jul 2023 15:51:19 +0800 Subject: [PATCH 2/3] feat(insert): support `sql insert` simple syntax --- src/binder/{create.rs => create_table.rs} | 46 +++++++------- src/binder/insert.rs | 63 +++++++++++++++++++ src/binder/mod.rs | 16 ++++- src/binder/select.rs | 2 +- src/catalog/column.rs | 4 -- src/catalog/table.rs | 18 +++++- src/db.rs | 12 +++- .../physical/physical_plan_builder.rs | 1 + src/execution_v1/physical_plan/mod.rs | 3 + .../physical_plan/physical_insert.rs | 12 ++++ .../physical_plan/physical_plan_builder.rs | 61 +++++++++++++----- src/execution_v1/volcano_executor/insert.rs | 56 +++++++++++++++++ src/execution_v1/volcano_executor/mod.rs | 10 ++- src/planner/logical_create_table_plan.rs | 6 +- src/planner/logical_insert_plan.rs | 6 ++ src/planner/mod.rs | 3 + src/planner/operator/create_table.rs | 7 +-- src/planner/operator/insert.rs | 9 +++ src/planner/operator/mod.rs | 4 +- src/storage/memory.rs | 33 +++++++--- src/storage/mod.rs | 2 + src/types/mod.rs | 2 +- 22 files changed, 304 insertions(+), 72 deletions(-) rename src/binder/{create.rs => create_table.rs} (62%) create mode 100644 src/binder/insert.rs create mode 100644 src/execution_v1/physical_plan/physical_insert.rs create mode 100644 src/execution_v1/volcano_executor/insert.rs create mode 100644 src/planner/logical_insert_plan.rs create mode 100644 src/planner/operator/insert.rs diff --git a/src/binder/create.rs b/src/binder/create_table.rs similarity index 62% rename from src/binder/create.rs rename to src/binder/create_table.rs index d1d78352..6abfa0d7 100644 --- a/src/binder/create.rs +++ b/src/binder/create_table.rs @@ -1,5 +1,4 @@ use std::collections::HashSet; - use anyhow::Result; use sqlparser::ast::{ColumnDef, ObjectName}; @@ -7,24 +6,25 @@ use super::Binder; use crate::binder::{lower_case_name, split_name}; use crate::catalog::ColumnCatalog; use crate::planner::logical_create_table_plan::LogicalCreateTablePlan; +use crate::planner::operator::create_table::CreateOperator; impl Binder { pub(crate) fn bind_create_table( &mut self, - name: ObjectName, + name: &ObjectName, columns: &[ColumnDef], ) -> Result { let name = lower_case_name(&name); - let (_, table_name) = split_name(&name)?; // check duplicated column names let mut set = HashSet::new(); for col in columns.iter() { - if !set.insert(col.name.value.clone()) { + let col_name = &col.name.value; + if !set.insert(col_name.clone()) { return Err(anyhow::Error::msg(format!( "bind duplicated column {}", - col.name.value.clone() + col_name ))); } } @@ -35,8 +35,10 @@ impl Binder { .collect(); let plan = LogicalCreateTablePlan { - table_name: table_name.to_string(), - columns, + operator: CreateOperator { + table_name: table_name.to_string(), + columns + }, }; Ok(plan) } @@ -46,7 +48,7 @@ impl Binder { mod tests { use super::*; use crate::binder::BinderContext; - use crate::catalog::{ColumnDesc, RootCatalog}; + use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog}; use crate::planner::LogicalPlan; use crate::types::LogicalType; @@ -58,19 +60,21 @@ mod tests { let plan1 = binder.bind(&stmt[0]).unwrap(); let plan2 = LogicalPlan::CreateTable(LogicalCreateTablePlan { - table_name: "t1".to_string(), - columns: vec![ - ColumnCatalog::new( - "id".to_string(), - false, - ColumnDesc::new(LogicalType::Integer, false) - ), - ColumnCatalog::new( - "name".to_string(), - false, - ColumnDesc::new(LogicalType::Varchar, false) - ) - ], + operator: CreateOperator { + table_name: "t1".to_string(), + columns: vec![ + ColumnCatalog::new( + "id".to_string(), + false, + ColumnDesc::new(LogicalType::Integer, false) + ), + ColumnCatalog::new( + "name".to_string(), + false, + ColumnDesc::new(LogicalType::Varchar, false) + ) + ], + }, }); assert_eq!(plan1, plan2); diff --git a/src/binder/insert.rs b/src/binder/insert.rs new file mode 100644 index 00000000..95cb9a99 --- /dev/null +++ b/src/binder/insert.rs @@ -0,0 +1,63 @@ +use sqlparser::ast::{Expr, Ident, ObjectName}; +use anyhow::Result; +use itertools::Itertools; +use crate::binder::{Binder, lower_case_name, split_name}; +use crate::expression::ScalarExpression; +use crate::planner::logical_insert_plan::LogicalInsertPlan; +use crate::planner::operator::insert::InsertOperator; + +impl Binder { + pub(crate) fn bind_insert( + &mut self, + name: ObjectName, + idents: &[Ident], + rows: &Vec> + ) -> Result { + let name = lower_case_name(&name); + let (_, table_name) = split_name(&name)?; + + if let Some(table) = self.context.catalog.get_table_by_name(table_name) { + let mut col_idxs = Vec::new(); + + for ident in idents { + let col_name = &ident.value; + if let Some(col_idx) = table.get_column_id_by_name(col_name) { + col_idxs.push(col_idx.clone()); + } else { + return Err(anyhow::Error::msg(format!( + "not found column {} on table {}", + col_name, + table_name + ))) + } + } + if col_idxs.is_empty() { + col_idxs = (0..table.columns_len()).collect_vec() + } + + // 行转列 + let mut cols: Vec> = vec![Vec::new(); rows[0].len()]; + + for row in rows { + for (i, expr) in row.into_iter().enumerate() { + cols[i].push(self.bind_expr(expr)?); + } + } + + Ok(LogicalInsertPlan { + operator: InsertOperator { + table: table_name.to_string(), + col_idxs, + cols, + }, + }) + } else { + Err(anyhow::Error::msg(format!( + "not found table {}", + table_name + ))) + } + } + + +} \ No newline at end of file diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 879780c5..5df381cf 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -1,12 +1,13 @@ pub mod aggregate; -mod create; +mod create_table; pub mod expr; mod select; +mod insert; use std::collections::HashMap; use anyhow::Result; -use sqlparser::ast::{Ident, ObjectName, Statement}; +use sqlparser::ast::{Ident, ObjectName, SetExpr, Statement}; use crate::catalog::{RootCatalog, DEFAULT_SCHEMA_NAME, CatalogError}; use crate::expression::ScalarExpression; @@ -59,15 +60,24 @@ impl Binder { } pub fn bind(mut self, stmt: &Statement) -> Result { + println!("{:#?}", stmt); let plan = match stmt { Statement::Query(query) => { let plan = self.bind_query(query)?; LogicalPlan::Select(plan) } Statement::CreateTable { name, columns, .. } => { - let plan = self.bind_create_table(name.to_owned(), &columns)?; + let plan = self.bind_create_table(name, &columns)?; LogicalPlan::CreateTable(plan) } + Statement::Insert { table_name, columns, source, .. } => { + if let SetExpr::Values(values) = source.body.as_ref() { + let plan = self.bind_insert(table_name.to_owned(), columns, &values.rows)?; + LogicalPlan::Insert(plan) + } else { + todo!() + } + } _ => unimplemented!(), }; Ok(plan) diff --git a/src/binder/select.rs b/src/binder/select.rs index 05130e28..b3120f70 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -200,7 +200,7 @@ impl Binder { let mut exprs = vec![]; for ref_id in self.context.bind_table.values().cloned().collect_vec() { let table = self.context.catalog.get_table(ref_id).unwrap(); - for (col_id, col) in &table.get_all_columns() { + for (col_id, col) in &table.all_columns() { let column_ref_id = ColumnRefId::from_table(ref_id, *col_id); // self.record_regular_table_column( // &table.name(), diff --git a/src/catalog/column.rs b/src/catalog/column.rs index c91445f5..6ce4d542 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -75,10 +75,6 @@ impl ColumnDesc { } } - pub(crate) fn is_primary(&self) -> bool { - self.is_primary - } - pub(crate) fn get_datatype(&self) -> LogicalType { self.column_datatype.clone() } diff --git a/src/catalog/table.rs b/src/catalog/table.rs index fbfbb1c8..5024208a 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -1,10 +1,12 @@ use std::collections::HashMap; +use std::sync::Arc; +use arrow::datatypes::{Schema, SchemaRef}; use itertools::Itertools; use crate::catalog::{CatalogError, ColumnCatalog}; use crate::types::{ColumnIdx, IdGenerator, TableIdx}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct TableCatalog { pub id: Option, pub name: String, @@ -15,6 +17,10 @@ pub struct TableCatalog { } impl TableCatalog { + pub(crate) fn columns_len(&self) -> usize { + self.columns.len() + } + pub(crate) fn get_column_by_id(&self, id: ColumnIdx) -> Option<&ColumnCatalog> { self.columns.get(id) } @@ -32,13 +38,21 @@ impl TableCatalog { self.column_idxs.contains_key(name) } - pub(crate) fn get_all_columns(&self) -> Vec<(ColumnIdx, &ColumnCatalog)> { + pub(crate) fn all_columns(&self) -> Vec<(ColumnIdx, &ColumnCatalog)> { self.columns .iter() .enumerate() .collect_vec() } + // TODO: 缓存schema + pub(crate) fn schema(&self) -> SchemaRef { + let fields = self.columns.iter() + .map(ColumnCatalog::to_field) + .collect_vec(); + Arc::new(Schema::new(fields)) + } + /// Add a column to the table catalog. pub(crate) fn add_column( &mut self, diff --git a/src/db.rs b/src/db.rs index b4b65d79..37a02101 100644 --- a/src/db.rs +++ b/src/db.rs @@ -123,6 +123,7 @@ pub enum DatabaseError { mod test { use std::sync::Arc; use arrow::array::{BooleanArray, Int32Array}; + use arrow::compute::concat_batches; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use itertools::Itertools; @@ -177,9 +178,14 @@ mod test { let database = Database::new_on_mem(); tokio_test::block_on(async move { - let _batch = database.run("create table t1 (a int, b int)").await?; - let batch = database.run("select * from t1").await?; - println!("{:#?}", batch); + let _ = database.run("create table t1 (a int, b boolean)").await?; + let _ = database.run("insert into t1 values (1, true), (2, false)").await?; + let vec_batch = database.run("select * from t1").await?; + + let table = database.storage + .get_catalog() + .get_table(0).unwrap().clone(); + println!("{:#?}", concat_batches(&table.schema(), &vec_batch)); Ok(()) }) diff --git a/src/execution/physical/physical_plan_builder.rs b/src/execution/physical/physical_plan_builder.rs index 255e7c3a..87f65edd 100644 --- a/src/execution/physical/physical_plan_builder.rs +++ b/src/execution/physical/physical_plan_builder.rs @@ -30,6 +30,7 @@ impl PhysicalPlanBuilder { match plan { LogicalPlan::Select(select) => self.build_select_logical_plan(select), LogicalPlan::CreateTable(_) => todo!(), + LogicalPlan::Insert(_) => todo!(), } } diff --git a/src/execution_v1/physical_plan/mod.rs b/src/execution_v1/physical_plan/mod.rs index 656e2c29..9e760fcd 100644 --- a/src/execution_v1/physical_plan/mod.rs +++ b/src/execution_v1/physical_plan/mod.rs @@ -1,4 +1,5 @@ use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTable; +use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert; use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection; use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan; @@ -6,9 +7,11 @@ pub(crate) mod physical_create_table; pub(crate) mod physical_plan_builder; pub(crate) mod physical_projection; pub(crate) mod physical_table_scan; +pub(crate) mod physical_insert; #[derive(Debug)] pub enum PhysicalOperator { + Insert(PhysicalInsert), CreateTable(PhysicalCreateTable), TableScan(PhysicalTableScan), Projection(PhysicalProjection), diff --git a/src/execution_v1/physical_plan/physical_insert.rs b/src/execution_v1/physical_plan/physical_insert.rs new file mode 100644 index 00000000..8c1d5ef4 --- /dev/null +++ b/src/execution_v1/physical_plan/physical_insert.rs @@ -0,0 +1,12 @@ +use crate::expression::ScalarExpression; +use crate::types::ColumnIdx; + +#[derive(Debug)] +pub struct PhysicalInsert { + /// Table name to insert to + pub table_name: String, + + pub col_idxs: Vec, + /// List of columns of the table + pub cols: Vec> +} \ No newline at end of file diff --git a/src/execution_v1/physical_plan/physical_plan_builder.rs b/src/execution_v1/physical_plan/physical_plan_builder.rs index 722cdb6f..24e31d5a 100644 --- a/src/execution_v1/physical_plan/physical_plan_builder.rs +++ b/src/execution_v1/physical_plan/physical_plan_builder.rs @@ -9,6 +9,10 @@ use crate::planner::operator::Operator; use crate::planner::LogicalPlan; use anyhow::anyhow; use anyhow::Result; +use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert; +use crate::planner::logical_insert_plan::LogicalInsertPlan; +use crate::planner::operator::insert::InsertOperator; +use crate::planner::operator::project::ProjectOperator; pub struct PhysicalPlanBuilder { plan_id: u32, @@ -27,33 +31,47 @@ impl PhysicalPlanBuilder { pub fn build_plan(&mut self, plan: &LogicalPlan) -> Result { match plan { - LogicalPlan::Select(select) => self.build_select_logical_plan(select), - LogicalPlan::CreateTable(create_table) => { - self.build_create_table_logic_plan(create_table) - } + LogicalPlan::Select(select) => + self.build_select_logical_plan(select), + LogicalPlan::CreateTable(create_table) => + Ok(self.build_create_table_logic_plan(create_table)), + LogicalPlan::Insert(insert) => + Ok(self.build_insert_logic_plan(insert)) } } + fn build_insert_logic_plan( + &mut self, + plan: &LogicalInsertPlan, + ) -> PhysicalOperator { + let InsertOperator { table, col_idxs, cols: rows } = plan.operator.clone(); + + PhysicalOperator::Insert( + PhysicalInsert { + table_name: table, + col_idxs, + cols: rows, + } + ) + } + fn build_create_table_logic_plan( &mut self, plan: &LogicalCreateTablePlan, - ) -> Result { - Ok(PhysicalOperator::CreateTable(PhysicalCreateTable { - table_name: plan.table_name.to_string(), - columns: plan.columns.clone(), - })) + ) -> PhysicalOperator { + let operator = &plan.operator; + + PhysicalOperator::CreateTable( + PhysicalCreateTable { + table_name: operator.table_name.to_string(), + columns: operator.columns.clone(), + } + ) } fn build_select_logical_plan(&mut self, plan: &LogicalSelectPlan) -> Result { match plan.operator.as_ref() { - Operator::Project(op) => { - let input = self.build_select_logical_plan(plan.child(0)?)?; - Ok(PhysicalOperator::Projection(PhysicalProjection { - plan_id: self.next_plan_id(), - exprs: op.columns.clone(), - input: Box::new(input), - })) - } + Operator::Project(op) => self.build_physical_projection(plan, op), Operator::Scan(scan) => Ok(self.build_physical_scan(scan.clone())), _ => Err(anyhow!(format!( "Unsupported physical plan: {:?}", @@ -62,6 +80,15 @@ impl PhysicalPlanBuilder { } } + fn build_physical_projection(&mut self, plan: &LogicalSelectPlan, op: &ProjectOperator) -> Result { + let input = self.build_select_logical_plan(plan.child(0)?)?; + Ok(PhysicalOperator::Projection(PhysicalProjection { + plan_id: self.next_plan_id(), + exprs: op.columns.clone(), + input: Box::new(input), + })) + } + fn build_physical_scan(&mut self, base: ScanOperator) -> PhysicalOperator { PhysicalOperator::TableScan(PhysicalTableScan { plan_id: self.next_plan_id(), base }) } diff --git a/src/execution_v1/volcano_executor/insert.rs b/src/execution_v1/volcano_executor/insert.rs new file mode 100644 index 00000000..b75a4b16 --- /dev/null +++ b/src/execution_v1/volcano_executor/insert.rs @@ -0,0 +1,56 @@ +use arrow::array::{ArrayRef, new_null_array}; +use arrow::datatypes::DataType; +use arrow::record_batch::RecordBatch; +use futures_async_stream::try_stream; +use itertools::Itertools; +use crate::catalog::CatalogError; +use crate::execution_v1::ExecutorError; +use crate::expression::ScalarExpression; +use crate::storage::{Storage, Table}; +use crate::types::ColumnIdx; +use crate::types::value::DataValue; + +pub struct Insert { } + +impl Insert { + #[try_stream(boxed, ok = RecordBatch, error = ExecutorError)] + pub async fn execute(table_name: String, col_idxs: Vec, mut cols: Vec>, storage: impl Storage) { + if let Some(table) = storage.get_catalog().get_table_by_name(&table_name) { + let row_len = cols[0].len(); + // 为了后继pop而倒序 + cols.reverse(); + + let vec_arr_ref = table.all_columns() + .into_iter() + .map(|(col_idx, col_catalog)| { + if col_idxs.contains(&col_idx) { + let col = cols.pop().unwrap(); + assert_eq!(col.len(), row_len); + + let mut builder = DataValue::new_builder(&col[0].return_type())?; + + for expr in col { + match expr { + ScalarExpression::Constant(value) => { + DataValue::append_for_builder(&value, &mut builder)?; + }, + _ => unreachable!() + } + } + + Ok::(builder.finish()) + } else { + Ok(new_null_array(&DataType::from(col_catalog.datatype().clone()), row_len)) + } + }) + .try_collect()?; + + + let new_batch = RecordBatch::try_new(table.schema(), vec_arr_ref)?; + + storage.get_table(table.id.unwrap())?.append(new_batch)?; + } else { + Err(CatalogError::NotFound("root", table_name.to_string()))?; + } + } +} \ No newline at end of file diff --git a/src/execution_v1/volcano_executor/mod.rs b/src/execution_v1/volcano_executor/mod.rs index 9aefaed0..41701f0b 100644 --- a/src/execution_v1/volcano_executor/mod.rs +++ b/src/execution_v1/volcano_executor/mod.rs @@ -1,6 +1,7 @@ mod create_table; mod projection; mod table_scan; +mod insert; use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection; use crate::execution_v1::physical_plan::PhysicalOperator; @@ -12,6 +13,8 @@ use crate::storage::StorageImpl; use arrow::record_batch::RecordBatch; use futures::stream::BoxStream; use futures::TryStreamExt; +use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert; +use crate::execution_v1::volcano_executor::insert::Insert; pub type BoxedExecutor = BoxStream<'static, Result>; @@ -38,8 +41,11 @@ impl VolcanoExecutor { PhysicalOperator::CreateTable(op) => match &self.storage { StorageImpl::InMemoryStorage(storage) => CreateTable::execute(op, storage.clone()), }, - _ => { - unimplemented!() + PhysicalOperator::Insert(PhysicalInsert { table_name, col_idxs, cols, }) => { + match &self.storage { + StorageImpl::InMemoryStorage(storage) => + Insert::execute(table_name, col_idxs, cols, storage.clone()), + } } } } diff --git a/src/planner/logical_create_table_plan.rs b/src/planner/logical_create_table_plan.rs index 61511028..df2c2fb4 100644 --- a/src/planner/logical_create_table_plan.rs +++ b/src/planner/logical_create_table_plan.rs @@ -1,8 +1,8 @@ -use crate::catalog::ColumnCatalog; +use crate::planner::operator::create_table::CreateOperator; + #[derive(Debug, PartialEq, Clone)] pub struct LogicalCreateTablePlan { - pub table_name: String, - pub columns: Vec, + pub operator: CreateOperator } // use sqlparser::ast::{ColumnDef, ColumnOption, Statement}; diff --git a/src/planner/logical_insert_plan.rs b/src/planner/logical_insert_plan.rs new file mode 100644 index 00000000..ff1cc8e3 --- /dev/null +++ b/src/planner/logical_insert_plan.rs @@ -0,0 +1,6 @@ +use crate::planner::operator::insert::InsertOperator; + +#[derive(Debug, PartialEq, Clone)] +pub struct LogicalInsertPlan { + pub operator: InsertOperator +} \ No newline at end of file diff --git a/src/planner/mod.rs b/src/planner/mod.rs index a2f4dbd2..d0a86885 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -3,7 +3,9 @@ pub mod logical_create_table_plan; pub mod logical_plan_builder; pub mod logical_select_plan; pub mod operator; +pub mod logical_insert_plan; +use crate::planner::logical_insert_plan::LogicalInsertPlan; use self::{ logical_create_table_plan::LogicalCreateTablePlan, logical_select_plan::LogicalSelectPlan, }; @@ -12,5 +14,6 @@ use self::{ pub enum LogicalPlan { Select(LogicalSelectPlan), CreateTable(LogicalCreateTablePlan), + Insert(LogicalInsertPlan) } pub enum LogicalPlanError {} diff --git a/src/planner/operator/create_table.rs b/src/planner/operator/create_table.rs index 00f53bb7..ca462945 100644 --- a/src/planner/operator/create_table.rs +++ b/src/planner/operator/create_table.rs @@ -1,10 +1,9 @@ -use crate::catalog::ColumnDesc; +use crate::catalog::ColumnCatalog; #[derive(Debug, PartialEq, Clone)] -#[allow(dead_code)] -pub struct CreateTableOperator { +pub struct CreateOperator { /// Table name to insert to pub table_name: String, /// List of columns of the table - pub columns: Vec<(String, bool, ColumnDesc)>, + pub columns: Vec, } diff --git a/src/planner/operator/insert.rs b/src/planner/operator/insert.rs new file mode 100644 index 00000000..7b418a5c --- /dev/null +++ b/src/planner/operator/insert.rs @@ -0,0 +1,9 @@ +use crate::expression::ScalarExpression; +use crate::types::ColumnIdx; + +#[derive(Debug, PartialEq, Clone)] +pub struct InsertOperator { + pub table: String, + pub(crate) col_idxs: Vec, + pub(crate) cols: Vec>, +} \ No newline at end of file diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 97d67b7f..0b01999f 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -6,8 +6,8 @@ pub mod limit; pub mod project; pub mod scan; pub mod sort; +pub mod insert; -use crate::planner::operator::create_table::CreateTableOperator; use std::sync::Arc; use self::{ @@ -27,6 +27,4 @@ pub enum Operator { Scan(ScanOperator), Sort(SortOperator), Limit(LimitOperator), - #[allow(dead_code)] - CreateTable(CreateTableOperator), } diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 1ff93972..2c4b0016 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -9,11 +9,11 @@ use crate::types::{LogicalType, TableIdx}; #[derive(Debug)] pub struct InMemoryStorage { - inner: Arc>, + inner: Arc>, } #[derive(Debug)] -struct Inner { +struct StorageInner { catalog: RootCatalog, tables: Vec, } @@ -28,7 +28,7 @@ impl InMemoryStorage { pub fn new() -> Self { InMemoryStorage { inner: Arc::new(Mutex::new( - Inner { + StorageInner { catalog: RootCatalog::default(), tables: Vec::new(), }) @@ -58,7 +58,7 @@ impl Storage for InMemoryStorage { let table_id = inner.catalog.add_table( table_name.to_string(), - table.columns_vec.clone() + table.inner.lock().columns.clone() )?; table.table_id = table_id; @@ -89,8 +89,13 @@ impl Storage for InMemoryStorage { pub struct InMemoryTable { table_id: TableIdx, table_name: String, + inner: Arc> +} + +#[derive(Debug)] +struct TableInner { data: Vec, - columns_vec: Vec, + columns: Vec, } impl InMemoryTable { @@ -99,8 +104,13 @@ impl InMemoryTable { Ok(Self { table_id: 0, table_name: name.to_string(), - data, - columns_vec: columns, + + inner: Arc::new(Mutex::new( + TableInner { + data, + columns, + } + )), }) } @@ -133,6 +143,13 @@ impl Table for InMemoryTable { ) -> Result { InMemoryTransaction::start(self) } + + fn append(&self, record_batch: RecordBatch) -> Result<(), StorageError> { + self.inner.lock() + .data.push(record_batch); + + Ok(()) + } } pub struct InMemoryTransaction { @@ -144,7 +161,7 @@ impl InMemoryTransaction { pub fn start(table: &InMemoryTable) -> Result { Ok(Self { batch_cursor: 0, - data: table.data.clone(), + data: table.inner.lock().data.clone(), }) } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 32300f6a..a89d6b66 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -42,6 +42,8 @@ pub trait Table: Sync + Send + Clone + 'static { bounds: Bounds, projection: Projections, ) -> Result; + + fn append(&self, record_batch: RecordBatch) -> Result<(), StorageError>; } // currently we use a transaction to hold csv reader diff --git a/src/types/mod.rs b/src/types/mod.rs index 983679e5..8e6ec90f 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -9,7 +9,7 @@ use strum_macros::AsRefStr; use crate::types::errors::TypeError; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub(crate) struct IdGenerator { buf: usize } From 60ddf017cd57bb9600df6b1435b54ae21afb0268 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Fri, 28 Jul 2023 16:01:20 +0800 Subject: [PATCH 3/3] Revert "feat(insert): support `sql insert` simple syntax" This reverts commit d5401693c2fe700183fe9341d343c33ad8944fc6. --- src/binder/{create_table.rs => create.rs} | 46 +++++++------- src/binder/insert.rs | 63 ------------------- src/binder/mod.rs | 16 +---- src/binder/select.rs | 2 +- src/catalog/column.rs | 4 ++ src/catalog/table.rs | 18 +----- src/db.rs | 12 +--- .../physical/physical_plan_builder.rs | 1 - src/execution_v1/physical_plan/mod.rs | 3 - .../physical_plan/physical_insert.rs | 12 ---- .../physical_plan/physical_plan_builder.rs | 61 +++++------------- src/execution_v1/volcano_executor/insert.rs | 56 ----------------- src/execution_v1/volcano_executor/mod.rs | 10 +-- src/planner/logical_create_table_plan.rs | 6 +- src/planner/logical_insert_plan.rs | 6 -- src/planner/mod.rs | 3 - src/planner/operator/create_table.rs | 7 ++- src/planner/operator/insert.rs | 9 --- src/planner/operator/mod.rs | 4 +- src/storage/memory.rs | 33 +++------- src/storage/mod.rs | 2 - src/types/mod.rs | 2 +- 22 files changed, 72 insertions(+), 304 deletions(-) rename src/binder/{create_table.rs => create.rs} (62%) delete mode 100644 src/binder/insert.rs delete mode 100644 src/execution_v1/physical_plan/physical_insert.rs delete mode 100644 src/execution_v1/volcano_executor/insert.rs delete mode 100644 src/planner/logical_insert_plan.rs delete mode 100644 src/planner/operator/insert.rs diff --git a/src/binder/create_table.rs b/src/binder/create.rs similarity index 62% rename from src/binder/create_table.rs rename to src/binder/create.rs index 6abfa0d7..d1d78352 100644 --- a/src/binder/create_table.rs +++ b/src/binder/create.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; + use anyhow::Result; use sqlparser::ast::{ColumnDef, ObjectName}; @@ -6,25 +7,24 @@ use super::Binder; use crate::binder::{lower_case_name, split_name}; use crate::catalog::ColumnCatalog; use crate::planner::logical_create_table_plan::LogicalCreateTablePlan; -use crate::planner::operator::create_table::CreateOperator; impl Binder { pub(crate) fn bind_create_table( &mut self, - name: &ObjectName, + name: ObjectName, columns: &[ColumnDef], ) -> Result { let name = lower_case_name(&name); + let (_, table_name) = split_name(&name)?; // check duplicated column names let mut set = HashSet::new(); for col in columns.iter() { - let col_name = &col.name.value; - if !set.insert(col_name.clone()) { + if !set.insert(col.name.value.clone()) { return Err(anyhow::Error::msg(format!( "bind duplicated column {}", - col_name + col.name.value.clone() ))); } } @@ -35,10 +35,8 @@ impl Binder { .collect(); let plan = LogicalCreateTablePlan { - operator: CreateOperator { - table_name: table_name.to_string(), - columns - }, + table_name: table_name.to_string(), + columns, }; Ok(plan) } @@ -48,7 +46,7 @@ impl Binder { mod tests { use super::*; use crate::binder::BinderContext; - use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog}; + use crate::catalog::{ColumnDesc, RootCatalog}; use crate::planner::LogicalPlan; use crate::types::LogicalType; @@ -60,21 +58,19 @@ mod tests { let plan1 = binder.bind(&stmt[0]).unwrap(); let plan2 = LogicalPlan::CreateTable(LogicalCreateTablePlan { - operator: CreateOperator { - table_name: "t1".to_string(), - columns: vec![ - ColumnCatalog::new( - "id".to_string(), - false, - ColumnDesc::new(LogicalType::Integer, false) - ), - ColumnCatalog::new( - "name".to_string(), - false, - ColumnDesc::new(LogicalType::Varchar, false) - ) - ], - }, + table_name: "t1".to_string(), + columns: vec![ + ColumnCatalog::new( + "id".to_string(), + false, + ColumnDesc::new(LogicalType::Integer, false) + ), + ColumnCatalog::new( + "name".to_string(), + false, + ColumnDesc::new(LogicalType::Varchar, false) + ) + ], }); assert_eq!(plan1, plan2); diff --git a/src/binder/insert.rs b/src/binder/insert.rs deleted file mode 100644 index 95cb9a99..00000000 --- a/src/binder/insert.rs +++ /dev/null @@ -1,63 +0,0 @@ -use sqlparser::ast::{Expr, Ident, ObjectName}; -use anyhow::Result; -use itertools::Itertools; -use crate::binder::{Binder, lower_case_name, split_name}; -use crate::expression::ScalarExpression; -use crate::planner::logical_insert_plan::LogicalInsertPlan; -use crate::planner::operator::insert::InsertOperator; - -impl Binder { - pub(crate) fn bind_insert( - &mut self, - name: ObjectName, - idents: &[Ident], - rows: &Vec> - ) -> Result { - let name = lower_case_name(&name); - let (_, table_name) = split_name(&name)?; - - if let Some(table) = self.context.catalog.get_table_by_name(table_name) { - let mut col_idxs = Vec::new(); - - for ident in idents { - let col_name = &ident.value; - if let Some(col_idx) = table.get_column_id_by_name(col_name) { - col_idxs.push(col_idx.clone()); - } else { - return Err(anyhow::Error::msg(format!( - "not found column {} on table {}", - col_name, - table_name - ))) - } - } - if col_idxs.is_empty() { - col_idxs = (0..table.columns_len()).collect_vec() - } - - // 行转列 - let mut cols: Vec> = vec![Vec::new(); rows[0].len()]; - - for row in rows { - for (i, expr) in row.into_iter().enumerate() { - cols[i].push(self.bind_expr(expr)?); - } - } - - Ok(LogicalInsertPlan { - operator: InsertOperator { - table: table_name.to_string(), - col_idxs, - cols, - }, - }) - } else { - Err(anyhow::Error::msg(format!( - "not found table {}", - table_name - ))) - } - } - - -} \ No newline at end of file diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 5df381cf..879780c5 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -1,13 +1,12 @@ pub mod aggregate; -mod create_table; +mod create; pub mod expr; mod select; -mod insert; use std::collections::HashMap; use anyhow::Result; -use sqlparser::ast::{Ident, ObjectName, SetExpr, Statement}; +use sqlparser::ast::{Ident, ObjectName, Statement}; use crate::catalog::{RootCatalog, DEFAULT_SCHEMA_NAME, CatalogError}; use crate::expression::ScalarExpression; @@ -60,24 +59,15 @@ impl Binder { } pub fn bind(mut self, stmt: &Statement) -> Result { - println!("{:#?}", stmt); let plan = match stmt { Statement::Query(query) => { let plan = self.bind_query(query)?; LogicalPlan::Select(plan) } Statement::CreateTable { name, columns, .. } => { - let plan = self.bind_create_table(name, &columns)?; + let plan = self.bind_create_table(name.to_owned(), &columns)?; LogicalPlan::CreateTable(plan) } - Statement::Insert { table_name, columns, source, .. } => { - if let SetExpr::Values(values) = source.body.as_ref() { - let plan = self.bind_insert(table_name.to_owned(), columns, &values.rows)?; - LogicalPlan::Insert(plan) - } else { - todo!() - } - } _ => unimplemented!(), }; Ok(plan) diff --git a/src/binder/select.rs b/src/binder/select.rs index b3120f70..05130e28 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -200,7 +200,7 @@ impl Binder { let mut exprs = vec![]; for ref_id in self.context.bind_table.values().cloned().collect_vec() { let table = self.context.catalog.get_table(ref_id).unwrap(); - for (col_id, col) in &table.all_columns() { + for (col_id, col) in &table.get_all_columns() { let column_ref_id = ColumnRefId::from_table(ref_id, *col_id); // self.record_regular_table_column( // &table.name(), diff --git a/src/catalog/column.rs b/src/catalog/column.rs index 6ce4d542..c91445f5 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -75,6 +75,10 @@ impl ColumnDesc { } } + pub(crate) fn is_primary(&self) -> bool { + self.is_primary + } + pub(crate) fn get_datatype(&self) -> LogicalType { self.column_datatype.clone() } diff --git a/src/catalog/table.rs b/src/catalog/table.rs index 5024208a..fbfbb1c8 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -1,12 +1,10 @@ use std::collections::HashMap; -use std::sync::Arc; -use arrow::datatypes::{Schema, SchemaRef}; use itertools::Itertools; use crate::catalog::{CatalogError, ColumnCatalog}; use crate::types::{ColumnIdx, IdGenerator, TableIdx}; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub struct TableCatalog { pub id: Option, pub name: String, @@ -17,10 +15,6 @@ pub struct TableCatalog { } impl TableCatalog { - pub(crate) fn columns_len(&self) -> usize { - self.columns.len() - } - pub(crate) fn get_column_by_id(&self, id: ColumnIdx) -> Option<&ColumnCatalog> { self.columns.get(id) } @@ -38,21 +32,13 @@ impl TableCatalog { self.column_idxs.contains_key(name) } - pub(crate) fn all_columns(&self) -> Vec<(ColumnIdx, &ColumnCatalog)> { + pub(crate) fn get_all_columns(&self) -> Vec<(ColumnIdx, &ColumnCatalog)> { self.columns .iter() .enumerate() .collect_vec() } - // TODO: 缓存schema - pub(crate) fn schema(&self) -> SchemaRef { - let fields = self.columns.iter() - .map(ColumnCatalog::to_field) - .collect_vec(); - Arc::new(Schema::new(fields)) - } - /// Add a column to the table catalog. pub(crate) fn add_column( &mut self, diff --git a/src/db.rs b/src/db.rs index 37a02101..b4b65d79 100644 --- a/src/db.rs +++ b/src/db.rs @@ -123,7 +123,6 @@ pub enum DatabaseError { mod test { use std::sync::Arc; use arrow::array::{BooleanArray, Int32Array}; - use arrow::compute::concat_batches; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use itertools::Itertools; @@ -178,14 +177,9 @@ mod test { let database = Database::new_on_mem(); tokio_test::block_on(async move { - let _ = database.run("create table t1 (a int, b boolean)").await?; - let _ = database.run("insert into t1 values (1, true), (2, false)").await?; - let vec_batch = database.run("select * from t1").await?; - - let table = database.storage - .get_catalog() - .get_table(0).unwrap().clone(); - println!("{:#?}", concat_batches(&table.schema(), &vec_batch)); + let _batch = database.run("create table t1 (a int, b int)").await?; + let batch = database.run("select * from t1").await?; + println!("{:#?}", batch); Ok(()) }) diff --git a/src/execution/physical/physical_plan_builder.rs b/src/execution/physical/physical_plan_builder.rs index 87f65edd..255e7c3a 100644 --- a/src/execution/physical/physical_plan_builder.rs +++ b/src/execution/physical/physical_plan_builder.rs @@ -30,7 +30,6 @@ impl PhysicalPlanBuilder { match plan { LogicalPlan::Select(select) => self.build_select_logical_plan(select), LogicalPlan::CreateTable(_) => todo!(), - LogicalPlan::Insert(_) => todo!(), } } diff --git a/src/execution_v1/physical_plan/mod.rs b/src/execution_v1/physical_plan/mod.rs index 9e760fcd..656e2c29 100644 --- a/src/execution_v1/physical_plan/mod.rs +++ b/src/execution_v1/physical_plan/mod.rs @@ -1,5 +1,4 @@ use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTable; -use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert; use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection; use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan; @@ -7,11 +6,9 @@ pub(crate) mod physical_create_table; pub(crate) mod physical_plan_builder; pub(crate) mod physical_projection; pub(crate) mod physical_table_scan; -pub(crate) mod physical_insert; #[derive(Debug)] pub enum PhysicalOperator { - Insert(PhysicalInsert), CreateTable(PhysicalCreateTable), TableScan(PhysicalTableScan), Projection(PhysicalProjection), diff --git a/src/execution_v1/physical_plan/physical_insert.rs b/src/execution_v1/physical_plan/physical_insert.rs deleted file mode 100644 index 8c1d5ef4..00000000 --- a/src/execution_v1/physical_plan/physical_insert.rs +++ /dev/null @@ -1,12 +0,0 @@ -use crate::expression::ScalarExpression; -use crate::types::ColumnIdx; - -#[derive(Debug)] -pub struct PhysicalInsert { - /// Table name to insert to - pub table_name: String, - - pub col_idxs: Vec, - /// List of columns of the table - pub cols: Vec> -} \ No newline at end of file diff --git a/src/execution_v1/physical_plan/physical_plan_builder.rs b/src/execution_v1/physical_plan/physical_plan_builder.rs index 24e31d5a..722cdb6f 100644 --- a/src/execution_v1/physical_plan/physical_plan_builder.rs +++ b/src/execution_v1/physical_plan/physical_plan_builder.rs @@ -9,10 +9,6 @@ use crate::planner::operator::Operator; use crate::planner::LogicalPlan; use anyhow::anyhow; use anyhow::Result; -use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert; -use crate::planner::logical_insert_plan::LogicalInsertPlan; -use crate::planner::operator::insert::InsertOperator; -use crate::planner::operator::project::ProjectOperator; pub struct PhysicalPlanBuilder { plan_id: u32, @@ -31,47 +27,33 @@ impl PhysicalPlanBuilder { pub fn build_plan(&mut self, plan: &LogicalPlan) -> Result { match plan { - LogicalPlan::Select(select) => - self.build_select_logical_plan(select), - LogicalPlan::CreateTable(create_table) => - Ok(self.build_create_table_logic_plan(create_table)), - LogicalPlan::Insert(insert) => - Ok(self.build_insert_logic_plan(insert)) - } - } - - fn build_insert_logic_plan( - &mut self, - plan: &LogicalInsertPlan, - ) -> PhysicalOperator { - let InsertOperator { table, col_idxs, cols: rows } = plan.operator.clone(); - - PhysicalOperator::Insert( - PhysicalInsert { - table_name: table, - col_idxs, - cols: rows, + LogicalPlan::Select(select) => self.build_select_logical_plan(select), + LogicalPlan::CreateTable(create_table) => { + self.build_create_table_logic_plan(create_table) } - ) + } } fn build_create_table_logic_plan( &mut self, plan: &LogicalCreateTablePlan, - ) -> PhysicalOperator { - let operator = &plan.operator; - - PhysicalOperator::CreateTable( - PhysicalCreateTable { - table_name: operator.table_name.to_string(), - columns: operator.columns.clone(), - } - ) + ) -> Result { + Ok(PhysicalOperator::CreateTable(PhysicalCreateTable { + table_name: plan.table_name.to_string(), + columns: plan.columns.clone(), + })) } fn build_select_logical_plan(&mut self, plan: &LogicalSelectPlan) -> Result { match plan.operator.as_ref() { - Operator::Project(op) => self.build_physical_projection(plan, op), + Operator::Project(op) => { + let input = self.build_select_logical_plan(plan.child(0)?)?; + Ok(PhysicalOperator::Projection(PhysicalProjection { + plan_id: self.next_plan_id(), + exprs: op.columns.clone(), + input: Box::new(input), + })) + } Operator::Scan(scan) => Ok(self.build_physical_scan(scan.clone())), _ => Err(anyhow!(format!( "Unsupported physical plan: {:?}", @@ -80,15 +62,6 @@ impl PhysicalPlanBuilder { } } - fn build_physical_projection(&mut self, plan: &LogicalSelectPlan, op: &ProjectOperator) -> Result { - let input = self.build_select_logical_plan(plan.child(0)?)?; - Ok(PhysicalOperator::Projection(PhysicalProjection { - plan_id: self.next_plan_id(), - exprs: op.columns.clone(), - input: Box::new(input), - })) - } - fn build_physical_scan(&mut self, base: ScanOperator) -> PhysicalOperator { PhysicalOperator::TableScan(PhysicalTableScan { plan_id: self.next_plan_id(), base }) } diff --git a/src/execution_v1/volcano_executor/insert.rs b/src/execution_v1/volcano_executor/insert.rs deleted file mode 100644 index b75a4b16..00000000 --- a/src/execution_v1/volcano_executor/insert.rs +++ /dev/null @@ -1,56 +0,0 @@ -use arrow::array::{ArrayRef, new_null_array}; -use arrow::datatypes::DataType; -use arrow::record_batch::RecordBatch; -use futures_async_stream::try_stream; -use itertools::Itertools; -use crate::catalog::CatalogError; -use crate::execution_v1::ExecutorError; -use crate::expression::ScalarExpression; -use crate::storage::{Storage, Table}; -use crate::types::ColumnIdx; -use crate::types::value::DataValue; - -pub struct Insert { } - -impl Insert { - #[try_stream(boxed, ok = RecordBatch, error = ExecutorError)] - pub async fn execute(table_name: String, col_idxs: Vec, mut cols: Vec>, storage: impl Storage) { - if let Some(table) = storage.get_catalog().get_table_by_name(&table_name) { - let row_len = cols[0].len(); - // 为了后继pop而倒序 - cols.reverse(); - - let vec_arr_ref = table.all_columns() - .into_iter() - .map(|(col_idx, col_catalog)| { - if col_idxs.contains(&col_idx) { - let col = cols.pop().unwrap(); - assert_eq!(col.len(), row_len); - - let mut builder = DataValue::new_builder(&col[0].return_type())?; - - for expr in col { - match expr { - ScalarExpression::Constant(value) => { - DataValue::append_for_builder(&value, &mut builder)?; - }, - _ => unreachable!() - } - } - - Ok::(builder.finish()) - } else { - Ok(new_null_array(&DataType::from(col_catalog.datatype().clone()), row_len)) - } - }) - .try_collect()?; - - - let new_batch = RecordBatch::try_new(table.schema(), vec_arr_ref)?; - - storage.get_table(table.id.unwrap())?.append(new_batch)?; - } else { - Err(CatalogError::NotFound("root", table_name.to_string()))?; - } - } -} \ No newline at end of file diff --git a/src/execution_v1/volcano_executor/mod.rs b/src/execution_v1/volcano_executor/mod.rs index 41701f0b..9aefaed0 100644 --- a/src/execution_v1/volcano_executor/mod.rs +++ b/src/execution_v1/volcano_executor/mod.rs @@ -1,7 +1,6 @@ mod create_table; mod projection; mod table_scan; -mod insert; use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection; use crate::execution_v1::physical_plan::PhysicalOperator; @@ -13,8 +12,6 @@ use crate::storage::StorageImpl; use arrow::record_batch::RecordBatch; use futures::stream::BoxStream; use futures::TryStreamExt; -use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert; -use crate::execution_v1::volcano_executor::insert::Insert; pub type BoxedExecutor = BoxStream<'static, Result>; @@ -41,11 +38,8 @@ impl VolcanoExecutor { PhysicalOperator::CreateTable(op) => match &self.storage { StorageImpl::InMemoryStorage(storage) => CreateTable::execute(op, storage.clone()), }, - PhysicalOperator::Insert(PhysicalInsert { table_name, col_idxs, cols, }) => { - match &self.storage { - StorageImpl::InMemoryStorage(storage) => - Insert::execute(table_name, col_idxs, cols, storage.clone()), - } + _ => { + unimplemented!() } } } diff --git a/src/planner/logical_create_table_plan.rs b/src/planner/logical_create_table_plan.rs index df2c2fb4..61511028 100644 --- a/src/planner/logical_create_table_plan.rs +++ b/src/planner/logical_create_table_plan.rs @@ -1,8 +1,8 @@ -use crate::planner::operator::create_table::CreateOperator; - +use crate::catalog::ColumnCatalog; #[derive(Debug, PartialEq, Clone)] pub struct LogicalCreateTablePlan { - pub operator: CreateOperator + pub table_name: String, + pub columns: Vec, } // use sqlparser::ast::{ColumnDef, ColumnOption, Statement}; diff --git a/src/planner/logical_insert_plan.rs b/src/planner/logical_insert_plan.rs deleted file mode 100644 index ff1cc8e3..00000000 --- a/src/planner/logical_insert_plan.rs +++ /dev/null @@ -1,6 +0,0 @@ -use crate::planner::operator::insert::InsertOperator; - -#[derive(Debug, PartialEq, Clone)] -pub struct LogicalInsertPlan { - pub operator: InsertOperator -} \ No newline at end of file diff --git a/src/planner/mod.rs b/src/planner/mod.rs index d0a86885..a2f4dbd2 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -3,9 +3,7 @@ pub mod logical_create_table_plan; pub mod logical_plan_builder; pub mod logical_select_plan; pub mod operator; -pub mod logical_insert_plan; -use crate::planner::logical_insert_plan::LogicalInsertPlan; use self::{ logical_create_table_plan::LogicalCreateTablePlan, logical_select_plan::LogicalSelectPlan, }; @@ -14,6 +12,5 @@ use self::{ pub enum LogicalPlan { Select(LogicalSelectPlan), CreateTable(LogicalCreateTablePlan), - Insert(LogicalInsertPlan) } pub enum LogicalPlanError {} diff --git a/src/planner/operator/create_table.rs b/src/planner/operator/create_table.rs index ca462945..00f53bb7 100644 --- a/src/planner/operator/create_table.rs +++ b/src/planner/operator/create_table.rs @@ -1,9 +1,10 @@ -use crate::catalog::ColumnCatalog; +use crate::catalog::ColumnDesc; #[derive(Debug, PartialEq, Clone)] -pub struct CreateOperator { +#[allow(dead_code)] +pub struct CreateTableOperator { /// Table name to insert to pub table_name: String, /// List of columns of the table - pub columns: Vec, + pub columns: Vec<(String, bool, ColumnDesc)>, } diff --git a/src/planner/operator/insert.rs b/src/planner/operator/insert.rs deleted file mode 100644 index 7b418a5c..00000000 --- a/src/planner/operator/insert.rs +++ /dev/null @@ -1,9 +0,0 @@ -use crate::expression::ScalarExpression; -use crate::types::ColumnIdx; - -#[derive(Debug, PartialEq, Clone)] -pub struct InsertOperator { - pub table: String, - pub(crate) col_idxs: Vec, - pub(crate) cols: Vec>, -} \ No newline at end of file diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 0b01999f..97d67b7f 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -6,8 +6,8 @@ pub mod limit; pub mod project; pub mod scan; pub mod sort; -pub mod insert; +use crate::planner::operator::create_table::CreateTableOperator; use std::sync::Arc; use self::{ @@ -27,4 +27,6 @@ pub enum Operator { Scan(ScanOperator), Sort(SortOperator), Limit(LimitOperator), + #[allow(dead_code)] + CreateTable(CreateTableOperator), } diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 2c4b0016..1ff93972 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -9,11 +9,11 @@ use crate::types::{LogicalType, TableIdx}; #[derive(Debug)] pub struct InMemoryStorage { - inner: Arc>, + inner: Arc>, } #[derive(Debug)] -struct StorageInner { +struct Inner { catalog: RootCatalog, tables: Vec, } @@ -28,7 +28,7 @@ impl InMemoryStorage { pub fn new() -> Self { InMemoryStorage { inner: Arc::new(Mutex::new( - StorageInner { + Inner { catalog: RootCatalog::default(), tables: Vec::new(), }) @@ -58,7 +58,7 @@ impl Storage for InMemoryStorage { let table_id = inner.catalog.add_table( table_name.to_string(), - table.inner.lock().columns.clone() + table.columns_vec.clone() )?; table.table_id = table_id; @@ -89,13 +89,8 @@ impl Storage for InMemoryStorage { pub struct InMemoryTable { table_id: TableIdx, table_name: String, - inner: Arc> -} - -#[derive(Debug)] -struct TableInner { data: Vec, - columns: Vec, + columns_vec: Vec, } impl InMemoryTable { @@ -104,13 +99,8 @@ impl InMemoryTable { Ok(Self { table_id: 0, table_name: name.to_string(), - - inner: Arc::new(Mutex::new( - TableInner { - data, - columns, - } - )), + data, + columns_vec: columns, }) } @@ -143,13 +133,6 @@ impl Table for InMemoryTable { ) -> Result { InMemoryTransaction::start(self) } - - fn append(&self, record_batch: RecordBatch) -> Result<(), StorageError> { - self.inner.lock() - .data.push(record_batch); - - Ok(()) - } } pub struct InMemoryTransaction { @@ -161,7 +144,7 @@ impl InMemoryTransaction { pub fn start(table: &InMemoryTable) -> Result { Ok(Self { batch_cursor: 0, - data: table.inner.lock().data.clone(), + data: table.data.clone(), }) } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a89d6b66..32300f6a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -42,8 +42,6 @@ pub trait Table: Sync + Send + Clone + 'static { bounds: Bounds, projection: Projections, ) -> Result; - - fn append(&self, record_batch: RecordBatch) -> Result<(), StorageError>; } // currently we use a transaction to hold csv reader diff --git a/src/types/mod.rs b/src/types/mod.rs index 8e6ec90f..983679e5 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -9,7 +9,7 @@ use strum_macros::AsRefStr; use crate::types::errors::TypeError; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub(crate) struct IdGenerator { buf: usize }