diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 8be31555533c..6659ee238f61 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -106,19 +106,6 @@ impl DropTableExecutor { ctx: &DdlContext, table_route_value: &TableRouteValue, ) -> Result<()> { - let table_name_key = TableNameKey::new( - &self.table.catalog_name, - &self.table.schema_name, - &self.table.table_name, - ); - if !ctx - .table_metadata_manager - .table_name_manager() - .exists(table_name_key) - .await? - { - return Ok(()); - } ctx.table_metadata_manager .delete_table_metadata(self.table_id, &self.table, table_route_value) .await @@ -152,19 +139,6 @@ impl DropTableExecutor { ctx: &DdlContext, table_route_value: &TableRouteValue, ) -> Result<()> { - let table_name_key = TableNameKey::new( - &self.table.catalog_name, - &self.table.schema_name, - &self.table.table_name, - ); - if ctx - .table_metadata_manager - .table_name_manager() - .exists(table_name_key) - .await? - { - return Ok(()); - } ctx.table_metadata_manager .restore_table_metadata(self.table_id, &self.table, table_route_value) .await diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 0d23a1dd26f7..7346ae780d23 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -421,8 +421,8 @@ pub enum Error { #[snafu(display("Invalid role: {}", role))] InvalidRole { role: i32, location: Location }, - #[snafu(display("Atomic key changed: {err_msg}"))] - CasKeyChanged { err_msg: String, location: Location }, + #[snafu(display("Failed to move values: {err_msg}"))] + MoveValues { err_msg: String, location: Location }, #[snafu(display("Failed to parse {} from utf8", name))] FromUtf8 { @@ -444,7 +444,7 @@ impl ErrorExt for Error { | EtcdFailed { .. } | EtcdTxnFailed { .. } | ConnectEtcd { .. } - | CasKeyChanged { .. } => StatusCode::Internal, + | MoveValues { .. } => StatusCode::Internal, SerdeJson { .. } | ParseOption { .. } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index a38387198b09..18004c71daff 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -88,13 +88,13 @@ use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue}; use self::table_route::{TableRouteManager, TableRouteValue}; use self::tombstone::TombstoneManager; use crate::ddl::utils::region_storage_path; -use crate::error::{self, Result, SerdeJsonSnafu, UnexpectedSnafu}; +use crate::error::{self, Result, SerdeJsonSnafu}; use crate::key::table_route::TableRouteKey; -use crate::key::tombstone::Key; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse}; use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus}; +use crate::rpc::store::BatchDeleteRequest; use crate::table_name::TableName; use crate::DatanodeId; @@ -582,7 +582,7 @@ impl TableMetadataManager { table_id: TableId, table_name: &TableName, table_route_value: &TableRouteValue, - ) -> Result> { + ) -> Result>> { // Builds keys let datanode_ids = if table_route_value.is_physical() { region_distribution(table_route_value.region_routes()?) @@ -604,11 +604,11 @@ impl TableMetadataManager { .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id)) .collect::>(); - keys.push(Key::compare_and_swap(table_name.as_raw_key())); - keys.push(Key::new(table_info_key.as_raw_key())); - keys.push(Key::new(table_route_key.as_raw_key())); + keys.push(table_name.as_raw_key()); + keys.push(table_info_key.as_raw_key()); + keys.push(table_route_key.as_raw_key()); for key in &datanode_table_keys { - keys.push(Key::new(key.as_raw_key())); + keys.push(key.as_raw_key()); } Ok(keys) } @@ -622,8 +622,7 @@ impl TableMetadataManager { table_route_value: &TableRouteValue, ) -> Result<()> { let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?; - self.tombstone_manager.create(keys).await?; - Ok(()) + self.tombstone_manager.create(keys).await } /// Deletes metadata tombstone for table **permanently**. @@ -634,11 +633,7 @@ impl TableMetadataManager { table_name: &TableName, table_route_value: &TableRouteValue, ) -> Result<()> { - let keys = self - .table_metadata_keys(table_id, table_name, table_route_value)? - .into_iter() - .map(|key| key.into_bytes()) - .collect::>(); + let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?; self.tombstone_manager.delete(keys).await } @@ -651,8 +646,7 @@ impl TableMetadataManager { table_route_value: &TableRouteValue, ) -> Result<()> { let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?; - self.tombstone_manager.restore(keys).await?; - Ok(()) + self.tombstone_manager.restore(keys).await } /// Deletes metadata for table **permanently**. @@ -663,21 +657,11 @@ impl TableMetadataManager { table_name: &TableName, table_route_value: &TableRouteValue, ) -> Result<()> { - let operations = self - .table_metadata_keys(table_id, table_name, table_route_value)? - .into_iter() - .map(|key| TxnOp::Delete(key.into_bytes())) - .collect::>(); - - let txn = Txn::new().and_then(operations); - let resp = self.kv_backend.txn(txn).await?; - ensure!( - resp.succeeded, - UnexpectedSnafu { - err_msg: format!("Failed to destroy table metadata: {table_id}") - } - ); - + let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?; + let _ = self + .kv_backend + .batch_delete(BatchDeleteRequest::new().with_keys(keys)) + .await?; Ok(()) } @@ -1286,14 +1270,17 @@ mod tests { .delete_table_metadata(table_id, &table_name, table_route_value) .await .unwrap(); - + // Should be ignored. + table_metadata_manager + .delete_table_metadata(table_id, &table_name, table_route_value) + .await + .unwrap(); assert!(table_metadata_manager .table_info_manager() .get(table_id) .await .unwrap() .is_none()); - assert!(table_metadata_manager .table_route_manager() .table_route_storage() @@ -1301,7 +1288,6 @@ mod tests { .await .unwrap() .is_none()); - assert!(table_metadata_manager .datanode_table_manager() .tables(datanode_id) @@ -1316,7 +1302,6 @@ mod tests { .await .unwrap(); assert!(table_info.is_none()); - let table_route = table_metadata_manager .table_route_manager() .table_route_storage() @@ -1792,5 +1777,12 @@ mod tests { .unwrap(); let kvs = mem_kv.dump(); assert_eq!(kvs, expected_result); + // Should be ignored. + table_metadata_manager + .restore_table_metadata(table_id, &table_name, &table_route_value) + .await + .unwrap(); + let kvs = mem_kv.dump(); + assert_eq!(kvs, expected_result); } } diff --git a/src/common/meta/src/key/tombstone.rs b/src/common/meta/src/key/tombstone.rs index 045959b6828f..38648f269560 100644 --- a/src/common/meta/src/key/tombstone.rs +++ b/src/common/meta/src/key/tombstone.rs @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use snafu::{ensure, OptionExt}; +use std::collections::HashMap; + +use snafu::ensure; -use super::TableMetaKeyGetTxnOp; use crate::error::{self, Result}; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use crate::kv_backend::KvBackendRef; +use crate::rpc::store::BatchGetRequest; /// [TombstoneManager] provides the ability to: /// - logically delete values @@ -29,307 +31,160 @@ pub(crate) struct TombstoneManager { const TOMBSTONE_PREFIX: &str = "__tombstone/"; -pub(crate) struct TombstoneKey(T); +pub(crate) struct TombstoneKeyValue { + pub(crate) origin_key: Vec, + pub(crate) tombstone_key: Vec, + pub(crate) value: Vec, +} fn to_tombstone(key: &[u8]) -> Vec { [TOMBSTONE_PREFIX.as_bytes(), key].concat() } -impl TombstoneKey<&Vec> { - /// Returns the origin key and tombstone key. - fn to_keys(&self) -> (Vec, Vec) { - let key = self.0; - let tombstone_key = to_tombstone(key); - (key.clone(), tombstone_key) - } - - /// Returns the origin key and tombstone key. - fn into_keys(self) -> (Vec, Vec) { - self.to_keys() - } - - /// Returns the tombstone key. - fn to_tombstone_key(&self) -> Vec { - let key = self.0; - to_tombstone(key) - } -} - -impl TableMetaKeyGetTxnOp for TombstoneKey<&Vec> { - fn build_get_op( - &self, - ) -> ( - TxnOp, - impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option>, - ) { - TxnOpGetResponseSet::build_get_op(to_tombstone(self.0)) - } -} - -/// The key used in the [TombstoneManager]. -pub(crate) struct Key { - bytes: Vec, - // Atomic Key: - // The value corresponding to the key remains consistent between two transactions. - atomic: bool, -} - -impl Key { - /// Returns a new atomic key. - pub(crate) fn compare_and_swap>>(key: T) -> Self { - Self { - bytes: key.into(), - atomic: true, - } - } - - /// Returns a new normal key. - pub(crate) fn new>>(key: T) -> Self { - Self { - bytes: key.into(), - atomic: false, - } - } - - /// Into bytes - pub(crate) fn into_bytes(self) -> Vec { - self.bytes - } - - fn get_inner(&self) -> &Vec { - &self.bytes - } - - fn is_atomic(&self) -> bool { - self.atomic - } -} - -impl TableMetaKeyGetTxnOp for Key { - fn build_get_op( - &self, - ) -> ( - TxnOp, - impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option>, - ) { - let key = self.get_inner().clone(); - (TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key)) - } -} - -fn format_on_failure_error_message Option>>( - mut set: TxnOpGetResponseSet, - on_failure_kv_and_filters: Vec<(Vec, Vec, F)>, -) -> String { - on_failure_kv_and_filters - .into_iter() - .flat_map(|(key, value, mut filter)| { - let got = filter(&mut set); - let Some(got) = got else { - return Some(format!( - "For key: {} was expected: {}, but value does not exists", - String::from_utf8_lossy(&key), - String::from_utf8_lossy(&value), - )); - }; - - if got != value { - Some(format!( - "For key: {} was expected: {}, but got: {}", - String::from_utf8_lossy(&key), - String::from_utf8_lossy(&value), - String::from_utf8_lossy(&got), - )) - } else { - None - } - }) - .collect::>() - .join("; ") -} - -fn format_keys(keys: &[Key]) -> String { - keys.iter() - .map(|key| String::from_utf8_lossy(&key.bytes)) - .collect::>() - .join(", ") -} - impl TombstoneManager { /// Returns [TombstoneManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { kv_backend } } - /// Creates tombstones for keys. + /// Moves value to `dest_key`. /// - /// Preforms to: - /// - retrieve all values corresponding `keys`. - /// - stores tombstone values. - pub(crate) async fn create(&self, keys: Vec) -> Result<()> { - // Builds transaction to retrieve all values - let (operations, mut filters): (Vec<_>, Vec<_>) = - keys.iter().map(|key| key.build_get_op()).unzip(); + /// Puts `value` to `dest_key` if the value of `src_key` equals `value`. + /// + /// Otherwise retrieves the value of `src_key`. + fn build_move_value_txn( + &self, + src_key: Vec, + value: Vec, + dest_key: Vec, + ) -> (Txn, impl FnMut(&mut TxnOpGetResponseSet) -> Option>) { + let txn = Txn::new() + .when(vec![Compare::with_value( + src_key.clone(), + CompareOp::Equal, + value.clone(), + )]) + .and_then(vec![ + TxnOp::Put(dest_key.clone(), value.clone()), + TxnOp::Delete(src_key.clone()), + ]) + .or_else(vec![TxnOp::Get(src_key.clone())]); + + (txn, TxnOpGetResponseSet::filter(src_key)) + } - let txn = Txn::new().and_then(operations); - let mut resp = self.kv_backend.txn(txn).await?; + async fn move_values_inner(&self, keys: &[Vec], dest_keys: &[Vec]) -> Result<()> { ensure!( - resp.succeeded, + keys.len() == dest_keys.len(), error::UnexpectedSnafu { - err_msg: format!( - "Failed to retrieves the metadata, keys: {}", - format_keys(&keys) - ), + err_msg: "The length of keys does not match the length of dest_keys." } ); - - let mut set = TxnOpGetResponseSet::from(&mut resp.responses); - // Builds the create tombstone transaction. - let mut tombstone_operations = Vec::with_capacity(keys.len() * 2); - let mut tombstone_comparison = vec![]; - let mut on_failure_operations = vec![]; - let mut on_failure_kv_and_filters = vec![]; - for (idx, key) in keys.iter().enumerate() { - let filter = &mut filters[idx]; - let value = filter(&mut set).with_context(|| error::UnexpectedSnafu { - err_msg: format!( - "Missing value, key: {}", - String::from_utf8_lossy(key.get_inner()) - ), - })?; - let (origin_key, tombstone_key) = TombstoneKey(key.get_inner()).into_keys(); - // Compares the atomic key. - if key.is_atomic() { - tombstone_comparison.push(Compare::with_not_exist_value( - tombstone_key.clone(), - CompareOp::Equal, - )); - tombstone_comparison.push(Compare::with_value( - origin_key.clone(), - CompareOp::Equal, - value.clone(), - )); - let (op, filter) = TxnOpGetResponseSet::build_get_op(origin_key.clone()); - on_failure_operations.push(op); - on_failure_kv_and_filters.push((origin_key.clone(), value.clone(), filter)); + // The key -> dest key mapping. + let lookup_table = keys.iter().zip(dest_keys.iter()).collect::>(); + + let resp = self + .kv_backend + .batch_get(BatchGetRequest::new().with_keys(keys.to_vec())) + .await?; + let mut results = resp + .kvs + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect::>(); + + const MAX_RETRIES: usize = 8; + for _ in 0..MAX_RETRIES { + let (txns, (keys, filters)): (Vec<_>, (Vec<_>, Vec<_>)) = results + .iter() + .map(|(key, value)| { + let (txn, filter) = self.build_move_value_txn( + key.clone(), + value.clone(), + lookup_table[&key].clone(), + ); + (txn, (key.clone(), filter)) + }) + .unzip(); + let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?; + if resp.succeeded { + return Ok(()); + } + let mut set = TxnOpGetResponseSet::from(&mut resp.responses); + // Updates results. + for (idx, mut filter) in filters.into_iter().enumerate() { + if let Some(value) = filter(&mut set) { + results.insert(keys[idx].clone(), value); + } else { + results.remove(&keys[idx]); + } } - tombstone_operations.push(TxnOp::Delete(origin_key)); - tombstone_operations.push(TxnOp::Put(tombstone_key, value)); } - let txn = if !tombstone_comparison.is_empty() { - Txn::new().when(tombstone_comparison) - } else { - Txn::new() + error::MoveValuesSnafu { + err_msg: format!( + "keys: {:?}", + keys.iter().map(|key| String::from_utf8_lossy(key)), + ), } - .and_then(tombstone_operations); + .fail() + } - let txn = if !on_failure_operations.is_empty() { - txn.or_else(on_failure_operations) + /// Moves values to `dest_key`. + async fn move_values(&self, keys: Vec>, dest_keys: Vec>) -> Result<()> { + let chunk_size = self.kv_backend.max_txn_ops() / 2; + if keys.len() > chunk_size { + let keys_chunks = keys.chunks(chunk_size).collect::>(); + let dest_keys_chunks = keys.chunks(chunk_size).collect::>(); + for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) { + self.move_values_inner(keys, dest_keys).await?; + } + + Ok(()) } else { - txn - }; - - let mut resp = self.kv_backend.txn(txn).await?; - // TODO(weny): add tests for atomic key changed. - if !resp.succeeded { - let set = TxnOpGetResponseSet::from(&mut resp.responses); - let err_msg = format_on_failure_error_message(set, on_failure_kv_and_filters); - return error::CasKeyChangedSnafu { err_msg }.fail(); + self.move_values_inner(&keys, &dest_keys).await } - Ok(()) } - /// Restores tombstones for keys. + /// Creates tombstones for keys. /// /// Preforms to: - /// - retrieve all tombstone values corresponding `keys`. + /// - deletes origin values. /// - stores tombstone values. - pub(crate) async fn restore(&self, keys: Vec) -> Result<()> { - // Builds transaction to retrieve all tombstone values - let tombstone_keys = keys - .iter() - .map(|key| TombstoneKey(key.get_inner())) - .collect::>(); - let (operations, mut filters): (Vec<_>, Vec<_>) = - tombstone_keys.iter().map(|key| key.build_get_op()).unzip(); - - let txn = Txn::new().and_then(operations); - let mut resp = self.kv_backend.txn(txn).await?; - ensure!( - resp.succeeded, - error::UnexpectedSnafu { - err_msg: format!( - "Failed to retrieves the metadata, keys: {}", - format_keys(&keys) - ), - } - ); - - let mut set = TxnOpGetResponseSet::from(&mut resp.responses); - - // Builds the restore tombstone transaction. - let mut tombstone_operations = Vec::with_capacity(keys.len() * 2); - let mut tombstone_comparison = vec![]; - let mut on_failure_operations = vec![]; - let mut on_failure_kv_and_filters = vec![]; - for (idx, key) in keys.iter().enumerate() { - let filter = &mut filters[idx]; - let value = filter(&mut set).with_context(|| error::UnexpectedSnafu { - err_msg: format!( - "Missing value, key: {}", - String::from_utf8_lossy(key.get_inner()) - ), - })?; - let (origin_key, tombstone_key) = tombstone_keys[idx].to_keys(); - // Compares the atomic key. - if key.is_atomic() { - tombstone_comparison.push(Compare::with_not_exist_value( - origin_key.clone(), - CompareOp::Equal, - )); - tombstone_comparison.push(Compare::with_value( - tombstone_key.clone(), - CompareOp::Equal, - value.clone(), - )); - let (op, filter) = tombstone_keys[idx].build_get_op(); - on_failure_operations.push(op); - on_failure_kv_and_filters.push((tombstone_key.clone(), value.clone(), filter)); - } - tombstone_operations.push(TxnOp::Delete(tombstone_key)); - tombstone_operations.push(TxnOp::Put(origin_key, value)); - } - - let txn = if !tombstone_comparison.is_empty() { - Txn::new().when(tombstone_comparison) - } else { - Txn::new() - } - .and_then(tombstone_operations); - - let txn = if !on_failure_operations.is_empty() { - txn.or_else(on_failure_operations) - } else { - txn - }; - - let mut resp = self.kv_backend.txn(txn).await?; - // TODO(weny): add tests for atomic key changed. - if !resp.succeeded { - let set = TxnOpGetResponseSet::from(&mut resp.responses); - let err_msg = format_on_failure_error_message(set, on_failure_kv_and_filters); - return error::CasKeyChangedSnafu { err_msg }.fail(); - } + pub(crate) async fn create(&self, keys: Vec>) -> Result<()> { + let (keys, dest_keys): (Vec<_>, Vec<_>) = keys + .into_iter() + .map(|key| { + let tombstone_key = to_tombstone(&key); + (key, tombstone_key) + }) + .unzip(); + + self.move_values(keys, dest_keys).await + } - Ok(()) + /// Restores tombstones for keys. + /// + /// Preforms to: + /// - restore origin value. + /// - deletes tombstone values. + pub(crate) async fn restore(&self, keys: Vec>) -> Result<()> { + let (keys, dest_keys): (Vec<_>, Vec<_>) = keys + .into_iter() + .map(|key| { + let tombstone_key = to_tombstone(&key); + (tombstone_key, key) + }) + .unzip(); + + self.move_values(keys, dest_keys).await } - /// Deletes tombstones for keys. + /// Deletes tombstones values for the specified `keys`. pub(crate) async fn delete(&self, keys: Vec>) -> Result<()> { let operations = keys .iter() - .map(|key| TxnOp::Delete(TombstoneKey(key).to_tombstone_key())) + .map(|key| TxnOp::Delete(to_tombstone(key))) .collect::>(); let txn = Txn::new().and_then(operations); @@ -342,54 +197,43 @@ impl TombstoneManager { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::sync::Arc; - use crate::key::tombstone::{Key, TombstoneKey, TombstoneManager}; + use super::to_tombstone; + use crate::error::Error; + use crate::key::tombstone::TombstoneManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackend; use crate::rpc::store::PutRequest; - #[tokio::test] - async fn test_create_tombstone() { - let kv_backend = Arc::new(MemoryKvBackend::default()); - let tombstone_manager = TombstoneManager::new(kv_backend.clone()); - kv_backend - .put(PutRequest::new().with_key("bar").with_value("baz")) - .await - .unwrap(); - kv_backend - .put(PutRequest::new().with_key("foo").with_value("hi")) - .await - .unwrap(); - tombstone_manager - .create(vec![Key::compare_and_swap("bar"), Key::new("foo")]) - .await - .unwrap(); - assert!(!kv_backend.exists(b"bar").await.unwrap()); - assert!(!kv_backend.exists(b"foo").await.unwrap()); - assert_eq!( - kv_backend - .get(&TombstoneKey(&"bar".into()).to_tombstone_key()) - .await - .unwrap() - .unwrap() - .value, - b"baz" - ); - assert_eq!( - kv_backend - .get(&TombstoneKey(&"foo".into()).to_tombstone_key()) - .await - .unwrap() - .unwrap() - .value, - b"hi" - ); - assert_eq!(kv_backend.len(), 2); + #[derive(Debug, Clone)] + struct MoveValue { + key: Vec, + dest_key: Vec, + value: Vec, + } + + async fn check_moved_values( + kv_backend: Arc>, + move_values: &[MoveValue], + ) { + for MoveValue { + key, + dest_key, + value, + } in move_values + { + assert!(kv_backend.get(key).await.unwrap().is_none()); + assert_eq!( + &kv_backend.get(dest_key).await.unwrap().unwrap().value, + value, + ); + } } #[tokio::test] - async fn test_create_tombstone_without_atomic_key() { + async fn test_create_tombstone() { let kv_backend = Arc::new(MemoryKvBackend::default()); let tombstone_manager = TombstoneManager::new(kv_backend.clone()); kv_backend @@ -401,14 +245,14 @@ mod tests { .await .unwrap(); tombstone_manager - .create(vec![Key::new("bar"), Key::new("foo")]) + .create(vec![b"bar".to_vec(), b"foo".to_vec()]) .await .unwrap(); assert!(!kv_backend.exists(b"bar").await.unwrap()); assert!(!kv_backend.exists(b"foo").await.unwrap()); assert_eq!( kv_backend - .get(&TombstoneKey(&"bar".into()).to_tombstone_key()) + .get(&to_tombstone(b"bar")) .await .unwrap() .unwrap() @@ -417,7 +261,7 @@ mod tests { ); assert_eq!( kv_backend - .get(&TombstoneKey(&"foo".into()).to_tombstone_key()) + .get(&to_tombstone(b"foo")) .await .unwrap() .unwrap() @@ -428,7 +272,7 @@ mod tests { } #[tokio::test] - async fn test_create_tombstone_origin_value_not_found_err() { + async fn test_create_tombstone_with_non_exist_values() { let kv_backend = Arc::new(MemoryKvBackend::default()); let tombstone_manager = TombstoneManager::new(kv_backend.clone()); @@ -441,11 +285,19 @@ mod tests { .await .unwrap(); - let err = tombstone_manager - .create(vec![Key::compare_and_swap("bar"), Key::new("baz")]) + tombstone_manager + .create(vec![b"bar".to_vec(), b"baz".to_vec()]) .await - .unwrap_err(); - assert!(err.to_string().contains("Missing value")); + .unwrap(); + check_moved_values( + kv_backend.clone(), + &[MoveValue { + key: b"bar".to_vec(), + dest_key: to_tombstone(b"bar"), + value: b"baz".to_vec(), + }], + ) + .await; } #[tokio::test] @@ -462,18 +314,18 @@ mod tests { .unwrap(); let expected_kvs = kv_backend.dump(); tombstone_manager - .create(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .create(vec![b"bar".to_vec(), b"foo".to_vec()]) .await .unwrap(); tombstone_manager - .restore(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .restore(vec![b"bar".to_vec(), b"foo".to_vec()]) .await .unwrap(); assert_eq!(expected_kvs, kv_backend.dump()); } #[tokio::test] - async fn test_restore_tombstone_without_atomic_key() { + async fn test_delete_tombstone() { let kv_backend = Arc::new(MemoryKvBackend::default()); let tombstone_manager = TombstoneManager::new(kv_backend.clone()); kv_backend @@ -484,61 +336,226 @@ mod tests { .put(PutRequest::new().with_key("foo").with_value("hi")) .await .unwrap(); - let expected_kvs = kv_backend.dump(); tombstone_manager - .create(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .create(vec![b"bar".to_vec(), b"foo".to_vec()]) .await .unwrap(); tombstone_manager - .restore(vec![Key::new("bar"), Key::new("foo")]) + .delete(vec![b"bar".to_vec(), b"foo".to_vec()]) .await .unwrap(); - assert_eq!(expected_kvs, kv_backend.dump()); + assert!(kv_backend.is_empty()); } #[tokio::test] - async fn test_restore_tombstone_origin_value_not_found_err() { + async fn test_move_values() { let kv_backend = Arc::new(MemoryKvBackend::default()); let tombstone_manager = TombstoneManager::new(kv_backend.clone()); - kv_backend - .put(PutRequest::new().with_key("bar").with_value("baz")) + let kvs = HashMap::from([ + (b"bar".to_vec(), b"baz".to_vec()), + (b"foo".to_vec(), b"hi".to_vec()), + (b"baz".to_vec(), b"hello".to_vec()), + ]); + for (key, value) in &kvs { + kv_backend + .put( + PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()), + ) + .await + .unwrap(); + } + let move_values = kvs + .iter() + .map(|(key, value)| MoveValue { + key: key.clone(), + dest_key: to_tombstone(key), + value: value.clone(), + }) + .collect::>(); + let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values + .clone() + .into_iter() + .map(|kv| (kv.key, kv.dest_key)) + .unzip(); + tombstone_manager + .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); - kv_backend - .put(PutRequest::new().with_key("foo").with_value("hi")) + check_moved_values(kv_backend.clone(), &move_values).await; + // Moves again + tombstone_manager + .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); + check_moved_values(kv_backend.clone(), &move_values).await; + } + + #[tokio::test] + async fn test_move_values_with_non_exists_values() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + let kvs = HashMap::from([ + (b"bar".to_vec(), b"baz".to_vec()), + (b"foo".to_vec(), b"hi".to_vec()), + (b"baz".to_vec(), b"hello".to_vec()), + ]); + for (key, value) in &kvs { + kv_backend + .put( + PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()), + ) + .await + .unwrap(); + } + let move_values = kvs + .iter() + .map(|(key, value)| MoveValue { + key: key.clone(), + dest_key: to_tombstone(key), + value: value.clone(), + }) + .collect::>(); + let (mut keys, mut dest_keys): (Vec<_>, Vec<_>) = move_values + .clone() + .into_iter() + .map(|kv| (kv.key, kv.dest_key)) + .unzip(); + keys.push(b"non-exists".to_vec()); + dest_keys.push(b"hi/non-exists".to_vec()); tombstone_manager - .create(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); - let err = tombstone_manager - .restore(vec![Key::new("bar"), Key::new("baz")]) + check_moved_values(kv_backend.clone(), &move_values).await; + // Moves again + tombstone_manager + .move_values(keys.clone(), dest_keys.clone()) .await - .unwrap_err(); - assert!(err.to_string().contains("Missing value")); + .unwrap(); + check_moved_values(kv_backend.clone(), &move_values).await; } #[tokio::test] - async fn test_delete_tombstone() { + async fn test_move_values_changed() { let kv_backend = Arc::new(MemoryKvBackend::default()); let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + let kvs = HashMap::from([ + (b"bar".to_vec(), b"baz".to_vec()), + (b"foo".to_vec(), b"hi".to_vec()), + (b"baz".to_vec(), b"hello".to_vec()), + ]); + for (key, value) in &kvs { + kv_backend + .put( + PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()), + ) + .await + .unwrap(); + } + kv_backend - .put(PutRequest::new().with_key("bar").with_value("baz")) + .put(PutRequest::new().with_key("baz").with_value("changed")) .await .unwrap(); - kv_backend - .put(PutRequest::new().with_key("foo").with_value("hi")) + + let move_values = kvs + .iter() + .map(|(key, value)| MoveValue { + key: key.clone(), + dest_key: to_tombstone(key), + value: value.clone(), + }) + .collect::>(); + let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values + .clone() + .into_iter() + .map(|kv| (kv.key, kv.dest_key)) + .unzip(); + tombstone_manager + .move_values(keys, dest_keys) .await .unwrap(); + } + + #[tokio::test] + async fn test_move_values_overwrite_dest_values() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + let kvs = HashMap::from([ + (b"bar".to_vec(), b"baz".to_vec()), + (b"foo".to_vec(), b"hi".to_vec()), + (b"baz".to_vec(), b"hello".to_vec()), + ]); + for (key, value) in &kvs { + kv_backend + .put( + PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()), + ) + .await + .unwrap(); + } + + // Prepares + let move_values = kvs + .iter() + .map(|(key, value)| MoveValue { + key: key.clone(), + dest_key: to_tombstone(key), + value: value.clone(), + }) + .collect::>(); + let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values + .clone() + .into_iter() + .map(|kv| (kv.key, kv.dest_key)) + .unzip(); tombstone_manager - .create(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .move_values(keys, dest_keys) .await .unwrap(); + check_moved_values(kv_backend.clone(), &move_values).await; + + // Overwrites existing dest keys. + let kvs = HashMap::from([ + (b"bar".to_vec(), b"new baz".to_vec()), + (b"foo".to_vec(), b"new hi".to_vec()), + (b"baz".to_vec(), b"new baz".to_vec()), + ]); + for (key, value) in &kvs { + kv_backend + .put( + PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()), + ) + .await + .unwrap(); + } + let move_values = kvs + .iter() + .map(|(key, value)| MoveValue { + key: key.clone(), + dest_key: to_tombstone(key), + value: value.clone(), + }) + .collect::>(); + let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values + .clone() + .into_iter() + .map(|kv| (kv.key, kv.dest_key)) + .unzip(); tombstone_manager - .delete(vec![b"bar".to_vec(), b"foo".to_vec()]) + .move_values(keys, dest_keys) .await .unwrap(); - assert!(kv_backend.is_empty()); + check_moved_values(kv_backend.clone(), &move_values).await; } } diff --git a/src/common/meta/src/key/txn_helper.rs b/src/common/meta/src/key/txn_helper.rs index b06c8acee98c..e7e0174e8c38 100644 --- a/src/common/meta/src/key/txn_helper.rs +++ b/src/common/meta/src/key/txn_helper.rs @@ -24,18 +24,6 @@ use crate::rpc::KeyValue; pub(crate) struct TxnOpGetResponseSet(Vec); impl TxnOpGetResponseSet { - /// Returns a [TxnOp] to retrieve the value corresponding `key` and - /// a filter to consume corresponding [KeyValue] from [TxnOpGetResponseSet]. - pub(crate) fn build_get_op>>( - key: T, - ) -> ( - TxnOp, - impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option>, - ) { - let key = key.into(); - (TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key)) - } - /// Returns a filter to consume a [KeyValue] where the key equals `key`. pub(crate) fn filter(key: Vec) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option> { move |set| { diff --git a/src/common/meta/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs index f2b43a9c3a28..f763d6b4430d 100644 --- a/src/common/meta/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -97,7 +97,6 @@ impl Display for RangeRequest { } impl RangeRequest { - #[inline] pub fn new() -> Self { Self { key: vec![], @@ -114,7 +113,6 @@ impl RangeRequest { /// key is the first key for the range, If range_end is not given, the /// request only looks up key. - #[inline] pub fn with_key(mut self, key: impl Into>) -> Self { self.key = key.into(); self @@ -129,7 +127,6 @@ impl RangeRequest { /// then the range request gets all keys prefixed with key. /// If both key and range_end are '\0', then the range request returns all /// keys. - #[inline] pub fn with_range(mut self, key: impl Into>, range_end: impl Into>) -> Self { self.key = key.into(); self.range_end = range_end.into(); @@ -138,7 +135,6 @@ impl RangeRequest { /// Gets all keys prefixed with key. /// range_end is the key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), - #[inline] pub fn with_prefix(mut self, key: impl Into>) -> Self { self.key = key.into(); self.range_end = util::get_prefix_end_key(&self.key); @@ -147,14 +143,12 @@ impl RangeRequest { /// limit is a limit on the number of keys returned for the request. When /// limit is set to 0, it is treated as no limit. - #[inline] pub fn with_limit(mut self, limit: i64) -> Self { self.limit = limit; self } /// keys_only when set returns only the keys and not the values. - #[inline] pub fn with_keys_only(mut self) -> Self { self.keys_only = true; self @@ -204,7 +198,6 @@ impl RangeResponse { } } - #[inline] pub fn take_kvs(&mut self) -> Vec { self.kvs.drain(..).collect() } @@ -244,7 +237,6 @@ impl From for PutRequest { } impl PutRequest { - #[inline] pub fn new() -> Self { Self { key: vec![], @@ -254,7 +246,6 @@ impl PutRequest { } /// key is the key, in bytes, to put into the key-value store. - #[inline] pub fn with_key(mut self, key: impl Into>) -> Self { self.key = key.into(); self @@ -262,7 +253,6 @@ impl PutRequest { /// value is the value, in bytes, to associate with the key in the /// key-value store. - #[inline] pub fn with_value(mut self, value: impl Into>) -> Self { self.value = value.into(); self @@ -270,7 +260,6 @@ impl PutRequest { /// If prev_kv is set, gets the previous key-value pair before changing it. /// The previous key-value pair will be returned in the put response. - #[inline] pub fn with_prev_kv(mut self) -> Self { self.prev_kv = true; self @@ -330,18 +319,15 @@ impl Default for BatchGetRequest { } impl BatchGetRequest { - #[inline] pub fn new() -> Self { Self { keys: vec![] } } - #[inline] pub fn with_keys(mut self, keys: Vec>) -> Self { self.keys = keys; self } - #[inline] pub fn add_key(mut self, key: impl Into>) -> Self { self.keys.push(key.into()); self @@ -416,7 +402,6 @@ impl From for BatchPutRequest { } impl BatchPutRequest { - #[inline] pub fn new() -> Self { Self { kvs: vec![], @@ -424,7 +409,6 @@ impl BatchPutRequest { } } - #[inline] pub fn add_kv(mut self, key: impl Into>, value: impl Into>) -> Self { self.kvs.push(KeyValue { key: key.into(), @@ -435,7 +419,6 @@ impl BatchPutRequest { /// If prev_kv is set, gets the previous key-value pair before changing it. /// The previous key-value pair will be returned in the put response. - #[inline] pub fn with_prev_kv(mut self) -> Self { self.prev_kv = true; self @@ -467,7 +450,6 @@ impl BatchPutResponse { } } - #[inline] pub fn take_prev_kvs(&mut self) -> Vec { self.prev_kvs.drain(..).collect() } @@ -501,7 +483,6 @@ impl From for BatchDeleteRequest { } impl BatchDeleteRequest { - #[inline] pub fn new() -> Self { Self { keys: vec![], @@ -509,7 +490,12 @@ impl BatchDeleteRequest { } } - #[inline] + /// Sets `keys`. + pub fn with_keys(mut self, keys: Vec>) -> Self { + self.keys = keys; + self + } + pub fn add_key(mut self, key: impl Into>) -> Self { self.keys.push(key.into()); self @@ -517,7 +503,6 @@ impl BatchDeleteRequest { /// If prev_kv is set, gets the previous key-value pair before deleting it. /// The previous key-value pair will be returned in the batch delete response. - #[inline] pub fn with_prev_kv(mut self) -> Self { self.prev_kv = true; self @@ -582,7 +567,6 @@ impl From for CompareAndPutRequest { } impl CompareAndPutRequest { - #[inline] pub fn new() -> Self { Self { key: vec![], @@ -592,14 +576,12 @@ impl CompareAndPutRequest { } /// key is the key, in bytes, to put into the key-value store. - #[inline] pub fn with_key(mut self, key: impl Into>) -> Self { self.key = key.into(); self } /// expect is the previous value, in bytes - #[inline] pub fn with_expect(mut self, expect: impl Into>) -> Self { self.expect = expect.into(); self @@ -607,7 +589,6 @@ impl CompareAndPutRequest { /// value is the value, in bytes, to associate with the key in the /// key-value store. - #[inline] pub fn with_value(mut self, value: impl Into>) -> Self { self.value = value.into(); self @@ -649,12 +630,10 @@ impl CompareAndPutResponse { } } - #[inline] pub fn is_success(&self) -> bool { self.success } - #[inline] pub fn take_prev_kv(&mut self) -> Option { self.prev_kv.take() } @@ -703,7 +682,6 @@ impl From for DeleteRangeRequest { } impl DeleteRangeRequest { - #[inline] pub fn new() -> Self { Self { key: vec![], @@ -719,7 +697,6 @@ impl DeleteRangeRequest { /// key is the first key to delete in the range. If range_end is not given, /// the range is defined to contain only the key argument. - #[inline] pub fn with_key(mut self, key: impl Into>) -> Self { self.key = key.into(); self @@ -735,7 +712,6 @@ impl DeleteRangeRequest { /// the keys with the prefix (the given key). /// If range_end is '\0', the range is all keys greater than or equal to the /// key argument. - #[inline] pub fn with_range(mut self, key: impl Into>, range_end: impl Into>) -> Self { self.key = key.into(); self.range_end = range_end.into(); @@ -744,7 +720,6 @@ impl DeleteRangeRequest { /// Deletes all keys prefixed with key. /// range_end is one bit larger than the given key. - #[inline] pub fn with_prefix(mut self, key: impl Into>) -> Self { self.key = key.into(); self.range_end = util::get_prefix_end_key(&self.key); @@ -753,7 +728,6 @@ impl DeleteRangeRequest { /// If prev_kv is set, gets the previous key-value pairs before deleting it. /// The previous key-value pairs will be returned in the delete response. - #[inline] pub fn with_prev_kv(mut self) -> Self { self.prev_kv = true; self @@ -788,12 +762,10 @@ impl DeleteRangeResponse { } } - #[inline] pub fn deleted(&self) -> i64 { self.deleted } - #[inline] pub fn take_prev_kvs(&mut self) -> Vec { self.prev_kvs.drain(..).collect() }