Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ async-backtrace = "0.2.6"
futures = "0.3.25"
futures-lite = "1.12.0"


[dev-dependencies]
tokio-test = "0.4.2"
ctor = "0.2.0"
env_logger = "0.10"
paste = "^1.0"
Expand Down
1 change: 0 additions & 1 deletion rust-toolchain

This file was deleted.

4 changes: 3 additions & 1 deletion src/binder/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Binder {
table_name: table_name.to_string(),
columns: columns
.into_iter()
.map(|col| (col.name.to_string(), col.desc.clone()))
.map(|col| (col.name.to_string(), col.nullable, col.desc.clone()))
.collect(),
};
Ok(plan)
Expand Down Expand Up @@ -68,10 +68,12 @@ mod tests {
columns: vec![
(
"id".to_string(),
false,
ColumnDesc::new(LogicalType::Integer, false),
),
(
"name".to_string(),
false,
ColumnDesc::new(LogicalType::Varchar, false),
),
],
Expand Down
4 changes: 3 additions & 1 deletion src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
use anyhow::Result;
use sqlparser::ast::{Ident, ObjectName, Statement};

use crate::catalog::{RootCatalog, DEFAULT_SCHEMA_NAME};
use crate::catalog::{RootCatalog, DEFAULT_SCHEMA_NAME, CatalogError};
use crate::expression::ScalarExpression;
use crate::planner::LogicalPlan;
use crate::types::TableId;
Expand Down Expand Up @@ -109,4 +109,6 @@ pub enum BindError {
BinaryOpTypeMismatch(String, String),
#[error("subquery in FROM must have an alias")]
SubqueryMustHaveAlias,
#[error("catalog error")]
CatalogError(#[from] CatalogError),
}
42 changes: 37 additions & 5 deletions src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,7 @@ impl Binder {
// *col_id,
// col.desc().clone(),
// );
let expr = ScalarExpression::ColumnRef {
column_ref_id,
primary_key: col.desc.is_primary(),
desc: col.desc.clone(),
};
let expr = ScalarExpression::ColumnRef((*col).clone());
exprs.push(expr);
}
}
Expand Down Expand Up @@ -317,3 +313,39 @@ impl Binder {
Ok(LimitOperator::new(offset, limit, children))
}
}

#[cfg(test)]
mod tests {
use sqlparser::ast::CharacterLength;

use super::*;
use crate::binder::{BinderContext, BindError};
use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog};
use crate::planner::LogicalPlan;
use crate::types::LogicalType;
use crate::types::LogicalType::{Boolean, Integer};

fn test_root_catalog() -> Result<RootCatalog, BindError> {
let mut root = RootCatalog::new();
let cols = vec![
ColumnCatalog::new("c1".to_string(), false, ColumnDesc::new(Integer, true)),
ColumnCatalog::new("c2".to_string(), false, ColumnDesc::new(Boolean, false)),
];
let _ = root.add_table("t1".to_string(), cols)?;
Ok(root)
}

#[test]
fn test_select_bind() -> Result<(), BindError> {
let sql = "select * from t1";
let root = test_root_catalog()?;

let binder = Binder::new(BinderContext::new(root));
let stmt = crate::parser::parse_sql(sql).unwrap();
let plan = binder.bind(&stmt[0]).unwrap();

println!("{:#?}", plan);

Ok(())
}
}
34 changes: 24 additions & 10 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use arrow::datatypes::{DataType, Field};
use sqlparser::ast::ColumnDef;
use sqlparser::ast::{ColumnDef, ColumnOption};

use crate::types::{ColumnId, IdGenerator, LogicalType};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnCatalog {
pub id: ColumnId,
pub name: String,
pub nullable: bool,
pub desc: ColumnDesc,
}

