Skip to content

Commit

Permalink
refactor: simplify the TombstoneManager
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 14, 2024
1 parent 762be6d commit b36ce6d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 50 deletions.
20 changes: 10 additions & 10 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ impl TableMetadataManager {
} else {
vec![]
};
let mut keys: Vec<Key<_>> = Vec::with_capacity(3 + datanode_ids.len());
let mut keys = Vec::with_capacity(3 + datanode_ids.len());
let table_name_key = TableNameKey::new(
&table_name.catalog_name,
&table_name.schema_name,
Expand All @@ -606,11 +606,11 @@ impl TableMetadataManager {
.map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
.collect::<Vec<_>>();

keys.push(Key::atomic(table_name_key.to_string()));
keys.push(Key::other(table_info_key.to_string()));
keys.push(Key::other(table_route_key.to_string()));
keys.push(Key::atomic(table_name_key.as_raw_key()));
keys.push(Key::other(table_info_key.as_raw_key()));
keys.push(Key::other(table_route_key.as_raw_key()));
for key in &datanode_table_keys {
keys.push(Key::other(key.to_string()));
keys.push(Key::other(key.as_raw_key()));
}
ensure!(
self.tombstone_manager.create(keys).await?,
Expand Down Expand Up @@ -638,7 +638,7 @@ impl TableMetadataManager {
} else {
vec![]
};
let mut keys: Vec<Key<_>> = Vec::with_capacity(3 + datanode_ids.len());
let mut keys = Vec::with_capacity(3 + datanode_ids.len());
let table_name = TableNameKey::new(
&table_name.catalog_name,
&table_name.schema_name,
Expand All @@ -651,11 +651,11 @@ impl TableMetadataManager {
.map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
.collect::<Vec<_>>();

keys.push(Key::atomic(table_name.to_string()));
keys.push(Key::other(table_info_key.to_string()));
keys.push(Key::other(table_route_key.to_string()));
keys.push(Key::atomic(table_name.as_raw_key()));
keys.push(Key::other(table_info_key.as_raw_key()));
keys.push(Key::other(table_route_key.as_raw_key()));
for key in &datanode_table_keys {
keys.push(Key::other(key.to_string()));
keys.push(Key::other(key.as_raw_key()));
}
ensure!(
self.tombstone_manager.restore(keys).await?,
Expand Down
90 changes: 50 additions & 40 deletions src/common/meta/src/key/tombstone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;

use snafu::{ensure, OptionExt};

use super::TableMetaKeyGetTxnOp;
use crate::error::{self, Result};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::TableMetaKey;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;

Expand All @@ -38,12 +35,12 @@ fn to_tombstone(key: &[u8]) -> Vec<u8> {
[TOMBSTONE_PREFIX.as_bytes(), key].concat()
}

impl<T: TableMetaKey> TombstoneKey<&T> {
impl TombstoneKey<&Vec<u8>> {
/// Returns the origin key and tombstone key.
fn to_keys(&self) -> (Vec<u8>, Vec<u8>) {
let key = self.0.as_raw_key();
let tombstone_key = to_tombstone(&key);
(key, tombstone_key)
let key = self.0;
let tombstone_key = to_tombstone(key);
(key.clone(), tombstone_key)
}

/// Returns the origin key and tombstone key.
Expand All @@ -53,42 +50,42 @@ impl<T: TableMetaKey> TombstoneKey<&T> {

/// Returns the tombstone key.
fn to_tombstone_key(&self) -> Vec<u8> {
let key = self.0.as_raw_key();
to_tombstone(&key)
let key = self.0;
to_tombstone(key)
}
}

impl<T: TableMetaKey> TableMetaKeyGetTxnOp for TombstoneKey<&T> {
impl TableMetaKeyGetTxnOp for TombstoneKey<&Vec<u8>> {
fn build_get_op(
&self,
) -> (
TxnOp,
impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
let key = to_tombstone(&self.0.as_raw_key());
let key = to_tombstone(self.0);
(TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key))
}
}

/// Atomic Key:
/// The value corresponding to the key remains consistent between two transactions.
pub(crate) enum Key<T> {
Atomic(T),
Other(T),
pub(crate) enum Key {
Atomic(Vec<u8>),
Other(Vec<u8>),
}

impl<T> Key<T> {
impl Key {
/// Returns a new [Key::Atomic].
pub(crate) fn atomic(key: T) -> Self {
Self::Atomic(key)
pub(crate) fn atomic<T: Into<Vec<u8>>>(key: T) -> Self {
Self::Atomic(key.into())
}

/// Returns a new [Key::Other].
pub(crate) fn other(key: T) -> Self {
Self::Other(key)
pub(crate) fn other<T: Into<Vec<u8>>>(key: T) -> Self {
Self::Other(key.into())
}

fn get_inner(&self) -> &T {
fn get_inner(&self) -> &Vec<u8> {
match self {
Key::Atomic(key) => key,
Key::Other(key) => key,
Expand All @@ -100,6 +97,18 @@ impl<T> Key<T> {
}
}

impl TableMetaKeyGetTxnOp for Key {
fn build_get_op(
&self,
) -> (
TxnOp,
impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
let key = self.get_inner().clone();
(TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key))
}
}

impl TombstoneManager {
/// Returns [TombstoneManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Expand All @@ -110,15 +119,10 @@ impl TombstoneManager {
/// Preforms to:
/// - retrieve all values corresponding `keys`.
/// - stores tombstone values.
pub(crate) async fn create<T: TableMetaKey + TableMetaKeyGetTxnOp + Display>(
&self,
keys: Vec<Key<T>>,
) -> Result<bool> {
pub(crate) async fn create(&self, keys: Vec<Key>) -> Result<bool> {
// Builds transaction to retrieve all values
let (operations, mut filters): (Vec<_>, Vec<_>) = keys
.iter()
.map(|key| key.get_inner().build_get_op())
.unzip();
let (operations, mut filters): (Vec<_>, Vec<_>) =
keys.iter().map(|key| key.build_get_op()).unzip();

let txn = Txn::new().and_then(operations);
let mut resp = self.kv_backend.txn(txn).await?;
Expand All @@ -136,7 +140,10 @@ impl TombstoneManager {
for (idx, key) in keys.iter().enumerate() {
let filter = &mut filters[idx];
let value = filter(&mut set).with_context(|| error::UnexpectedSnafu {
err_msg: format!("Missing value, key: {}", key.get_inner()),
err_msg: format!(
"Missing value, key: {}",
String::from_utf8_lossy(key.get_inner())
),
})?;
let (origin_key, tombstone_key) = TombstoneKey(key.get_inner()).into_keys();
// Compares the atomic key.
Expand Down Expand Up @@ -167,10 +174,7 @@ impl TombstoneManager {
/// Preforms to:
/// - retrieve all tombstone values corresponding `keys`.
/// - stores tombstone values.
pub(crate) async fn restore<T: TableMetaKey + TableMetaKeyGetTxnOp + Display>(
&self,
keys: Vec<Key<T>>,
) -> Result<bool> {
pub(crate) async fn restore(&self, keys: Vec<Key>) -> Result<bool> {
// Builds transaction to retrieve all tombstone values
let tombstone_keys = keys
.iter()
Expand All @@ -196,7 +200,10 @@ impl TombstoneManager {
for (idx, key) in keys.iter().enumerate() {
let filter = &mut filters[idx];
let value = filter(&mut set).with_context(|| error::UnexpectedSnafu {
err_msg: format!("Missing value, key: {}", key.get_inner()),
err_msg: format!(
"Missing value, key: {}",
String::from_utf8_lossy(key.get_inner())
),
})?;
let (origin_key, tombstone_key) = tombstone_keys[idx].to_keys();
// Compares the atomic key.
Expand All @@ -223,7 +230,7 @@ impl TombstoneManager {
}

/// Deletes tombstones for keys.
pub(crate) async fn delete<T: TableMetaKey>(&self, keys: Vec<T>) -> Result<()> {
pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
let operations = keys
.iter()
.map(|key| TxnOp::Delete(TombstoneKey(key).to_tombstone_key()))
Expand Down Expand Up @@ -290,7 +297,7 @@ mod tests {
assert!(!kv_backend.exists(b"foo").await.unwrap());
assert_eq!(
kv_backend
.get(&TombstoneKey(&"bar").to_tombstone_key())
.get(&TombstoneKey(&"bar".into()).to_tombstone_key())
.await
.unwrap()
.unwrap()
Expand All @@ -299,7 +306,7 @@ mod tests {
);
assert_eq!(
kv_backend
.get(&TombstoneKey(&"foo").to_tombstone_key())
.get(&TombstoneKey(&"foo".into()).to_tombstone_key())
.await
.unwrap()
.unwrap()
Expand Down Expand Up @@ -329,7 +336,7 @@ mod tests {
assert!(!kv_backend.exists(b"foo").await.unwrap());
assert_eq!(
kv_backend
.get(&TombstoneKey(&"bar").to_tombstone_key())
.get(&TombstoneKey(&"bar".into()).to_tombstone_key())
.await
.unwrap()
.unwrap()
Expand All @@ -338,7 +345,7 @@ mod tests {
);
assert_eq!(
kv_backend
.get(&TombstoneKey(&"foo").to_tombstone_key())
.get(&TombstoneKey(&"foo".into()).to_tombstone_key())
.await
.unwrap()
.unwrap()
Expand Down Expand Up @@ -456,7 +463,10 @@ mod tests {
.create(vec![Key::atomic("bar"), Key::other("foo")])
.await
.unwrap());
tombstone_manager.delete(vec!["bar", "foo"]).await.unwrap();
tombstone_manager
.delete(vec![b"bar".to_vec(), b"foo".to_vec()])
.await
.unwrap();
assert!(kv_backend.is_empty());
}
}

0 comments on commit b36ce6d

Please sign in to comment.