From 7c590b30ca2c4bb3ef9ac6d9cbfc181f322de14c Mon Sep 17 00:00:00 2001 From: Pavel Tiunov Date: Mon, 16 Nov 2020 22:17:40 -0800 Subject: [PATCH] fix(cubestore): CREATE SCHEMA IF NOT EXISTS support --- rust/cubestore/src/bin/cubestored.rs | 10 +- rust/cubestore/src/config/mod.rs | 40 +++- rust/cubestore/src/metastore/mod.rs | 28 ++- rust/cubestore/src/scheduler/mod.rs | 42 ++-- rust/cubestore/src/sql/mod.rs | 266 +++++++++++-------------- rust/cubestore/src/sql/parser.rs | 79 ++++++++ rust/cubestore/src/store/compaction.rs | 2 +- rust/cubestore/src/store/mod.rs | 2 +- 8 files changed, 274 insertions(+), 195 deletions(-) create mode 100644 rust/cubestore/src/sql/parser.rs diff --git a/rust/cubestore/src/bin/cubestored.rs b/rust/cubestore/src/bin/cubestored.rs index 81b508d170aa..a0dc64e50623 100644 --- a/rust/cubestore/src/bin/cubestored.rs +++ b/rust/cubestore/src/bin/cubestored.rs @@ -1,5 +1,4 @@ use cubestore::mysql::MySqlServer; -use futures::future::{join3}; use cubestore::config::Config; use simple_logger::SimpleLogger; use log::Level; @@ -43,13 +42,6 @@ fn main() { let services = config.configure().await; services.start_processing_loops().await.unwrap(); - let (r1, r2, r3) = join3( - MySqlServer::listen("0.0.0.0:3306".to_string(), services.sql_service.clone()), - services.scheduler.write().await.run_scheduler(), - services.listener.run_listener(), - ).await; - r1.unwrap(); - r2.unwrap(); - r3.unwrap(); + MySqlServer::listen("0.0.0.0:3306".to_string(), services.sql_service.clone()).await.unwrap(); }); } diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index e7328f294e91..1f62e999ab55 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -1,6 +1,5 @@ use crate::remotefs::{LocalDirRemoteFs, RemoteFs}; -use std::{env}; -use tokio::sync::{RwLock}; +use std::{env, fs}; use crate::metastore::RocksMetaStore; use std::sync::Arc; use crate::store::{WALStore, ChunkStore}; @@ -14,16 +13,16 @@ use crate::scheduler::SchedulerImpl; use std::path::PathBuf; use mockall::automock; use tokio::sync::broadcast; -use crate::metastore::listener::{MetastoreListenerImpl}; use crate::CubeError; use crate::remotefs::s3::S3RemoteFs; use crate::queryplanner::query_executor::{QueryExecutor, QueryExecutorImpl}; +use rocksdb::{DB, Options}; +use std::future::Future; #[derive(Clone)] pub struct CubeServices { pub sql_service: Arc, - pub scheduler: Arc>, - pub listener: Arc, + pub scheduler: Arc, pub meta_store: Arc, pub cluster: Arc } @@ -38,6 +37,8 @@ impl CubeServices { self.cluster.start_processing_loops().await; let meta_store = self.meta_store.clone(); tokio::spawn(async move { meta_store.run_upload_loop().await }); + let scheduler = self.scheduler.clone(); + tokio::spawn(async move { scheduler.run_scheduler().await }); Ok(()) } @@ -45,6 +46,7 @@ impl CubeServices { pub async fn stop_processing_loops(&self) -> Result<(), CubeError> { self.cluster.stop_processing_loops().await?; self.meta_store.stop_processing_loops().await; + self.scheduler.stop_processing_loops()?; Ok(()) } } @@ -117,6 +119,30 @@ impl Config { } } + pub async fn run_test(name: &str, test_fn: impl FnOnce(CubeServices) -> T) + where + T: Future + Send + 'static, + T::Output: Send + 'static + { + let config = Self::test(name); + + let store_path = config.local_dir().clone(); + let remote_store_path = config.remote_dir().clone(); + let _ = fs::remove_dir_all(store_path.clone()); + let _ = fs::remove_dir_all(remote_store_path.clone()); + { + let services = config.configure().await; + services.start_processing_loops().await.unwrap(); + + test_fn(services.clone()).await; + + services.stop_processing_loops().await.unwrap(); + } + let _ = DB::destroy(&Options::default(), config.meta_store_path()); + let _ = fs::remove_dir_all(store_path.clone()); + let _ = fs::remove_dir_all(remote_store_path.clone()); + } + pub fn config_obj(&self) -> Arc { self.config_obj.clone() } @@ -155,7 +181,6 @@ impl Config { let remote_fs = self.remote_fs().unwrap(); let (event_sender, event_receiver) = broadcast::channel(10000); // TODO config - let listener = MetastoreListenerImpl::new(event_sender.subscribe()); let meta_store = RocksMetaStore::load_from_remote(self.meta_store_path().to_str().unwrap(), remote_fs.clone()).await.unwrap(); meta_store.add_listener(event_sender).await; let wal_store = WALStore::new(meta_store.clone(), remote_fs.clone(), 500000); @@ -182,8 +207,7 @@ impl Config { CubeServices { sql_service, - scheduler: Arc::new(RwLock::new(scheduler)), - listener, + scheduler: Arc::new(scheduler), meta_store, cluster } diff --git a/rust/cubestore/src/metastore/mod.rs b/rust/cubestore/src/metastore/mod.rs index 9d6d55ace871..7801691fccb7 100644 --- a/rust/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/src/metastore/mod.rs @@ -473,7 +473,7 @@ impl) -> R + Send + Sync + Clone + 'stati pub trait MetaStore: Send + Sync { async fn wait_for_current_seq_to_sync(&self) -> Result<(), CubeError>; fn schemas_table(&self) -> Box>; - async fn create_schema(&self, schema_name: String) -> Result, CubeError>; + async fn create_schema(&self, schema_name: String, if_not_exists: bool) -> Result, CubeError>; async fn get_schemas(&self) -> Result>, CubeError>; async fn get_schema_by_id(&self, schema_id: u64) -> Result, CubeError>; //TODO Option @@ -1376,10 +1376,16 @@ impl MetaStore for RocksMetaStore { }) } - async fn create_schema(&self, schema_name: String) -> Result, CubeError> { + async fn create_schema(&self, schema_name: String, if_not_exists: bool) -> Result, CubeError> { self.write_operation(move |db_ref, batch_pipe| { let table = SchemaRocksTable::new(db_ref.clone()); - let schema = Schema { name: schema_name }; + if if_not_exists { + let rows = table.get_rows_by_index(&schema_name, &SchemaRocksIndex::Name)?; + if let Some(row) = rows.into_iter().nth(0) { + return Ok(row); + } + } + let schema = Schema { name: schema_name.clone() }; Ok(table.insert(schema, batch_pipe)?) }).await } @@ -1859,18 +1865,18 @@ mod tests { { let meta_store = RocksMetaStore::new(store_path.join("metastore").as_path(), remote_fs); - let schema_1 = meta_store.create_schema("foo".to_string()).await.unwrap(); + let schema_1 = meta_store.create_schema("foo".to_string(), false).await.unwrap(); println!("New id: {}", schema_1.id); - let schema_2 = meta_store.create_schema("bar".to_string()).await.unwrap(); + let schema_2 = meta_store.create_schema("bar".to_string(), false).await.unwrap(); println!("New id: {}", schema_2.id); - let schema_3 = meta_store.create_schema("boo".to_string()).await.unwrap(); + let schema_3 = meta_store.create_schema("boo".to_string(), false).await.unwrap(); println!("New id: {}", schema_3.id); let schema_1_id = schema_1.id; let schema_2_id = schema_2.id; let schema_3_id = schema_3.id; - assert!(meta_store.create_schema("foo".to_string()).await.is_err()); + assert!(meta_store.create_schema("foo".to_string(), false).await.is_err()); assert_eq!(meta_store.get_schema("foo".to_string()).await.unwrap(), schema_1); assert_eq!(meta_store.get_schema("bar".to_string()).await.unwrap(), schema_2); @@ -1928,7 +1934,7 @@ mod tests { { let meta_store = RocksMetaStore::new(store_path.clone().join("metastore").as_path(), remote_fs); - let schema_1 = meta_store.create_schema( "foo".to_string()).await.unwrap(); + let schema_1 = meta_store.create_schema( "foo".to_string(), false).await.unwrap(); let mut columns = Vec::new(); columns.push(Column::new("col1".to_string(), ColumnType::Int, 0)); columns.push(Column::new("col2".to_string(), ColumnType::String, 1)); @@ -1962,11 +1968,11 @@ mod tests { { let services = config.configure().await; services.start_processing_loops().await.unwrap(); - services.meta_store.create_schema("foo1".to_string()).await.unwrap(); + services.meta_store.create_schema("foo1".to_string(), false).await.unwrap(); services.meta_store.run_upload().await.unwrap(); - services.meta_store.create_schema("foo".to_string()).await.unwrap(); + services.meta_store.create_schema("foo".to_string(), false).await.unwrap(); services.meta_store.upload_check_point().await.unwrap(); - services.meta_store.create_schema("bar".to_string()).await.unwrap(); + services.meta_store.create_schema("bar".to_string(), false).await.unwrap(); services.meta_store.run_upload().await.unwrap(); services.stop_processing_loops().await.unwrap(); } diff --git a/rust/cubestore/src/scheduler/mod.rs b/rust/cubestore/src/scheduler/mod.rs index b3cded9e536c..69bb849918c0 100644 --- a/rust/cubestore/src/scheduler/mod.rs +++ b/rust/cubestore/src/scheduler/mod.rs @@ -7,12 +7,15 @@ use crate::metastore::job::{JobType, Job}; use log::{error}; use crate::remotefs::RemoteFs; use crate::store::{WALStore, ChunkStore}; +use tokio::sync::{Mutex, watch}; pub struct SchedulerImpl { meta_store: Arc, cluster: Arc, remote_fs: Arc, - event_receiver: Receiver + event_receiver: Mutex>, + stop_sender: watch::Sender, + stop_receiver: Mutex>, } impl SchedulerImpl { @@ -22,17 +25,33 @@ impl SchedulerImpl { remote_fs: Arc, event_receiver: Receiver ) -> SchedulerImpl { + let (tx,rx) = watch::channel(false); SchedulerImpl { meta_store, cluster, remote_fs, - event_receiver + event_receiver: Mutex::new(event_receiver), + stop_sender: tx, + stop_receiver: Mutex::new(rx) } } - pub async fn run_scheduler(&mut self) -> Result<(), CubeError> { + pub async fn run_scheduler(&self) -> Result<(), CubeError> { loop { - let event = self.event_receiver.recv().await?; + let mut stop_receiver = self.stop_receiver.lock().await; + let mut event_receiver = self.event_receiver.lock().await; + let event = tokio::select! { + Some(stopped) = stop_receiver.recv() => { + if stopped { + return Ok(()); + } else { + continue; + } + } + event = event_receiver.recv() => { + event? + } + }; let res = self.process_event(event.clone()).await; if let Err(e) = res { error!("Error processing event {:?}: {}", event, e); @@ -40,20 +59,11 @@ impl SchedulerImpl { } } - pub async fn run_scheduler_until(&mut self, last_event_fn: impl Fn(MetaStoreEvent) -> bool) -> Result<(), CubeError> { - loop { - let event = self.event_receiver.recv().await?; - let res = self.process_event(event.clone()).await; - if let Err(e) = res { - error!("Error processing event {:?}: {}", event, e); - } - if last_event_fn(event) { - return Ok(()) - } - } + pub fn stop_processing_loops(&self) -> Result<(), CubeError> { + Ok(self.stop_sender.broadcast(true)?) } - async fn process_event(&mut self, event: MetaStoreEvent) -> Result<(), CubeError> { + async fn process_event(&self, event: MetaStoreEvent) -> Result<(), CubeError> { if let MetaStoreEvent::Insert(TableId::WALs, row_id) | MetaStoreEvent::Update(TableId::WALs, row_id) = event { diff --git a/rust/cubestore/src/sql/mod.rs b/rust/cubestore/src/sql/mod.rs index 67945461e3d6..4c5946412000 100644 --- a/rust/cubestore/src/sql/mod.rs +++ b/rust/cubestore/src/sql/mod.rs @@ -1,3 +1,5 @@ +mod parser; + use log::{trace}; use sqlparser::dialect::{Dialect}; @@ -15,12 +17,13 @@ use crate::queryplanner::{QueryPlanner, QueryPlan}; use crate::cluster::{Cluster, JobEvent}; -use datafusion::sql::parser::{DFParser, CreateExternalTable}; +use parser::{Statement as CubeStoreStatement}; use datafusion::sql::parser::{Statement as DFStatement}; use futures::future::join_all; use crate::metastore::job::JobType; use datafusion::physical_plan::datetime_expressions::string_to_timestamp_nanos; use crate::queryplanner::query_executor::QueryExecutor; +use crate::sql::parser::CubeStoreParser; #[async_trait] pub trait SqlService: Send + Sync { @@ -46,8 +49,8 @@ impl SqlServiceImpl { Arc::new(SqlServiceImpl { db, wal_store, query_planner, query_executor, cluster }) } - async fn create_schema(&self, name: String) -> Result, CubeError> { - self.db.create_schema(name).await + async fn create_schema(&self, name: String, if_not_exists: bool) -> Result, CubeError> { + self.db.create_schema(name, if_not_exists).await } async fn create_table(&self, schema_name: String, table_name: String, columns: &Vec, external: bool, location: Option, indexes: Vec) -> Result, CubeError> { @@ -153,100 +156,97 @@ impl SqlService for SqlServiceImpl { if let Some(data_frame) = SqlServiceImpl::handle_workbench_queries(q) { return Ok(data_frame); } - let dialect = &MySqlDialectWithBackTicks {}; let replaced_quote = q.replace("\\'", "''"); - let ast = DFParser::parse_sql_with_dialect(&replaced_quote, dialect)?; + let mut parser = CubeStoreParser::new(&replaced_quote)?; + let ast = parser.parse_statement()?; // trace!("AST is: {:?}", ast); - for query in ast { - match query { - DFStatement::Statement(Statement::ShowVariable { variable }) => { - return match variable.value.to_lowercase() { - s if s == "schemas" => Ok(DataFrame::from(self.db.get_schemas().await?)), - s if s == "tables" => Ok(DataFrame::from(self.db.get_tables().await?)), - s if s == "chunks" => Ok(DataFrame::from(self.db.chunks_table().all_rows().await?)), - s if s == "indexes" => Ok(DataFrame::from(self.db.index_table().all_rows().await?)), - s if s == "partitions" => Ok(DataFrame::from(self.db.partition_table().all_rows().await?)), - x => Err(CubeError::user(format!("Unknown SHOW: {}", x))) - }; - } - DFStatement::Statement(Statement::SetVariable { .. }) => { - return Ok(DataFrame::new(vec![], vec![])); + match ast { + CubeStoreStatement::Statement(Statement::ShowVariable { variable }) => { + match variable.value.to_lowercase() { + s if s == "schemas" => Ok(DataFrame::from(self.db.get_schemas().await?)), + s if s == "tables" => Ok(DataFrame::from(self.db.get_tables().await?)), + s if s == "chunks" => Ok(DataFrame::from(self.db.chunks_table().all_rows().await?)), + s if s == "indexes" => Ok(DataFrame::from(self.db.index_table().all_rows().await?)), + s if s == "partitions" => Ok(DataFrame::from(self.db.partition_table().all_rows().await?)), + x => Err(CubeError::user(format!("Unknown SHOW: {}", x))) } - DFStatement::Statement(Statement::CreateSchema { schema_name }) => { - let name = schema_name.to_string(); - let res = self.create_schema(name).await?; - return Ok(DataFrame::from(vec![res])); + } + CubeStoreStatement::Statement(Statement::SetVariable { .. }) => { + Ok(DataFrame::new(vec![], vec![])) + } + CubeStoreStatement::CreateSchema { schema_name, if_not_exists } => { + let name = schema_name.to_string(); + let res = self.create_schema(name, if_not_exists).await?; + Ok(DataFrame::from(vec![res])) + } + CubeStoreStatement::Statement(Statement::CreateTable { name, columns, external, location, .. }) => { + let nv = &name.0; + if nv.len() != 2 { + return Err(CubeError::user(format!("Schema's name should be present in query (boo.table1). Your query was '{}'", q))); } - DFStatement::Statement(Statement::CreateTable { name, columns, external, location, .. }) => { - let nv = &name.0; - if nv.len() != 2 { - return Err(CubeError::user(format!("Schema's name should be present in query (boo.table1). Your query was '{}'", q))); - } - let schema_name = &nv[0].value; - let table_name = &nv[1].value; + let schema_name = &nv[0].value; + let table_name = &nv[1].value; - let res = self.create_table(schema_name.clone(), table_name.clone(), &columns, external, location, vec![]).await?; - return Ok(DataFrame::from(vec![res])); - } - DFStatement::Statement(Statement::Drop { object_type, names, .. }) => { - match object_type { - ObjectType::Schema => { - self.db.delete_schema(names[0].to_string()).await?; - } - ObjectType::Table => { - let table = self.db.get_table(names[0].0[0].to_string(), names[0].0[1].to_string()).await?; - self.db.drop_table(table.get_id()).await?; - } - _ => return Err(CubeError::user("Unsupported drop operation".to_string())) + let res = self.create_table(schema_name.clone(), table_name.clone(), &columns, external, location, vec![]).await?; + Ok(DataFrame::from(vec![res])) + } + CubeStoreStatement::Statement(Statement::Drop { object_type, names, .. }) => { + match object_type { + ObjectType::Schema => { + self.db.delete_schema(names[0].to_string()).await?; } - return Ok(DataFrame::new(vec![], vec![])) - } - DFStatement::CreateExternalTable(CreateExternalTable { name, columns, location, indexes, .. }) => { - let ObjectName(table_ident) = name.clone(); - if table_ident.len() != 2 { - return Err(CubeError::user(format!("Schema name expected in table name but '{}' found", name.to_string()))); + ObjectType::Table => { + let table = self.db.get_table(names[0].0[0].to_string(), names[0].0[1].to_string()).await?; + self.db.drop_table(table.get_id()).await?; } - - let res = self.create_table( - table_ident[0].value.to_string(), - table_ident[1].value.to_string(), - &columns, - true, - Some(location), - indexes - ).await?; - return Ok(DataFrame::from(vec![res])); + _ => return Err(CubeError::user("Unsupported drop operation".to_string())) } - DFStatement::Statement(Statement::Insert { table_name, columns, source }) => { - let data = if let SetExpr::Values(Values(data_series)) = &source.body { - data_series - } else { - return Err(CubeError::user(format!("Data should be present in query. Your query was '{}'", q))); - }; - - let nv = &table_name.0; - if nv.len() != 2 { - return Err(CubeError::user(format!("Schema's name should be present in query (boo.table1). Your query was '{}'", q))); - } - let schema_name = &nv[0].value; - let table_name = &nv[1].value; - - self.insert_data(schema_name.clone(), table_name.clone(), &columns, data).await?; - return Ok(DataFrame::new(vec![], vec![])); + Ok(DataFrame::new(vec![], vec![])) + } + /*CubeStoreStatement::CreateExternalTable(CreateExternalTable { name, columns, location, indexes, .. }) => { + let ObjectName(table_ident) = name.clone(); + if table_ident.len() != 2 { + return Err(CubeError::user(format!("Schema name expected in table name but '{}' found", name.to_string()))); } - DFStatement::Statement(Statement::Query(_)) => { - let logical_plan = self.query_planner.logical_plan(query.clone()).await?; - // TODO distribute and combine - let res = match logical_plan { - QueryPlan::Meta(logical_plan) => self.query_planner.execute_meta_plan(logical_plan).await?, - QueryPlan::Select(serialized) => self.query_executor.execute_router_plan(serialized, self.cluster.clone()).await? - }; - return Ok(res); + + let res = self.create_table( + table_ident[0].value.to_string(), + table_ident[1].value.to_string(), + &columns, + true, + Some(location), + indexes + ).await?; + Ok(DataFrame::from(vec![res])); + }*/ + CubeStoreStatement::Statement(Statement::Insert { table_name, columns, source }) => { + let data = if let SetExpr::Values(Values(data_series)) = &source.body { + data_series + } else { + return Err(CubeError::user(format!("Data should be present in query. Your query was '{}'", q))); + }; + + let nv = &table_name.0; + if nv.len() != 2 { + return Err(CubeError::user(format!("Schema's name should be present in query (boo.table1). Your query was '{}'", q))); } - _ => return Err(CubeError::user(format!("Unsupported SQL: '{}'", q))) - }; + let schema_name = &nv[0].value; + let table_name = &nv[1].value; + + self.insert_data(schema_name.clone(), table_name.clone(), &columns, data).await?; + Ok(DataFrame::new(vec![], vec![])) + } + CubeStoreStatement::Statement(Statement::Query(q)) => { + let logical_plan = self.query_planner.logical_plan(DFStatement::Statement(Statement::Query(q))).await?; + // TODO distribute and combine + let res = match logical_plan { + QueryPlan::Meta(logical_plan) => self.query_planner.execute_meta_plan(logical_plan).await?, + QueryPlan::Select(serialized) => self.query_executor.execute_router_plan(serialized, self.cluster.clone()).await? + }; + Ok(res) + } + _ => Err(CubeError::user(format!("Unsupported SQL: '{}'", q))) } - Err(CubeError::user(format!("Unsupported SQL: '{}'", q))) } } @@ -374,14 +374,9 @@ mod tests { use crate::queryplanner::MockQueryPlanner; use crate::queryplanner::query_executor::MockQueryExecutor; use crate::config::Config; - use crate::metastore::{MetaStoreEvent, RocksMetaStore}; - use crate::metastore::job::JobType; - use std::borrow::BorrowMut; + use crate::metastore::{RocksMetaStore}; use crate::cluster::MockCluster; - use crate::metastore::listener::{MetastoreListenerImpl}; - use futures::future::{join3}; use std::fs; - use crate::scheduler::SchedulerImpl; use crate::store::WALStore; #[actix_rt::test] @@ -514,73 +509,46 @@ mod tests { // let _ = fs::remove_dir_all(remote_store_path.clone()); } - #[actix_rt::test] + #[tokio::test] async fn select_test() { - let config = Config::test("select"); + Config::run_test("select", async move |services| { + let service = services.sql_service; - let store_path = config.local_dir().clone(); - let remote_store_path = config.remote_dir().clone(); - let _ = fs::remove_dir_all(store_path.clone()); - let _ = fs::remove_dir_all(remote_store_path.clone()); - { - let services = config.configure().await; - services.start_processing_loops().await.unwrap(); - - select_tests(services.scheduler.write().await.borrow_mut(), services.listener.clone(), services.sql_service.clone()).await; - services.stop_processing_loops().await.unwrap(); - } - let _ = DB::destroy(&Options::default(), config.meta_store_path()); - let _ = fs::remove_dir_all(store_path.clone()); - let _ = fs::remove_dir_all(remote_store_path.clone()); - } - - async fn select_tests(scheduler: &mut SchedulerImpl, listener: Arc, service: Arc) { - let _ = service.exec_query("CREATE SCHEMA Foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA Foo").await.unwrap(); - let query = "CREATE TABLE Foo.Persons ( + let _ = service.exec_query( + "CREATE TABLE Foo.Persons ( PersonID int, LastName varchar(255), FirstName varchar(255), Address varchar(255), City varchar(255) - );"; - - let _ = service.exec_query(query).await.unwrap(); - - let query = "INSERT INTO Foo.Persons - (LastName, PersonID, FirstName, Address, City) - VALUES - ('LastName 1', 23, 'FirstName 1', 'Address 1', 'City 1'), ('LastName 2', 22, 'FirstName 2', 'Address 2', 'City 2');"; - - let (r1, r2, r3) = join3( - service.exec_query(query), - listener.run_listener_until(|e| match e { - MetaStoreEvent::DeleteJob(job) => { - match job.get_row().job_type() { - JobType::WalPartitioning => true, - _ => false - } - } - _ => false - }), - scheduler.run_scheduler_until(|e| match e { - MetaStoreEvent::DeleteJob(job) => { - match job.get_row().job_type() { - JobType::WalPartitioning => true, - _ => false - } - } - _ => false - }), - ).await; - r1.unwrap(); - r2.unwrap(); - r3.unwrap(); + );" + ).await.unwrap(); + + service.exec_query( + "INSERT INTO Foo.Persons + (LastName, PersonID, FirstName, Address, City) + VALUES + ('LastName 1', 23, 'FirstName 1', 'Address 1', 'City 1'), + ('LastName 2', 22, 'FirstName 2', 'Address 2', 'City 2');" + ).await.unwrap(); + + let result = service.exec_query("SELECT PersonID person_id from Foo.Persons").await.unwrap(); + + assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(22)])); + assert_eq!(result.get_rows()[1], Row::new(vec![TableValue::Int(23)])); + }).await; + } - let result = service.exec_query("SELECT PersonID person_id from Foo.Persons").await.unwrap(); + #[tokio::test] + async fn create_schema_if_not_exists() { + Config::run_test("create_schema_if_not_exists", async move |services| { + let service = services.sql_service; - assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(22)])); - assert_eq!(result.get_rows()[1], Row::new(vec![TableValue::Int(23)])); + let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap(); + }).await; } } diff --git a/rust/cubestore/src/sql/parser.rs b/rust/cubestore/src/sql/parser.rs new file mode 100644 index 000000000000..c839a098b59e --- /dev/null +++ b/rust/cubestore/src/sql/parser.rs @@ -0,0 +1,79 @@ +use sqlparser::dialect::Dialect; +use sqlparser::ast::{ObjectName, Statement as SQLStatement}; +use sqlparser::parser::{Parser, ParserError}; +use sqlparser::tokenizer::{Tokenizer, Token}; +use sqlparser::dialect::keywords::Keyword; + +#[derive(Debug)] +pub struct MySqlDialectWithBackTicks {} + +impl Dialect for MySqlDialectWithBackTicks { + fn is_delimited_identifier_start(&self, ch: char) -> bool { + ch == '"' || ch == '`' + } + + fn is_identifier_start(&self, ch: char) -> bool { + (ch >= 'a' && ch <= 'z') + || (ch >= 'A' && ch <= 'Z') + || ch == '_' + || ch == '$' + || (ch >= '\u{0080}' && ch <= '\u{ffff}') + } + + fn is_identifier_part(&self, ch: char) -> bool { + self.is_identifier_start(ch) || (ch >= '0' && ch <= '9') + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum Statement { + Statement(SQLStatement), + CreateSchema { schema_name: ObjectName, if_not_exists: bool }, +} + +pub struct CubeStoreParser { + parser: Parser, +} + +impl CubeStoreParser { + pub fn new(sql: &str) -> Result { + let dialect = &MySqlDialectWithBackTicks {}; + let mut tokenizer = Tokenizer::new(dialect, sql); + let tokens = tokenizer.tokenize()?; + Ok(CubeStoreParser { + parser: Parser::new(tokens), + }) + } + + pub fn parse_statement(&mut self) -> Result { + match self.parser.peek_token() { + Token::Word(w) => match w.keyword { + Keyword::CREATE => { + self.parser.next_token(); + self.parse_create() + } + _ => Ok(Statement::Statement(self.parser.parse_statement()?)) + }, + _ => Ok(Statement::Statement(self.parser.parse_statement()?)) + } + } + + pub fn parse_create(&mut self) -> Result { + if self.parser.parse_keyword(Keyword::SCHEMA) { + self.parse_create_schema() + } else { + Ok(Statement::Statement(self.parser.parse_create()?)) + } + } + + fn parse_create_schema(&mut self) -> Result { + let if_not_exists = self + .parser + .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); + let schema_name = self.parser.parse_object_name()?; + Ok(Statement::CreateSchema { + schema_name, + if_not_exists + }) + } +} \ No newline at end of file diff --git a/rust/cubestore/src/store/compaction.rs b/rust/cubestore/src/store/compaction.rs index 8670f6a83b18..c537ff3dd0e4 100644 --- a/rust/cubestore/src/store/compaction.rs +++ b/rust/cubestore/src/store/compaction.rs @@ -117,7 +117,7 @@ mod tests { let (remote_fs, metastore) = RocksMetaStore::prepare_test_metastore("compaction"); let mut chunk_store = MockChunkDataStore::new(); let mut config = MockConfigObj::new(); - metastore.create_schema("foo".to_string()).await.unwrap(); + metastore.create_schema("foo".to_string(), false).await.unwrap(); let cols = vec![Column::new("name".to_string(), ColumnType::String, 0)]; metastore.create_table( "foo".to_string(), diff --git a/rust/cubestore/src/store/mod.rs b/rust/cubestore/src/store/mod.rs index 358dc7b76911..43a9356d1662 100644 --- a/rust/cubestore/src/store/mod.rs +++ b/rust/cubestore/src/store/mod.rs @@ -386,7 +386,7 @@ mod tests { let first_rows = (0..35).map(|i| Row::new(vec![TableValue::Int(34 - i), TableValue::String(format!("Foo {}", 34 - i)), TableValue::String(format!("Boo {}", 34 - i))])).collect::>(); let data_frame = DataFrame::new(col.clone(), first_rows); - meta_store.create_schema("foo".to_string()).await.unwrap(); + meta_store.create_schema("foo".to_string(), false).await.unwrap(); let table = meta_store.create_table("foo".to_string(), "bar".to_string(), col.clone(), None, None, vec![]).await.unwrap(); let _ = wal_store.add_wal(table.clone(), data_frame).await;