From f9d8fd3bb4f0282934fb4c50eb710f8921df7d2c Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Mon, 24 Jul 2023 18:46:50 +0800 Subject: [PATCH] feat(volcano_executor): implementing a Simple Volcano Actuator `db::test` simulates a single-field table `t1` for `select * from t1` query test --- Cargo.toml | 2 +- rust-toolchain | 1 - src/binder/create.rs | 4 +- src/binder/mod.rs | 4 +- src/binder/select.rs | 42 ++++++- src/catalog/column.rs | 34 +++-- src/catalog/root.rs | 2 + src/catalog/table.rs | 4 +- src/db.rs | 119 +++++++++++++----- src/execution/executor_graph.rs | 5 +- src/execution_v1/mod.rs | 37 ++++++ src/execution_v1/physical_plan/mod.rs | 11 ++ .../physical_plan/physical_plan_builder.rs | 55 ++++++++ .../physical_plan/physical_projection.rs | 8 ++ .../physical_plan/physical_table_scan.rs | 6 + src/execution_v1/volcano_executor/mod.rs | 46 +++++++ .../volcano_executor/projection.rs | 28 +++++ .../volcano_executor/table_scan.rs | 27 ++++ src/expression/array_compute.rs | 86 +++++++++++++ src/expression/evaluator.rs | 75 +++++++++++ src/expression/mod.rs | 31 +++-- src/lib.rs | 92 +------------- src/main.rs | 4 +- src/parser/mod.rs | 3 +- src/planner/logical_create_table_plan.rs | 2 +- src/storage/memory.rs | 76 +++++++---- src/storage/mod.rs | 12 +- src/types/mod.rs | 2 +- src/types/value.rs | 12 +- 29 files changed, 633 insertions(+), 197 deletions(-) delete mode 100644 rust-toolchain create mode 100644 src/execution_v1/mod.rs create mode 100644 src/execution_v1/physical_plan/mod.rs create mode 100644 src/execution_v1/physical_plan/physical_plan_builder.rs create mode 100644 src/execution_v1/physical_plan/physical_projection.rs create mode 100644 src/execution_v1/physical_plan/physical_table_scan.rs create mode 100644 src/execution_v1/volcano_executor/mod.rs create mode 100644 src/execution_v1/volcano_executor/projection.rs create mode 100644 src/execution_v1/volcano_executor/table_scan.rs create mode 100644 src/expression/array_compute.rs diff --git a/Cargo.toml b/Cargo.toml index 7ec0aff4..17adf571 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,8 +41,8 @@ async-backtrace = "0.2.6" futures = "0.3.25" futures-lite = "1.12.0" - [dev-dependencies] +tokio-test = "0.4.2" ctor = "0.2.0" env_logger = "0.10" paste = "^1.0" diff --git a/rust-toolchain b/rust-toolchain deleted file mode 100644 index ab84e227..00000000 --- a/rust-toolchain +++ /dev/null @@ -1 +0,0 @@ -nightly-2023-04-07 diff --git a/src/binder/create.rs b/src/binder/create.rs index b1c65bc3..44a56c16 100644 --- a/src/binder/create.rs +++ b/src/binder/create.rs @@ -39,7 +39,7 @@ impl Binder { table_name: table_name.to_string(), columns: columns .into_iter() - .map(|col| (col.name.to_string(), col.desc.clone())) + .map(|col| (col.name.to_string(), col.nullable, col.desc.clone())) .collect(), }; Ok(plan) @@ -68,10 +68,12 @@ mod tests { columns: vec![ ( "id".to_string(), + false, ColumnDesc::new(LogicalType::Integer, false), ), ( "name".to_string(), + false, ColumnDesc::new(LogicalType::Varchar, false), ), ], diff --git a/src/binder/mod.rs b/src/binder/mod.rs index b2a6f8f9..9205b38a 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -8,7 +8,7 @@ use std::collections::HashMap; use anyhow::Result; use sqlparser::ast::{Ident, ObjectName, Statement}; -use crate::catalog::{RootCatalog, DEFAULT_SCHEMA_NAME}; +use crate::catalog::{RootCatalog, DEFAULT_SCHEMA_NAME, CatalogError}; use crate::expression::ScalarExpression; use crate::planner::LogicalPlan; use crate::types::TableId; @@ -109,4 +109,6 @@ pub enum BindError { BinaryOpTypeMismatch(String, String), #[error("subquery in FROM must have an alias")] SubqueryMustHaveAlias, + #[error("catalog error")] + CatalogError(#[from] CatalogError), } diff --git a/src/binder/select.rs b/src/binder/select.rs index 5e7e5bed..417f4251 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -208,11 +208,7 @@ impl Binder { // *col_id, // col.desc().clone(), // ); - let expr = ScalarExpression::ColumnRef { - column_ref_id, - primary_key: col.desc.is_primary(), - desc: col.desc.clone(), - }; + let expr = ScalarExpression::ColumnRef((*col).clone()); exprs.push(expr); } } @@ -317,3 +313,39 @@ impl Binder { Ok(LimitOperator::new(offset, limit, children)) } } + +#[cfg(test)] +mod tests { + use sqlparser::ast::CharacterLength; + + use super::*; + use crate::binder::{BinderContext, BindError}; + use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog}; + use crate::planner::LogicalPlan; + use crate::types::LogicalType; + use crate::types::LogicalType::{Boolean, Integer}; + + fn test_root_catalog() -> Result { + let mut root = RootCatalog::new(); + let cols = vec![ + ColumnCatalog::new("c1".to_string(), false, ColumnDesc::new(Integer, true)), + ColumnCatalog::new("c2".to_string(), false, ColumnDesc::new(Boolean, false)), + ]; + let _ = root.add_table("t1".to_string(), cols)?; + Ok(root) + } + + #[test] + fn test_select_bind() -> Result<(), BindError> { + let sql = "select * from t1"; + let root = test_root_catalog()?; + + let binder = Binder::new(BinderContext::new(root)); + let stmt = crate::parser::parse_sql(sql).unwrap(); + let plan = binder.bind(&stmt[0]).unwrap(); + + println!("{:#?}", plan); + + Ok(()) + } +} \ No newline at end of file diff --git a/src/catalog/column.rs b/src/catalog/column.rs index 91badb23..d496f2cb 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -1,20 +1,22 @@ use arrow::datatypes::{DataType, Field}; -use sqlparser::ast::ColumnDef; +use sqlparser::ast::{ColumnDef, ColumnOption}; use crate::types::{ColumnId, IdGenerator, LogicalType}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct ColumnCatalog { pub id: ColumnId, pub name: String, + pub nullable: bool, pub desc: ColumnDesc, } impl ColumnCatalog { - pub(crate) fn new(column_name: String, column_desc: ColumnDesc) -> ColumnCatalog { + pub(crate) fn new(column_name: String, nullable: bool, column_desc: ColumnDesc) -> ColumnCatalog { ColumnCatalog { id: IdGenerator::build(), name: column_name, + nullable, desc: column_desc, } } @@ -33,9 +35,9 @@ impl ColumnCatalog { pub fn to_field(&self) -> Field { Field::new( - &*self.name.clone(), - DataType::from(self.desc.column_datatype.clone()), - self.desc.is_primary(), + self.name.as_str(), + DataType::from(self.datatype().clone()), + self.nullable, ) } } @@ -43,10 +45,22 @@ impl ColumnCatalog { impl From for ColumnCatalog { fn from(column_def: ColumnDef) -> Self { let column_name = column_def.name.to_string(); - let column_datatype = LogicalType::try_from(column_def.data_type).unwrap(); - let is_primary = false; - let column_desc = ColumnDesc::new(column_datatype, is_primary); - ColumnCatalog::new(column_name, column_desc) + let column_desc = ColumnDesc::new( + LogicalType::try_from(column_def.data_type).unwrap(), + false + ); + let mut nullable = false; + + // TODO: 这里可以对更多字段可设置内容进行补充 + for option_def in column_def.options { + match option_def.option { + ColumnOption::Null => nullable = true, + ColumnOption::NotNull => (), + _ => todo!() + } + } + + ColumnCatalog::new(column_name, nullable, column_desc) } } diff --git a/src/catalog/root.rs b/src/catalog/root.rs index 2fc3c882..0f84ad65 100644 --- a/src/catalog/root.rs +++ b/src/catalog/root.rs @@ -67,10 +67,12 @@ mod tests { let col0 = ColumnCatalog::new( "a".to_string(), + false, ColumnDesc::new(LogicalType::Integer, false), ); let col1 = ColumnCatalog::new( "b".to_string(), + false, ColumnDesc::new(LogicalType::Boolean, false), ); let col_catalogs = vec![col0, col1]; diff --git a/src/catalog/table.rs b/src/catalog/table.rs index a7224efd..9c775470 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -81,8 +81,8 @@ mod tests { // | 1 | true | // | 2 | false | fn test_table_catalog() { - let col0 = ColumnCatalog::new("a".into(), ColumnDesc::new(LogicalType::Integer, false)); - let col1 = ColumnCatalog::new("b".into(), ColumnDesc::new(LogicalType::Boolean, false)); + let col0 = ColumnCatalog::new("a".into(), false, ColumnDesc::new(LogicalType::Integer, false)); + let col1 = ColumnCatalog::new("b".into(), false, ColumnDesc::new(LogicalType::Boolean, false)); let col_catalogs = vec![col0, col1]; let table_catalog = TableCatalog::new("test".to_string(), col_catalogs).unwrap(); diff --git a/src/db.rs b/src/db.rs index ea84457f..c02dcfe2 100644 --- a/src/db.rs +++ b/src/db.rs @@ -8,10 +8,12 @@ use sqlparser::parser::ParserError; use crate::binder::{BindError, Binder, BinderContext}; use crate::catalog::ColumnCatalog; +use crate::execution_v1::physical_plan::physical_plan_builder::PhysicalPlanBuilder; +use crate::execution_v1::volcano_executor::VolcanoExecutor; use crate::parser::parse_sql; use crate::planner::LogicalPlan; use crate::storage::memory::InMemoryStorage; -use crate::storage::{Storage, StorageError}; +use crate::storage::{Storage, StorageError, StorageImpl}; use crate::types::IdGenerator; #[derive(Debug)] @@ -33,7 +35,7 @@ impl Database { } /// Run SQL queries. - pub fn run(&mut self, sql: &str) -> Result<()> { + pub async fn run(&self, sql: &str) -> Result> { // parse let stmts = parse_sql(sql)?; // bind @@ -49,36 +51,44 @@ impl Database { /// Limit(1) /// Project(a,b) let logical_plan = binder.bind(&stmts[0])?; - println!("logic plan {:?}", logical_plan); - - // let physical_planner = PhysicalPlaner::default(); - // let executor_builder = ExecutorBuilder::new(self.env.clone()); - - // let physical_plan = physical_planner.plan(logical_plan)?; - // let executor = executor_builder.build(physical_plan)?; - // futures::executor::block_on(executor).unwrap(); - - /// THE FOLLOWING CODE IS FOR TESTING ONLY - /// THE FINAL CODE WILL BE IN executor MODULE - if let LogicalPlan::CreateTable(plan) = logical_plan { - let mut columns = Vec::new(); - plan.columns.iter().for_each(|c| { - columns.push(ColumnCatalog::new(c.0.clone(), c.1.clone())); - }); - let table_name = plan.table_name.clone(); - // columns->batch record - let mut data = Vec::new(); - - columns.iter().for_each(|c| { - let batch = RecordBatch::new_empty(Arc::new(Schema::new(vec![c.to_field()]))); - data.push(batch); - }); - - self.storage - .create_table(IdGenerator::build(), table_name.as_str(), data)?; - } - - Ok(()) + println!("logic plan {:#?}", logical_plan); + + let mut builder = PhysicalPlanBuilder::new(); + let operator = builder.build_plan(&logical_plan)?; + + let storage = StorageImpl::InMemoryStorage(self.storage.clone()); + let executor = VolcanoExecutor::new(storage); + + let mut stream = executor.build(operator); + + Ok(VolcanoExecutor::try_collect(&mut stream).await?) + + // // let physical_planner = PhysicalPlaner::default(); + // // let executor_builder = ExecutorBuilder::new(self.env.clone()); + // + // // let physical_plan = physical_planner.plan(logical_plan)?; + // // let executor = executor_builder.build(physical_plan)?; + // // futures::executor::block_on(executor).unwrap(); + // + // /// THE FOLLOWING CODE IS FOR TESTING ONLY + // /// THE FINAL CODE WILL BE IN executor MODULE + // if let LogicalPlan::CreateTable(plan) = logical_plan { + // let mut columns = Vec::new(); + // plan.columns.iter().for_each(|c| { + // columns.push(ColumnCatalog::new(c.0.clone(), c.1, c.2.clone())); + // }); + // let table_name = plan.table_name.clone(); + // // columns->batch record + // let mut data = Vec::new(); + // + // columns.iter().for_each(|c| { + // let batch = RecordBatch::new_empty(Arc::new(Schema::new(vec![c.to_field()]))); + // data.push(batch); + // }); + // + // self.storage + // .create_table(IdGenerator::build(), table_name.as_str(), data)?; + // } } } @@ -113,3 +123,48 @@ pub enum DatabaseError { #[error("Internal error: {0}")] InternalError(String), } + +#[cfg(test)] +mod test { + use std::sync::Arc; + use arrow::array::Int32Array; + use arrow::datatypes::Schema; + use arrow::record_batch::RecordBatch; + use itertools::Itertools; + use crate::catalog::{ColumnCatalog, ColumnDesc}; + use crate::db::Database; + use crate::execution_v1::ExecutorError; + use crate::storage::{Storage, StorageError}; + use crate::storage::memory::InMemoryStorage; + use crate::types::{IdGenerator, LogicalType, TableId}; + + fn build_table(storage: &impl Storage) -> Result { + let fields = vec![ + ColumnCatalog::new( + "c1".to_string(), + false, + ColumnDesc::new(LogicalType::Integer, true) + ).to_field(), + ]; + let batch = RecordBatch::try_new( + Arc::new(Schema::new(fields)), + vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))] + ).unwrap(); + + Ok(storage.create_table("t1", vec![batch])?) + } + + #[test] + fn test_run_sql() -> anyhow::Result<()> { + let mut database = Database::new_on_mem(); + + let i = build_table(&database.storage)?; + + tokio_test::block_on(async move { + let batch = database.run("select * from t1").await?; + println!("{:#?}", batch); + + Ok(()) + }) + } +} diff --git a/src/execution/executor_graph.rs b/src/execution/executor_graph.rs index 577e656f..59c4f803 100644 --- a/src/execution/executor_graph.rs +++ b/src/execution/executor_graph.rs @@ -10,10 +10,8 @@ use super::{ executor::ExecutionQueue, parallel::{ meta_pipeline::MetaPipeline, - pipeline::Pipeline, pipeline_event::{PipelineEvent, PipelineEventStack}, }, - physical::PhysicalOperator, }; use anyhow::Result; @@ -150,7 +148,7 @@ impl ExecutorGraph { // Set up the dependencies within this `MetaPipeline`. for pipeline in pipelines.iter() { - if let Some(source) = pipeline.get_source() { + if let Some(_source) = pipeline.get_source() { // if (source->type == // PhysicalOperatorType::TABLE_SCAN) { // // we have to reset the source here (in the main thread), @@ -213,6 +211,7 @@ impl ExecutorGraph { self.graph[index].clone() } + #[allow(dead_code)] fn get_prev_nodes(&self, index: NodeIndex) -> Vec> { let mut prev_nodes = vec![]; for edge in self.graph.edges_directed(index, Direction::Incoming) { diff --git a/src/execution_v1/mod.rs b/src/execution_v1/mod.rs new file mode 100644 index 00000000..74982194 --- /dev/null +++ b/src/execution_v1/mod.rs @@ -0,0 +1,37 @@ +pub(crate) mod volcano_executor; +pub(crate) mod physical_plan; + +use arrow::error::ArrowError; +use crate::catalog::CatalogError; +use crate::storage::StorageError; +use crate::types::errors::TypeError; + +#[derive(thiserror::Error, Debug)] +pub enum ExecutorError { + #[error("catalog error: {0}")] + CatalogError( + #[source] + #[from] + CatalogError, + ), + #[error("arrow error: {0}")] + ArrowError( + #[source] + #[from] + ArrowError, + ), + #[error("type error: {0}")] + TypeError( + #[source] + #[from] + TypeError, + ), + #[error("storage error: {0}")] + StorageError( + #[source] + #[from] + StorageError + ), + #[error("Internal error: {0}")] + InternalError(String), +} \ No newline at end of file diff --git a/src/execution_v1/physical_plan/mod.rs b/src/execution_v1/physical_plan/mod.rs new file mode 100644 index 00000000..990ac2ec --- /dev/null +++ b/src/execution_v1/physical_plan/mod.rs @@ -0,0 +1,11 @@ +use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection; +use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan; + +pub(crate) mod physical_table_scan; +pub(crate) mod physical_projection; +pub(crate) mod physical_plan_builder; + +pub enum PhysicalOperator { + TableScan(PhysicalTableScan), + Projection(PhysicalProjection) +} \ No newline at end of file diff --git a/src/execution_v1/physical_plan/physical_plan_builder.rs b/src/execution_v1/physical_plan/physical_plan_builder.rs new file mode 100644 index 00000000..bd0941ce --- /dev/null +++ b/src/execution_v1/physical_plan/physical_plan_builder.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; +use anyhow::anyhow; +use anyhow::Result; +use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection; +use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan; +use crate::execution_v1::physical_plan::PhysicalOperator; +use crate::planner::logical_select_plan::LogicalSelectPlan; +use crate::planner::LogicalPlan; +use crate::planner::operator::Operator; +use crate::planner::operator::scan::ScanOperator; + +pub struct PhysicalPlanBuilder { + plan_id: u32, +} + +impl PhysicalPlanBuilder { + pub fn new() -> Self { + PhysicalPlanBuilder { plan_id: 0 } + } + + fn next_plan_id(&mut self) -> u32 { + let id = self.plan_id; + self.plan_id += 1; + id + } + + pub fn build_plan(&mut self, plan: &LogicalPlan) -> Result { + match plan { + LogicalPlan::Select(select) => self.build_select_logical_plan(select), + LogicalPlan::CreateTable(_) => todo!(), + } + } + + fn build_select_logical_plan(&mut self, plan: &LogicalSelectPlan) -> Result { + match plan.operator.as_ref() { + Operator::Project(op) => { + let input = self.build_select_logical_plan(plan.child(0)?)?; + Ok(PhysicalOperator::Projection(PhysicalProjection { + plan_id: self.next_plan_id(), + exprs: op.columns.clone(), + input: Box::new(input), + })) + } + Operator::Scan(scan) => Ok(self.build_physical_scan(scan.clone())), + _ => Err(anyhow!(format!( + "Unsupported physical plan: {:?}", + plan.operator + ))), + } + } + + fn build_physical_scan(&mut self, base: ScanOperator) -> PhysicalOperator { + PhysicalOperator::TableScan(PhysicalTableScan { plan_id: self.next_plan_id(), base }) + } +} \ No newline at end of file diff --git a/src/execution_v1/physical_plan/physical_projection.rs b/src/execution_v1/physical_plan/physical_projection.rs new file mode 100644 index 00000000..6a239e79 --- /dev/null +++ b/src/execution_v1/physical_plan/physical_projection.rs @@ -0,0 +1,8 @@ +use crate::execution_v1::physical_plan::PhysicalOperator; +use crate::expression::ScalarExpression; + +pub struct PhysicalProjection { + pub(crate) plan_id: u32, + pub(crate) exprs: Vec, + pub(crate) input: Box +} \ No newline at end of file diff --git a/src/execution_v1/physical_plan/physical_table_scan.rs b/src/execution_v1/physical_plan/physical_table_scan.rs new file mode 100644 index 00000000..8a0ce996 --- /dev/null +++ b/src/execution_v1/physical_plan/physical_table_scan.rs @@ -0,0 +1,6 @@ +use crate::planner::operator::scan::ScanOperator; + +pub struct PhysicalTableScan { + pub(crate) plan_id: u32, + pub(crate) base: ScanOperator +} \ No newline at end of file diff --git a/src/execution_v1/volcano_executor/mod.rs b/src/execution_v1/volcano_executor/mod.rs new file mode 100644 index 00000000..3377f96f --- /dev/null +++ b/src/execution_v1/volcano_executor/mod.rs @@ -0,0 +1,46 @@ +mod table_scan; +mod projection; + +use arrow::record_batch::RecordBatch; +use futures::stream::BoxStream; +use futures::TryStreamExt; +use crate::execution_v1::ExecutorError; +use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection; +use crate::execution_v1::physical_plan::PhysicalOperator; +use crate::execution_v1::volcano_executor::projection::Projection; +use crate::execution_v1::volcano_executor::table_scan::TableScan; +use crate::storage::StorageImpl; + +pub type BoxedExecutor = BoxStream<'static, Result>; + +pub struct VolcanoExecutor { + storage: StorageImpl +} + +impl VolcanoExecutor { + pub(crate) fn new(storage: StorageImpl) -> Self { + Self { storage } + } + + pub(crate) fn build(&self, plan: PhysicalOperator) -> BoxedExecutor { + match plan { + PhysicalOperator::TableScan(op) => { + match &self.storage { + StorageImpl::InMemoryStorage(storage) => TableScan::execute(op, storage.clone()) + } + } + PhysicalOperator::Projection(PhysicalProjection { input, exprs, .. }) => { + let input = self.build(*input); + Projection::execute(exprs, input) + } + } + } + + pub async fn try_collect(executor: &mut BoxedExecutor) -> Result, ExecutorError> { + let mut output = Vec::new(); + while let Some(batch) = executor.try_next().await? { + output.push(batch); + } + Ok(output) + } +} \ No newline at end of file diff --git a/src/execution_v1/volcano_executor/projection.rs b/src/execution_v1/volcano_executor/projection.rs new file mode 100644 index 00000000..aac6fb7b --- /dev/null +++ b/src/execution_v1/volcano_executor/projection.rs @@ -0,0 +1,28 @@ +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use futures_async_stream::try_stream; +use crate::execution_v1::volcano_executor::BoxedExecutor; +use crate::execution_v1::ExecutorError; +use crate::expression::ScalarExpression; + +pub struct Projection { } + +impl Projection { + #[try_stream(boxed, ok = RecordBatch, error = ExecutorError)] + pub async fn execute(exprs: Vec, input: BoxedExecutor) { + #[for_await] + for batch in input { + let batch = batch?; + let columns = exprs + .iter() + .map(|e| e.eval_column(&batch)) + .try_collect(); + let fields = exprs.iter().map(|e| e.eval_field(&batch)).collect(); + let schema = SchemaRef::new(Schema::new_with_metadata( + fields, + batch.schema().metadata().clone(), + )); + yield RecordBatch::try_new(schema, columns?)?; + } + } +} \ No newline at end of file diff --git a/src/execution_v1/volcano_executor/table_scan.rs b/src/execution_v1/volcano_executor/table_scan.rs new file mode 100644 index 00000000..f949daee --- /dev/null +++ b/src/execution_v1/volcano_executor/table_scan.rs @@ -0,0 +1,27 @@ + +use arrow::record_batch::RecordBatch; +use futures_async_stream::try_stream; +use crate::execution_v1::ExecutorError; +use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan; +use crate::planner::operator::scan::ScanOperator; +use crate::storage::{Storage, Table, Transaction}; + +pub struct TableScan { } + +impl TableScan { + #[try_stream(boxed, ok = RecordBatch, error = ExecutorError)] + pub async fn execute(plan: PhysicalTableScan, storage: impl Storage) { + // TODO: sort_fields, pre_where, limit + let ScanOperator { table_ref_id, .. } = plan.base; + println!("ref id: {}", table_ref_id); + let table = storage.get_table(table_ref_id)?; + let mut transaction = table.read( + None, + None + )?; + + while let Some(batch) = transaction.next_batch()? { + yield batch; + } + } +} \ No newline at end of file diff --git a/src/expression/array_compute.rs b/src/expression/array_compute.rs new file mode 100644 index 00000000..7a7ee414 --- /dev/null +++ b/src/expression/array_compute.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; +use arrow::array::ArrayRef; +use arrow::compute::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn}; +use arrow::datatypes::DataType; +use arrow::array::*; +use arrow::compute::*; +use crate::execution_v1::ExecutorError; +use crate::expression::BinaryOperator; +/// Copied from datafusion binary.rs +macro_rules! compute_op { + // invoke binary operator + ($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{ + let ll = $LEFT + .as_any() + .downcast_ref::<$DT>() + .expect("compute_op failed to downcast array"); + let rr = $RIGHT + .as_any() + .downcast_ref::<$DT>() + .expect("compute_op failed to downcast array"); + Ok(Arc::new($OP(&ll, &rr)?)) + }}; + // invoke unary operator + ($OPERAND:expr, $OP:ident, $DT:ident) => {{ + let operand = $OPERAND + .as_any() + .downcast_ref::<$DT>() + .expect("compute_op failed to downcast array"); + Ok(Arc::new($OP(&operand)?)) + }}; +} + +macro_rules! arithmetic_op { + ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ + match $LEFT.data_type() { + DataType::Int32 => compute_op!($LEFT, $RIGHT, $OP, Int32Array), + DataType::Int64 => compute_op!($LEFT, $RIGHT, $OP, Int64Array), + DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array), + _ => todo!("unsupported data type"), + } + }}; +} + +macro_rules! boolean_op { + ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ + if *$LEFT.data_type() != DataType::Boolean || *$RIGHT.data_type() != DataType::Boolean { + return Err(ExecutorError::InternalError(format!( + "Cannot evaluate binary expression with types {:?} and {:?}, only Boolean supported", + $LEFT.data_type(), + $RIGHT.data_type() + ))); + } + + let ll = $LEFT + .as_any() + .downcast_ref::() + .expect("boolean_op failed to downcast array"); + let rr = $RIGHT + .as_any() + .downcast_ref::() + .expect("boolean_op failed to downcast array"); + Ok(Arc::new($OP(&ll, &rr)?)) + }}; +} + +pub fn binary_op( + left: &ArrayRef, + right: &ArrayRef, + op: &BinaryOperator, +) -> Result { + match op { + BinaryOperator::Plus => arithmetic_op!(left, right, add), + BinaryOperator::Minus => arithmetic_op!(left, right, subtract), + BinaryOperator::Multiply => arithmetic_op!(left, right, multiply), + BinaryOperator::Divide => arithmetic_op!(left, right, divide), + BinaryOperator::Gt => Ok(Arc::new(gt_dyn(left, right)?)), + BinaryOperator::Lt => Ok(Arc::new(lt_dyn(left, right)?)), + BinaryOperator::GtEq => Ok(Arc::new(gt_eq_dyn(left, right)?)), + BinaryOperator::LtEq => Ok(Arc::new(lt_eq_dyn(left, right)?)), + BinaryOperator::Eq => Ok(Arc::new(eq_dyn(left, right)?)), + BinaryOperator::NotEq => Ok(Arc::new(neq_dyn(left, right)?)), + BinaryOperator::And => boolean_op!(left, right, and_kleene), + BinaryOperator::Or => boolean_op!(left, right, or_kleene), + _ => todo!(), + } +} diff --git a/src/expression/evaluator.rs b/src/expression/evaluator.rs index 8b137891..12bc912c 100644 --- a/src/expression/evaluator.rs +++ b/src/expression/evaluator.rs @@ -1 +1,76 @@ +use std::sync::Arc; +use arrow::array::{ArrayRef, BooleanArray, new_null_array}; +use arrow::compute::cast; +use arrow::datatypes::{DataType, Field}; +use arrow::record_batch::RecordBatch; +use crate::execution_v1::ExecutorError; +use crate::expression::array_compute::binary_op; +use crate::expression::ScalarExpression; +use crate::types::value::DataValue; +impl ScalarExpression { + pub fn eval_column(&self, batch: &RecordBatch) -> Result { + match &self { + ScalarExpression::Constant(val) => + Ok(val.to_array_of_size(batch.num_rows())), + ScalarExpression::ColumnRef(col) => { + // FIXME: 此处ColumnRef理应被优化器下推至TableScan中, + // 并且无法实现eval_column,因为ColumnCatalog并无对应的index + Ok(batch.column(0).clone()) + } + ScalarExpression::InputRef{ index, .. } => + Ok(batch.column(*index).clone()), + ScalarExpression::Alias{ expr, .. } => + expr.eval_column(batch), + ScalarExpression::TypeCast{ expr, ty, .. } => + Ok(cast(&expr.eval_column(batch)?, &DataType::from(ty.clone()))?), + ScalarExpression::Binary{ left_expr, right_expr, op, .. } => { + let left = left_expr.eval_column(batch)?; + let right = right_expr.eval_column(batch)?; + binary_op(&left, &right, op) + } + ScalarExpression::IsNull{ expr } => + Ok(DataValue::bool_array(batch.num_rows(), &Some(expr.nullable()))), + ScalarExpression::Unary{ .. } => todo!(), + ScalarExpression::AggCall{ .. } => todo!() + } + } + + pub fn eval_field(&self, batch: &RecordBatch) -> Field { + match self { + ScalarExpression::Constant(val) => + Field::new(format!("{}", val).as_str(), val.datatype(), true), + ScalarExpression::ColumnRef(col) => col.to_field(), + ScalarExpression::InputRef { index, .. } => + batch.schema().field(*index).clone(), + ScalarExpression::Alias { alias, expr, .. } => { + let logic_type = expr.return_type().unwrap(); + Field::new(alias, logic_type.into(), true) + } + ScalarExpression::TypeCast { expr, ty, .. } => { + let inner_field = expr.eval_field(batch); + let data_type = DataType::from(ty.clone()); + let new_name = format!("{}({})", data_type, inner_field.name()); + Field::new(new_name.as_str(), data_type, true) + } + ScalarExpression::AggCall { args, kind, ty } => { + let inner_name = args[0].eval_field(batch).name().clone(); + let new_name = format!("{:?}({})", kind, inner_name); + Field::new(new_name.as_str(), ty.clone().into(), true) + } + ScalarExpression::Binary { left_expr, right_expr, op, ty } => { + let left = left_expr.eval_field(batch); + let right = right_expr.eval_field(batch); + let new_name = format!("{}{:?}{}", left.name(), op, right.name()); + let data_type = DataType::from(ty.clone()); + Field::new(new_name.as_str(), data_type, true) + } + ScalarExpression::IsNull { expr } => { + let data_type = DataType::from(expr.return_type().unwrap()); + let new_name = format!("{}", data_type); + Field::new(new_name.as_str(), data_type, true) + } + ScalarExpression::Unary { .. } => todo!() + } + } +} \ No newline at end of file diff --git a/src/expression/mod.rs b/src/expression/mod.rs index cf258a75..7d027550 100644 --- a/src/expression/mod.rs +++ b/src/expression/mod.rs @@ -3,12 +3,13 @@ use std::fmt::Display; use sqlparser::ast::{BinaryOperator as SqlBinaryOperator, UnaryOperator as SqlUnaryOperator}; use self::agg::AggKind; -use crate::catalog::{ColumnDesc, ColumnRefId}; +use crate::catalog::ColumnCatalog; use crate::types::value::DataValue; use crate::types::LogicalType; pub mod agg; mod evaluator; +mod array_compute; /// ScalarExpression represnet all scalar expression in SQL. /// SELECT a+1, b FROM t1. @@ -17,11 +18,7 @@ mod evaluator; #[derive(Debug, PartialEq, Clone)] pub enum ScalarExpression { Constant(DataValue), - ColumnRef { - column_ref_id: ColumnRefId, - primary_key: bool, - desc: ColumnDesc, - }, + ColumnRef(ColumnCatalog), InputRef { index: usize, ty: LogicalType, @@ -57,10 +54,25 @@ pub enum ScalarExpression { } impl ScalarExpression { + pub fn nullable(&self) -> bool { + match self { + ScalarExpression::Constant(_) => false, + ScalarExpression::ColumnRef(col) => col.nullable, + ScalarExpression::InputRef { .. } => unreachable!(), + ScalarExpression::Alias { expr, .. } => expr.nullable(), + ScalarExpression::TypeCast { expr, .. } => expr.nullable(), + ScalarExpression::IsNull { expr } => expr.nullable(), + ScalarExpression::Unary { expr, .. } => expr.nullable(), + ScalarExpression::Binary { left_expr, right_expr, .. } => + left_expr.nullable() && right_expr.nullable(), + ScalarExpression::AggCall { args, .. } => args[0].nullable() + } + } + pub fn return_type(&self) -> Option { match self { - Self::Constant(v) => Some(v.get_logic_type().clone()), - Self::ColumnRef { desc, .. } => Some(desc.get_datatype().clone()), + Self::Constant(v) => Some(v.logic_type().clone()), + Self::ColumnRef(col) => Some(col.datatype().clone()), Self::Binary { ty: return_type, .. } => Some(return_type.clone()), @@ -96,11 +108,8 @@ impl Display for ScalarExpression { pub enum UnaryOperator { Plus, Minus, - Not, - True, - False, } diff --git a/src/lib.rs b/src/lib.rs index 69963779..12fb50b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,96 +1,9 @@ #![feature(error_generic_member_access)] #![feature(provide_any)] #![allow(unused_doc_comments)] -// #![deny( -// // The following are allowed by default lints according to -// // https://doc.rust-lang.org/rustc/lints/listing/allowed-by-default.html -// absolute_paths_not_starting_with_crate, -// // box_pointers, async trait must use it -// // elided_lifetimes_in_paths, // allow anonymous lifetime -// explicit_outlives_requirements, -// keyword_idents, -// macro_use_extern_crate, -// meta_variable_misuse, -// missing_abi, -// missing_copy_implementations, -// // must_not_suspend, unstable -// non_ascii_idents, -// // non_exhaustive_omitted_patterns, unstable -// noop_method_call, -// pointer_structural_match, -// rust_2021_incompatible_closure_captures, -// rust_2021_incompatible_or_patterns, -// rust_2021_prefixes_incompatible_syntax, -// rust_2021_prelude_collisions, -// single_use_lifetimes, -// trivial_casts, -// trivial_numeric_casts, -// //unreachable_pub, -// unsafe_op_in_unsafe_fn, -// // unstable_features, -// // unused_crate_dependencies, the false positive case blocks us -// unused_import_braces, -// unused_lifetimes, -// unused_qualifications, -// //unused_results, -// //warnings, // treat all wanings as errors -// clippy::all, -// clippy::cargo, -// // The followings are selected restriction lints for rust 1.57 -// clippy::clone_on_ref_ptr, -// clippy::create_dir, -// clippy::dbg_macro, -// clippy::decimal_literal_representation, -// // clippy::default_numeric_fallback, too verbose when dealing with numbers -// clippy::disallowed_script_idents, -// clippy::else_if_without_else, -// clippy::exit, -// clippy::expect_used, -// clippy::filetype_is_file, -// clippy::float_arithmetic, -// clippy::float_cmp_const, -// clippy::get_unwrap, -// clippy::if_then_some_else_none, -// // clippy::implicit_return, it's idiomatic Rust code. -// // clippy::inline_asm_x86_att_syntax, stick to intel syntax -// clippy::inline_asm_x86_intel_syntax, -// // clippy::integer_division, required in the project -// clippy::let_underscore_must_use, -// clippy::lossy_float_literal, -// clippy::mem_forget, -// clippy::missing_enforced_import_renames, -// clippy::missing_inline_in_public_items, -// // clippy::mod_module_files, mod.rs file is used -// clippy::modulo_arithmetic, -// clippy::multiple_inherent_impl, -// // clippy::panic, allow in application code -// // clippy::panic_in_result_fn, not necessary as panic is banned -// clippy::print_stderr, -// clippy::print_stdout, -// clippy::rc_mutex, -// clippy::rest_pat_in_fully_bound_structs, -// clippy::same_name_method, -// clippy::self_named_module_files, -// // clippy::shadow_reuse, it’s a common pattern in Rust code -// // clippy::shadow_same, it’s a common pattern in Rust code -// clippy::str_to_string, -// clippy::string_add, -// clippy::string_to_string, -// clippy::todo, -// clippy::unimplemented, -// clippy::unnecessary_self_imports, -// clippy::unneeded_field_pattern, -// // clippy::unreachable, allow unreachable panic, which is out of expectation -// clippy::unwrap_in_result, -// clippy::unwrap_used, -// // clippy::use_debug, debug is allow for debug log -// clippy::verbose_file_reads, -// )] -// #![allow( -// clippy::panic, // allow debug_assert, panic in production code -// clippy::multiple_crate_versions, // caused by the dependency, can't be fixed -// )] #![feature(result_flattening)] +#![feature(generators)] +#![feature(iterator_try_collect)] #![allow(cast_ref_to_mut)] pub mod binder; pub mod catalog; @@ -101,3 +14,4 @@ pub mod parser; pub mod planner; pub mod storage; pub mod types; +mod execution_v1; diff --git a/src/main.rs b/src/main.rs index 7997477b..5ba834d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,7 @@ async fn main() -> Result<(), Box> { println!("RootCatalog: {:?}", db.storage.get_catalog()); let mut input = String::new(); io::stdin().read_line(&mut input)?; - let ret = db.run(&input); - println!("{:?}", ret); + let ret = db.run(&input).await; + println!("{:?}", ret); } } diff --git a/src/parser/mod.rs b/src/parser/mod.rs index 43420ddb..3c681623 100644 --- a/src/parser/mod.rs +++ b/src/parser/mod.rs @@ -16,6 +16,5 @@ use anyhow::Result; /// ``` pub fn parse_sql(sql: &str) -> Result> { let dialect = PostgreSqlDialect {}; - let statements = Parser::parse_sql(&dialect, sql)?; - Ok(statements) + Ok(Parser::parse_sql(&dialect, sql)?) } diff --git a/src/planner/logical_create_table_plan.rs b/src/planner/logical_create_table_plan.rs index 6199fa7c..ef60a138 100644 --- a/src/planner/logical_create_table_plan.rs +++ b/src/planner/logical_create_table_plan.rs @@ -2,7 +2,7 @@ use crate::catalog::ColumnDesc; #[derive(Debug, PartialEq, Clone)] pub struct LogicalCreateTablePlan { pub table_name: String, - pub columns: Vec<(String, ColumnDesc)>, + pub columns: Vec<(String, bool, ColumnDesc)>, } // use sqlparser::ast::{ColumnDef, ColumnOption, Statement}; diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 83a87a55..198d5a4b 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; -use std::sync::Mutex; +use std::sync::Arc; use arrow::record_batch::RecordBatch; +use parking_lot::Mutex; use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog}; use crate::storage::{Bounds, Projections, Storage, StorageError, Table, Transaction}; @@ -9,8 +10,13 @@ use crate::types::{LogicalType, TableId}; #[derive(Debug)] pub struct InMemoryStorage { - catalog: Mutex, - tables: Mutex>, + inner: Arc>, +} + +#[derive(Debug)] +struct Inner { + catalog: RootCatalog, + tables: HashMap, } impl Default for InMemoryStorage { @@ -22,8 +28,20 @@ impl Default for InMemoryStorage { impl InMemoryStorage { pub fn new() -> Self { InMemoryStorage { - catalog: Mutex::new(RootCatalog::default()), - tables: Mutex::new(HashMap::new()), + inner: Arc::new(Mutex::new( + Inner { + catalog: RootCatalog::default(), + tables: HashMap::new(), + }) + ) + } + } +} + +impl Clone for InMemoryStorage { + fn clone(&self) -> Self { + InMemoryStorage { + inner: Arc::clone(&self.inner), } } } @@ -32,32 +50,35 @@ impl Storage for InMemoryStorage { type TableType = InMemoryTable; fn create_table( - &mut self, - id: TableId, + &self, table_name: &str, data: Vec, - ) -> Result<(), StorageError> { - let table = InMemoryTable::new(id.clone(), table_name, data)?; - self.catalog - .lock() - .unwrap() - .add_table(table.table_name.clone(), table.columns_vec.clone()) - .unwrap(); - self.tables.lock().unwrap().insert(id, table); - Ok(()) + ) -> Result { + let mut table = InMemoryTable::new(table_name, data)?; + let mut inner = self.inner.lock(); + + let table_id = inner.catalog.add_table( + table_name.to_string(), + table.columns_vec.clone() + )?; + + table.table_id = table_id; + inner.tables.insert(table_id, table); + + Ok(table_id) } fn get_table(&self, id: TableId) -> Result { - self.tables - .lock() - .unwrap() + self.inner.lock() + .tables .get(&id) .cloned() .ok_or(StorageError::TableNotFound(id)) } fn get_catalog(&self) -> RootCatalog { - self.catalog.lock().unwrap().clone() + self.inner.lock() + .catalog.clone() } fn show_tables(&self) -> Result { @@ -74,10 +95,10 @@ pub struct InMemoryTable { } impl InMemoryTable { - pub fn new(id: TableId, name: &str, data: Vec) -> Result { + pub fn new(name: &str, data: Vec) -> Result { let columns = Self::infer_catalog(data.first().cloned()); Ok(Self { - table_id: id, + table_id: 0, table_name: name.to_string(), data, columns_vec: columns, @@ -89,9 +110,13 @@ impl InMemoryTable { if let Some(batch) = batch { for f in batch.schema().fields().iter() { let field_name = f.name().to_string(); - let column_dec = + let column_desc = ColumnDesc::new(LogicalType::try_from(f.data_type()).unwrap(), false); - let column_catalog = ColumnCatalog::new(field_name, column_dec); + let column_catalog = ColumnCatalog::new( + field_name, + f.is_nullable(), + column_desc + ); columns.push(column_catalog) } } @@ -165,10 +190,9 @@ mod storage_test { #[test] fn test_in_memory_storage_works_with_data() -> Result<(), StorageError> { - let id = IdGenerator::build(); let mut storage = InMemoryStorage::new(); - storage.create_table(id.clone(), "test", build_record_batch()?)?; + let id = storage.create_table("test", build_record_batch()?)?; let catalog = storage.get_catalog(); println!("{:?}", catalog); let table_catalog = catalog.get_table_by_name("test"); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 2d3071cf..6d668aaf 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -5,7 +5,7 @@ use std::io; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -use crate::catalog::RootCatalog; +use crate::catalog::{CatalogError, RootCatalog}; use crate::storage::memory::InMemoryStorage; use crate::types::TableId; @@ -14,15 +14,14 @@ pub enum StorageImpl { InMemoryStorage(InMemoryStorage), } -pub trait Storage: Sync + Send { +pub trait Storage: Sync + Send + 'static { type TableType: Table; fn create_table( - &mut self, - id: TableId, + &self, table_name: &str, columns: Vec, - ) -> Result<(), StorageError>; + ) -> Result; fn get_table(&self, id: TableId) -> Result; fn get_catalog(&self) -> RootCatalog; fn show_tables(&self) -> Result; @@ -60,4 +59,7 @@ pub enum StorageError { #[error("table not found: {0}")] TableNotFound(TableId), + + #[error("catalog error")] + CatalogError(#[from] CatalogError), } diff --git a/src/types/mod.rs b/src/types/mod.rs index 42cbf3e1..88bfb516 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,4 +1,4 @@ -mod errors; +pub mod errors; pub mod value; use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering::{Acquire, Release}; diff --git a/src/types/value.rs b/src/types/value.rs index e819edfd..35224443 100644 --- a/src/types/value.rs +++ b/src/types/value.rs @@ -253,7 +253,7 @@ impl DataValue { }) } - pub fn get_logical_type(&self) -> LogicalType { + pub fn logical_type(&self) -> LogicalType { match self { DataValue::Null => LogicalType::SqlNull, DataValue::Boolean(_) => LogicalType::Boolean, @@ -279,10 +279,14 @@ impl DataValue { self.to_array_of_size(1) } + pub fn bool_array(size: usize, e: &Option) -> ArrayRef { + Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef + } + /// Converts a scalar value into an array of `size` rows. pub fn to_array_of_size(&self, size: usize) -> ArrayRef { match self { - DataValue::Boolean(e) => Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef, + DataValue::Boolean(e) => Self::bool_array(size, e), DataValue::Float64(e) => { build_array_from_option!(Float64, Float64Array, e, size) } @@ -449,7 +453,7 @@ impl DataValue { Ok(()) } - pub fn get_logic_type(&self) -> LogicalType { + pub fn logic_type(&self) -> LogicalType { match self { DataValue::Boolean(_) => LogicalType::Boolean, DataValue::UInt8(_) => LogicalType::UTinyint, @@ -470,7 +474,7 @@ impl DataValue { } } - pub fn get_datatype(&self) -> DataType { + pub fn datatype(&self) -> DataType { match self { DataValue::Boolean(_) => DataType::Boolean, DataValue::UInt8(_) => DataType::UInt8,