diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 673dbe8a..9ffb632e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2022-12-02-aarch64-apple-darwin + toolchain: nightly-2023-04-07 components: rustfmt, clippy - name: Check code format uses: actions-rs/cargo@v1 @@ -31,7 +31,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2022-12-02-aarch64-apple-darwin + toolchain: nightly-2023-04-07 - uses: actions/checkout@v2 - name: Build uses: actions-rs/cargo@v1 @@ -45,7 +45,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2022-12-02-aarch64-apple-darwin + toolchain: nightly-2023-04-07 - uses: actions/checkout@v2 - name: Test uses: actions-rs/cargo@v1 diff --git a/.github/workflows/cr.yml b/.github/workflows/cr.yml new file mode 100644 index 00000000..c7a80868 --- /dev/null +++ b/.github/workflows/cr.yml @@ -0,0 +1,28 @@ +name: Code Review + +permissions: + contents: read + pull-requests: write + +on: + pull_request: + types: [opened, reopened, synchronize] + +jobs: + test: + # if: ${{ contains(github.event.*.labels.*.name, 'gpt review') }} # Optional; to run only when a label is attached + runs-on: ubuntu-latest + steps: + - uses: anc95/ChatGPT-CodeReview@main + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + # Optional + LANGUAGE: Chinese + OPENAI_API_ENDPOINT: https://api.openai.com/v1 + MODEL: gpt-3.5-turbo + PROMPT: + top_p: 1 + temperature: 1 + max_tokens: 10000 + MAX_PATCH_LENGTH: 10000 # if the patch/diff length is large than MAX_PATCH_LENGTH, will be ignored and won't review. By default, with no MAX_PATCH_LENGTH set, there is also no limit for the patch/diff length. \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index a3c02fda..c37c228f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,3 @@ - - # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [package] @@ -10,7 +8,6 @@ edition = "2021" [lib] doctest = false - [dependencies] log = "^0.4" sqlparser = "0.34.0" @@ -33,6 +30,13 @@ tokio-process = "0.2.5" serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" async-trait = "0.1.68" +integer-encoding = "3.0.4" +petgraph = "0.6.3" +futures-async-stream = "0.2.6" +async-channel = "1.8.0" +async-backtrace = "0.2.6" +futures = "0.3.25" +futures-lite = "1.12.0" [dev-dependencies] ctor = "0.2.0" diff --git a/src/binder/create.rs b/src/binder/create.rs index 8b137891..70c84e8b 100644 --- a/src/binder/create.rs +++ b/src/binder/create.rs @@ -1 +1,87 @@ +use super::Binder; +use crate::binder::{lower_case_name, split_name}; +use crate::catalog::{Column, ColumnDesc}; +use crate::planner::logical_create_table_plan::LogicalCreateTablePlan; +use crate::planner::LogicalPlan; +use crate::types::ColumnId; +use anyhow::Result; +use sqlparser::ast::{ColumnDef, ObjectName}; +use std::collections::HashSet; +impl Binder { + pub(crate) fn bind_create_table( + &mut self, + name: ObjectName, + columns: &[ColumnDef], + ) -> Result { + let name = lower_case_name(&name); + + let (_, table_name) = split_name(&name)?; + + // check duplicated column names + let mut set = HashSet::new(); + for col in columns.iter() { + if !set.insert(col.name.value.clone()) { + return Err(anyhow::Error::msg(format!( + "bind duplicated column {}", + col.name.value.clone() + ))); + } + } + + let mut columns: Vec = columns + .iter() + .enumerate() + .map(|(_, col)| Column::from(col)) + .collect(); + + let plan = LogicalCreateTablePlan { + table_name: table_name.to_string(), + columns: columns + .into_iter() + .map(|col| (col.name.to_string(), col.desc.clone())) + .collect(), + }; + Ok(plan) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::binder::BinderContext; + use crate::catalog::Root; + use crate::types::{DataTypeExt, DataTypeKind}; + use sqlparser::ast::CharacterLength; + use std::sync::Arc; + + #[test] + fn test_create_bind() { + let sql = "create table t1 (id int , name varchar(10))"; + let mut binder = Binder::new(BinderContext::new(Arc::new(Root::new()))); + let stmt = crate::parser::parse_sql(sql).unwrap(); + let plan1 = binder.bind(&stmt[0]).unwrap(); + + let character_length = CharacterLength { + length: 10, + unit: None, + }; + let plan2 = LogicalPlan::CreateTable(LogicalCreateTablePlan { + table_name: "t1".to_string(), + columns: vec![ + ( + "id".to_string(), + DataTypeKind::Int(None).nullable().to_column(), + ), + ( + "name".to_string(), + DataTypeKind::Varchar(Option::from(character_length)) + .nullable() + .to_column(), + ), + ], + }); + + assert_eq!(plan1, plan2); + } +} diff --git a/src/binder/mod.rs b/src/binder/mod.rs index 0ad74c2a..c7c696a7 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -5,18 +5,16 @@ mod select; use std::collections::HashMap; -use crate::{ - catalog::{CatalogRef, TableRefId}, - expression::ScalarExpression, - planner::LogicalPlan, -}; +use crate::{catalog::CatalogRef, expression::ScalarExpression, planner::LogicalPlan}; +use crate::catalog::DEFAULT_SCHEMA_NAME; +use crate::types::TableId; use anyhow::Result; -use sqlparser::ast::Statement; - +use sqlparser::ast::{Ident, ObjectName, Statement}; +#[derive(Clone)] pub struct BinderContext { catalog: CatalogRef, - bind_table: HashMap, + bind_table: HashMap, aliases: HashMap, group_by_exprs: Vec, agg_calls: Vec, @@ -65,8 +63,31 @@ impl Binder { let plan = self.bind_query(query)?; LogicalPlan::Select(plan) } + Statement::CreateTable { name, columns, .. } => { + let plan = self.bind_create_table(name.to_owned(), &columns)?; + LogicalPlan::CreateTable(plan) + } _ => unimplemented!(), }; Ok(plan) } } + +/// Convert an object name into lower case +fn lower_case_name(name: &ObjectName) -> ObjectName { + ObjectName( + name.0 + .iter() + .map(|ident| Ident::new(ident.value.to_lowercase())) + .collect(), + ) +} + +/// Split an object name into `(schema name, table name)`. +fn split_name(name: &ObjectName) -> Result<(&str, &str)> { + Ok(match name.0.as_slice() { + [table] => (DEFAULT_SCHEMA_NAME, &table.value), + [schema, table] => (&schema.value, &table.value), + _ => return Err(anyhow::anyhow!("Invalid table name: {:?}", name)), + }) +} diff --git a/src/binder/select.rs b/src/binder/select.rs index df7ae730..dd6453db 100644 --- a/src/binder/select.rs +++ b/src/binder/select.rs @@ -1,7 +1,7 @@ use std::{borrow::Borrow, sync::Arc}; use crate::{ - catalog::{ColumnRefId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME}, + catalog::ColumnRefId, expression::ScalarExpression, planner::{ logical_select_plan::LogicalSelectPlan, @@ -16,6 +16,7 @@ use crate::{ use super::Binder; +use crate::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME}; use anyhow::Result; use itertools::Itertools; use sqlparser::ast::{ @@ -24,7 +25,7 @@ use sqlparser::ast::{ }; impl Binder { - pub(super) fn bind_query(&mut self, query: &Query) -> Result { + pub(crate) fn bind_query(&mut self, query: &Query) -> Result { if let Some(_with) = &query.with { // TODO support with clause. } @@ -127,7 +128,8 @@ impl Binder { .map(|ident| Ident::new(ident.value.to_lowercase())) .collect_vec(); - let (database, schema, mut table): (&str, &str, &str) = match obj_name.as_slice() { + let (_database, _schema, mut table): (&str, &str, &str) = match obj_name.as_slice() + { [table] => (DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, &table.value), [schema, table] => (DEFAULT_DATABASE_NAME, &schema.value, &table.value), [database, schema, table] => (&database.value, &schema.value, &table.value), @@ -147,7 +149,7 @@ impl Binder { let table_ref_id = self .context .catalog - .get_table_id_by_name(database, schema, table) + .get_table_id_by_name(table) .ok_or_else(|| anyhow::Error::msg(format!("bind table {}", table)))?; self.context.bind_table.insert(table.into(), table_ref_id); @@ -198,7 +200,7 @@ impl Binder { fn bind_all_column_refs(&mut self) -> Result> { let mut exprs = vec![]; for ref_id in self.context.bind_table.values().cloned().collect_vec() { - let table = self.context.catalog.get_table(&ref_id).unwrap(); + let table = self.context.catalog.get_table(ref_id).unwrap(); for (col_id, col) in &table.get_all_columns() { let column_ref_id = ColumnRefId::from_table(ref_id, *col_id); // self.record_regular_table_column( @@ -209,8 +211,8 @@ impl Binder { // ); let expr = ScalarExpression::ColumnRef { column_ref_id, - primary_key: col.is_primary(), - desc: col.desc().clone(), + primary_key: col.desc.is_primary(), + desc: col.desc.clone(), }; exprs.push(expr); } diff --git a/src/binder/statement/create_table.rs b/src/binder/statement/create_table.rs new file mode 100644 index 00000000..d2767e72 --- /dev/null +++ b/src/binder/statement/create_table.rs @@ -0,0 +1,121 @@ +use crate::binder::{split_name, BindError, Binder}; +use crate::catalog::ColumnDesc; +use crate::parser::{ColumnDef, ColumnOption, Statement}; +use crate::types::{DataType, DatabaseIdT, SchemaIdT}; +use std::collections::HashSet; + +/// A bound `CREATE TABLE` statement. +#[derive(Debug, PartialEq, Clone)] +pub struct BoundCreateTable { + pub database_id: DatabaseIdT, + pub schema_id: SchemaIdT, + pub table_name: String, + pub columns: Vec<(String, ColumnDesc)>, +} + +impl Binder { + pub fn bind_create_table(&mut self, stmt: &Statement) -> Result { + match stmt { + Statement::CreateTable { name, columns, .. } => { + let (database_name, schema_name, table_name) = split_name(name)?; + + let db = self + .catalog + .get_database_by_name(database_name) + .ok_or_else(|| BindError::InvalidDatabase(database_name.into()))?; + + let schema = db + .get_schema_by_name(schema_name) + .ok_or_else(|| BindError::SchemaNotFound(schema_name.into()))?; + if schema.get_table_by_name(table_name).is_some() { + return Err(BindError::DuplicatedTable(table_name.into())); + } + + // check duplicated column names + let mut set = HashSet::new(); + for col in columns.iter() { + if !set.insert(col.name.value.clone()) { + return Err(BindError::DuplicatedColumn(col.name.value.clone())); + } + } + let columns = columns + .iter() + .map(|col| (col.name.value.clone(), ColumnDesc::from(col))) + .collect(); + Ok(BoundCreateTable { + database_id: db.id(), + schema_id: schema.id(), + table_name: table_name.into(), + columns, + }) + } + _ => panic!("mismatched statement type"), + } + } +} + +impl From<&ColumnDef> for ColumnDesc { + fn from(cdf: &ColumnDef) -> Self { + let mut is_nullable = true; + let mut is_primary = false; + for opt in cdf.options.iter() { + match opt.option { + ColumnOption::Null => is_nullable = true, + ColumnOption::NotNull => is_nullable = false, + ColumnOption::Unique { is_primary: v } => is_primary = v, + _ => todo!("column options"), + } + } + ColumnDesc::new( + DataType::new(cdf.data_type.clone(), is_nullable), + is_primary, + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::catalog::RootCatalog; + use crate::parser; + use crate::parser::SQLParser; + use crate::types::{DataTypeExt, DataTypeKind}; + use std::sync::Arc; + + #[test] + fn bind_create_table() { + let catalog = Arc::new(RootCatalog::new()); + let mut binder = Binder::new(catalog.clone()); + let sql = " + create table t1 (v1 int not null, v2 int); + create table t2 (a int not null, a int not null); + create table t3 (v1 int not null);"; + let stmts = parser::RSParser::parse_sql(sql).unwrap(); + + assert_eq!( + binder.bind_create_table(&stmts[0]), + Ok(BoundCreateTable { + database_id: 0, + schema_id: 0, + table_name: "t1".into(), + columns: vec![ + ("v1".into(), DataTypeKind::Int(None).not_null().to_column()), + ("v2".into(), DataTypeKind::Int(None).nullable().to_column()), + ], + }) + ); + + assert_eq!( + binder.bind_create_table(&stmts[1]), + Err(BindError::DuplicatedColumn("a".into())) + ); + + let database = catalog.get_database_by_id(0).unwrap(); + let schema = database.get_schema_by_id(0).unwrap(); + schema.add_table("t3".into(), vec![], false).unwrap(); + assert_eq!( + binder.bind_create_table(&stmts[2]), + Err(BindError::DuplicatedTable("t3".into())) + ); + } +} diff --git a/src/binder/statement/mod.rs b/src/binder/statement/mod.rs new file mode 100644 index 00000000..2f949013 --- /dev/null +++ b/src/binder/statement/mod.rs @@ -0,0 +1,5 @@ +mod create_table; +mod select; + +pub use self::create_table::*; +pub use self::select::*; diff --git a/src/binder/statement/select.rs b/src/binder/statement/select.rs new file mode 100644 index 00000000..5d879f41 --- /dev/null +++ b/src/binder/statement/select.rs @@ -0,0 +1,7 @@ +use crate::parser::Value; + +/// A bound `SELECT` statement. +#[derive(Debug, PartialEq, Clone)] +pub struct BoundSelect { + pub values: Vec, +} diff --git a/src/catalog/column.rs b/src/catalog/column.rs index cb4388ce..7e6aab37 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -1,34 +1,24 @@ -use crate::types::{ColumnIdT, DataType}; +use crate::types::{ColumnId, DataType, IdGenerator}; +use sqlparser::ast::{ColumnDef, ColumnOption}; -/// The descriptor of a column. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ColumnDesc { - column_datatype: DataType, - is_primary: bool, +#[derive(Clone)] +pub struct Column { + pub id: ColumnId, + pub name: String, + pub desc: ColumnDesc, } -impl ColumnDesc { - pub(crate) const fn new(column_datatype: DataType, is_primary: bool) -> ColumnDesc { - ColumnDesc { - column_datatype, - is_primary, +impl Column { + pub(crate) fn new(column_name: String, column_desc: ColumnDesc) -> Column { + Column { + id: IdGenerator::build(), + name: column_name, + desc: column_desc, } } - pub(crate) fn is_primary(&self) -> bool { - self.is_primary - } - - pub(crate) fn set_primary(&mut self, is_primary: bool) { - self.is_primary = is_primary; - } - - pub(crate) fn is_nullable(&self) -> bool { - self.column_datatype.is_nullable() - } - - pub(crate) fn get_datatype(&self) -> DataType { - self.column_datatype.clone() + pub(crate) fn datatype(&self) -> &DataType { + &self.desc.column_datatype } } @@ -43,56 +33,53 @@ impl DataType { } } -/// Column catalog +/// The descriptor of a column. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct Column { - id: ColumnIdT, - name: String, - desc: ColumnDesc, +pub struct ColumnDesc { + column_datatype: DataType, + is_primary: bool, } -impl Column { - pub(crate) fn new( - column_id: ColumnIdT, - column_name: String, - column_desc: ColumnDesc, - ) -> Column { - Column { - id: column_id, - name: column_name, - desc: column_desc, +impl ColumnDesc { + pub(crate) const fn new(column_datatype: DataType, is_primary: bool) -> ColumnDesc { + ColumnDesc { + column_datatype, + is_primary, } } - pub(crate) fn id(&self) -> ColumnIdT { - self.id - } - pub fn set_id(&mut self, column_id: ColumnIdT) { - self.id = column_id - } - - pub(crate) fn name(&self) -> &str { - &self.name - } - - pub fn desc(&self) -> &ColumnDesc { - &self.desc - } - - pub fn datatype(&self) -> DataType { - self.desc.column_datatype.clone() + pub(crate) fn is_primary(&self) -> bool { + self.is_primary } - pub(crate) fn set_primary(&mut self, is_primary: bool) { - self.desc.set_primary(is_primary) + pub(crate) fn is_nullable(&self) -> bool { + self.column_datatype.is_nullable() } - pub(crate) fn is_primary(&self) -> bool { - self.desc.is_primary() + pub(crate) fn get_datatype(&self) -> DataType { + self.column_datatype.clone() } +} - pub(crate) fn is_nullable(&self) -> bool { - self.desc.is_nullable() +impl From<&ColumnDef> for Column { + fn from(cdef: &ColumnDef) -> Self { + let mut is_nullable = true; + let mut is_primary_ = false; + for opt in &cdef.options { + match opt.option { + ColumnOption::Null => is_nullable = true, + ColumnOption::NotNull => is_nullable = false, + ColumnOption::Unique { is_primary } => is_primary_ = is_primary, + _ => todo!("column options"), + } + } + Column::new( + cdef.name.value.clone(), + ColumnDesc::new( + DataType::new(cdef.data_type.clone(), is_nullable), + is_primary_, + ), + ) } } @@ -104,16 +91,12 @@ mod tests { #[test] fn test_column_catalog() { let mut col_catalog = Column::new( - 0, "test".to_string(), DataTypeKind::Int(None).not_null().to_column(), ); - assert_eq!(col_catalog.id(), 0); - assert_eq!(col_catalog.is_primary(), false); - assert_eq!(col_catalog.is_nullable(), false); - assert_eq!(col_catalog.name(), "test"); - col_catalog.set_primary(true); - assert_eq!(col_catalog.is_primary(), true); + assert_eq!(col_catalog.desc.is_primary(), false); + assert_eq!(col_catalog.desc.is_nullable(), false); + assert_eq!(col_catalog.name, "test"); } } diff --git a/src/catalog/database.rs b/src/catalog/database.rs index a519847f..f6e8dfbc 100644 --- a/src/catalog/database.rs +++ b/src/catalog/database.rs @@ -1,10 +1,10 @@ -use crate::catalog::{CatalogError, Schema, SchemaCatalogRef, DEFAULT_SCHEMA_NAME}; +use crate::catalog::{CatalogError, SchemaCatalog, SchemaCatalogRef, DEFAULT_SCHEMA_NAME}; use crate::types::{DatabaseIdT, SchemaIdT}; use parking_lot::Mutex; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -pub(crate) struct Database { +pub(crate) struct DatabaseCatalog { database_id: DatabaseIdT, inner: Mutex, } @@ -14,13 +14,13 @@ struct Inner { /// schema_name -> schema_id schema_idxs: HashMap, /// schema_id -> schema_catalog - schemas: BTreeMap>, + schemas: BTreeMap>, next_schema_id: SchemaIdT, } -impl Database { +impl DatabaseCatalog { pub(crate) fn new(database_id: DatabaseIdT, database_name: String) -> Self { - let db_catalog = Database { + let db_catalog = DatabaseCatalog { database_id, inner: Mutex::new(Inner { database_name, @@ -40,7 +40,7 @@ impl Database { } let schema_id = inner.next_schema_id; inner.next_schema_id += 1; - let schema_catalog = Arc::new(Schema::new(schema_id, schema_name.clone())); + let schema_catalog = Arc::new(SchemaCatalog::new(schema_id, schema_name.clone())); inner.schema_idxs.insert(schema_name, schema_id); inner.schemas.insert(schema_id, schema_catalog); Ok(schema_id) @@ -66,12 +66,12 @@ impl Database { inner.schema_idxs.get(name).cloned() } - pub(crate) fn get_schema_by_id(&self, schema_id: SchemaIdT) -> Option> { + pub(crate) fn get_schema_by_id(&self, schema_id: SchemaIdT) -> Option> { let inner = self.inner.lock(); inner.schemas.get(&schema_id).cloned() } - pub(crate) fn get_schema_by_name(&self, name: &str) -> Option> { + pub(crate) fn get_schema_by_name(&self, name: &str) -> Option> { let inner = self.inner.lock(); inner .schema_idxs @@ -92,26 +92,26 @@ impl Database { #[cfg(test)] mod test { - use crate::catalog::{Column, Database, Schema, Table}; + use crate::catalog::{ColumnCatalog, DatabaseCatalog, SchemaCatalog, TableCatalog}; use crate::types::{DataTypeExt, DataTypeKind}; #[test] fn test_database_catalog() { - let col0 = Column::new( + let col0 = ColumnCatalog::new( 0, "a".to_string(), DataTypeKind::Int(Some(32)).not_null().to_column(), ); - let col1 = Column::new( + let col1 = ColumnCatalog::new( 1, "b".to_string(), DataTypeKind::Boolean.not_null().to_column(), ); let col_catalogs = vec![col0, col1]; - let mut _schema_catalog = Schema::new(0, "test_scheme".to_string()); - let _table_catalog = Table::new(0, "test_table".to_string(), col_catalogs, false); + let mut _schema_catalog = SchemaCatalog::new(0, "test_scheme".to_string()); + let _table_catalog = TableCatalog::new(0, "test_table".to_string(), col_catalogs, false); - let database_catalog = Database::new(0, "test_database".to_string()); + let database_catalog = DatabaseCatalog::new(0, "test_database".to_string()); let schema_id = database_catalog.add_schema("test_schema".into()).unwrap(); assert_eq!(schema_id, 1); diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 6f671ffc..b1d7443a 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -1,69 +1,33 @@ // Module: catalog pub(crate) use self::column::*; -pub(crate) use self::database::*; pub(crate) use self::root::*; -pub(crate) use self::schema::*; pub(crate) use self::table::*; -use crate::types::DatabaseIdT; -use crate::types::{ColumnIdT, SchemaIdT, TableIdT}; +use crate::types::{ColumnId, TableId}; use std::sync::Arc; -pub(crate) type ColumnCatalogRef = Arc; -pub(crate) type TableCatalogRef = Arc; -pub(crate) type SchemaCatalogRef = Arc; -pub(crate) type DatabaseCatalogRef = Arc; -pub(crate) type RootCatalogRef = Arc; /// The type of catalog reference. -pub type CatalogRef = Arc; +pub type CatalogRef = Arc; +pub(crate) type TableRef = Arc
; +pub(crate) type ColumnRef = Arc; pub(crate) static DEFAULT_DATABASE_NAME: &str = "kipsql"; pub(crate) static DEFAULT_SCHEMA_NAME: &str = "kipsql"; mod column; -mod database; mod root; -mod schema; mod table; -/// The reference ID of a table. -#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)] -pub struct TableRefId { - pub database_id: DatabaseIdT, - pub schema_id: SchemaIdT, - pub table_id: TableIdT, -} - -impl TableRefId { - pub const fn new(database_id: DatabaseIdT, schema_id: SchemaIdT, table_id: TableIdT) -> Self { - TableRefId { - schema_id, - table_id, - database_id, - } - } -} - /// The reference ID of a column. #[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)] pub struct ColumnRefId { - pub schema_id: SchemaIdT, - pub table_id: TableIdT, - pub column_id: ColumnIdT, + pub table_id: TableId, + pub column_id: ColumnId, } impl ColumnRefId { - pub const fn from_table(table: TableRefId, column_id: ColumnIdT) -> Self { - ColumnRefId { - schema_id: table.schema_id, - table_id: table.table_id, - column_id, - } - } - - pub const fn new(schema_id: SchemaIdT, table_id: TableIdT, column_id: ColumnIdT) -> Self { + pub const fn from_table(table_id: TableId, column_id: ColumnId) -> Self { ColumnRefId { - schema_id, table_id, column_id, } diff --git a/src/catalog/root.rs b/src/catalog/root.rs index 9f2abedc..7e534721 100644 --- a/src/catalog/root.rs +++ b/src/catalog/root.rs @@ -1,147 +1,80 @@ -use crate::catalog::{CatalogError, Database, DatabaseCatalogRef, DEFAULT_DATABASE_NAME}; -use crate::types::DatabaseIdT; -use parking_lot::Mutex; -use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; +use crate::catalog::{CatalogError, Column, Table}; +use crate::types::TableId; +use std::collections::BTreeMap; -use super::{Table, TableRefId}; - -pub struct RootCatalog { - inner: Mutex, -} - -#[derive(Default)] -pub struct Inner { - /// Database name to database id mapping - database_idxs: HashMap, - /// Database id to database catalog mapping - databases: BTreeMap>, - next_database_id: DatabaseIdT, +pub struct Root { + table_idxs: BTreeMap, + tables: BTreeMap, } -impl Default for RootCatalog { - fn default() -> Self { - Self::new() - } -} - -impl RootCatalog { - pub(crate) fn new() -> RootCatalog { - let root_catalog = RootCatalog { - inner: Mutex::new(Inner::default()), - }; - let _ = root_catalog - .add_database(DEFAULT_DATABASE_NAME.into()) - .is_ok(); - root_catalog - } - - pub(crate) fn add_database(&self, database_name: String) -> Result { - let mut inner = self.inner.lock(); - if inner.database_idxs.contains_key(&database_name) { - return Err(CatalogError::Duplicated("database", database_name)); +impl Root { + #[allow(dead_code)] + pub fn new() -> Self { + Root { + table_idxs: Default::default(), + tables: Default::default(), } - let database_id = inner.next_database_id; - inner.next_database_id += 1; - let database_catalog = Arc::new(Database::new(database_id, database_name.clone())); - inner.database_idxs.insert(database_name, database_id); - inner.databases.insert(database_id, database_catalog); - Ok(database_id) - } - - pub(crate) fn delete_database(&mut self, database_name: &str) -> Result<(), CatalogError> { - let mut inner = self.inner.lock(); - let id = inner - .database_idxs - .remove(database_name) - .ok_or_else(|| CatalogError::NotFound("database", database_name.into()))?; - inner.databases.remove(&id); - Ok(()) - } - - pub(crate) fn get_all_databases(&self) -> BTreeMap { - let inner = self.inner.lock(); - inner.databases.clone() } - pub fn get_database_id_by_name(&self, name: &str) -> Option { - let inner = self.inner.lock(); - inner.database_idxs.get(name).cloned() + pub(crate) fn get_table_id_by_name(&self, name: &str) -> Option { + self.table_idxs.get(name).cloned() } - pub(crate) fn get_database_by_id(&self, database_id: DatabaseIdT) -> Option> { - let inner = self.inner.lock(); - inner.databases.get(&database_id).cloned() + pub(crate) fn get_table(&self, table_id: TableId) -> Option<&Table> { + self.tables.get(&table_id) } - pub(crate) fn get_database_by_name(&self, name: &str) -> Option> { - let inner = self.inner.lock(); - inner - .database_idxs - .get(name) - .and_then(|id| inner.databases.get(id)) - .cloned() + pub(crate) fn get_table_by_name(&self, name: &str) -> Option<&Table> { + let id = self.table_idxs.get(name)?; + self.tables.get(id) } - pub fn get_table(&self, table_ref_id: &TableRefId) -> Option> { - let db = self.get_database_by_id(table_ref_id.database_id)?; - let schema = db.get_schema_by_id(table_ref_id.schema_id)?; - schema.get_table_by_id(table_ref_id.table_id) - } + pub(crate) fn add_table( + &mut self, + table_name: String, + columns: Vec, + ) -> Result { + if self.table_idxs.contains_key(&table_name) { + return Err(CatalogError::Duplicated("column", table_name)); + } + let table = Table::new(table_name.to_owned(), columns)?; + let table_id = table.id; - pub fn get_table_id_by_name( - &self, - database_name: &str, - schema_name: &str, - table_name: &str, - ) -> Option { - let db = self.get_database_by_name(database_name)?; - let schema = db.get_schema_by_name(schema_name)?; - let table = schema.get_table_by_name(table_name)?; + self.table_idxs.insert(table_name, table_id); + self.tables.insert(table_id, table); - Some(TableRefId { - schema_id: schema.id(), - table_id: table.id(), - database_id: db.id(), - }) + Ok(table_id) } } #[cfg(test)] mod tests { use super::*; - use crate::catalog::{Column, DEFAULT_SCHEMA_NAME}; + use crate::catalog::Column; use crate::types::{DataTypeExt, DataTypeKind}; #[test] fn test_root_catalog() { - let root_catalog = RootCatalog::new(); - let database_id = root_catalog - .get_database_id_by_name(DEFAULT_DATABASE_NAME) - .unwrap(); - let database_catalog = root_catalog.get_database_by_id(database_id).unwrap(); - let schema_catalog = database_catalog - .get_schema_by_name(DEFAULT_SCHEMA_NAME) - .unwrap(); + let mut root_catalog = Root::new(); let col0 = Column::new( - 0, "a".to_string(), DataTypeKind::Int(None).not_null().to_column(), ); let col1 = Column::new( - 1, "b".to_string(), DataTypeKind::Boolean.not_null().to_column(), ); let col_catalogs = vec![col0, col1]; - let table_id = schema_catalog - .add_table("test_table".into(), col_catalogs, false) + let table_id_1 = root_catalog + .add_table("test_table_1".into(), col_catalogs.clone()) + .unwrap(); + + let table_id_2 = root_catalog + .add_table("test_table_2".into(), col_catalogs) .unwrap(); - assert_eq!(table_id, 0); - assert_eq!(database_catalog.name(), DEFAULT_DATABASE_NAME); - assert_eq!(schema_catalog.name(), DEFAULT_SCHEMA_NAME); + assert_ne!(table_id_1, table_id_2); } } diff --git a/src/catalog/schema.rs b/src/catalog/schema.rs index 0cad15b3..4b6dba2c 100644 --- a/src/catalog/schema.rs +++ b/src/catalog/schema.rs @@ -1,10 +1,10 @@ -use crate::catalog::{CatalogError, Column, Table}; +use crate::catalog::{CatalogError, ColumnCatalog, TableCatalog}; use crate::types::{SchemaIdT, TableIdT}; use parking_lot::Mutex; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -pub(crate) struct Schema { +pub(crate) struct SchemaCatalog { schema_id: SchemaIdT, inner: Mutex, } @@ -12,13 +12,13 @@ pub(crate) struct Schema { struct Inner { schema_name: String, table_idxs: HashMap, - tables: BTreeMap>, + tables: BTreeMap>, next_table_id: TableIdT, } -impl Schema { - pub(crate) fn new(schema_id: SchemaIdT, schema_name: String) -> Schema { - Schema { +impl SchemaCatalog { + pub(crate) fn new(schema_id: SchemaIdT, schema_name: String) -> SchemaCatalog { + SchemaCatalog { schema_id, inner: Mutex::new(Inner { schema_name, @@ -32,7 +32,7 @@ impl Schema { pub(crate) fn add_table( &self, table_name: String, - columns: Vec, + columns: Vec, is_materialized_view: bool, ) -> Result { let mut inner = self.inner.lock(); @@ -41,7 +41,7 @@ impl Schema { } let table_id = inner.next_table_id; inner.next_table_id += 1; - let table_catalog = Arc::new(Table::new( + let table_catalog = Arc::new(TableCatalog::new( table_id, table_name.clone(), columns, @@ -63,7 +63,7 @@ impl Schema { Ok(()) } - pub(crate) fn get_all_tables(&self) -> BTreeMap> { + pub(crate) fn get_all_tables(&self) -> BTreeMap> { let inner = self.inner.lock(); inner.tables.clone() } @@ -73,12 +73,12 @@ impl Schema { inner.table_idxs.get(name).cloned() } - pub(crate) fn get_table_by_id(&self, table_id: TableIdT) -> Option> { + pub(crate) fn get_table_by_id(&self, table_id: TableIdT) -> Option> { let inner = self.inner.lock(); inner.tables.get(&table_id).cloned() } - pub(crate) fn get_table_by_name(&self, name: &str) -> Option> { + pub(crate) fn get_table_by_name(&self, name: &str) -> Option> { let inner = self.inner.lock(); inner .table_idxs @@ -104,14 +104,14 @@ mod tests { #[test] fn test_schema_catalog() { - let col0 = Column::new( + let col0 = ColumnCatalog::new( 0, "a".into(), DataTypeKind::Int(None).not_null().to_column(), ); - let col1 = Column::new(1, "b".into(), DataTypeKind::Boolean.not_null().to_column()); + let col1 = ColumnCatalog::new(1, "b".into(), DataTypeKind::Boolean.not_null().to_column()); let col_catalogs = vec![col0, col1]; - let mut schema_catalog = Schema::new(0, "test_scheme".to_string()); + let mut schema_catalog = SchemaCatalog::new(0, "test_scheme".to_string()); let table_id = schema_catalog .add_table("test_table".to_string(), col_catalogs, false) .unwrap(); diff --git a/src/catalog/table.rs b/src/catalog/table.rs index 546bfa46..d3f0b829 100644 --- a/src/catalog/table.rs +++ b/src/catalog/table.rs @@ -1,114 +1,67 @@ use crate::catalog::{CatalogError, Column}; -use crate::types::{ColumnIdT, TableIdT}; -use parking_lot::Mutex; +use crate::types::{ColumnId, IdGenerator, TableId}; +use itertools::Itertools; use std::collections::{BTreeMap, HashMap}; pub struct Table { - table_id: TableIdT, - inner: Mutex, -} - -struct Inner { - name: String, + pub id: TableId, + pub name: String, /// Mapping from column names to column ids - column_idxs: HashMap, - /// Mapping from column ids to column catalogs - columns: BTreeMap, - - #[allow(dead_code)] - /// The next column id to be assigned - is_materialized_view: bool, - /// Whether the table is a materialized view - next_column_id: ColumnIdT, + column_idxs: HashMap, + columns: BTreeMap, } impl Table { - /// Create a new table catalog with the given table id and table name. - pub(crate) fn new( - table_id: TableIdT, - table_name: String, - columns: Vec, - is_materialized_view: bool, - ) -> Table { - let table_catalog = Table { - table_id, - inner: Mutex::new(Inner { - name: table_name, - column_idxs: HashMap::new(), - columns: BTreeMap::new(), - is_materialized_view, - next_column_id: 0, - }), - }; - for col_catalog in columns.into_iter() { - let _ = table_catalog.add_column(col_catalog).is_ok(); - } - table_catalog + pub(crate) fn get_column_by_id(&self, id: ColumnId) -> Option<&Column> { + self.columns.get(&id) } - /// Add a column to the table catalog. - pub(crate) fn add_column(&self, col_catalog: Column) -> Result { - let mut inner = self.inner.lock(); - - if inner.column_idxs.contains_key(col_catalog.name()) { - return Err(CatalogError::Duplicated( - "column", - col_catalog.name().into(), - )); - } - inner.next_column_id += 1; - let id = col_catalog.id(); - - inner - .column_idxs - .insert(col_catalog.name().to_owned(), col_catalog.id()); - inner.columns.insert(id, col_catalog); - Ok(id) + pub(crate) fn get_column_id_by_name(&self, name: &str) -> Option { + self.column_idxs.get(name).cloned() } - /// Check if the table catalog contains a column with the given name. pub(crate) fn contains_column(&self, name: &str) -> bool { - let inner = self.inner.lock(); - inner.column_idxs.contains_key(name) + self.column_idxs.contains_key(name) } - /// Get all columns in the table catalog. - pub fn get_all_columns(&self) -> BTreeMap { - let inner = self.inner.lock(); - inner.columns.clone() + pub(crate) fn get_all_columns(&self) -> Vec<(ColumnId, &Column)> { + self.columns + .iter() + .map(|(col_id, col)| (*col_id, col)) + .collect_vec() } - /// Get the column id of the column with the given name. - pub(crate) fn get_column_id_by_name(&self, name: &str) -> Option { - let inner = self.inner.lock(); - inner.column_idxs.get(name).cloned() - } + /// Add a column to the table catalog. + pub(crate) fn add_column(&mut self, col_catalog: Column) -> Result { + if self.column_idxs.contains_key(&col_catalog.name) { + return Err(CatalogError::Duplicated("column", col_catalog.name.into())); + } - /// Get the column catalog of the column with the given id. - pub(crate) fn get_column_by_id(&self, column_id: ColumnIdT) -> Option { - let inner = self.inner.lock(); - inner.columns.get(&column_id).cloned() - } + let col_id = col_catalog.id; - /// Get the column catalog of the column with the given name. - pub(crate) fn get_column_by_name(&self, name: &String) -> Option { - let inner = self.inner.lock(); - let column_id = inner.column_idxs.get(name)?; - inner.columns.get(column_id).cloned() - } + self.column_idxs.insert(col_catalog.name.to_owned(), col_id); + self.columns.insert(col_id, col_catalog); - /// Get the table id of the table. - pub(crate) fn id(&self) -> TableIdT { - self.table_id + Ok(col_id) } - /// Get the table name of the table. - pub(crate) fn name(&self) -> String { - let inner = self.inner.lock(); - inner.name.clone() + pub(crate) fn new(table_name: String, columns: Vec) -> Result { + let mut table_catalog = Table { + id: IdGenerator::build(), + name: table_name, + column_idxs: HashMap::new(), + columns: BTreeMap::new(), + }; + + for col_catalog in columns.into_iter() { + let _ = table_catalog.add_column(col_catalog)?; + } + + Ok(table_catalog) } } +#[cfg(test)] mod tests { use super::*; use crate::types::{DataType, DataTypeExt, DataTypeKind}; @@ -119,34 +72,31 @@ mod tests { // | 1 | true | // | 2 | false | fn test_table_catalog() { - let col0 = Column::new( - 0, - "a".into(), - DataTypeKind::Int(None).not_null().to_column(), - ); - let col1 = Column::new(1, "b".into(), DataTypeKind::Boolean.not_null().to_column()); + let col0 = Column::new("a".into(), DataTypeKind::Int(None).not_null().to_column()); + let col1 = Column::new("b".into(), DataTypeKind::Boolean.not_null().to_column()); let col_catalogs = vec![col0, col1]; - let table_catalog = Table::new(0, "test".to_string(), col_catalogs, false); + let table_catalog = Table::new("test".to_string(), col_catalogs).unwrap(); assert_eq!(table_catalog.contains_column("a"), true); assert_eq!(table_catalog.contains_column("b"), true); assert_eq!(table_catalog.contains_column("c"), false); - assert_eq!(table_catalog.get_column_id_by_name("a"), Some(0)); - assert_eq!(table_catalog.get_column_id_by_name("b"), Some(1)); + let col_a_id = table_catalog.get_column_id_by_name("a").unwrap(); + let col_b_id = table_catalog.get_column_id_by_name("b").unwrap(); + assert!(col_a_id < col_b_id); - let column_catalog = table_catalog.get_column_by_id(0).unwrap(); - assert_eq!(column_catalog.name(), "a"); + let column_catalog = table_catalog.get_column_by_id(col_a_id).unwrap(); + assert_eq!(column_catalog.name, "a"); assert_eq!( column_catalog.datatype(), - DataType::new(DataTypeKind::Int(None), false) + &DataType::new(DataTypeKind::Int(None), false) ); - let column_catalog = table_catalog.get_column_by_id(1).unwrap(); - assert_eq!(column_catalog.name(), "b"); + let column_catalog = table_catalog.get_column_by_id(col_b_id).unwrap(); + assert_eq!(column_catalog.name, "b"); assert_eq!( column_catalog.datatype(), - DataType::new(DataTypeKind::Boolean, false) + &DataType::new(DataTypeKind::Boolean, false) ); } } diff --git a/src/execution/executor.rs b/src/execution/executor.rs new file mode 100644 index 00000000..81d4d958 --- /dev/null +++ b/src/execution/executor.rs @@ -0,0 +1,104 @@ +use std::{collections::VecDeque, sync::Arc}; + +use anyhow::Result; + +use super::executor_graph::ExecutorGraph; +use super::executor_task::ExecutionTask; +use super::parallel::pipeline_builder::PipelineBuilder; +use super::physical::PhysicalOperatorRef; +use super::runtime::thread::Thread; +use super::runtime::thread::ThreadJoinHandle; +use super::runtime::ExecutionRuntime; +use super::runtime::TrySpawn; + +pub struct ExecutionQueue(pub VecDeque); + +impl ExecutionQueue { + pub fn push_task(&mut self, task: ExecutionTask) { + self.0.push_back(task); + } +} + +pub struct ExecutionContext { + pub global_execution_queue: ExecutionQueue, +} + +impl ExecutionContext { + pub fn get_global_execution_queue(&mut self) -> &mut ExecutionQueue { + &mut self.global_execution_queue + } +} + +pub struct Executor { + pub context: ExecutionContext, + pub graph: ExecutorGraph, + pub threads_num: usize, + pub runtime: Arc, +} + +impl Executor { + pub async fn initialize(plan: PhysicalOperatorRef) -> Result { + let mut builder = PipelineBuilder::new(); + let meta_pipeline = builder.finalize(plan)?; + + let graph = ExecutorGraph::from_pipeline(meta_pipeline)?; + + Ok(Executor { + context: ExecutionContext { + global_execution_queue: ExecutionQueue(VecDeque::new()), + }, + graph, + threads_num: 0, + runtime: Arc::new(ExecutionRuntime::with_default_worker_threads()?), + }) + } + + pub async fn execute(&mut self) -> Result<()> { + // Schedule the pipelines that do not have dependencies. + self.graph + .init_schedule_event_queue(&mut self.context.global_execution_queue) + .await; + + // Pull execution task from `GlobalExecutionQueue`. + self.runtime.spawn(ExecutionTask::empty()); + Ok(()) + } + + fn execute_threads(self: &Arc) -> Vec>> { + let mut thread_join_handles = Vec::with_capacity(self.threads_num); + for thread_num in 0..self.threads_num { + let this = self.clone(); + #[allow(unused_mut)] + let mut name = Some(format!("PipelineTask-{}", thread_num)); + + thread_join_handles.push(Thread::named_spawn(name, move || unsafe { + // let this_clone = this.clone(); + + // let try_result = catch_unwind(move || -> Result<()> { + // match this_clone.execute_single_thread(thread_num) { + // Ok(_) => Ok(()), + // Err(cause) => Err(cause), + // } + // }); + + // // finish the pipeline executor when has error or panic + // if let Err(cause) = try_result.flatten() { + // // this.finish(Some(cause)); + // } + + Ok(()) + })); + } + thread_join_handles + } + + unsafe fn execute_single_thread(&self, thread_num: usize) -> Result<()> { + Ok(()) + } + + fn complete_pipeline(&self) {} + + fn finish_task(&self) -> Result<()> { + Ok(()) + } +} diff --git a/src/execution/executor_graph.rs b/src/execution/executor_graph.rs new file mode 100644 index 00000000..577e656f --- /dev/null +++ b/src/execution/executor_graph.rs @@ -0,0 +1,250 @@ +use std::{collections::HashMap, sync::Arc}; + +use petgraph::{ + stable_graph::{NodeIndex, StableGraph}, + visit::EdgeRef, + Direction, +}; + +use super::{ + executor::ExecutionQueue, + parallel::{ + meta_pipeline::MetaPipeline, + pipeline::Pipeline, + pipeline_event::{PipelineEvent, PipelineEventStack}, + }, + physical::PhysicalOperator, +}; +use anyhow::Result; + +pub struct ExecutorGraph { + graph: StableGraph, ()>, +} + +impl ExecutorGraph { + pub fn from_pipeline(mut meta_pipeline: MetaPipeline) -> Result { + let mut graph: StableGraph, ()> = StableGraph::new(); + Self::init_graph(&mut meta_pipeline, &mut graph)?; + Ok(ExecutorGraph { graph }) + } + + fn init_graph( + meta_pipeline: &mut MetaPipeline, + graph: &mut StableGraph, ()>, + ) -> Result<()> { + // Get the all pipelines. + let all_pipelines = meta_pipeline.get_pipelines(true)?; + + let to_schedue = meta_pipeline.get_meta_pipelines(true, true)?; + + // Value -> initialize_event_id, running_event_id, finish_event_id, + // complete_event_id. + let mut pipeline_graph_mapping: HashMap = HashMap::new(); + + for meta_pipe in to_schedue { + // let mut pipes_edges: Vec> = Vec::new(); + let base_pipeline = Arc::new(meta_pipe.get_base_pipeline()?); + let base_initialize_event = Arc::new(PipelineEvent::create_initialize_event( + base_pipeline.clone(), + )); + let base_running_event = + Arc::new(PipelineEvent::create_running_event(base_pipeline.clone())); + let base_finish_event = + Arc::new(PipelineEvent::create_finish_event(base_pipeline.clone())); + let base_complete_event = Arc::new(PipelineEvent::create_complete_event( + true, + base_pipeline.clone(), + )); + + let base_init_event_node_id = graph.add_node(base_initialize_event); + let base_running_event_node_id = graph.add_node(base_running_event); + let base_finish_event_node_id = graph.add_node(base_finish_event); + let base_complete_event_node_id = graph.add_node(base_complete_event); + + // Add base stack. + pipeline_graph_mapping.insert( + base_pipeline.get_pipeline_id(), + PipelineEventStack { + pipeline_initialize_event: base_init_event_node_id, + pipeline_event: base_running_event_node_id, + pipeline_finish_event: base_finish_event_node_id, + pipeline_complete_event: base_complete_event_node_id, + }, + ); + + // Dependencies: initialize -> running -> finish -> complete. + graph.add_edge(base_init_event_node_id, base_running_event_node_id, ()); + graph.add_edge(base_running_event_node_id, base_finish_event_node_id, ()); + graph.add_edge(base_finish_event_node_id, base_complete_event_node_id, ()); + + let pipelines = meta_pipe.get_pipelines(false)?; + for idx in 1..pipelines.len() { + let pipeline = &pipelines[idx]; + + let pipeline_running_event = Arc::new(PipelineEvent::create_running_event( + Arc::from(pipeline.clone()), + )); + + let running_event_node_id = graph.add_node(pipeline_running_event); + + match meta_pipe.get_finish_group(pipeline.get_pipeline_id()) { + Some(finish_group) => { + let mapping_stack = pipeline_graph_mapping.get(finish_group).unwrap(); + let stack = PipelineEventStack { + pipeline_initialize_event: base_init_event_node_id, + pipeline_event: base_running_event_node_id, + pipeline_finish_event: mapping_stack.pipeline_finish_event, + pipeline_complete_event: base_complete_event_node_id, + }; + + // Dependencies: base_finish -> pipeline_event -> group_finish + graph.add_edge(base_running_event_node_id, running_event_node_id, ()); + graph.add_edge( + running_event_node_id, + mapping_stack.pipeline_finish_event, + (), + ); + + pipeline_graph_mapping.insert(pipeline.get_pipeline_id(), stack); + } + None => match meta_pipe.has_finish_event(pipeline.get_pipeline_id()) { + true => { + // Dependencies: base_finish -> pipeline_event -> + // pipeline_finish -> base_complete + let pipeline_finish_event = Arc::new( + PipelineEvent::create_finish_event(Arc::from(pipeline.clone())), + ); + + let finish_event_node_id = graph.add_node(pipeline_finish_event); + + graph.add_edge(base_finish_event_node_id, running_event_node_id, ()); + graph.add_edge(running_event_node_id, finish_event_node_id, ()); + + pipeline_graph_mapping.insert( + pipeline.get_pipeline_id(), + PipelineEventStack { + pipeline_initialize_event: base_init_event_node_id, + pipeline_event: running_event_node_id, + pipeline_finish_event: finish_event_node_id, + pipeline_complete_event: base_complete_event_node_id, + }, + ); + } + false => { + // Dependencies: base_initialize -> pipeline_event -> base_finish. + graph.add_edge(base_init_event_node_id, running_event_node_id, ()); + + pipeline_graph_mapping.insert( + pipeline.get_pipeline_id(), + PipelineEventStack { + pipeline_initialize_event: base_init_event_node_id, + pipeline_event: running_event_node_id, + pipeline_finish_event: base_finish_event_node_id, + pipeline_complete_event: base_complete_event_node_id, + }, + ); + } + }, + } + } + + // Set up the dependencies within this `MetaPipeline`. + for pipeline in pipelines.iter() { + if let Some(source) = pipeline.get_source() { + // if (source->type == + // PhysicalOperatorType::TABLE_SCAN) { // + // we have to reset the source here (in the main thread), + // because some of our clients (looking at you, R) + // // do not like it when threads other than the main thread + // call into R, for e.g., arrow scans + // pipeline->ResetSource(true); + // } + } + + match meta_pipe.find_dependencies(pipeline.get_pipeline_id())? { + Some(dep_pipes) => dep_pipes.iter().for_each(|dep_pipe| { + if let Some(dependency_stack) = pipeline_graph_mapping.get(dep_pipe) { + let stack = pipeline_graph_mapping + .get(&pipeline.get_pipeline_id()) + .unwrap(); + graph.add_edge( + dependency_stack.pipeline_event, + stack.pipeline_event, + (), + ); + } + }), + None => continue, + } + } + } + + // Set up the dependencies across `MetaPipeline`. + for pipe in all_pipelines.iter() { + for pipe_ix in pipe.get_dependencies().iter() { + let from = pipeline_graph_mapping + .get(pipe_ix) + .unwrap() + .pipeline_complete_event; + let to = pipeline_graph_mapping + .get(&pipe.get_pipeline_id()) + .unwrap() + .pipeline_event; + if !graph.contains_edge(from, to) { + graph.add_edge(from, to, ()); + } + } + } + Ok(()) + } + + pub async fn init_schedule_event_queue(&self, global_execution_queue: &mut ExecutionQueue) { + // Schedule source node. + for node_idx in self.graph.externals(Direction::Outgoing) { + let node = self.get_node_by_index(node_idx); + // todo: fix deref pointer. + let pipeline_event = + unsafe { &mut *(node.as_ref() as *const PipelineEvent as *mut PipelineEvent) }; + pipeline_event.schedule(global_execution_queue).await; + } + } + + fn get_node_by_index(&self, index: NodeIndex) -> Arc { + self.graph[index].clone() + } + + fn get_prev_nodes(&self, index: NodeIndex) -> Vec> { + let mut prev_nodes = vec![]; + for edge in self.graph.edges_directed(index, Direction::Incoming) { + prev_nodes.push(self.graph[edge.source()].clone()); + } + prev_nodes + } + + fn get_next_nodes(&self, index: NodeIndex) -> Vec> { + let mut next_nodes = vec![]; + for edge in self.graph.edges_directed(index, Direction::Outgoing) { + next_nodes.push(self.graph[edge.target()].clone()); + } + next_nodes + } + + fn get_all_nodes(&self) -> Vec> { + self.graph.node_weights().cloned().collect() + } + + fn get_last_node(&self) -> Arc { + let mut last_node = None; + for node in self.graph.node_indices() { + if self + .graph + .edges_directed(node, petgraph::Direction::Outgoing) + .count() + == 0 + { + last_node = Some(self.graph[node].clone()); + } + } + last_node.unwrap() + } +} diff --git a/src/execution/executor_task.rs b/src/execution/executor_task.rs new file mode 100644 index 00000000..4a97abcf --- /dev/null +++ b/src/execution/executor_task.rs @@ -0,0 +1,97 @@ +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use anyhow::anyhow; +use anyhow::Result; +use futures::{future::BoxFuture, Future}; + +pub enum TaskExecutionMode { + Complete, + Partial, +} + +pub enum TaskExecutionResult { + Finished, + NotFinished, + Error, + Blocked, +} + +#[async_trait::async_trait] +pub trait Task { + /// The name of task. + fn name(&self) -> String; + + /// Execute the task in the specified execution mode + /// * If mode is Complete, Execute should always finish processing and + /// return Finished + /// * If mode is Partial, Execute can return not_finished, in which case + /// Execute will be called again + /// * In case of an error, error is returned + /// * In case the task has interrupted, blocked is returned. + async fn execute(&mut self, mode: TaskExecutionMode) -> Result; +} + +pub struct ExecutionTask { + inner: BoxFuture<'static, Result<()>>, +} + +impl ExecutionTask { + // pub fn create(inner: Inner) -> ExecutionTask + // where + // Inner: Future> + Send + 'static, + // { + // ExecutionTask { + // inner: inner.boxed(), + // } + // } + + pub fn create(inner: Arc) -> ExecutionTask { + todo!() + } + + pub fn empty() -> ExecutionTask { + todo!() + } +} + +impl Future for ExecutionTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.inner.as_mut(); + let try_result = + std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || -> Poll> { + inner.poll(cx) + })); + + match try_result { + Ok(Poll::Pending) => Poll::Pending, + Ok(Poll::Ready(_res)) => { + // self.queue.completed_async_task( + // self.workers_condvar.clone(), + // CompletedAsyncTask::create(self.processor_id, self.worker_id, res), + // ); + Poll::Ready(()) + } + Err(cause) => { + let _res: Result<()> = match cause.downcast_ref::<&'static str>() { + Some(msg) => Err(anyhow!(msg.to_string())), + None => match cause.downcast_ref::() { + Some(msg) => Err(anyhow!(msg.to_string())), + None => Err(anyhow!("unknown panic message".to_string())), + }, + }; + + // self.queue.completed_async_task( + // self.workers_condvar.clone(), + // CompletedAsyncTask::create(self.processor_id, self.worker_id, res), + // ); + Poll::Ready(()) + } + } + } +} diff --git a/src/execution/mod.rs b/src/execution/mod.rs new file mode 100644 index 00000000..6b9ad375 --- /dev/null +++ b/src/execution/mod.rs @@ -0,0 +1,6 @@ +pub mod executor; +pub mod executor_graph; +pub mod executor_task; +pub mod parallel; +pub mod physical; +pub mod runtime; diff --git a/src/execution/parallel/meta_pipeline.rs b/src/execution/parallel/meta_pipeline.rs new file mode 100644 index 00000000..23f9953f --- /dev/null +++ b/src/execution/parallel/meta_pipeline.rs @@ -0,0 +1,146 @@ +use std::collections::HashMap; + +use crate::execution::executor::Executor; +use crate::execution::physical::{PhysicalOperator, PhysicalOperatorRef}; + +use super::pipeline::Pipeline; +use anyhow::Result; + +pub type PipelineIx = usize; + +/// MetaPipeline represents a set of pipelines that have the same sink. +#[derive(Clone)] +pub struct MetaPipeline { + /// The executor for all MetaPipeline in the query plan. + // pub executor: Executor, + + /// The sink of all pipelines within this MetaPipeline. + pub sink: Option, + + /// All pipelines with the different source, but the same sink. + pub pipelines: Vec, + + /// Dependencies within this MetaPipeline. + pub dependencies: HashMap>, + + /// Other MetaPipelines that this MetaPipeline depends on. + pub children: Vec, + + /// Pipelines (other than the base pipeline) that need their own + /// PipelineFinishEvent (e.g., for IEJoin) + pub finish_pipelines: Vec, + + /// Next batch index. + pub next_batch_index: u64, + + /// Mapping from pipeline(e.g., child or union) to finish pipeline. + pub finish_map: HashMap, +} + +pub const BATCH_INCREMENT: u64 = 10000000000000; + +impl MetaPipeline { + pub fn create(sink: Option) -> Self { + MetaPipeline { + // executor: todo!(), + sink, + pipelines: Vec::new(), + dependencies: HashMap::new(), + children: Vec::new(), + finish_pipelines: Vec::new(), + next_batch_index: 0, + finish_map: HashMap::new(), + } + } + + /// Returns the Executor for this MetaPipeline. + pub fn get_executor(&self) -> Result { + todo!() + } + + /// Returns the sink operator for this MetaPipeline. + pub fn get_sink(&self) -> Result { + todo!() + } + + pub fn find_dependencies(&self, pipeline_ix: PipelineIx) -> Result>> { + Ok(self.dependencies.get(&pipeline_ix).cloned()) + } + + /// Returns the initial pipeline of this MetaPipeline. + pub fn get_base_pipeline(&self) -> Result { + Ok(self.pipelines[0].clone()) + } + + /// Returns the pipelines for this MetaPipeline. + pub fn get_pipelines(&self, recursive: bool) -> Result> { + let mut pipelines = vec![]; + pipelines.extend_from_slice(&self.pipelines); + + if recursive { + for child in &self.children { + pipelines.extend_from_slice(&child.get_pipelines(recursive)?); + } + } + Ok(pipelines) + } + + /// Returns the MetaPipeline children of this MetaPipeline. + pub fn get_meta_pipelines(&self, recursive: bool, skip: bool) -> Result> { + let mut meta_pipelines = vec![]; + if !skip { + meta_pipelines.push(self.clone()); + } + + if recursive { + for child in &self.children { + child.get_meta_pipelines(true, false)?; + } + } + Ok(meta_pipelines) + } + + /// Create an empty pipeline within this MetaPipeline. + pub fn create_pipeline(&mut self, pipeline_id: PipelineIx) -> Result { + let mut pipeline = Pipeline::new(pipeline_id); + pipeline.sink = self.sink.clone(); + pipeline.base_batch_index = BATCH_INCREMENT * self.next_batch_index; + self.next_batch_index += 1; + let ix = self.pipelines.len(); + self.pipelines.push(pipeline); + Ok(ix) + } + + /// Create a union pipeline. + pub fn create_union_pipeline(&self, pipeline_id: PipelineIx) -> Result { + todo!() + } + + /// Create a child pipeline operator `starting` at `operator`. + /// where 'last_pipeline' is the last pipeline added before building out + /// 'current'. + pub fn create_child_pipeline(&self, pipeline_id: PipelineIx) {} + + /// Create a MetaPipeline child that `current` deponds on. + pub fn create_child_meta_pipeline( + &mut self, + pipeline_id: PipelineIx, + current: usize, + operator: PhysicalOperatorRef, + ) -> Result<&mut MetaPipeline> { + let mut child_meta_pipe = MetaPipeline::create(Some(operator)); + child_meta_pipe.create_pipeline(pipeline_id)?; + self.children.push(child_meta_pipe.clone()); + + self.pipelines[current].dependencies.push(pipeline_id); + Ok(self.children.last_mut().unwrap()) + } + + pub fn get_finish_group(&self, pipeline_ix: PipelineIx) -> Option<&PipelineIx> { + self.finish_map.get(&pipeline_ix) + } + + pub fn has_finish_event(&self, pipeline_ix: PipelineIx) -> bool { + self.finish_pipelines.contains(&pipeline_ix) + } +} diff --git a/src/execution/parallel/mod.rs b/src/execution/parallel/mod.rs new file mode 100644 index 00000000..7d166a0d --- /dev/null +++ b/src/execution/parallel/mod.rs @@ -0,0 +1,8 @@ +pub mod meta_pipeline; +pub mod pipeline; +pub mod pipeline_builder; +pub mod pipeline_complete_event; +pub mod pipeline_event; +pub mod pipeline_finish_event; +pub mod pipeline_initialize_event; +pub mod pipeline_running_event; diff --git a/src/execution/parallel/pipeline.rs b/src/execution/parallel/pipeline.rs new file mode 100644 index 00000000..8a4d7c0e --- /dev/null +++ b/src/execution/parallel/pipeline.rs @@ -0,0 +1,207 @@ +use std::sync::Arc; + +use super::{meta_pipeline::PipelineIx, pipeline_event::PipelineEvent}; +use crate::execution::{ + executor_task::{Task, TaskExecutionMode, TaskExecutionResult}, + physical::PhysicalOperatorRef, +}; +use anyhow::Result; +use parking_lot::Mutex; + +/// Pipeline represent a chain of physical operators that are executed in +/// sequence that include `source`, `operator` and `sink. +/// +/// To improve performance. The pipeline can be split into multiple +/// sub-pipelines that are executed in parallel. +/// +/// # Define a physical operator whether sink operator. +/// If any operator that need to digest the data of all child nodes before they +/// can proceed to the next step that called **Pipeline Breaker** +/// +/// # How to split into multiple sub-pipelines depends on whether physical operator is `Pipeline breaker`. +/// * If a physical operator is `Pipeline breaker` pull it out and use it as new +/// sub pipeline source, +/// * And use it as prev sub pipeline sink. +/// +/// For Example: +/// +/// SELECT * FROM t1; +/// Pipeline: TableScan (push to) Project. +/// +/// SELECT * FROM t1 GROUP BY a LIMIT 10; +/// Pipeline0: Table Scan (push to) Project (push to) GROUP BY a. +/// Pipeline1: (Depends on Pipeline0 GROUP BY a) Project (push to) TOP10. +/// +/// SELECT * FROM t1 ORDER BY a LIMIT 10; +/// Pipeline0: Table Scan (push to) Project (push to) ORDER BY a. +/// Pipeline1: (Depends on Pipeline0 ORDER BY a) Project (push to) TOP10. +/// +/// SELECT * FROM t1 UNION All SELECT * FROM t2 +/// Pipeline0: Table Scan t1(push to) Project +/// Pipeline1: Table Scan t2(push to) Project +/// Pipeline2: (Depends on Pipeline0,Pipeline1 Project) Project. +/// So that Pipeline0 and Pipeline1 not any dependencies so can concurrency +/// execute. +/// +/// # How to execution. +/// * Constructor physical operators to pipeline +/// * Pick no dependency pipeline to executed first(pipeline 0). +/// * If pipeline 0 is complete, pick another pipeline that only reply on it to +/// execute. +/// * When all operations in a pipeline support parallelization, the pipeline is +/// executed in parallel. +#[derive(Clone, Debug)] +pub struct Pipeline { + pub pipeline_id: PipelineIx, + + /// The source of this pipeline. + pub source: Option, + + /// THe chain of intermediate operators. + pub operators: Vec, + + /// The sink of this pipeline. + pub sink: Option, + + /// The parent pipelines. + pub parents: Vec, + + /// The dependencies of this pipeline. + pub dependencies: Vec, + + pub base_batch_index: u64, +} + +impl Pipeline { + pub fn new(pipeline_id: PipelineIx) -> Pipeline { + Pipeline { + source: None, + operators: Vec::new(), + sink: None, + parents: Vec::new(), + dependencies: Vec::new(), + base_batch_index: 0, + pipeline_id, + } + } + + pub fn get_pipeline_id(&self) -> PipelineIx { + self.pipeline_id + } + + pub fn get_dependencies(&self) -> &[PipelineIx] { + &self.dependencies + } + + pub fn reset(&self) {} + + pub fn reset_sink(&self) {} + + pub fn finalize(&self) { + // if (executor.HasError()) { + // return; + // } + // D_ASSERT(ready); + // try { + // auto sink_state = sink->Finalize(*this, event, executor.context, + // *sink->sink_state); sink->sink_state->state = sink_state; + // } catch (Exception &ex) { // LCOV_EXCL_START + // executor.PushError(PreservedError(ex)); + // } catch (std::exception &ex) { + // executor.PushError(PreservedError(ex)); + // } catch (...) { + // executor.PushError(PreservedError("Unknown exception in Finalize!")); + // } // L + } + + pub fn get_source(&self) -> Option { + self.source.clone() + } + + pub fn can_parallel(&self) -> bool { + true + } + + pub fn schedule(&self, event: Mutex) -> Vec> { + let mut tasks: Vec> = vec![]; + + // Check if the sink, source and all intermediate operators support parallelism. + let threads_num = if self.schedule_parallel() { 4 } else { 1 }; + + tasks.push(Arc::new(PipelineRunningTask { + pipeline: Arc::new(self.clone()), + event, + threads_num, + })); + tasks + } + + fn schedule_parallel(&self) -> bool { + if let Some(sink) = self.sink.as_ref() { + if !sink.parallel_sink() { + return false; + } + } + if let Some(source) = self.source.as_ref() { + if !source.parallel_source() { + return false; + } + } + for operator in self.operators.iter() { + if !operator.parallel_operator() { + return false; + } + } + + // auto &scheduler = TaskScheduler::GetScheduler(executor.context); + // idx_t active_threads = scheduler.NumberOfThreads(); + // if (max_threads > active_threads) { + // max_threads = active_threads; + // } + // if (max_threads <= 1) { + // // too small to parallelize + // return false; + // } + + true + } +} + +pub struct PipelineRunningTask { + pipeline: Arc, + event: Mutex, + threads_num: usize, +} + +#[async_trait::async_trait] +impl Task for PipelineRunningTask { + /// The name of task. + fn name(&self) -> String { + "Pipeline".to_string() + } + + async fn execute(&mut self, mode: TaskExecutionMode) -> Result { + // let executor = PipelineExecutor::create(self.clone())?; + + // match mode { + // TaskExecutionMode::Complete => match executor.execute()? { + // PipelineExecuteResult::Finished => Err(Error::Corrupted( + // "Execute without limit should not return + // NOT_FINISHED" + // .to_string(), + // )), + // PipelineExecuteResult::NotFinished => + // Ok(TaskExecutionResult::NotFinished), + // PipelineExecuteResult::Interrupted => + // Ok(TaskExecutionResult::Blocked), }, + // TaskExecutionMode::Partial => match + // executor.execute_parital(50_usize)? { + // PipelineExecuteResult::Finished => Ok(TaskExecutionResult::Finished), + // PipelineExecuteResult::NotFinished => + // Ok(TaskExecutionResult::NotFinished), + // PipelineExecuteResult::Interrupted => + // Ok(TaskExecutionResult::Blocked), }, + // } + todo!() + } +} diff --git a/src/execution/parallel/pipeline_builder.rs b/src/execution/parallel/pipeline_builder.rs new file mode 100644 index 00000000..76c37152 --- /dev/null +++ b/src/execution/parallel/pipeline_builder.rs @@ -0,0 +1,191 @@ +use super::meta_pipeline::{MetaPipeline, PipelineIx}; +use crate::execution::physical::{PhysicalOperator, PhysicalOperatorRef}; +use anyhow::Result; + +pub struct PipelineBuilder { + current_pipeline: usize, + last_pipeline_id: PipelineIx, +} + +impl PipelineBuilder { + pub fn new() -> PipelineBuilder { + PipelineBuilder { + current_pipeline: 0, + last_pipeline_id: 0, + } + } + + pub fn finalize(&mut self, plan: PhysicalOperatorRef) -> Result { + let mut meta_pipeline = MetaPipeline::create(None); + let pipe_ix = meta_pipeline.create_pipeline(self.get_next_pipeline_id())?; + self.current_pipeline = pipe_ix; + self.build_meta_pipeline(&mut meta_pipeline, plan)?; + Ok(meta_pipeline) + } + + fn get_next_pipeline_id(&mut self) -> PipelineIx { + let ix = self.last_pipeline_id; + self.last_pipeline_id += 1; + ix + } + + fn build_meta_pipeline( + &mut self, + meta_pipeline: &mut MetaPipeline, + plan: PhysicalOperatorRef, + ) -> Result<()> { + match plan.as_ref() { + PhysicalOperator::Join => self.build_join_pipeline(meta_pipeline, plan)?, + _ => { + self.build_pipeline(meta_pipeline, plan)?; + } + } + Ok(()) + } + + fn build_pipeline( + &mut self, + meta_pipeline: &mut MetaPipeline, + plan: PhysicalOperatorRef, + ) -> Result<()> { + let current = self.current_pipeline; + if plan.is_sink() { + // Operator is a sink, build a pipeline. + // meta_pipeline.pipelines[current].sink = None; + assert!(plan.children().len() == 1); + + meta_pipeline.pipelines[current].source = Some(plan.clone()); + + // Create a new pipeline starting from the child. + let child_meta_pipe = meta_pipeline.create_child_meta_pipeline( + self.get_next_pipeline_id(), + current, + plan.clone(), + )?; + + self.build_meta_pipeline(child_meta_pipe, plan.children()[0].clone())?; + } else { + // Operator is not a sink, recurse in children. + let children = plan.children(); + if children.is_empty() { + meta_pipeline.pipelines[current].source = Some(plan); + } else { + assert!(children.len() == 1); + + let children = plan.children(); + meta_pipeline.pipelines[current].operators.push(plan); + + self.build_pipeline(meta_pipeline, children[0].clone())?; + } + } + + Ok(()) + } + + // todo + fn build_join_pipeline( + &mut self, + meta_pipeline: &mut MetaPipeline, + plan: PhysicalOperatorRef, + ) -> Result<()> { + assert!(plan.children().len() == 2); + + // current is the probe pipeline: add this operator. + let current = self.current_pipeline; + meta_pipeline.pipelines[current] + .operators + .push(plan.clone()); + + let _pipelines_so_far = meta_pipeline.get_pipelines(false)?; + + // Build side(RHS). + let child_meta_pipeline = meta_pipeline.create_child_meta_pipeline( + self.get_next_pipeline_id(), + current, + plan.clone(), + )?; + self.build_meta_pipeline(child_meta_pipeline, plan.children()[1].clone())?; + + // Build probe side(LHS). + self.build_pipeline(meta_pipeline, plan.children()[0].clone())?; + + // match plan.as_ref() { + // PhysicalOperator::Join => { + // todo!() + // } + // _ => return Ok(()), + // } + + Ok(()) + } +} + +#[cfg(test)] +mod pipeline_builder_tests { + use std::sync::Arc; + + use super::*; + use crate::{ + execution::physical::{ + physical_filter::PhysicalFilter, physical_limit::PhysicalLimit, + physical_projection::PhysicalProjection, physical_scan::PhysicalTableScan, + physical_sort::PhysicalSort, + }, + expression::ScalarExpression, + planner::operator::sort::SortField, + types::value::DataValue, + }; + + fn build_simple_physical_op_tree() -> PhysicalOperatorRef { + let scan = PhysicalTableScan { plan_id: 1 }; + let filter = PhysicalFilter { + plan_id: 2, + input: Arc::new(PhysicalOperator::TableScan(scan)), + predicates: ScalarExpression::Constant(DataValue::Null), + }; + + let sort = PhysicalSort { + plan_id: 3, + input: Arc::new(PhysicalOperator::Filter(filter)), + order_by: vec![SortField::new( + ScalarExpression::Constant(DataValue::Null), + true, + true, + )], + limit: None, + }; + + let sort_project = PhysicalProjection { + plan_id: 4, + input: Arc::new(PhysicalOperator::Sort(sort)), + }; + + let limit = PhysicalLimit { + plan_id: 5, + input: Arc::new(PhysicalOperator::Prjection(sort_project)), + limit: 1, + offset: 1, + }; + + let project = PhysicalProjection { + plan_id: 6, + input: Arc::new(PhysicalOperator::Limit(limit)), + }; + + Arc::new(PhysicalOperator::Prjection(project)) + } + + #[test] + fn test_build_pipeline() { + let mut builder = PipelineBuilder::new(); + + let plan = build_simple_physical_op_tree(); + + let meta_pipe = builder.finalize(plan).unwrap(); + + let pipelines = meta_pipe.get_pipelines(true).unwrap(); + for pipeline in pipelines { + println!("{:#?}", pipeline); + } + } +} diff --git a/src/execution/parallel/pipeline_complete_event.rs b/src/execution/parallel/pipeline_complete_event.rs new file mode 100644 index 00000000..d9e89461 --- /dev/null +++ b/src/execution/parallel/pipeline_complete_event.rs @@ -0,0 +1,14 @@ +use super::pipeline_event::Event; + +pub struct PipelineCompleteEvent { + // pub executor: Arc, + pub complete_pipeline: bool, +} + +impl Event for PipelineCompleteEvent { + fn finalize_finish(&self) { + if self.complete_pipeline { + // self.executor.complete_pipeline(); + } + } +} diff --git a/src/execution/parallel/pipeline_event.rs b/src/execution/parallel/pipeline_event.rs new file mode 100644 index 00000000..faa595ad --- /dev/null +++ b/src/execution/parallel/pipeline_event.rs @@ -0,0 +1,189 @@ +use std::sync::Arc; + +use parking_lot::Mutex; +use petgraph::stable_graph::NodeIndex; + +use super::{ + pipeline_complete_event::PipelineCompleteEvent, pipeline_finish_event::PipelineFinishEvent, + pipeline_initialize_event::PipelineInitializeEvent, + pipeline_running_event::PipelineRunningEvent, +}; +use crate::execution::{ + executor::ExecutionQueue, + executor_task::{ExecutionTask, Task}, + parallel::pipeline::Pipeline, +}; + +pub trait Event: Sync + Send + 'static { + fn schedule( + &self, + _pipeline: Arc, + _event: Mutex, + ) -> Vec> { + vec![] + } + + /// Called right after the event is finished. + fn finish_event(&self) {} + + /// Called after the event is entirely finished. + fn finalize_finish(&self) {} +} + +pub struct PipelineEventStack { + pub pipeline_initialize_event: NodeIndex, + pub pipeline_event: NodeIndex, + pub pipeline_finish_event: NodeIndex, + pub pipeline_complete_event: NodeIndex, +} + +#[derive(Clone)] +pub struct PipelineEvent { + pub pipeline: Arc, + /// The current threads working on the event. + pub finished_tasks: usize, + + /// The maximum amount of threads that can work on the event. + pub total_tasks: usize, + + /// The amount of completed dependencies + /// The event can only be started after the dependencies have finished + /// executing. + pub finished_dependencies: usize, + + /// The total amount of dependencies. + pub total_dependencies: usize, + + /// The events that depend on this event to run + pub parents: Vec>, + + /// Whether or not the event is finished executing. + pub finished: bool, + + event: Arc, +} + +pub struct PipelineEventGuard(Mutex); + +impl PipelineEvent { + pub fn create_initialize_event( + // executor: Arc, + pipeline: Arc, + ) -> PipelineEvent { + PipelineEvent::create(Arc::new(PipelineInitializeEvent), pipeline) + } + + pub fn create_running_event(pipeline: Arc) -> PipelineEvent { + PipelineEvent::create(Arc::new(PipelineRunningEvent), pipeline) + } + + pub fn create_finish_event(pipeline: Arc) -> PipelineEvent { + PipelineEvent::create(Arc::new(PipelineFinishEvent(pipeline.clone())), pipeline) + } + + pub fn create_complete_event( + complete_pipeline: bool, + pipeline: Arc, + ) -> PipelineEvent { + PipelineEvent::create( + Arc::new(PipelineCompleteEvent { + // executor: executor.clone(), + complete_pipeline, + }), + pipeline, + ) + } + + fn create(event: Arc, pipeline: Arc) -> Self { + Self { + pipeline, + finished_tasks: 0, + total_tasks: 0, + finished_dependencies: 0, + total_dependencies: 0, + parents: vec![], + finished: false, + event, + } + } + + fn finish(&mut self) { + assert!(self.finished); + + self.event.finish_event(); + self.finished = true; + + for entry in self.parents.iter_mut() { + entry.complete_dependency(); + } + self.event.finalize_finish(); + } + + pub fn complete_dependency(&mut self) { + self.finished_dependencies += 1; + let cur_finished = self.finished_dependencies; + assert!(cur_finished <= self.total_dependencies); + if cur_finished == self.total_dependencies { + if self.total_tasks == 0 { + self.event + .schedule(self.pipeline.clone(), Mutex::new(self.clone())); + self.finish() + } + } + } + + pub fn finish_task(&mut self) { + let cur_tasks = self.total_tasks; + self.finished_tasks += 1; + let cur_finished = self.finished_tasks; + assert!(cur_finished < cur_tasks); + if cur_finished == cur_tasks { + self.finish(); + } + } + + fn has_dependencies(&self) -> bool { + self.total_dependencies != 0 + } + + fn is_finished(&self) -> bool { + self.finished + } + + fn add_dependency(&mut self, event: Box) { + self.total_dependencies += 1; + + self.parents.push(event); + } + + fn insert_event(&mut self, _replacement_event: Arc) { + // let mut parents = self.parents.replace(vec![]); + + // replacement_event.add_dependency(self); + // parents.push(Arc::downgrade(&replacement_event)); + + // self.executor.add_event(replacement_event); // Implement + // // Executor::add_event() as per your requirements + + // self.parents.replace(parents); + } + + pub async fn schedule(&mut self, global_execution_queue: &mut ExecutionQueue) { + let inner_tasks = self + .event + .schedule(self.pipeline.clone(), Mutex::new(self.clone())); + + self.total_tasks = inner_tasks.len(); + inner_tasks.iter().for_each(|inner| { + global_execution_queue.push_task(ExecutionTask::create(inner.clone())); + }); + } + + pub fn finish_event(&mut self) { + self.event.finish_event(); + } + + pub fn finalize_finish(&mut self) { + self.event.finalize_finish(); + } +} diff --git a/src/execution/parallel/pipeline_finish_event.rs b/src/execution/parallel/pipeline_finish_event.rs new file mode 100644 index 00000000..c8f60246 --- /dev/null +++ b/src/execution/parallel/pipeline_finish_event.rs @@ -0,0 +1,13 @@ +use std::sync::Arc; + + + +use super::{pipeline::Pipeline, pipeline_event::Event}; + +pub struct PipelineFinishEvent(pub Arc); + +impl Event for PipelineFinishEvent { + fn finish_event(&self) { + self.0.finalize(); + } +} diff --git a/src/execution/parallel/pipeline_initialize_event.rs b/src/execution/parallel/pipeline_initialize_event.rs new file mode 100644 index 00000000..c664c2db --- /dev/null +++ b/src/execution/parallel/pipeline_initialize_event.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; + +use parking_lot::Mutex; + +use crate::execution::executor_task::{Task, TaskExecutionMode, TaskExecutionResult}; + +use super::{ + pipeline::Pipeline, + pipeline_event::{Event, PipelineEvent}, +}; + +use anyhow::Result; + +pub struct PipelineInitializeEvent; + +impl Event for PipelineInitializeEvent { + fn schedule(&self, pipeline: Arc, event: Mutex) -> Vec> { + vec![Arc::new(PipelineInitializeTask { pipeline, event })] + } +} + +pub struct PipelineInitializeTask { + pipeline: Arc, + event: Mutex, +} + +#[async_trait::async_trait] +impl Task for PipelineInitializeTask { + /// The name of task. + fn name(&self) -> String { + "PipelineInitializeTask".to_string() + } + + async fn execute(&mut self, _mode: TaskExecutionMode) -> Result { + self.pipeline.reset_sink(); + let mut guard = self.event.lock(); + + guard.finish_task(); + + drop(guard); + + Ok(TaskExecutionResult::Finished) + } +} diff --git a/src/execution/parallel/pipeline_running_event.rs b/src/execution/parallel/pipeline_running_event.rs new file mode 100644 index 00000000..dde17761 --- /dev/null +++ b/src/execution/parallel/pipeline_running_event.rs @@ -0,0 +1,17 @@ +use std::sync::Arc; + +use parking_lot::Mutex; + +use super::{ + pipeline::Pipeline, + pipeline_event::{Event, PipelineEvent}, +}; +use crate::execution::executor_task::Task; + +pub struct PipelineRunningEvent; + +impl Event for PipelineRunningEvent { + fn schedule(&self, pipeline: Arc, event: Mutex) -> Vec> { + pipeline.schedule(event) + } +} diff --git a/src/execution/physical/mod.rs b/src/execution/physical/mod.rs new file mode 100644 index 00000000..64f12b46 --- /dev/null +++ b/src/execution/physical/mod.rs @@ -0,0 +1,63 @@ +pub mod physical_filter; +pub mod physical_limit; +pub mod physical_plan_builder; +pub mod physical_projection; +pub mod physical_result_collector; +pub mod physical_scan; +pub mod physical_sort; +pub mod physical_topn; + +use std::sync::Arc; + +use self::{ + physical_filter::PhysicalFilter, physical_limit::PhysicalLimit, + physical_projection::PhysicalProjection, physical_scan::PhysicalTableScan, + physical_sort::PhysicalSort, +}; + +pub type PhysicalOperatorRef = Arc; + +#[derive(Debug)] +pub enum PhysicalOperator { + TableScan(PhysicalTableScan), + Prjection(PhysicalProjection), + Sort(PhysicalSort), + Filter(PhysicalFilter), + Limit(PhysicalLimit), + Join, +} + +impl PhysicalOperator { + pub fn is_sink(&self) -> bool { + match self { + PhysicalOperator::TableScan(_) => false, + PhysicalOperator::Prjection(_) => false, + PhysicalOperator::Sort(_) => true, + PhysicalOperator::Filter(_) => false, + PhysicalOperator::Limit(_) => true, + PhysicalOperator::Join => true, + } + } + + pub fn parallel_operator(&self) -> bool { + todo!() + } + + pub fn parallel_source(&self) -> bool { + todo!() + } + + pub fn parallel_sink(&self) -> bool { + todo!() + } + pub fn children(&self) -> Vec { + match self { + PhysicalOperator::TableScan(_) => vec![], + PhysicalOperator::Prjection(project) => vec![project.input.clone()], + PhysicalOperator::Sort(sort) => vec![sort.input.clone()], + PhysicalOperator::Filter(filter) => vec![filter.input.clone()], + PhysicalOperator::Limit(limit) => vec![limit.input.clone()], + PhysicalOperator::Join => vec![], + } + } +} diff --git a/src/executor/physical/physical_filter.rs b/src/execution/physical/physical_filter.rs similarity index 61% rename from src/executor/physical/physical_filter.rs rename to src/execution/physical/physical_filter.rs index c6b12de4..51e2e37f 100644 --- a/src/executor/physical/physical_filter.rs +++ b/src/execution/physical/physical_filter.rs @@ -1,9 +1,10 @@ use crate::expression::ScalarExpression; -use super::PhysicalPlanBoxed; +use super::PhysicalOperatorRef; +#[derive(Debug)] pub struct PhysicalFilter { pub plan_id: u32, - pub input: PhysicalPlanBoxed, + pub input: PhysicalOperatorRef, pub predicates: ScalarExpression, } diff --git a/src/execution/physical/physical_hash_join.rs b/src/execution/physical/physical_hash_join.rs new file mode 100644 index 00000000..7edcd9ed --- /dev/null +++ b/src/execution/physical/physical_hash_join.rs @@ -0,0 +1 @@ +pub struct PhysicalHashJoin {} diff --git a/src/execution/physical/physical_join.rs b/src/execution/physical/physical_join.rs new file mode 100644 index 00000000..3aaa4285 --- /dev/null +++ b/src/execution/physical/physical_join.rs @@ -0,0 +1 @@ +pub struct PhysicalNLJoin {} diff --git a/src/executor/physical/physical_limit.rs b/src/execution/physical/physical_limit.rs similarity index 53% rename from src/executor/physical/physical_limit.rs rename to src/execution/physical/physical_limit.rs index ab5f0fcf..ff6b76d7 100644 --- a/src/executor/physical/physical_limit.rs +++ b/src/execution/physical/physical_limit.rs @@ -1,9 +1,10 @@ -use super::PhysicalPlanBoxed; +use super::PhysicalOperatorRef; +#[derive(Debug)] pub struct PhysicalLimit { pub plan_id: u32, - pub input: PhysicalPlanBoxed, + pub input: PhysicalOperatorRef, pub limit: usize, pub offset: usize, } diff --git a/src/execution/physical/physical_merge_join.rs b/src/execution/physical/physical_merge_join.rs new file mode 100644 index 00000000..8eb34313 --- /dev/null +++ b/src/execution/physical/physical_merge_join.rs @@ -0,0 +1 @@ +pub struct PhysicalMergeJoin {} diff --git a/src/execution/physical/physical_plan_builder.rs b/src/execution/physical/physical_plan_builder.rs new file mode 100644 index 00000000..255e7c3a --- /dev/null +++ b/src/execution/physical/physical_plan_builder.rs @@ -0,0 +1,92 @@ +use std::sync::Arc; + +use crate::planner::operator::scan::ScanOperator; +use crate::planner::{logical_select_plan::LogicalSelectPlan, operator::Operator, LogicalPlan}; + +use super::{ + physical_filter::PhysicalFilter, physical_limit::PhysicalLimit, + physical_projection::PhysicalProjection, physical_sort::PhysicalSort, PhysicalOperator, +}; + +use anyhow::anyhow; +use anyhow::Result; + +pub struct PhysicalPlanBuilder { + plan_id: u32, +} + +impl PhysicalPlanBuilder { + pub fn new() -> Self { + PhysicalPlanBuilder { plan_id: 0 } + } + + fn next_plan_id(&mut self) -> u32 { + let id = self.plan_id; + self.plan_id += 1; + id + } + + pub fn build_plan(&mut self, plan: &LogicalPlan) -> Result { + match plan { + LogicalPlan::Select(select) => self.build_select_logical_plan(select), + LogicalPlan::CreateTable(_) => todo!(), + } + } + + fn build_select_logical_plan(&mut self, plan: &LogicalSelectPlan) -> Result { + match plan.operator.as_ref() { + Operator::Project(_) => { + let input = self.build_select_logical_plan(plan.child(0)?)?; + Ok(PhysicalOperator::Prjection(PhysicalProjection { + plan_id: self.next_plan_id(), + input: Arc::new(input), + })) + } + Operator::Scan(scan) => self.build_physical_scan(scan), + Operator::Sort(sort) => { + let input = self.build_select_logical_plan(plan.child(0)?)?; + Ok(PhysicalOperator::Sort(PhysicalSort { + plan_id: self.next_plan_id(), + input: Arc::new(input), + order_by: sort.sort_fields.clone(), + limit: sort.limit, + })) + } + Operator::Limit(limit) => { + let input = self.build_select_logical_plan(plan.child(0)?)?; + + Ok(PhysicalOperator::Limit(PhysicalLimit { + plan_id: self.next_plan_id(), + input: Arc::new(input), + limit: limit.count, + offset: limit.offset, + })) + } + Operator::Filter(filter) => { + let input = self.build_select_logical_plan(plan.child(0)?)?; + Ok(PhysicalOperator::Filter(PhysicalFilter { + plan_id: self.next_plan_id(), + input: Arc::new(input), + predicates: filter.predicate.clone(), + })) + } + _ => Err(anyhow!(format!( + "Unsupported physical plan: {:?}", + plan.operator + ))), + } + } + + fn build_physical_scan(&mut self, _scan: &ScanOperator) -> Result { + // let ScanOperator { + // table_ref_id, + // columns, + // sort_fields, + // predicates, + // limit, + // } = scan; + + // Get table impl use `table_ref_id`. + todo!() + } +} diff --git a/src/execution/physical/physical_projection.rs b/src/execution/physical/physical_projection.rs new file mode 100644 index 00000000..95b1e24d --- /dev/null +++ b/src/execution/physical/physical_projection.rs @@ -0,0 +1,7 @@ +use super::PhysicalOperatorRef; +#[derive(Debug)] +pub struct PhysicalProjection { + pub plan_id: u32, + + pub input: PhysicalOperatorRef, +} diff --git a/src/execution/physical/physical_result_collector.rs b/src/execution/physical/physical_result_collector.rs new file mode 100644 index 00000000..aa99bb3e --- /dev/null +++ b/src/execution/physical/physical_result_collector.rs @@ -0,0 +1,4 @@ +pub struct PipelineResultCollector { + plan_id: u32, + +} diff --git a/src/execution/physical/physical_scan.rs b/src/execution/physical/physical_scan.rs new file mode 100644 index 00000000..3176578e --- /dev/null +++ b/src/execution/physical/physical_scan.rs @@ -0,0 +1,8 @@ +#[derive(Debug)] +pub struct PhysicalTableScan { + pub plan_id: u32, + // pub context: Arc, + // pub operator: ScanOperator, +} + +impl PhysicalTableScan {} diff --git a/src/executor/physical/physical_sort.rs b/src/execution/physical/physical_sort.rs similarity index 57% rename from src/executor/physical/physical_sort.rs rename to src/execution/physical/physical_sort.rs index 8189e500..2838fa5d 100644 --- a/src/executor/physical/physical_sort.rs +++ b/src/execution/physical/physical_sort.rs @@ -1,11 +1,11 @@ use crate::planner::operator::sort::SortField; -use super::PhysicalPlanBoxed; -// use crate::planner::operator::logical_sort::SortField; +use super::PhysicalOperatorRef; +#[derive(Debug)] pub struct PhysicalSort { pub plan_id: u32, - pub input: PhysicalPlanBoxed, + pub input: PhysicalOperatorRef, pub order_by: Vec, pub limit: Option, } diff --git a/src/execution/physical/physical_topn.rs b/src/execution/physical/physical_topn.rs new file mode 100644 index 00000000..1b2b1ae1 --- /dev/null +++ b/src/execution/physical/physical_topn.rs @@ -0,0 +1,3 @@ +pub struct PhysicalTopN { + +} diff --git a/src/execution/runtime/catch_unwind.rs b/src/execution/runtime/catch_unwind.rs new file mode 100644 index 00000000..aae0cbd9 --- /dev/null +++ b/src/execution/runtime/catch_unwind.rs @@ -0,0 +1,44 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{future::BoxFuture, FutureExt}; + +use anyhow::anyhow; +use anyhow::Result; + +pub fn catch_unwind R>(f: F) -> Result { + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) { + Ok(res) => Ok(res), + Err(cause) => match cause.downcast_ref::<&'static str>() { + Some(err) => Err(anyhow!(err.to_string())), + None => Err(anyhow!("unknown panic message.".to_string())), + }, + } +} + +pub struct CatchUnwindFuture { + inner: BoxFuture<'static, F::Output>, +} + +impl CatchUnwindFuture { + pub fn create(f: F) -> CatchUnwindFuture { + CatchUnwindFuture { inner: f.boxed() } + } +} + +impl Future for CatchUnwindFuture { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = &mut self.inner; + + match catch_unwind(move || inner.poll_unpin(cx)) { + Ok(Poll::Pending) => Poll::Pending, + Ok(Poll::Ready(value)) => Poll::Ready(Ok(value)), + Err(cause) => Poll::Ready(Err(cause)), + } + } +} diff --git a/src/execution/runtime/mod.rs b/src/execution/runtime/mod.rs new file mode 100644 index 00000000..cf2a72d8 --- /dev/null +++ b/src/execution/runtime/mod.rs @@ -0,0 +1,252 @@ +pub mod catch_unwind; +pub mod thread; +pub mod thread_pool; + +use anyhow::anyhow; +use anyhow::Result; +use std::{ + future::Future, + sync::Arc, + time::{Duration, Instant}, +}; + +use tokio::{ + runtime::Handle, + sync::{oneshot, OwnedSemaphorePermit, Semaphore}, + task::JoinHandle, +}; + +use self::catch_unwind::CatchUnwindFuture; + +pub trait TrySpawn { + #[track_caller] + fn try_spawn(&self, task: T) -> Result> + where + T: Future + Send + 'static, + T::Output: Send + 'static; + + #[track_caller] + fn spawn(&self, task: T) -> JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + self.try_spawn(task).unwrap() + } +} + +impl TrySpawn for Arc { + #[track_caller] + fn try_spawn(&self, task: T) -> Result> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + self.as_ref().try_spawn(task) + } + + #[track_caller] + fn spawn(&self, task: T) -> JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + self.as_ref().spawn(task) + } +} + +/// Simple tokio runtime wrapper. +pub struct ExecutionRuntime { + handle: Handle, + + /// Use to receive a drop signal when dropper is dropped. + _dropper: Dropper, +} + +impl ExecutionRuntime { + fn builder() -> tokio::runtime::Builder { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.enable_all().thread_stack_size(20 * 1024 * 1024); + + builder + } + + pub fn with_default_worker_threads() -> Result { + let rt = Self::builder() + .build() + .map_err(|e| anyhow!(e.to_string()))?; + let (send_stop, recv_stop) = oneshot::channel(); + + let handle = rt.handle().clone(); + let join_handler = std::thread::spawn(move || { + let _ = rt.block_on(recv_stop); + let instant = Instant::now(); + + rt.shutdown_timeout(Duration::from_secs(3)); + + instant.elapsed() >= Duration::from_secs(3) + }); + + Ok(ExecutionRuntime { + handle, + _dropper: Dropper { + close: Some(send_stop), + name: Some("UnnamedRuntime".to_owned()), + join_handler: Some(join_handler), + }, + }) + } + + pub fn with_work_threads(num_thread: usize, thread_name: Option) -> Result { + let mut rt_builder = Self::builder(); + rt_builder.worker_threads(num_thread); + + if let Some(thread_name) = thread_name.as_ref() { + rt_builder.thread_name(thread_name); + } + + let rt = rt_builder.build().map_err(|e| anyhow!(e.to_string()))?; + let (send_stop, recv_stop) = oneshot::channel(); + + let handle = rt.handle().clone(); + let join_handler = std::thread::spawn(move || { + let _ = rt.block_on(recv_stop); + let instant = Instant::now(); + + rt.shutdown_timeout(Duration::from_secs(3)); + + instant.elapsed() >= Duration::from_secs(3) + }); + + Ok(ExecutionRuntime { + handle, + _dropper: Dropper { + close: Some(send_stop), + name: Some(format!( + "{}Runtime", + thread_name.unwrap_or("Unnamed".to_owned()) + )), + join_handler: Some(join_handler), + }, + }) + } + + pub fn inner(&self) -> tokio::runtime::Handle { + self.handle.clone() + } + + pub fn block_on(&self, future: F) -> F::Output + where + F: Future> + Send + 'static, + { + let future = CatchUnwindFuture::create(future); + self.handle.block_on(future).flatten() + } + + pub async fn try_spawn_batch( + &self, + semaphore: Semaphore, + futures: impl IntoIterator, + ) -> Result>> + where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + { + let semaphore = Arc::new(semaphore); + let iter = futures.into_iter().map(|v| { + |permit| async { + let r = v.await; + drop(permit); + r + } + }); + self.try_spawn_batch_with_owned_semaphore(semaphore, iter) + .await + } + + pub async fn try_spawn_batch_with_owned_semaphore( + &self, + semaphore: Arc, + futures: impl IntoIterator, + ) -> Result>> + where + F: FnOnce(OwnedSemaphorePermit) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + { + let iter = futures.into_iter(); + let mut handlers = + Vec::with_capacity(iter.size_hint().1.unwrap_or_else(|| iter.size_hint().0)); + + for fut in iter { + let semaphore = semaphore.clone(); + + let permit = semaphore + .acquire_owned() + .await + .map_err(|e| anyhow!(format!("semaphore closed, acquire permit failure. {}", e)))?; + + let handler = self + .handle + .spawn(async_backtrace::location!().frame(async move { fut(permit).await })); + handlers.push(handler) + } + Ok(handlers) + } +} + +impl TrySpawn for ExecutionRuntime { + fn try_spawn(&self, task: T) -> Result> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + Ok(self.handle.spawn(async_backtrace::location!().frame(task))) + } +} + +struct Dropper { + name: Option, + close: Option>, + join_handler: Option>, +} + +impl Drop for Dropper { + fn drop(&mut self) { + if let Some(name) = self.name.take() { + if let Some(close) = self.close.take() { + let _ = close.send(()); + } + + if let Some(join_handler) = self.join_handler.take() { + if join_handler.join().unwrap_or(false) { + tracing::warn!("{} shutdown timeout", name); + } else { + tracing::info!("{} shutdown success", name); + } + } + } + } +} + +pub async fn execute_futures_in_parallel( + futures: impl IntoIterator, + thread_nums: usize, + permit_nums: usize, + thread_name: String, +) -> Result> +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + let semaphore = Semaphore::new(permit_nums); + let runtime = Arc::new(ExecutionRuntime::with_work_threads( + thread_nums, + Some(thread_name), + )?); + + let join_handlers = runtime.try_spawn_batch(semaphore, futures).await?; + futures::future::try_join_all(join_handlers) + .await + .map_err(|err| anyhow!(format!("try join all futures failure, {}", err))) +} diff --git a/src/execution/runtime/thread.rs b/src/execution/runtime/thread.rs new file mode 100644 index 00000000..d0bbb186 --- /dev/null +++ b/src/execution/runtime/thread.rs @@ -0,0 +1,53 @@ +use anyhow::anyhow; +use anyhow::Result; +use std::thread::{Builder, JoinHandle}; +pub struct Thread; + +pub struct ThreadJoinHandle { + inner: JoinHandle, +} + +impl ThreadJoinHandle { + pub fn create(inner: JoinHandle) -> Self { + ThreadJoinHandle { inner } + } + + pub fn join(self) -> Result { + match self.inner.join() { + Ok(res) => Ok(res), + Err(cause) => match cause.downcast_ref::<&'static str>() { + Some(msg) => Err(anyhow!(msg.to_string())), + None => match cause.downcast_ref::() { + Some(msg) => Err(anyhow!(msg.to_string())), + None => Err(anyhow!("unknown panic message".to_string())), + }, + }, + } + } +} + +impl Thread { + pub fn named_spawn(mut name: Option, f: F) -> ThreadJoinHandle + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, + { + let mut thread_builder = Builder::new(); + + if let Some(named) = name.take() { + thread_builder = thread_builder.name(named); + } + + ThreadJoinHandle::create(thread_builder.spawn(f).unwrap()) + } + + pub fn spawn(f: F) -> ThreadJoinHandle + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, + { + Self::named_spawn(None, f) + } +} diff --git a/src/execution/runtime/thread_pool.rs b/src/execution/runtime/thread_pool.rs new file mode 100644 index 00000000..fc1f6f92 --- /dev/null +++ b/src/execution/runtime/thread_pool.rs @@ -0,0 +1,51 @@ +use async_channel::{Receiver, Sender}; + +use super::thread::Thread; +use anyhow::Result; + +pub struct ThreadPool { + tx: Sender>, +} + +pub struct TaskJoinHandler { + rx: Receiver, +} + +impl TaskJoinHandler { + pub fn create(rx: Receiver) -> TaskJoinHandler { + TaskJoinHandler:: { rx } + } + + pub fn join(&self) -> T { + self.rx.recv_blocking().unwrap() + } +} + +impl ThreadPool { + pub fn create(threads: usize) -> Result { + let (tx, rx) = async_channel::unbounded::>(); + + for _ in 0..threads { + let thread_rx = rx.clone(); + Thread::spawn(move || { + while let Ok(task) = thread_rx.recv_blocking() { + task(); + } + }); + } + + Ok(ThreadPool { tx }) + } + + pub fn execute(&self, f: F) -> TaskJoinHandler + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = async_channel::bounded(1); + let _ = self.tx.send_blocking(Box::new(move || { + let _ = tx.send_blocking(f()); + })); + TaskJoinHandler::create(rx) + } +} diff --git a/src/executor/execute_builder.rs b/src/executor/execute_builder.rs deleted file mode 100644 index 8b137891..00000000 --- a/src/executor/execute_builder.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/executor/execute_runtime.rs b/src/executor/execute_runtime.rs deleted file mode 100644 index 48501ae0..00000000 --- a/src/executor/execute_runtime.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::{ - thread, - time::{Duration, Instant}, -}; - -use anyhow::Result; -use tokio::{runtime::Handle, sync::oneshot}; - -/// Simple tokio runtime wrapper. -pub struct ExecuteRuntime { - handle: Handle, - _dropper: Dropper, -} - -impl ExecuteRuntime { - fn builder() -> tokio::runtime::Builder { - let mut builder = tokio::runtime::Builder::new_multi_thread(); - builder.enable_all().thread_stack_size(20 * 1024 * 1024); - - builder - } - - pub fn with_default_worker_threads() -> Result { - let rt = Self::builder() - .build() - .map_err(|e| anyhow::Error::msg(e.to_string()))?; - let (send_stop, recv_stop) = oneshot::channel(); - - let handle = rt.handle().clone(); - let join_handler = thread::spawn(move || { - let _ = rt.block_on(recv_stop); - let instant = Instant::now(); - - rt.shutdown_timeout(Duration::from_secs(3)); - - instant.elapsed() >= Duration::from_secs(3) - }); - - Ok(ExecuteRuntime { - handle, - _dropper: Dropper { - close: Some(send_stop), - name: Some("UnnamedRuntime".to_owned()), - join_handler: Some(join_handler), - }, - }) - } - - pub fn with_work_threads(num_thread: usize, thread_name: Option) -> Result { - let mut rt_builder = Self::builder(); - rt_builder.worker_threads(num_thread); - - if let Some(thread_name) = thread_name.as_ref() { - rt_builder.thread_name(thread_name); - } - - let rt = rt_builder - .build() - .map_err(|e| anyhow::Error::msg(e.to_string()))?; - let (send_stop, recv_stop) = oneshot::channel(); - - let handle = rt.handle().clone(); - let join_handler = thread::spawn(move || { - let _ = rt.block_on(recv_stop); - let instant = Instant::now(); - - rt.shutdown_timeout(Duration::from_secs(3)); - - instant.elapsed() >= Duration::from_secs(3) - }); - - Ok(ExecuteRuntime { - handle, - _dropper: Dropper { - close: Some(send_stop), - name: Some(format!( - "{}Runtime", - thread_name.unwrap_or("Unnamed".to_owned()) - )), - join_handler: Some(join_handler), - }, - }) - } -} - -struct Dropper { - name: Option, - close: Option>, - join_handler: Option>, -} - -impl Drop for Dropper { - fn drop(&mut self) { - if let Some(name) = self.name.take() { - if let Some(close) = self.close.take() { - let _ = close.send(()); - } - - if let Some(join_handler) = self.join_handler.take() { - if join_handler.join().unwrap_or(false) { - log::warn!("{} shutdown timeout", name); - } else { - log::info!("{} shutdown success", name); - } - } - } - } -} diff --git a/src/executor/execute_task.rs b/src/executor/execute_task.rs deleted file mode 100644 index 8b137891..00000000 --- a/src/executor/execute_task.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/executor/mod.rs b/src/executor/mod.rs deleted file mode 100644 index 0e78099e..00000000 --- a/src/executor/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod execute_builder; -pub mod execute_runtime; -pub mod execute_task; -pub mod physical; -pub mod pipeline; diff --git a/src/executor/physical/mod.rs b/src/executor/physical/mod.rs deleted file mode 100644 index d3f7d3dc..00000000 --- a/src/executor/physical/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -pub mod physical_filter; -pub mod physical_limit; -pub mod physical_plan_builder; -pub mod physical_project; -pub mod physical_scan; -pub mod physical_sort; - -use serde::{Deserialize, Serialize}; - -use self::{ - physical_filter::PhysicalFilter, physical_limit::PhysicalLimit, - physical_project::PhysicalProject, physical_scan::PhysicalTableScan, - physical_sort::PhysicalSort, -}; - -pub type PhysicalPlanBoxed = Box; - -pub enum PhysicalPlan { - TableScan(PhysicalTableScan), - Prjection(PhysicalProject), - Sort(PhysicalSort), - Filter(PhysicalFilter), - Limit(PhysicalLimit), -} diff --git a/src/executor/physical/physical_plan_builder.rs b/src/executor/physical/physical_plan_builder.rs deleted file mode 100644 index 480d35ca..00000000 --- a/src/executor/physical/physical_plan_builder.rs +++ /dev/null @@ -1,82 +0,0 @@ -use crate::{ - executor::physical::physical_project::PhysicalProject, - planner::{logical_select_plan::LogicalSelectPlan, operator::Operator, LogicalPlan}, -}; - -use super::{ - physical_filter::PhysicalFilter, physical_limit::PhysicalLimit, - physical_scan::PhysicalTableScan, physical_sort::PhysicalSort, PhysicalPlan, -}; -use anyhow::Result; - -pub struct PhysicalPlanBuilder { - plan_id: u32, -} - -impl PhysicalPlanBuilder { - pub fn new() -> Self { - PhysicalPlanBuilder { plan_id: 0 } - } - - fn next_plan_id(&mut self) -> u32 { - let id = self.plan_id; - self.plan_id += 1; - id - } - - // todo: all physical plan need add statistic. - // todo: - pub fn build_plan(&mut self, plan: &LogicalPlan) -> Result { - match plan { - LogicalPlan::Select(select) => self.build_select(select), - LogicalPlan::CreateTable(_) => todo!(), - } - } - - fn build_select(&mut self, select_plan: &LogicalSelectPlan) -> Result { - match select_plan.operator.as_ref() { - Operator::Project(_) => { - let input = self.build_select(select_plan.child(0)?)?; - Ok(PhysicalPlan::Prjection(PhysicalProject { - plan_id: self.next_plan_id(), - input: Box::new(input), - })) - } - Operator::Scan(scan) => Ok(PhysicalPlan::TableScan(PhysicalTableScan { - plan_id: self.next_plan_id(), - operator: scan.clone(), - })), - Operator::Sort(sort) => { - let input = self.build_select(select_plan.child(0)?)?; - Ok(PhysicalPlan::Sort(PhysicalSort { - plan_id: self.next_plan_id(), - input: Box::new(input), - order_by: sort.sort_fields.clone(), - limit: sort.limit, - })) - } - Operator::Limit(limit) => { - let input = self.build_select(select_plan.child(0)?)?; - - Ok(PhysicalPlan::Limit(PhysicalLimit { - plan_id: self.next_plan_id(), - input: Box::new(input), - limit: limit.count, - offset: limit.offset, - })) - } - Operator::Filter(filter) => { - let input = self.build_select(select_plan.child(0)?)?; - Ok(PhysicalPlan::Filter(PhysicalFilter { - plan_id: self.next_plan_id(), - input: Box::new(input), - predicates: filter.predicate.clone(), - })) - } - _ => Err(anyhow::Error::msg(format!( - "Unsupported physical plan: {:?}", - select_plan.operator - ))), - } - } -} diff --git a/src/executor/physical/physical_project.rs b/src/executor/physical/physical_project.rs deleted file mode 100644 index 85d6624a..00000000 --- a/src/executor/physical/physical_project.rs +++ /dev/null @@ -1,8 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use super::PhysicalPlanBoxed; -pub struct PhysicalProject { - pub plan_id: u32, - - pub input: PhysicalPlanBoxed, -} diff --git a/src/executor/physical/physical_scan.rs b/src/executor/physical/physical_scan.rs deleted file mode 100644 index 41d6bb37..00000000 --- a/src/executor/physical/physical_scan.rs +++ /dev/null @@ -1,9 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use crate::planner::operator::scan::ScanOperator; - -pub struct PhysicalTableScan { - pub plan_id: u32, - pub operator: ScanOperator, -} diff --git a/src/executor/pipeline/mod.rs b/src/executor/pipeline/mod.rs deleted file mode 100644 index 8ede1704..00000000 --- a/src/executor/pipeline/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod pipeline_builer; diff --git a/src/executor/pipeline/pipeline_builer.rs b/src/executor/pipeline/pipeline_builer.rs deleted file mode 100644 index 8b137891..00000000 --- a/src/executor/pipeline/pipeline_builer.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/lib.rs b/src/lib.rs index 7f821fe4..a4900f0b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -87,10 +87,11 @@ // clippy::panic, // allow debug_assert, panic in production code // clippy::multiple_crate_versions, // caused by the dependency, can't be fixed // )] - +#![feature(result_flattening)] +#![allow(cast_ref_to_mut)] pub mod binder; pub mod catalog; -pub mod executor; +pub mod execution; pub mod expression; pub mod parser; pub mod planner; diff --git a/src/main.rs b/src/main.rs index 84a7edd2..a72d2702 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,29 +4,31 @@ use kip_sql::planner::{display::display_plan, logical_plan_builder::PlanBuilder} #[tokio::main] async fn main() -> Result<(), Box> { - println!(":) Welcome to the KIPSQL, Please input sql."); - - let mut input = String::new(); - io::stdin().read_line(&mut input)?; - - let output = handle_query(input); - - println!("KIPSQL Output: {output}"); + let builder = PlanBuilder::new(); - Ok(()) + loop { + println!(":) Welcome to the KIPSQL, Please input sql."); + let mut input = String::new(); + io::stdin().read_line(&mut input)?; + let output = handle_query(input, &builder); + println!("KIPSQL Output: {output}"); + } } -fn handle_query(sql: String) -> String { - let builder = PlanBuilder::new(); - let plan = builder.build_sql(&sql).unwrap(); +fn handle_query(sql: String, builder: &PlanBuilder) -> String { + let plan = builder.build_sql(&sql); + if plan.is_err() { + return plan.err().unwrap().to_string() + " ERROR"; + } + return "OK".to_string(); - let logical_graph = display_plan(&plan); - tracing::info!("logical plan {}", logical_graph); + //let logical_graph = display_plan(&plan); + //tracing::info!("logical plan {}", logical_graph); // todo optimize. // let mut optimize = Optimizer::new(); // let _optmized_plan = optimize.optimize(&plan).unwrap(); // todo executor. - logical_graph + //let logical_graph = display_plan(&plan); } diff --git a/src/planner/logical_create_table_plan.rs b/src/planner/logical_create_table_plan.rs index c13185dc..6199fa7c 100644 --- a/src/planner/logical_create_table_plan.rs +++ b/src/planner/logical_create_table_plan.rs @@ -1,8 +1,8 @@ +use crate::catalog::ColumnDesc; +#[derive(Debug, PartialEq, Clone)] pub struct LogicalCreateTablePlan { - // pub database_id: DatabaseId, - // pub schema_id: SchemaId, - // pub table_name: String, - // pub columns: Vec<(String, ColumnDesc)>, + pub table_name: String, + pub columns: Vec<(String, ColumnDesc)>, } // use sqlparser::ast::{ColumnDef, ColumnOption, Statement}; diff --git a/src/planner/logical_plan_builder.rs b/src/planner/logical_plan_builder.rs index 5e3fcd7f..d38ec889 100644 --- a/src/planner/logical_plan_builder.rs +++ b/src/planner/logical_plan_builder.rs @@ -1,15 +1,25 @@ -use std::sync::Arc; - +use crate::binder::{Binder, BinderContext}; +use crate::catalog::Root; use crate::parser; use anyhow::Result; +use serde::de::Unexpected::Option; +use sqlparser::ast::Statement; +use std::sync::Arc; + +use crate::planner::logical_select_plan::LogicalSelectPlan; use super::LogicalPlan; -pub struct PlanBuilder {} +#[derive(Clone)] +pub struct PlanBuilder { + context: BinderContext, +} impl PlanBuilder { pub fn new() -> Self { - PlanBuilder {} + PlanBuilder { + context: BinderContext::new(Arc::new(Root::new())), + } } /// Build a logical plan. @@ -22,16 +32,15 @@ impl PlanBuilder { pub fn build_sql<'a>(&self, sql: &'a str) -> Result { let stmts = parser::parse_sql(sql)?; + println!("stmt:{:#}", stmts[0]); + // TODO: add plan-cache for fast return. - // let mut binder = Binder::new(self.context.clone()); + let mut binder = Binder::new(self.context.clone()); - // let mut plan = match &stmts[0] { - // Statement::Query(query) => binder.bind_query(query)?, - // _ => unreachable!(), - // }; + let logical_plan = binder.bind(&stmts[0])?; - // tracing::info!("optimize before {:?}", plan); + println!("logical_plan:{:?}", logical_plan); // let mut optimizer = Optimizer::new(self.context.clone()); @@ -39,8 +48,8 @@ impl PlanBuilder { // tracing::info!("optimize after {:?}", plan); - // Ok(plan) + Ok(logical_plan) - todo!() + //todo!() } } diff --git a/src/planner/logical_select_plan.rs b/src/planner/logical_select_plan.rs index 182e7c8a..79484686 100644 --- a/src/planner/logical_select_plan.rs +++ b/src/planner/logical_select_plan.rs @@ -4,10 +4,9 @@ use super::operator::OperatorRef; use anyhow::Result; /// LogicalSelectPlan is a tree of operators that represent a logical query plan. - +#[derive(Debug, PartialEq, Clone)] pub struct LogicalSelectPlan { pub operator: OperatorRef, - pub children: Vec>, } diff --git a/src/planner/mod.rs b/src/planner/mod.rs index 95dd0e8e..17c9420d 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -8,6 +8,7 @@ use self::{ logical_create_table_plan::LogicalCreateTablePlan, logical_select_plan::LogicalSelectPlan, }; +#[derive(Debug, PartialEq, Clone)] pub enum LogicalPlan { Select(LogicalSelectPlan), CreateTable(LogicalCreateTablePlan), diff --git a/src/planner/operator/aggregate.rs b/src/planner/operator/aggregate.rs index 44e6e2fc..d5417029 100644 --- a/src/planner/operator/aggregate.rs +++ b/src/planner/operator/aggregate.rs @@ -5,7 +5,7 @@ use crate::{ planner::{logical_select_plan::LogicalSelectPlan, operator::Operator}, }; -#[derive(Debug)] +#[derive(Debug, PartialEq, Clone)] pub struct AggregateOperator { pub groupby_exprs: Vec, diff --git a/src/planner/operator/filter.rs b/src/planner/operator/filter.rs index b5fae94b..8daf1005 100644 --- a/src/planner/operator/filter.rs +++ b/src/planner/operator/filter.rs @@ -4,7 +4,7 @@ use crate::{expression::ScalarExpression, planner::logical_select_plan::LogicalS use super::Operator; -#[derive(Debug)] +#[derive(Debug, PartialEq, Clone)] pub struct FilterOperator { pub predicate: ScalarExpression, having: bool, diff --git a/src/planner/operator/join.rs b/src/planner/operator/join.rs index e0439d66..797f9b58 100644 --- a/src/planner/operator/join.rs +++ b/src/planner/operator/join.rs @@ -1,13 +1,10 @@ use std::sync::Arc; -use crate::{ - expression::ScalarExpression, - planner::{logical_select_plan::LogicalSelectPlan, LogicalPlan}, -}; +use crate::{expression::ScalarExpression, planner::logical_select_plan::LogicalSelectPlan}; use super::Operator; -#[derive(Debug)] +#[derive(Debug, PartialEq, Clone)] pub enum JoinType { Inner, LeftOuter, @@ -20,7 +17,7 @@ pub enum JoinType { RightAnti, } -#[derive(Debug)] +#[derive(Debug, PartialEq, Clone)] pub struct JoinOperator { pub on: Option, pub join_type: JoinType, diff --git a/src/planner/operator/limit.rs b/src/planner/operator/limit.rs index 8ea39482..0cd087e3 100644 --- a/src/planner/operator/limit.rs +++ b/src/planner/operator/limit.rs @@ -4,7 +4,7 @@ use crate::planner::logical_select_plan::LogicalSelectPlan; use super::Operator; -#[derive(Debug)] +#[derive(Debug, PartialEq, Clone)] pub struct LimitOperator { pub offset: usize, pub count: usize, diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index dd8a67b2..fe99bcf3 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -15,7 +15,7 @@ use self::{ pub type OperatorRef = Arc; -#[derive(Debug)] +#[derive(Debug, PartialEq, Clone)] pub enum Operator { Dummy, Aggregate(AggregateOperator), diff --git a/src/planner/operator/project.rs b/src/planner/operator/project.rs index 05c1bcdd..4ea4b341 100644 --- a/src/planner/operator/project.rs +++ b/src/planner/operator/project.rs @@ -6,7 +6,7 @@ use crate::{expression::ScalarExpression, planner::logical_select_plan::LogicalS use super::Operator; -#[derive(Debug)] +#[derive(Debug, PartialEq, Clone)] pub struct ProjectOperator { pub columns: Vec, } diff --git a/src/planner/operator/scan.rs b/src/planner/operator/scan.rs index a704599c..c5f3743f 100644 --- a/src/planner/operator/scan.rs +++ b/src/planner/operator/scan.rs @@ -1,16 +1,16 @@ use std::sync::Arc; +use crate::types::TableId; use crate::{ - catalog::{ColumnRefId, TableRefId}, - expression::ScalarExpression, + catalog::ColumnRefId, expression::ScalarExpression, planner::logical_select_plan::LogicalSelectPlan, }; use super::{sort::SortField, Operator}; -#[derive(Debug, Clone)] +#[derive(Debug, PartialEq, Clone)] pub struct ScanOperator { - pub table_ref_id: TableRefId, + pub table_ref_id: TableId, pub columns: Vec, pub sort_fields: Vec, // Support push down predicate. @@ -20,7 +20,7 @@ pub struct ScanOperator { pub limit: Option, } impl ScanOperator { - pub fn new(table_ref_id: TableRefId) -> LogicalSelectPlan { + pub fn new(table_ref_id: TableId) -> LogicalSelectPlan { LogicalSelectPlan { operator: Arc::new(Operator::Scan(ScanOperator { table_ref_id, diff --git a/src/planner/operator/sort.rs b/src/planner/operator/sort.rs index 067db418..e78489d4 100644 --- a/src/planner/operator/sort.rs +++ b/src/planner/operator/sort.rs @@ -1,6 +1,6 @@ use crate::expression::ScalarExpression; -#[derive(Debug, Clone)] +#[derive(Debug, PartialEq, Clone)] pub struct SortField { expr: ScalarExpression, desc: bool, @@ -17,7 +17,7 @@ impl SortField { } } -#[derive(Debug)] +#[derive(Debug, PartialEq, Clone)] pub struct SortOperator { pub sort_fields: Vec, /// Support push down limit to sort plan. diff --git a/src/types/mod.rs b/src/types/mod.rs index c92f8980..73543e3d 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,6 +1,12 @@ pub mod value; +use integer_encoding::FixedInt; pub use sqlparser::ast::DataType as DataTypeKind; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering::{Acquire, Release}; + +static ID_BUF: AtomicU32 = AtomicU32::new(0); + /// Inner data type #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct DataType { @@ -39,7 +45,53 @@ impl DataTypeExt for DataTypeKind { } } -pub type DatabaseIdT = u32; -pub type SchemaIdT = u32; -pub type TableIdT = u32; -pub type ColumnIdT = u32; +pub(crate) struct IdGenerator {} + +impl IdGenerator { + pub(crate) fn encode_to_raw() -> Vec { + ID_BUF.load(Acquire).encode_fixed_vec() + } + + pub(crate) fn from_raw(buf: &[u8]) { + Self::init(u32::decode_fixed(buf)) + } + + pub(crate) fn init(init_value: u32) { + ID_BUF.store(init_value, Release) + } + + pub(crate) fn build() -> u32 { + ID_BUF.fetch_add(1, Release) + } +} + +pub type TableId = u32; +pub type ColumnId = u32; + +#[cfg(test)] +mod test { + use crate::types::{IdGenerator, ID_BUF}; + use std::sync::atomic::Ordering::Release; + + /// Tips: 由于IdGenerator为static全局性质生成的id,因此需要单独测试避免其他测试方法干扰 + #[test] + #[ignore] + fn test_id_generator() { + assert_eq!(IdGenerator::build(), 0); + assert_eq!(IdGenerator::build(), 1); + + let buf = IdGenerator::encode_to_raw(); + test_id_generator_reset(); + + assert_eq!(IdGenerator::build(), 0); + + IdGenerator::from_raw(&buf); + + assert_eq!(IdGenerator::build(), 2); + assert_eq!(IdGenerator::build(), 3); + } + + fn test_id_generator_reset() { + ID_BUF.store(0, Release) + } +}