Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(drop_table): support to rollback table metadata #3692

Merged
merged 24 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f7833db
feat: support to rollback table metadata
WenyXu Apr 10, 2024
9becd6a
refactor: store table route value instead of physical table route
WenyXu Apr 10, 2024
18ac6a9
feat(drop_table): support to rollback table metadata
WenyXu Apr 10, 2024
f48d8ea
test: add rollback tests for drop table
WenyXu Apr 10, 2024
c5581b6
fix: do not set region to readonly
WenyXu Apr 10, 2024
b7957bb
test: add sqlness tests
WenyXu Apr 10, 2024
ad3ddd2
feat: implement TombstoneManager
WenyXu Apr 12, 2024
27815de
test: add tests for TombstoneManager
WenyXu Apr 12, 2024
9e51067
refactor: using TombstoneManager
WenyXu Apr 12, 2024
b90f07d
chore: remove unused code
WenyXu Apr 12, 2024
141f0a9
fix: fix typo
WenyXu Apr 12, 2024
fb5066d
refactor: using `on_restore_metadata`
WenyXu Apr 12, 2024
762be6d
refactor: add `executor` to `DropTableProcedure`
WenyXu Apr 12, 2024
995c8b6
refactor: simplify the `TombstoneManager`
WenyXu Apr 14, 2024
47eadd1
refactor: refactor `Key`
WenyXu Apr 15, 2024
830d22f
refactor: carry more info
WenyXu Apr 15, 2024
d73f2c6
feat: add `destroy_table_metadata`
WenyXu Apr 15, 2024
4d78fd7
refactor: remove redundant table_route_value
WenyXu Apr 15, 2024
0f1ca48
feat: ensure the key is empty
WenyXu Apr 16, 2024
b73b137
feat: introcude `table_metadata_keys`
WenyXu Apr 16, 2024
dda1389
chore: carry more info
WenyXu Apr 16, 2024
2468509
chore: remove clone
WenyXu Apr 16, 2024
0d5257f
chore: apply suggestions from CR
WenyXu Apr 16, 2024
3a51e39
feat: delete metadata tombstone
WenyXu Apr 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 40 additions & 20 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ mod metadata;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use common_telemetry::info;
use common_telemetry::tracing::warn;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use strum::AsRefStr;
Expand All @@ -31,6 +33,7 @@ use self::executor::DropTableExecutor;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
Expand Down Expand Up @@ -81,10 +84,7 @@ impl DropTableProcedure {

/// Register dropping regions if doesn't exist.
fn register_dropping_regions(&mut self) -> Result<()> {
// Safety: filled in `on_prepare`.
let region_routes = self.data.region_routes().unwrap()?;

let dropping_regions = operating_leader_regions(region_routes);
let dropping_regions = operating_leader_regions(&self.data.region_routes);

if self.dropping_regions.len() == dropping_regions.len() {
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
Expand All @@ -109,7 +109,10 @@ impl DropTableProcedure {
}

/// Removes the table metadata.
async fn on_remove_metadata(&mut self, executor: &DropTableExecutor) -> Result<Status> {
pub(crate) async fn on_remove_metadata(
&mut self,
executor: &DropTableExecutor,
) -> Result<Status> {
self.register_dropping_regions()?;
// NOTES: If the meta server is crashed after the `RemoveMetadata`,
// Corresponding regions of this table on the Datanode will be closed automatically.
Expand All @@ -118,11 +121,7 @@ impl DropTableProcedure {
// TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping.
let table_id = self.data.table_id();
executor
.on_remove_metadata(
&self.context,
// Safety: filled in `on_prepare`.
self.data.region_routes().unwrap()?,
)
.on_remove_metadata(&self.context, &self.data.region_routes)
.await?;
info!("Deleted table metadata for table {table_id}");
self.data.state = DropTableState::InvalidateTableCache;
Expand All @@ -139,11 +138,7 @@ impl DropTableProcedure {

pub async fn on_datanode_drop_regions(&self, executor: &DropTableExecutor) -> Result<Status> {
executor
.on_drop_regions(
&self.context,
// Safety: filled in `on_prepare`.
self.data.region_routes().unwrap()?,
)
.on_drop_regions(&self.context, &self.data.region_routes)
.await?;
Ok(Status::done())
}
Expand Down Expand Up @@ -194,15 +189,42 @@ impl Procedure for DropTableProcedure {

LockKey::new(lock_key)
}

fn rollback_supported(&self) -> bool {
!matches!(self.data.state, DropTableState::Prepare)
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
}

async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
warn!(
"Rolling back the drop table procedure, table: {}",
self.data.table_id()
);

// Safety: fetched in `DropTableState::Prepare` step.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
let table_route_value = self.data.table_route_value.as_ref().unwrap();
let datanode_table_values = &self.data.datanode_table_value;

self.context
.table_metadata_manager
.restore_table_metadata(table_info_value, table_route_value, datanode_table_values)
.await
.map_err(ProcedureError::external)
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DropTableData {
pub state: DropTableState,
pub cluster_id: u64,
pub task: DropTableTask,
pub region_routes: Vec<RegionRoute>,
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
pub table_route_value: Option<DeserializedValueWithBytes<TableRouteValue>>,
pub table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
pub datanode_table_value: Vec<(
DatanodeTableKey,
DeserializedValueWithBytes<DatanodeTableValue>,
)>,
}

impl DropTableData {
Expand All @@ -211,19 +233,17 @@ impl DropTableData {
state: DropTableState::Prepare,
cluster_id,
task,
region_routes: vec![],
table_route_value: None,
table_info_value: None,
datanode_table_value: vec![],
}
}

fn table_ref(&self) -> TableReference {
self.task.table_ref()
}

fn region_routes(&self) -> Option<Result<&Vec<RegionRoute>>> {
self.table_route_value.as_ref().map(|v| v.region_routes())
}

fn table_id(&self) -> TableId {
self.task.table_id
}
Expand Down
5 changes: 5 additions & 0 deletions src/common/meta/src/ddl/drop_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ impl DropTableExecutor {
.await
}

/// Restores the table metadata.
pub async fn on_restore_metadata(&self) -> Result<()> {
todo!()
}

/// Invalidates frontend caches
pub async fn invalidate_table_cache(&self, ctx: &DdlContext) -> Result<()> {
let cache_invalidator = &ctx.cache_invalidator;
Expand Down
52 changes: 48 additions & 4 deletions src/common/meta/src/ddl/drop_table/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

use common_catalog::format_full_table_name;
use snafu::OptionExt;
use snafu::{ensure, OptionExt};

use crate::ddl::drop_table::DropTableProcedure;
use crate::error::{self, Result};
use crate::key::datanode_table::DatanodeTableKey;
use crate::rpc::router::region_distribution;

impl DropTableProcedure {
/// Fetches the table info and table route.
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -31,16 +33,58 @@ impl DropTableProcedure {
.with_context(|| error::TableInfoNotFoundSnafu {
table: format_full_table_name(&task.catalog, &task.schema, &task.table),
})?;
let (_, table_route_value) = self
let (_, physical_table_route_value) = self
.context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw_physical_table_route(task.table_id)
.get_physical_table_route(task.table_id)
.await?;
let table_route_value = self
.context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_raw(task.table_id)
.await?
.context(error::TableRouteNotFoundSnafu {
table_id: task.table_id,
})?;

let distributions = region_distribution(&physical_table_route_value.region_routes);
let datanode_table_keys = distributions
.into_keys()
.map(|datanode_id| DatanodeTableKey::new(datanode_id, task.table_id))
.collect::<Vec<_>>();

// Only for physical table.
if table_route_value.is_physical() {
let datanode_table_values = self
.context
.table_metadata_manager
.datanode_table_manager()
.batch_get(&datanode_table_keys)
.await?;
ensure!(
datanode_table_keys.len() == datanode_table_values.len(),
error::UnexpectedSnafu {
err_msg: format!(
"Dropping table: {}, num({}) of datanode table values should equal num({}) of keys",
task.table_id,
datanode_table_values.len(),
datanode_table_keys.len()
)
}
);
self.data.datanode_table_value = datanode_table_keys
.into_iter()
.zip(datanode_table_values)
.collect();
}

self.data.region_routes = physical_table_route_value.region_routes;
self.data.table_info_value = Some(table_info_value);
self.data.table_route_value = Some(table_route_value);

Ok(())
}
}
104 changes: 102 additions & 2 deletions src/common/meta/src/ddl/tests/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@ use api::v1::region::{region_request, RegionRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use store_api::storage::RegionId;
use tokio::sync::mpsc;

use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::datanode_handler::DatanodeWatcher;
use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler};
use crate::ddl::test_util::{
create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task,
};
use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
use crate::key::table_route::TableRouteValue;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::ddl::DropTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
use crate::test_util::{new_ddl_context, new_ddl_context_with_kv_backend, MockDatanodeManager};

#[tokio::test]
async fn test_on_prepare_table_not_exists_err() {
Expand Down Expand Up @@ -191,3 +199,95 @@ async fn test_on_datanode_drop_regions() {
let (peer, request) = results.remove(0);
check(peer, request, 3, RegionId::new(table_id, 3));
}

#[tokio::test]
async fn test_on_rollback() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let kv_backend = Arc::new(MemoryKvBackend::new());
let ddl_context = new_ddl_context_with_kv_backend(datanode_manager, kv_backend.clone());
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
TableRouteValue::Physical(table_route),
)
.await;
// The create logical table procedure.
let physical_table_id = table_id;
// Creates the logical table metadata.
let task = test_create_logical_table_task("foo");
let mut procedure = CreateLogicalTablesProcedure::new(
cluster_id,
vec![task],
physical_table_id,
ddl_context.clone(),
);
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
// Triggers procedure to create table metadata
let status = procedure.execute(&ctx).await.unwrap();
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
assert_eq!(*table_ids, vec![1025]);

let expected_kvs = kv_backend.dump();
// Drops the physical table
{
let task = DropTableTask {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: "phy_table".to_string(),
table_id: physical_table_id,
drop_if_exists: false,
};
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
let executor = procedure.executor();
procedure.on_prepare(&executor).await.unwrap();
procedure.on_remove_metadata(&executor).await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.rollback(&ctx).await.unwrap();
let kvs = kv_backend.dump();
assert_eq!(kvs, expected_kvs);
}

// Drops the logical table
let task = DropTableTask {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: "foo".to_string(),
table_id: table_ids[0],
drop_if_exists: false,
};
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
let executor = procedure.executor();
procedure.on_prepare(&executor).await.unwrap();
procedure.on_remove_metadata(&executor).await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.rollback(&ctx).await.unwrap();
let kvs = kv_backend.dump();
assert_eq!(kvs, expected_kvs);
}
Loading