Skip to content

Commit

Permalink
fix(cubestore): CREATE SCHEMA IF NOT EXISTS support
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Nov 17, 2020
1 parent 5eaf369 commit 7c590b3
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 195 deletions.
10 changes: 1 addition & 9 deletions rust/cubestore/src/bin/cubestored.rs
@@ -1,5 +1,4 @@
use cubestore::mysql::MySqlServer;
use futures::future::{join3};
use cubestore::config::Config;
use simple_logger::SimpleLogger;
use log::Level;
Expand Down Expand Up @@ -43,13 +42,6 @@ fn main() {
let services = config.configure().await;
services.start_processing_loops().await.unwrap();

let (r1, r2, r3) = join3(
MySqlServer::listen("0.0.0.0:3306".to_string(), services.sql_service.clone()),
services.scheduler.write().await.run_scheduler(),
services.listener.run_listener(),
).await;
r1.unwrap();
r2.unwrap();
r3.unwrap();
MySqlServer::listen("0.0.0.0:3306".to_string(), services.sql_service.clone()).await.unwrap();
});
}
40 changes: 32 additions & 8 deletions rust/cubestore/src/config/mod.rs
@@ -1,6 +1,5 @@
use crate::remotefs::{LocalDirRemoteFs, RemoteFs};
use std::{env};
use tokio::sync::{RwLock};
use std::{env, fs};
use crate::metastore::RocksMetaStore;
use std::sync::Arc;
use crate::store::{WALStore, ChunkStore};
Expand All @@ -14,16 +13,16 @@ use crate::scheduler::SchedulerImpl;
use std::path::PathBuf;
use mockall::automock;
use tokio::sync::broadcast;
use crate::metastore::listener::{MetastoreListenerImpl};
use crate::CubeError;
use crate::remotefs::s3::S3RemoteFs;
use crate::queryplanner::query_executor::{QueryExecutor, QueryExecutorImpl};
use rocksdb::{DB, Options};
use std::future::Future;

#[derive(Clone)]
pub struct CubeServices {
pub sql_service: Arc<dyn SqlService>,
pub scheduler: Arc<RwLock<SchedulerImpl>>,
pub listener: Arc<MetastoreListenerImpl>,
pub scheduler: Arc<SchedulerImpl>,
pub meta_store: Arc<RocksMetaStore>,
pub cluster: Arc<ClusterImpl>
}
Expand All @@ -38,13 +37,16 @@ impl CubeServices {
self.cluster.start_processing_loops().await;
let meta_store = self.meta_store.clone();
tokio::spawn(async move { meta_store.run_upload_loop().await });
let scheduler = self.scheduler.clone();
tokio::spawn(async move { scheduler.run_scheduler().await });
Ok(())
}


pub async fn stop_processing_loops(&self) -> Result<(), CubeError> {
self.cluster.stop_processing_loops().await?;
self.meta_store.stop_processing_loops().await;
self.scheduler.stop_processing_loops()?;
Ok(())
}
}
Expand Down Expand Up @@ -117,6 +119,30 @@ impl Config {
}
}

pub async fn run_test<T>(name: &str, test_fn: impl FnOnce(CubeServices) -> T)
where
T: Future + Send + 'static,
T::Output: Send + 'static
{
let config = Self::test(name);

let store_path = config.local_dir().clone();
let remote_store_path = config.remote_dir().clone();
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
{
let services = config.configure().await;
services.start_processing_loops().await.unwrap();

test_fn(services.clone()).await;

services.stop_processing_loops().await.unwrap();
}
let _ = DB::destroy(&Options::default(), config.meta_store_path());
let _ = fs::remove_dir_all(store_path.clone());
let _ = fs::remove_dir_all(remote_store_path.clone());
}

