Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: register regions during procedure recovery #3859

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions src/common/meta/src/ddl/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ pub mod start;
use std::any::Any;
use std::fmt::Debug;

use common_procedure::error::{Error as ProcedureError, FromJsonSnafu, ToJsonSnafu};
use common_error::ext::BoxedError;
use common_procedure::error::{Error as ProcedureError, ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
Expand Down Expand Up @@ -68,6 +69,11 @@ pub(crate) trait State: Send + Debug {
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)>;

/// The hook is called during the recovery.
fn on_recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
Ok(())
}

/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
}
Expand All @@ -88,6 +94,11 @@ impl DropDatabaseProcedure {
}
}

fn on_recover(&mut self) -> Result<()> {
let state = &mut self.state;
state.on_recover(&self.runtime_context)
}

pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult<Self> {
let DropDatabaseOwnedData {
catalog,
Expand All @@ -96,7 +107,7 @@ impl DropDatabaseProcedure {
state,
} = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self {
let mut procedure = Self {
runtime_context,
context: DropDatabaseContext {
catalog,
Expand All @@ -105,7 +116,13 @@ impl DropDatabaseProcedure {
tables: None,
},
state,
})
};
procedure
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
.on_recover()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

Ok(procedure)
}
}

Expand Down
44 changes: 43 additions & 1 deletion src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ impl DropDatabaseExecutor {
}

impl DropDatabaseExecutor {
fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
/// Registers the operating regions.
pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
if !self.dropping_regions.is_empty() {
return Ok(());
}
let dropping_regions = operating_leader_regions(&self.physical_region_routes);
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
for (region_id, datanode_id) in dropping_regions {
Expand All @@ -85,6 +89,10 @@ impl DropDatabaseExecutor {
#[async_trait::async_trait]
#[typetag::serde]
impl State for DropDatabaseExecutor {
fn on_recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
self.register_dropping_regions(ddl_ctx)
}

async fn next(
&mut self,
ddl_ctx: &DdlContext,
Expand Down Expand Up @@ -338,4 +346,38 @@ mod tests {
let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
assert!(err.is_retry_later());
}

#[tokio::test]
async fn test_on_recovery() {
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(node_manager);
let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await;
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
physical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes.clone(),
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
state.on_recover(&ddl_context).unwrap();
assert_eq!(state.dropping_regions.len(), 1);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Physical);
}
}
}
21 changes: 18 additions & 3 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub(crate) mod executor;
mod metadata;

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_error::ext::BoxedError;
use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
Expand Down Expand Up @@ -68,12 +69,26 @@ impl DropTableProcedure {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let executor = data.build_executor();
Ok(Self {
// Only registers regions if the metadata is deleted.
let register_operating_regions = matches!(
data.state,
DropTableState::DeleteMetadata
| DropTableState::InvalidateTableCache
| DropTableState::DatanodeDropRegions
);
let mut procedure = Self {
context,
data,
dropping_regions: vec![],
executor,
})
};
if register_operating_regions {
procedure
.register_dropping_regions()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}
Ok(procedure)
}

pub(crate) async fn on_prepare<'a>(&mut self) -> Result<Status> {
Expand Down
64 changes: 64 additions & 0 deletions src/common/meta/src/ddl/tests/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,67 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
inner_test(new_drop_table_task("s", logical_table_id, false)).await;
inner_test(new_drop_table_task("t", physical_table_id, false)).await;
}

#[tokio::test]
async fn test_from_json() {
for (state, num_operating_regions, num_operating_regions_after_recovery) in [
(DropTableState::DeleteMetadata, 0, 1),
(DropTableState::InvalidateTableCache, 1, 1),
(DropTableState::DatanodeDropRegions, 1, 1),
(DropTableState::DeleteTombstone, 1, 0),
] {
let cluster_id = 1;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let kv_backend = Arc::new(MemoryKvBackend::new());
let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend);

let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await;
let task = new_drop_table_task("t", physical_table_id, false);
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
execute_procedure_until(&mut procedure, |p| p.data.state == state).await;
let data = procedure.dump().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
// Cleans up the keeper.
ddl_context.memory_region_keeper.clear();
let procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions_after_recovery
);
assert_eq!(
procedure.dropping_regions.len(),
num_operating_regions_after_recovery
);
}

let num_operating_regions = 0;
let num_operating_regions_after_recovery = 0;
let cluster_id = 1;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let kv_backend = Arc::new(MemoryKvBackend::new());
let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend);

let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await;
let task = new_drop_table_task("t", physical_table_id, false);
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
execute_procedure_until_done(&mut procedure).await;
let data = procedure.dump().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
// Cleans up the keeper.
ddl_context.memory_region_keeper.clear();
let procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions_after_recovery
);
assert_eq!(
procedure.dropping_regions.len(),
num_operating_regions_after_recovery
);
}
7 changes: 7 additions & 0 deletions src/common/meta/src/region_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,16 @@ impl MemoryRegionKeeper {
inner.len()
}

/// Returns true if it's empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}

#[cfg(test)]
pub fn clear(&self) {
let mut inner = self.inner.write().unwrap();
inner.clear();
}
}

#[cfg(test)]
Expand Down
Loading