Skip to content

Commit

Permalink
Adaptive cleanup of mutex table
Browse files Browse the repository at this point in the history
  • Loading branch information
sadhansood committed Sep 8, 2022
1 parent 3b41203 commit 409b299
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 10 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl ValidatorService {
tokio::spawn(async move {
narwhal_node::restarter::NodeRestarter::watch(
consensus_keypair,
&*consensus_committee,
&consensus_committee,
consensus_worker_cache,
consensus_storage_base_path,
consensus_execution_state,
Expand Down
101 changes: 92 additions & 9 deletions crates/sui-storage/src/mutex_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::info;

type InnerLockTable<K> = HashMap<K, Arc<tokio::sync::Mutex<()>>>;
Expand All @@ -22,6 +23,7 @@ pub struct MutexTable<K: Hash> {
_k: std::marker::PhantomData<K>,
_cleaner: JoinHandle<()>,
stop: Arc<AtomicBool>,
size: Arc<AtomicUsize>,
}

#[derive(Debug)]
Expand All @@ -46,6 +48,7 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
shard_size: usize,
cleanup_period: Duration,
cleanup_initial_delay: Duration,
cleanup_entries_threshold: usize,
) -> Self {
let lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>> = Arc::new(
(0..num_shards)
Expand All @@ -56,19 +59,29 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
let cloned = lock_table.clone();
let stop = Arc::new(AtomicBool::new(false));
let stop_cloned = stop.clone();
let size: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let size_cloned = size.clone();
Self {
random_state: RandomState::new(),
lock_table,
_k: std::marker::PhantomData {},
_cleaner: tokio::spawn(async move {
tokio::time::sleep(cleanup_initial_delay).await;
let mut previous_cleanup_instant = Instant::now();
while !stop_cloned.load(Ordering::SeqCst) {
Self::cleanup(cloned.clone());
tokio::time::sleep(cleanup_period).await;
if size_cloned.load(Ordering::SeqCst) >= cleanup_entries_threshold
|| previous_cleanup_instant.elapsed() >= cleanup_period
{
let num_removed = Self::cleanup(cloned.clone());
size_cloned.fetch_sub(num_removed, Ordering::SeqCst);
previous_cleanup_instant = Instant::now();
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
info!("Stopping mutex table cleanup!");
}),
stop,
size,
}
}

Expand All @@ -78,10 +91,16 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
shard_size,
Duration::from_secs(10),
Duration::from_secs(10),
10_000,
)
}

pub fn cleanup(lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>>) {
pub fn size(&self) -> usize {
self.size.load(Ordering::SeqCst)
}

pub fn cleanup(lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>>) -> usize {
let mut num_removed: usize = 0;
for shard in lock_table.iter() {
let map = shard.try_write();
if map.is_err() {
Expand All @@ -93,12 +112,18 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
// This check is also likely sufficient e.g. you don't even need try_lock below, but keeping it just in case
if Arc::strong_count(v) == 1 {
let mutex_guard = v.try_lock();
mutex_guard.is_err()
if mutex_guard.is_ok() {
num_removed += 1;
false
} else {
true
}
} else {
true
}
});
}
num_removed
}

fn get_lock_idx(&self, key: &K) -> usize {
Expand Down Expand Up @@ -144,7 +169,10 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
let element = {
let mut map = self.lock_table[lock_idx].write().await;
map.entry(k)
.or_insert_with(|| Arc::new(Mutex::new(())))
.or_insert_with(|| {
self.size.fetch_add(1, Ordering::SeqCst);
Arc::new(Mutex::new(()))
})
.clone()
};
LockGuard(element.lock_owned().await)
Expand All @@ -171,7 +199,10 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
.try_write()
.map_err(|_| TryAcquireLockError::LockTableLocked)?;
map.entry(k)
.or_insert_with(|| Arc::new(Mutex::new(())))
.or_insert_with(|| {
self.size.fetch_add(1, Ordering::SeqCst);
Arc::new(Mutex::new(()))
})
.clone()
};
let lock = element.try_lock_owned();
Expand Down Expand Up @@ -225,8 +256,13 @@ async fn test_mutex_table_concurrent_in_same_bucket() {
#[tokio::test]
async fn test_mutex_table() {
// Disable bg cleanup with Duration.MAX for initial delay
let mutex_table =
MutexTable::<String>::new_with_cleanup(1, 128, Duration::from_secs(10), Duration::MAX);
let mutex_table = MutexTable::<String>::new_with_cleanup(
1,
128,
Duration::from_secs(10),
Duration::MAX,
1000,
);
let john1 = mutex_table.try_acquire_lock("john".to_string());
assert!(john1.is_ok());
let john2 = mutex_table.try_acquire_lock("john".to_string());
Expand Down Expand Up @@ -259,6 +295,7 @@ async fn test_mutex_table_bg_cleanup() {
128,
Duration::from_secs(5),
Duration::from_secs(1),
1000,
);
let lock1 = mutex_table.try_acquire_lock("lock1".to_string());
let lock2 = mutex_table.try_acquire_lock("lock2".to_string());
Expand Down Expand Up @@ -296,3 +333,49 @@ async fn test_mutex_table_bg_cleanup() {
assert!(locked.is_empty());
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_mutex_table_bg_cleanup_with_size_threshold() {
// set up the table to never trigger cleanup because of time period but only size threshold
let mutex_table =
MutexTable::<String>::new_with_cleanup(1, 128, Duration::MAX, Duration::from_secs(1), 5);
let lock1 = mutex_table.try_acquire_lock("lock1".to_string());
let lock2 = mutex_table.try_acquire_lock("lock2".to_string());
let lock3 = mutex_table.try_acquire_lock("lock3".to_string());
let lock4 = mutex_table.try_acquire_lock("lock4".to_string());
let lock5 = mutex_table.try_acquire_lock("lock5".to_string());
assert!(lock1.is_ok());
assert!(lock2.is_ok());
assert!(lock3.is_ok());
assert!(lock4.is_ok());
assert!(lock5.is_ok());
// Trigger cleanup
MutexTable::cleanup(mutex_table.lock_table.clone());
// Try acquiring locks again, these should still fail because locks have not been released
let lock11 = mutex_table.try_acquire_lock("lock1".to_string());
let lock22 = mutex_table.try_acquire_lock("lock2".to_string());
let lock33 = mutex_table.try_acquire_lock("lock3".to_string());
let lock44 = mutex_table.try_acquire_lock("lock4".to_string());
let lock55 = mutex_table.try_acquire_lock("lock5".to_string());
assert!(lock11.is_err());
assert!(lock22.is_err());
assert!(lock33.is_err());
assert!(lock44.is_err());
assert!(lock55.is_err());
assert_eq!(mutex_table.size(), 5);
// drop all locks
drop(lock1);
drop(lock2);
drop(lock3);
drop(lock4);
drop(lock5);
tokio::task::yield_now().await;
// Wait for bg cleanup to be triggered because of size threshold
tokio::time::advance(Duration::from_secs(5)).await;
tokio::task::yield_now().await;
assert_eq!(mutex_table.size(), 0);
for entry in mutex_table.lock_table.iter() {
let locked = entry.read().await;
assert!(locked.is_empty());
}
}

0 comments on commit 409b299

Please sign in to comment.