Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): Ensure all moving values remain unchanged between two transactions #3727

Merged
merged 12 commits into from
Apr 18, 2024
26 changes: 0 additions & 26 deletions src/common/meta/src/ddl/drop_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,6 @@ impl DropTableExecutor {
ctx: &DdlContext,
table_route_value: &TableRouteValue,
) -> Result<()> {
let table_name_key = TableNameKey::new(
&self.table.catalog_name,
&self.table.schema_name,
&self.table.table_name,
);
if !ctx
.table_metadata_manager
.table_name_manager()
.exists(table_name_key)
.await?
{
return Ok(());
}
ctx.table_metadata_manager
.delete_table_metadata(self.table_id, &self.table, table_route_value)
.await
Expand Down Expand Up @@ -152,19 +139,6 @@ impl DropTableExecutor {
ctx: &DdlContext,
table_route_value: &TableRouteValue,
) -> Result<()> {
let table_name_key = TableNameKey::new(
&self.table.catalog_name,
&self.table.schema_name,
&self.table.table_name,
);
if ctx
.table_metadata_manager
.table_name_manager()
.exists(table_name_key)
.await?
{
return Ok(());
}
ctx.table_metadata_manager
.restore_table_metadata(self.table_id, &self.table, table_route_value)
.await
Expand Down
6 changes: 5 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ pub enum Error {
#[snafu(display("Atomic key changed: {err_msg}"))]
CasKeyChanged { err_msg: String, location: Location },

#[snafu(display("Failed to move values: {err_msg}"))]
MoveValues { err_msg: String, location: Location },

#[snafu(display("Failed to parse {} from utf8", name))]
FromUtf8 {
name: String,
Expand All @@ -444,7 +447,8 @@ impl ErrorExt for Error {
| EtcdFailed { .. }
| EtcdTxnFailed { .. }
| ConnectEtcd { .. }
| CasKeyChanged { .. } => StatusCode::Internal,
| CasKeyChanged { .. }
| MoveValues { .. } => StatusCode::Internal,

SerdeJson { .. }
| ParseOption { .. }
Expand Down
94 changes: 74 additions & 20 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ use self::tombstone::TombstoneManager;
use crate::ddl::utils::region_storage_path;
use crate::error::{self, Result, SerdeJsonSnafu, UnexpectedSnafu};
use crate::key::table_route::TableRouteKey;
use crate::key::tombstone::Key;
use crate::key::tombstone::TombstoneKeyValue;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus};
use crate::rpc::store::BatchGetRequest;
use crate::rpc::KeyValue;
use crate::table_name::TableName;
use crate::DatanodeId;

Expand Down Expand Up @@ -582,7 +584,7 @@ impl TableMetadataManager {
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
) -> Result<Vec<Key>> {
) -> Result<Vec<Vec<u8>>> {
// Builds keys
let datanode_ids = if table_route_value.is_physical() {
region_distribution(table_route_value.region_routes()?)
Expand All @@ -604,11 +606,11 @@ impl TableMetadataManager {
.map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
.collect::<HashSet<_>>();

keys.push(Key::compare_and_swap(table_name.as_raw_key()));
keys.push(Key::new(table_info_key.as_raw_key()));
keys.push(Key::new(table_route_key.as_raw_key()));
keys.push(table_name.as_raw_key());
keys.push(table_info_key.as_raw_key());
keys.push(table_route_key.as_raw_key());
for key in &datanode_table_keys {
keys.push(Key::new(key.as_raw_key()));
keys.push(key.as_raw_key());
}
Ok(keys)
}
Expand All @@ -622,8 +624,32 @@ impl TableMetadataManager {
table_route_value: &TableRouteValue,
) -> Result<()> {
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
self.tombstone_manager.create(keys).await?;
Ok(())
let num_keys = keys.len();
let resp = self
.kv_backend
.batch_get(BatchGetRequest::new().with_keys(keys))
.await?;
if resp.kvs.is_empty() {
return Ok(());
}
ensure!(
resp.kvs.len() == num_keys,
error::UnexpectedSnafu {
err_msg: format!(
"Read incomplete metadata during delete table metadata, table: {}({}); Expected num of keys: {}, but got: {}",
table_name,
table_id,
num_keys,
resp.kvs.len()
)
}
);
let kvs = resp
.kvs
.into_iter()
.map(|KeyValue { key, value }| (key, value))
.collect();
self.tombstone_manager.create(kvs).await
}

/// Deletes metadata tombstone for table **permanently**.
Expand All @@ -634,11 +660,7 @@ impl TableMetadataManager {
table_name: &TableName,
table_route_value: &TableRouteValue,
) -> Result<()> {
let keys = self
.table_metadata_keys(table_id, table_name, table_route_value)?
.into_iter()
.map(|key| key.into_bytes())
.collect::<Vec<_>>();
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
self.tombstone_manager.delete(keys).await
}

Expand All @@ -651,8 +673,32 @@ impl TableMetadataManager {
table_route_value: &TableRouteValue,
) -> Result<()> {
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
self.tombstone_manager.restore(keys).await?;
Ok(())
let num_keys = keys.len();
let kvs = self.tombstone_manager.batch_get(keys).await?;
if kvs.is_empty() {
return Ok(());
}
ensure!(
kvs.len() == num_keys,
error::UnexpectedSnafu {
err_msg: format!(
"Read incomplete metadata during restore table metadata, table: {}({}); Expected num of keys: {}, but got: {}",
table_name,
table_id,
num_keys,
kvs.len()
)
}
);
let kvs = kvs
.into_iter()
.map(
|TombstoneKeyValue {
origin_key, value, ..
}| (origin_key, value),
)
.collect();
self.tombstone_manager.restore(kvs).await
}

/// Deletes metadata for table **permanently**.
Expand All @@ -666,7 +712,7 @@ impl TableMetadataManager {
let operations = self
.table_metadata_keys(table_id, table_name, table_route_value)?
.into_iter()
.map(|key| TxnOp::Delete(key.into_bytes()))
.map(TxnOp::Delete)
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
.collect::<Vec<_>>();

let txn = Txn::new().and_then(operations);
Expand Down Expand Up @@ -1286,22 +1332,24 @@ mod tests {
.delete_table_metadata(table_id, &table_name, table_route_value)
.await
.unwrap();

// Should be ignored.
table_metadata_manager
.delete_table_metadata(table_id, &table_name, table_route_value)
.await
.unwrap();
assert!(table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.unwrap()
.is_none());

assert!(table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.unwrap()
.is_none());

assert!(table_metadata_manager
.datanode_table_manager()
.tables(datanode_id)
Expand All @@ -1316,7 +1364,6 @@ mod tests {
.await
.unwrap();
assert!(table_info.is_none());

let table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
Expand Down Expand Up @@ -1792,5 +1839,12 @@ mod tests {
.unwrap();
let kvs = mem_kv.dump();
assert_eq!(kvs, expected_result);
// Should be ignored.
table_metadata_manager
.restore_table_metadata(table_id, &table_name, &table_route_value)
.await
.unwrap();
let kvs = mem_kv.dump();
assert_eq!(kvs, expected_result);
}
}
Loading
Loading