Skip to content
This repository has been archived by the owner on Nov 29, 2022. It is now read-only.

[feature] add support to remove_all keys atomic operation #38

Merged
merged 6 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 95 additions & 2 deletions typed-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ type StoreResult<T> = Result<T, StoreError>;

pub enum StoreCommand<Key, Value> {
Write(Key, Value),
WriteAll(Vec<(Key, Value)>, oneshot::Sender<StoreResult<()>>),
Delete(Key),
DeleteAll(Vec<Key>, oneshot::Sender<StoreResult<()>>),
Read(Key, oneshot::Sender<StoreResult<Option<Value>>>),
ReadAll(Vec<Key>, oneshot::Sender<StoreResult<Vec<Option<Value>>>>),
NotifyRead(Key, oneshot::Sender<StoreResult<Option<Value>>>),
}

Expand All @@ -48,7 +51,7 @@ where
Value: Serialize + DeserializeOwned + Send + Clone + 'static,
{
pub fn new(keyed_db: rocks::DBMap<Key, Value>) -> Self {
let mut obligations = HashMap::<_, VecDeque<oneshot::Sender<_>>>::new();
let mut obligations = HashMap::<Key, VecDeque<oneshot::Sender<_>>>::new();
let (tx, mut rx) = channel(100);
tokio::spawn(async move {
while let Some(command) = rx.recv().await {
Expand All @@ -61,6 +64,21 @@ where
}
}
}
StoreCommand::WriteAll(key_values, sender) => {
let response =
keyed_db.multi_insert(key_values.iter().map(|(k, v)| (k, v)));

if response.is_ok() {
for (key, _) in key_values {
if let Some(mut senders) = obligations.remove(&key) {
while let Some(s) = senders.pop_front() {
let _ = s.send(Ok(None));
}
}
}
}
let _ = sender.send(response);
}
StoreCommand::Delete(key) => {
let _ = keyed_db.remove(&key);
if let Some(mut senders) = obligations.remove(&key) {
Expand All @@ -69,11 +87,28 @@ where
}
}
}

StoreCommand::DeleteAll(keys, sender) => {
let response = keyed_db.multi_remove(keys.iter());
// notify the obligations only when the delete was successful
if response.is_ok() {
for key in keys {
if let Some(mut senders) = obligations.remove(&key) {
while let Some(s) = senders.pop_front() {
let _ = s.send(Ok(None));
}
}
}
}
Comment on lines +93 to +101
Copy link
Contributor Author

@akichidis akichidis Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that there is value doing this (we do similar for the single Delete operation). Based on the implementation of the notify_read the obligation:

  • either will be notified immediately if the value is available
  • or will be notified once it becomes available

in order for an obligator to be notified when a value gets deleted we should have a value already in place, which means that the obligator will have already received a notification via the earlier write operation and then get popped up by the queue.

The other alternative would be to just notify irrespective of whether we do have a successful delete or not - but again don't know what's the value then.

I might be missing a case here, but if we don't see this possible I would lean towards removing this code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the notify_read operation is indeed to implement a post-write callback signal.

The use case is to make sure you only move on when you have guarantees you'll be able to recover from a process crash through persistence, and in practice we use it as a barrier (i.e. you have X writes to perform and only move on when they've all been done). I've never seen the value of a notify_read actually used or inspected, but rather always discarded: the signal is we're done with the next write operation on those keys, rather than the details of the written semantics.

Does that make things more clear? I agree that the name notify_read and the discipline of actually returning the value when we don't use it are confusing, and that there are probably simplifications to be found there. But I think discharging an obligation on a delete makes sense to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess looking on the code of notify_read it seemed to me that the caller will want to get notified once the data are written by someone (becoming available) ....to give an analogy, like a one-off pub/sub with persistence. Ok cool, I'll leave as is then, but it seems that we might need to iterate this in the future , but probably not a priority for now.

let _ = sender.send(response);
}
StoreCommand::Read(key, sender) => {
let response = keyed_db.get(&key);
let _ = sender.send(response);
}
StoreCommand::ReadAll(keys, sender) => {
let response = keyed_db.multi_get(keys.as_slice());
let _ = sender.send(response);
}
StoreCommand::NotifyRead(key, sender) => {
let response = keyed_db.get(&key);
if let Ok(Some(_)) = response {
Expand Down Expand Up @@ -103,12 +138,52 @@ where
}
}

/// Atomically writes all the key-value pairs in storage.
/// If the operation is successful, then the result will be a non
/// error empty result. Otherwise the error is returned.
pub async fn write_all(
&self,
key_value_pairs: impl IntoIterator<Item = (Key, Value)>,
) -> StoreResult<()> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::WriteAll(
key_value_pairs.into_iter().collect(),
sender,
))
.await
{
panic!("Failed to send WriteAll command to store: {}", e);
}
receiver
.await
.expect("Failed to receive reply to WriteAll command from store")
}

pub async fn remove(&self, key: Key) {
if let Err(e) = self.channel.send(StoreCommand::Delete(key)).await {
panic!("Failed to send Delete command to store: {}", e);
}
}