pub fn config_obj(&self) -> Arc<dyn ConfigObj> {
self.config_obj.clone()
}
Expand Down Expand Up @@ -155,7 +181,6 @@ impl Config {
let remote_fs = self.remote_fs().unwrap();
let (event_sender, event_receiver) = broadcast::channel(10000); // TODO config

let listener = MetastoreListenerImpl::new(event_sender.subscribe());
let meta_store = RocksMetaStore::load_from_remote(self.meta_store_path().to_str().unwrap(), remote_fs.clone()).await.unwrap();
meta_store.add_listener(event_sender).await;
let wal_store = WALStore::new(meta_store.clone(), remote_fs.clone(), 500000);
Expand All @@ -182,8 +207,7 @@ impl Config {

CubeServices {
sql_service,
scheduler: Arc::new(RwLock::new(scheduler)),
listener,
scheduler: Arc::new(scheduler),
meta_store,
cluster
}
Expand Down
28 changes: 17 additions & 11 deletions rust/cubestore/src/metastore/mod.rs
Expand Up @@ -473,7 +473,7 @@ impl<R: RocksTable + 'static, F: Fn(Arc<DB>) -> R + Send + Sync + Clone + 'stati
pub trait MetaStore: Send + Sync {
async fn wait_for_current_seq_to_sync(&self) -> Result<(), CubeError>;
fn schemas_table(&self) -> Box<dyn MetaStoreTable<T=Schema>>;
async fn create_schema(&self, schema_name: String) -> Result<IdRow<Schema>, CubeError>;
async fn create_schema(&self, schema_name: String, if_not_exists: bool) -> Result<IdRow<Schema>, CubeError>;
async fn get_schemas(&self) -> Result<Vec<IdRow<Schema>>, CubeError>;
async fn get_schema_by_id(&self, schema_id: u64) -> Result<IdRow<Schema>, CubeError>;
//TODO Option
Expand Down Expand Up @@ -1376,10 +1376,16 @@ impl MetaStore for RocksMetaStore {
})
}

async fn create_schema(&self, schema_name: String) -> Result<IdRow<Schema>, CubeError> {
async fn create_schema(&self, schema_name: String, if_not_exists: bool) -> Result<IdRow<Schema>, CubeError> {
self.write_operation(move |db_ref, batch_pipe| {
let table = SchemaRocksTable::new(db_ref.clone());
let schema = Schema { name: schema_name };
if if_not_exists {
let rows = table.get_rows_by_index(&schema_name, &SchemaRocksIndex::Name)?;
if let Some(row) = rows.into_iter().nth(0) {
return Ok(row);
}
}
let schema = Schema { name: schema_name.clone() };
Ok(table.insert(schema, batch_pipe)?)
}).await
}
Expand Down Expand Up @@ -1859,18 +1865,18 @@ mod tests {
{
let meta_store = RocksMetaStore::new(store_path.join("metastore").as_path(), remote_fs);

let schema_1 = meta_store.create_schema("foo".to_string()).await.unwrap();
let schema_1 = meta_store.create_schema("foo".to_string(), false).await.unwrap();
println!("New id: {}", schema_1.id);
let schema_2 = meta_store.create_schema("bar".to_string()).await.unwrap();
let schema_2 = meta_store.create_schema("bar".to_string(), false).await.unwrap();
println!("New id: {}", schema_2.id);
let schema_3 = meta_store.create_schema("boo".to_string()).await.unwrap();
let schema_3 = meta_store.create_schema("boo".to_string(), false).await.unwrap();
println!("New id: {}", schema_3.id);

let schema_1_id = schema_1.id;
let schema_2_id = schema_2.id;
let schema_3_id = schema_3.id;

assert!(meta_store.create_schema("foo".to_string()).await.is_err());
assert!(meta_store.create_schema("foo".to_string(), false).await.is_err());

assert_eq!(meta_store.get_schema("foo".to_string()).await.unwrap(), schema_1);
assert_eq!(meta_store.get_schema("bar".to_string()).await.unwrap(), schema_2);
Expand Down Expand Up @@ -1928,7 +1934,7 @@ mod tests {
{
let meta_store = RocksMetaStore::new(store_path.clone().join("metastore").as_path(), remote_fs);

let schema_1 = meta_store.create_schema( "foo".to_string()).await.unwrap();
let schema_1 = meta_store.create_schema( "foo".to_string(), false).await.unwrap();
let mut columns = Vec::new();
columns.push(Column::new("col1".to_string(), ColumnType::Int, 0));
columns.push(Column::new("col2".to_string(), ColumnType::String, 1));
Expand Down Expand Up @@ -1962,11 +1968,11 @@ mod tests {
{
let services = config.configure().await;
services.start_processing_loops().await.unwrap();
services.meta_store.create_schema("foo1".to_string()).await.unwrap();
services.meta_store.create_schema("foo1".to_string(), false).await.unwrap();
services.meta_store.run_upload().await.unwrap();
services.meta_store.create_schema("foo".to_string()).await.unwrap();
services.meta_store.create_schema("foo".to_string(), false).await.unwrap();
services.meta_store.upload_check_point().await.unwrap();
services.meta_store.create_schema("bar".to_string()).await.unwrap();
services.meta_store.create_schema("bar".to_string(), false).await.unwrap();
services.meta_store.run_upload().await.unwrap();
services.stop_processing_loops().await.unwrap();
}
Expand Down
42 changes: 26 additions & 16 deletions rust/cubestore/src/scheduler/mod.rs
Expand Up @@ -7,12 +7,15 @@ use crate::metastore::job::{JobType, Job};
use log::{error};
use crate::remotefs::RemoteFs;
use crate::store::{WALStore, ChunkStore};
use tokio::sync::{Mutex, watch};

pub struct SchedulerImpl {
meta_store: Arc<dyn MetaStore>,
cluster: Arc<dyn Cluster>,
remote_fs: Arc<dyn RemoteFs>,
event_receiver: Receiver<MetaStoreEvent>
event_receiver: Mutex<Receiver<MetaStoreEvent>>,
stop_sender: watch::Sender<bool>,
stop_receiver: Mutex<watch::Receiver<bool>>,
}

impl SchedulerImpl {
Expand All @@ -22,38 +25,45 @@ impl SchedulerImpl {
remote_fs: Arc<dyn RemoteFs>,
event_receiver: Receiver<MetaStoreEvent>
) -> SchedulerImpl {
let (tx,rx) = watch::channel(false);
SchedulerImpl {
meta_store,
cluster,
remote_fs,
event_receiver
event_receiver: Mutex::new(event_receiver),
stop_sender: tx,
stop_receiver: Mutex::new(rx)
}
}

pub async fn run_scheduler(&mut self) -> Result<(), CubeError> {
pub async fn run_scheduler(&self) -> Result<(), CubeError> {
loop {
let event = self.event_receiver.recv().await?;
let mut stop_receiver = self.stop_receiver.lock().await;
let mut event_receiver = self.event_receiver.lock().await;
let event = tokio::select! {
Some(stopped) = stop_receiver.recv() => {
if stopped {
return Ok(());
} else {
continue;
}
}
event = event_receiver.recv() => {
event?
}
};
let res = self.process_event(event.clone()).await;
if let Err(e) = res {
error!("Error processing event {:?}: {}", event, e);
}
}
}

pub async fn run_scheduler_until(&mut self, last_event_fn: impl Fn(MetaStoreEvent) -> bool) -> Result<(), CubeError> {
loop {
let event = self.event_receiver.recv().await?;
let res = self.process_event(event.clone()).await;
if let Err(e) = res {
error!("Error processing event {:?}: {}", event, e);
}
if last_event_fn(event) {
return Ok(())
}
}
pub fn stop_processing_loops(&self) -> Result<(), CubeError> {
Ok(self.stop_sender.broadcast(true)?)
}

async fn process_event(&mut self, event: MetaStoreEvent) -> Result<(), CubeError> {
async fn process_event(&self, event: MetaStoreEvent) -> Result<(), CubeError> {
if let
MetaStoreEvent::Insert(TableId::WALs, row_id) |
MetaStoreEvent::Update(TableId::WALs, row_id) = event {
Expand Down

0 comments on commit 7c590b3

Please sign in to comment.