impl ColumnCatalog {
pub(crate) fn new(column_name: String, column_desc: ColumnDesc) -> ColumnCatalog {
pub(crate) fn new(column_name: String, nullable: bool, column_desc: ColumnDesc) -> ColumnCatalog {
ColumnCatalog {
id: IdGenerator::build(),
name: column_name,
nullable,
desc: column_desc,
}
}
Expand All @@ -33,20 +35,32 @@ impl ColumnCatalog {

pub fn to_field(&self) -> Field {
Field::new(
&*self.name.clone(),
DataType::from(self.desc.column_datatype.clone()),
self.desc.is_primary(),
self.name.as_str(),
DataType::from(self.datatype().clone()),
self.nullable,
)
}
}

impl From<ColumnDef> for ColumnCatalog {
fn from(column_def: ColumnDef) -> Self {
let column_name = column_def.name.to_string();
let column_datatype = LogicalType::try_from(column_def.data_type).unwrap();
let is_primary = false;
let column_desc = ColumnDesc::new(column_datatype, is_primary);
ColumnCatalog::new(column_name, column_desc)
let column_desc = ColumnDesc::new(
LogicalType::try_from(column_def.data_type).unwrap(),
false
);
let mut nullable = false;

// TODO: 这里可以对更多字段可设置内容进行补充
for option_def in column_def.options {
match option_def.option {
ColumnOption::Null => nullable = true,
ColumnOption::NotNull => (),
_ => todo!()
}
}

ColumnCatalog::new(column_name, nullable, column_desc)
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/catalog/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ mod tests {

let col0 = ColumnCatalog::new(
"a".to_string(),
false,
ColumnDesc::new(LogicalType::Integer, false),
);
let col1 = ColumnCatalog::new(
"b".to_string(),
false,
ColumnDesc::new(LogicalType::Boolean, false),
);
let col_catalogs = vec![col0, col1];
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ mod tests {
// | 1 | true |
// | 2 | false |
fn test_table_catalog() {
let col0 = ColumnCatalog::new("a".into(), ColumnDesc::new(LogicalType::Integer, false));
let col1 = ColumnCatalog::new("b".into(), ColumnDesc::new(LogicalType::Boolean, false));
let col0 = ColumnCatalog::new("a".into(), false, ColumnDesc::new(LogicalType::Integer, false));
let col1 = ColumnCatalog::new("b".into(), false, ColumnDesc::new(LogicalType::Boolean, false));
let col_catalogs = vec![col0, col1];
let table_catalog = TableCatalog::new("test".to_string(), col_catalogs).unwrap();

Expand Down
119 changes: 87 additions & 32 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use sqlparser::parser::ParserError;

use crate::binder::{BindError, Binder, BinderContext};
use crate::catalog::ColumnCatalog;
use crate::execution_v1::physical_plan::physical_plan_builder::PhysicalPlanBuilder;
use crate::execution_v1::volcano_executor::VolcanoExecutor;
use crate::parser::parse_sql;
use crate::planner::LogicalPlan;
use crate::storage::memory::InMemoryStorage;
use crate::storage::{Storage, StorageError};
use crate::storage::{Storage, StorageError, StorageImpl};
use crate::types::IdGenerator;

#[derive(Debug)]
Expand All @@ -33,7 +35,7 @@ impl Database {
}

/// Run SQL queries.
pub fn run(&mut self, sql: &str) -> Result<()> {
pub async fn run(&self, sql: &str) -> Result<Vec<RecordBatch>> {
// parse
let stmts = parse_sql(sql)?;
// bind
Expand All @@ -49,36 +51,44 @@ impl Database {
/// Limit(1)
/// Project(a,b)
let logical_plan = binder.bind(&stmts[0])?;
println!("logic plan {:?}", logical_plan);

// let physical_planner = PhysicalPlaner::default();
// let executor_builder = ExecutorBuilder::new(self.env.clone());

// let physical_plan = physical_planner.plan(logical_plan)?;
// let executor = executor_builder.build(physical_plan)?;
// futures::executor::block_on(executor).unwrap();

/// THE FOLLOWING CODE IS FOR TESTING ONLY
/// THE FINAL CODE WILL BE IN executor MODULE
if let LogicalPlan::CreateTable(plan) = logical_plan {
let mut columns = Vec::new();
plan.columns.iter().for_each(|c| {
columns.push(ColumnCatalog::new(c.0.clone(), c.1.clone()));
});
let table_name = plan.table_name.clone();
// columns->batch record
let mut data = Vec::new();

columns.iter().for_each(|c| {
let batch = RecordBatch::new_empty(Arc::new(Schema::new(vec![c.to_field()])));
data.push(batch);
});

self.storage
.create_table(IdGenerator::build(), table_name.as_str(), data)?;
}

Ok(())
println!("logic plan {:#?}", logical_plan);

let mut builder = PhysicalPlanBuilder::new();
let operator = builder.build_plan(&logical_plan)?;

let storage = StorageImpl::InMemoryStorage(self.storage.clone());
let executor = VolcanoExecutor::new(storage);

let mut stream = executor.build(operator);

Ok(VolcanoExecutor::try_collect(&mut stream).await?)

// // let physical_planner = PhysicalPlaner::default();
// // let executor_builder = ExecutorBuilder::new(self.env.clone());
//
// // let physical_plan = physical_planner.plan(logical_plan)?;
// // let executor = executor_builder.build(physical_plan)?;
// // futures::executor::block_on(executor).unwrap();
//
// /// THE FOLLOWING CODE IS FOR TESTING ONLY
// /// THE FINAL CODE WILL BE IN executor MODULE
// if let LogicalPlan::CreateTable(plan) = logical_plan {
// let mut columns = Vec::new();
// plan.columns.iter().for_each(|c| {
// columns.push(ColumnCatalog::new(c.0.clone(), c.1, c.2.clone()));
// });
// let table_name = plan.table_name.clone();
// // columns->batch record
// let mut data = Vec::new();
//
// columns.iter().for_each(|c| {
// let batch = RecordBatch::new_empty(Arc::new(Schema::new(vec![c.to_field()])));
// data.push(batch);
// });
//
// self.storage
// .create_table(IdGenerator::build(), table_name.as_str(), data)?;
// }
}
}

Expand Down Expand Up @@ -113,3 +123,48 @@ pub enum DatabaseError {
#[error("Internal error: {0}")]
InternalError(String),
}

#[cfg(test)]
mod test {
use std::sync::Arc;
use arrow::array::Int32Array;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use itertools::Itertools;
use crate::catalog::{ColumnCatalog, ColumnDesc};
use crate::db::Database;
use crate::execution_v1::ExecutorError;
use crate::storage::{Storage, StorageError};
use crate::storage::memory::InMemoryStorage;
use crate::types::{IdGenerator, LogicalType, TableId};

fn build_table(storage: &impl Storage) -> Result<TableId, StorageError> {
let fields = vec![
ColumnCatalog::new(
"c1".to_string(),
false,
ColumnDesc::new(LogicalType::Integer, true)
).to_field(),
];
let batch = RecordBatch::try_new(
Arc::new(Schema::new(fields)),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))]
).unwrap();

Ok(storage.create_table("t1", vec![batch])?)
}

#[test]
fn test_run_sql() -> anyhow::Result<()> {
let mut database = Database::new_on_mem();

let i = build_table(&database.storage)?;

tokio_test::block_on(async move {
let batch = database.run("select * from t1").await?;
println!("{:#?}", batch);

Ok(())
})
}
}
5 changes: 2 additions & 3 deletions src/execution/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ use super::{
executor::ExecutionQueue,
parallel::{
meta_pipeline::MetaPipeline,
pipeline::Pipeline,
pipeline_event::{PipelineEvent, PipelineEventStack},
},
physical::PhysicalOperator,
};
use anyhow::Result;

Expand Down Expand Up @@ -150,7 +148,7 @@ impl ExecutorGraph {

// Set up the dependencies within this `MetaPipeline`.
for pipeline in pipelines.iter() {
if let Some(source) = pipeline.get_source() {
if let Some(_source) = pipeline.get_source() {
// if (source->type ==
// PhysicalOperatorType::TABLE_SCAN) { //
// we have to reset the source here (in the main thread),
Expand Down Expand Up @@ -213,6 +211,7 @@ impl ExecutorGraph {
self.graph[index].clone()
}

#[allow(dead_code)]
fn get_prev_nodes(&self, index: NodeIndex) -> Vec<Arc<PipelineEvent>> {
let mut prev_nodes = vec![];
for edge in self.graph.edges_directed(index, Direction::Incoming) {
Expand Down
Loading