Skip to content

Commit

Permalink
fix: fix create table ddl return incorrect table id (#3232)
Browse files Browse the repository at this point in the history
* fix: fix create table ddl return incorrect table id

* refactor: refactor param of Status::done_with_output
  • Loading branch information
WenyXu committed Jan 25, 2024
1 parent 6c2f0c9 commit 8bade8f
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 24 deletions.
11 changes: 6 additions & 5 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,26 +104,27 @@ impl CreateTableProcedure {
/// Checks whether the table exists.
async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_table;
let exist = self
let table_name_value = self
.context
.table_metadata_manager
.table_name_manager()
.exists(TableNameKey::new(
.get(TableNameKey::new(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
))
.await?;

if exist {
if let Some(value) = table_name_value {
ensure!(
self.creator.data.task.create_table.create_if_not_exists,
error::TableAlreadyExistsSnafu {
table_name: self.creator.data.table_ref().to_string(),
}
);

return Ok(Status::done());
let table_id = value.table_id();
return Ok(Status::done_with_output(table_id));
}

self.creator.data.state = CreateTableState::DatanodeCreateRegions;
Expand Down Expand Up @@ -315,7 +316,7 @@ impl CreateTableProcedure {
.await?;
info!("Created table metadata for table {table_id}");

Ok(Status::done())
Ok(Status::done_with_output(table_id))
}
}

Expand Down
31 changes: 18 additions & 13 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, tracing};
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -163,7 +163,7 @@ impl DdlManager {
alter_table_task: AlterTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
physical_table_info: Option<(TableId, TableName)>,
) -> Result<ProcedureId> {
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();

let procedure = AlterTableProcedure::new(
Expand All @@ -187,7 +187,7 @@ impl DdlManager {
create_table_task: CreateTableTask,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<ProcedureId> {
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();

let procedure = CreateTableProcedure::new(
Expand All @@ -211,7 +211,7 @@ impl DdlManager {
drop_table_task: DropTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
table_route_value: DeserializedValueWithBytes<TableRouteValue>,
) -> Result<ProcedureId> {
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();

let procedure = DropTableProcedure::new(
Expand All @@ -235,7 +235,7 @@ impl DdlManager {
truncate_table_task: TruncateTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
) -> Result<ProcedureId> {
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = TruncateTableProcedure::new(
cluster_id,
Expand All @@ -250,7 +250,10 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
async fn submit_procedure(
&self,
procedure_with_id: ProcedureWithId,
) -> Result<(ProcedureId, Option<Output>)> {
let procedure_id = procedure_with_id.id;

let mut watcher = self
Expand All @@ -259,11 +262,11 @@ impl DdlManager {
.await
.context(SubmitProcedureSnafu)?;

watcher::wait(&mut watcher)
let output = watcher::wait(&mut watcher)
.await
.context(WaitProcedureSnafu)?;

Ok(procedure_id)
Ok((procedure_id, output))
}
}

Expand All @@ -288,7 +291,7 @@ async fn handle_truncate_table_task(

let table_route = table_route_value.into_inner().region_routes()?.clone();

let id = ddl_manager
let (id, _) = ddl_manager
.submit_truncate_table_task(
cluster_id,
truncate_table_task,
Expand Down Expand Up @@ -363,7 +366,7 @@ async fn handle_alter_table_task(
))
};

let id = ddl_manager
let (id, _) = ddl_manager
.submit_alter_table_task(
cluster_id,
alter_table_task,
Expand Down Expand Up @@ -405,7 +408,7 @@ async fn handle_drop_table_task(
let table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::Physical(table_route_value));

let id = ddl_manager
let (id, _) = ddl_manager
.submit_drop_table_task(
cluster_id,
drop_table_task,
Expand Down Expand Up @@ -443,16 +446,18 @@ async fn handle_create_table_task(

create_table_task.table_info.ident.table_id = table_id;

let id = ddl_manager
let (id, output) = ddl_manager
.submit_create_table_task(
cluster_id,
create_table_task,
table_route,
region_wal_options,
)
.await?;
let output = output.context(error::ProcedureOutputSnafu)?;

info!("Table: {table_id:?} is created via procedure_id {id:?}");
let table_id = *(output.downcast_ref::<u32>().unwrap());
info!("Table: {table_id} is created via procedure_id {id:?}");

Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
Expand Down
6 changes: 5 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ pub enum Error {
source: datatypes::Error,
},

#[snafu(display("Failed to get procedure output"))]
ProcedureOutput { location: Location },

#[snafu(display("Primary key '{key}' not found when creating region request"))]
PrimaryKeyNotFound { key: String, location: Location },

Expand Down Expand Up @@ -396,7 +399,8 @@ impl ErrorExt for Error {
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. }
| UnexpectedLogicalRouteTable { .. } => StatusCode::Unexpected,
| UnexpectedLogicalRouteTable { .. }
| ProcedureOutput { .. } => StatusCode::Unexpected,

SendMessage { .. }
| GetKvCache { .. }
Expand Down
4 changes: 2 additions & 2 deletions src/common/procedure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod watcher;

pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
BoxedProcedure, Context, ContextProvider, LockKey, Output, Procedure, ProcedureId,
ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
};
pub use crate::watcher::Watcher;
4 changes: 2 additions & 2 deletions src/common/procedure/src/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ impl Status {
}

/// Returns a [Status::Done] with output.
pub fn done_with_output(output: Output) -> Status {
pub fn done_with_output<T: Any + Send + Sync>(output: T) -> Status {
Status::Done {
output: Some(output),
output: Some(Arc::new(output)),
}
}
/// Returns `true` if the procedure is done.
Expand Down
2 changes: 1 addition & 1 deletion src/common/procedure/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ mod tests {
self.error = !self.error;
Err(Error::retry_later(MockError::new(StatusCode::Internal)))
} else {
Ok(Status::done_with_output(Arc::new("hello")))
Ok(Status::done_with_output("hello"))
}
}

Expand Down

0 comments on commit 8bade8f

Please sign in to comment.