Skip to content

Commit

Permalink
refactor: move create database to procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
CookiePieWw committed Apr 1, 2024
1 parent 6c316d2 commit eb10f0a
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu

pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
Expand Down
140 changes: 140 additions & 0 deletions src/common/meta/src/ddl/create_database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;

use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::schema_name::SchemaNameKey;
use crate::lock_key::CatalogLock;

pub struct CreateDatabaseProcedure {
pub context: DdlContext,
pub data: CreateDatabaseData,
}

impl CreateDatabaseProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateDatabase";

pub fn new(
catalog: String,
schema: String,
create_if_not_exists: bool,
context: DdlContext,
) -> Self {
Self {
context: context,
data: CreateDatabaseData {
state: CreateDatabaseState::Prepare,
catalog: catalog,
schema: schema,
create_if_not_exists: create_if_not_exists,
},
}
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self {
context: context,
data: data,
})
}

pub async fn on_prepare(&mut self) -> Result<Status> {
let exists = self
.context
.table_metadata_manager
.schema_manager()
.exists(SchemaNameKey::new(&self.data.catalog, &self.data.schema))
.await?;

if exists && self.data.create_if_not_exists {
return Ok(Status::done());
}

ensure!(
!exists,
error::SchemaAlreadyExistsSnafu {
catalog: &self.data.catalog,
schema: &self.data.schema,
}
);

self.data.state = CreateDatabaseState::CreateMetadata;
Ok(Status::executing(true))
}

pub async fn on_create_metadata(&mut self) -> Result<Status> {
self.context
.table_metadata_manager
.schema_manager()
.create(
SchemaNameKey::new(&self.data.catalog, &self.data.schema),
None,
false,
)
.await?;

Ok(Status::done())
}
}

#[async_trait]
impl Procedure for CreateDatabaseProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;

match state {
CreateDatabaseState::Prepare => self.on_prepare().await,
CreateDatabaseState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let lock_key = vec![CatalogLock::Read(&self.data.catalog).into()];

LockKey::new(lock_key)
}
}

#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
pub enum CreateDatabaseState {
Prepare,
CreateMetadata,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CreateDatabaseData {
pub state: CreateDatabaseState,
pub catalog: String,
pub schema: String,
pub create_if_not_exists: bool,
}
59 changes: 56 additions & 3 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_database::CreateDatabaseProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_database::DropDatabaseProcedure;
Expand All @@ -45,12 +46,12 @@ use crate::key::table_route::TableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::DdlTask::{
AlterLogicalTables, AlterTable, CreateLogicalTables, CreateTable, DropDatabase,
AlterLogicalTables, AlterTable, CreateDatabase, CreateLogicalTables, CreateTable, DropDatabase,
DropLogicalTables, DropTable, TruncateTable,
};
use crate::rpc::ddl::{
AlterTableTask, CreateTableTask, DropDatabaseTask, DropTableTask, SubmitDdlTaskRequest,
SubmitDdlTaskResponse, TruncateTableTask,
AlterTableTask, CreateDatabaseTask, CreateTableTask, DropDatabaseTask, DropTableTask,
SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
Expand Down Expand Up @@ -170,6 +171,15 @@ impl DdlManager {
})
},
),
(
CreateDatabaseProcedure::TYPE_NAME,
&|context: DdlContext| -> BoxedProcedureLoader {
Box::new(move |json: &str| {
let context = context.clone();
CreateDatabaseProcedure::from_json(json, context).map(|p| Box::new(p) as _)
})
},
),
(
DropDatabaseProcedure::TYPE_NAME,
&|context: DdlContext| -> BoxedProcedureLoader {
Expand Down Expand Up @@ -293,6 +303,25 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
/// Submits and executes a create database task.
pub async fn submit_create_database(
&self,
_cluster_id: ClusterId,
CreateDatabaseTask {
catalog,
schema,
create_if_not_exists,
}: CreateDatabaseTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure =
CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

self.submit_procedure(procedure_with_id).await
}

#[tracing::instrument(skip_all)]
/// Submits and executes a drop table task.
pub async fn submit_drop_database(
Expand Down Expand Up @@ -557,6 +586,27 @@ async fn handle_create_logical_table_tasks(
})
}

async fn handle_create_database_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
create_database_task: CreateDatabaseTask,
) -> Result<SubmitDdlTaskResponse> {
let (id, _) = ddl_manager
.submit_create_database(cluster_id, create_database_task.clone())
.await?;

let procedure_id = id.to_string();
info!(
"Database {}.{} is created via procedure_id {id:?}",
create_database_task.catalog, create_database_task.schema
);

Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
..Default::default()
})
}