/// Atomically removes all the data referenced by the provided keys.
/// If the operation is successful, then the result will be a non
/// error empty result. Otherwise the error is returned.
pub async fn remove_all(&self, keys: impl IntoIterator<Item = Key>) -> StoreResult<()> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::DeleteAll(keys.into_iter().collect(), sender))
.await
{
panic!("Failed to send DeleteAll command to store: {}", e);
}
receiver
.await
.expect("Failed to receive reply to RemoveAll command from store")
}

pub async fn read(&self, key: Key) -> StoreResult<Option<Value>> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self.channel.send(StoreCommand::Read(key, sender)).await {
Expand All @@ -119,6 +194,24 @@ where
.expect("Failed to receive reply to Read command from store")
}

/// Fetches all the values for the provided keys.
pub async fn read_all(
&self,
keys: impl IntoIterator<Item = Key>,
) -> StoreResult<Vec<Option<Value>>> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
.channel
.send(StoreCommand::ReadAll(keys.into_iter().collect(), sender))
.await
{
panic!("Failed to send ReadAll command to store: {}", e);
}
receiver
.await
.expect("Failed to receive reply to ReadAll command from store")
}

pub async fn notify_read(&self, key: Key) -> StoreResult<Option<Value>> {
let (sender, receiver) = oneshot::channel();
if let Err(e) = self
Expand Down
14 changes: 11 additions & 3 deletions typed-store/src/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,19 @@ where
}

/// Returns a vector of values corresponding to the keys provided.
fn multi_get(&self, keys: &[K]) -> Result<Vec<Option<V>>, TypedStoreError> {
fn multi_get<J>(
&self,
keys: impl IntoIterator<Item = J>,
) -> Result<Vec<Option<V>>, TypedStoreError>
where
J: Borrow<K>,
{
let cf = self.cf();

let keys_bytes: Result<Vec<_>, TypedStoreError> =
keys.iter().map(|k| Ok((&cf, be_fix_int_ser(k)?))).collect();
let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
.into_iter()
.map(|k| Ok((&cf, be_fix_int_ser(k.borrow())?)))
.collect();

let results = self.rocksdb.multi_get_cf(keys_bytes?);

Expand Down
64 changes: 64 additions & 0 deletions typed-store/src/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,67 @@ async fn read_notify() {
store.write(key, value).await;
assert!(handle.await.is_ok());
}

#[tokio::test]
async fn remove_all_successfully() {
// GIVEN Create new store.
let db = rocks::DBMap::<Vec<u8>, Vec<u8>>::open(temp_dir(), None, None).unwrap();
let store = Store::new(db);

// AND Write values to the store.
let keys = vec![
vec![0u8, 1u8, 2u8, 1u8],
vec![0u8, 1u8, 2u8, 2u8],
vec![0u8, 1u8, 2u8, 3u8],
];
let value = vec![4u8, 5u8, 6u8, 7u8];

for key in keys.clone() {
store.write(key.clone(), value.clone()).await;
}

// WHEN multi remove values
let result = store.remove_all(keys.clone().into_iter()).await;

// THEN
assert!(result.is_ok());

// AND values doesn't exist any more
for key in keys {
let result = store.read(key).await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
}
}

#[tokio::test]
async fn write_and_read_all_successfully() {
// GIVEN Create new store.
let db = rocks::DBMap::<Vec<u8>, Vec<u8>>::open(temp_dir(), None, None).unwrap();
let store = Store::new(db);

// AND key-values to store.
let key_values = vec![
(vec![0u8, 1u8, 2u8, 1u8], vec![4u8, 5u8, 6u8, 7u8]),
(vec![0u8, 1u8, 2u8, 2u8], vec![4u8, 5u8, 6u8, 7u8]),
(vec![0u8, 1u8, 2u8, 3u8], vec![4u8, 5u8, 6u8, 7u8]),
];

// WHEN
let result = store.write_all(key_values.clone()).await;

// THEN
assert!(result.is_ok());

// AND read_all to ensure that values have been written
let keys: Vec<Vec<u8>> = key_values.clone().into_iter().map(|(key, _)| key).collect();
let result = store.read_all(keys).await;

assert!(result.is_ok());
assert_eq!(result.as_ref().unwrap().len(), 3);

for (i, value) in result.unwrap().into_iter().enumerate() {
assert!(value.is_some());
assert_eq!(value.unwrap(), key_values[i].1);
}
}
7 changes: 6 additions & 1 deletion typed-store/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ where
fn values(&'a self) -> Self::Values;

/// Returns a vector of values corresponding to the keys provided.
fn multi_get(&self, keys: &[K]) -> Result<Vec<Option<V>>, Self::Error>;
fn multi_get<J>(
&self,
keys: impl IntoIterator<Item = J>,
) -> Result<Vec<Option<V>>, Self::Error>
where
J: Borrow<K>;

/// Inserts key-value pairs.
fn multi_insert<J, U>(
Expand Down