async fn handle_drop_database_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
Expand Down Expand Up @@ -651,6 +701,9 @@ impl ProcedureExecutor for DdlManager {
handle_alter_logical_table_tasks(self, cluster_id, alter_table_tasks).await
}
DropLogicalTables(_) => todo!(),
CreateDatabase(create_database_task) => {
handle_create_database_task(self, cluster_id, create_database_task).await
}
DropDatabase(drop_database_task) => {
handle_drop_database_task(self, cluster_id, drop_database_task).await
}
Expand Down
21 changes: 21 additions & 0 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum DdlTask {
CreateLogicalTables(Vec<CreateTableTask>),
DropLogicalTables(Vec<DropTableTask>),
AlterLogicalTables(Vec<AlterTableTask>),
CreateDatabase(CreateDatabaseTask),
DropDatabase(DropDatabaseTask),
}

Expand Down Expand Up @@ -90,6 +91,18 @@ impl DdlTask {
})
}

pub fn new_create_database(
catalog: String,
schema: String,
create_if_not_exists: bool,
) -> Self {
DdlTask::CreateDatabase(CreateDatabaseTask {
catalog,
schema,
create_if_not_exists,
})
}

pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
DdlTask::DropDatabase(DropDatabaseTask {
catalog,
Expand Down Expand Up @@ -201,6 +214,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {

Task::AlterTableTasks(PbAlterTableTasks { tasks })
}
DdlTask::CreateDatabase(_) => todo!(),
DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
};

Expand Down Expand Up @@ -588,6 +602,13 @@ impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
}
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct CreateDatabaseTask {
pub catalog: String,
pub schema: String,
pub create_if_not_exists: bool,
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct DropDatabaseTask {
pub catalog: String,
Expand Down
49 changes: 30 additions & 19 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,30 +707,41 @@ impl StatementExecutor {
}
);

// TODO(weny): considers executing it in the procedures.
let schema_key = SchemaNameKey::new(catalog, database);
let exists = self
.table_metadata_manager
.schema_manager()
.exists(schema_key)
if !self
.catalog_manager
.schema_exists(&catalog, &database)
.await
.context(TableMetadataManagerSnafu)?;
.context(CatalogSnafu)?
{
self.create_database_procedure(
catalog.to_string(),
database.to_string(),
create_if_not_exists,
)
.await?;

if exists {
return if create_if_not_exists {
Ok(Output::new_with_affected_rows(1))
} else {
error::SchemaExistsSnafu { name: database }.fail()
};
Ok(Output::new_with_affected_rows(1))
} else if create_if_not_exists {
Ok(Output::new_with_affected_rows(1))
} else {
error::SchemaExistsSnafu { name: database }.fail()
}
}

self.table_metadata_manager
.schema_manager()
.create(schema_key, None, false)
.await
.context(TableMetadataManagerSnafu)?;
async fn create_database_procedure(
&self,
catalog: String,
database: String,
create_if_not_exists: bool,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
task: DdlTask::new_create_database(catalog, database, create_if_not_exists),
};

Ok(Output::new_with_affected_rows(1))
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
}

Expand Down

0 comments on commit eb10f0a

Please sign in to comment.