Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive cleanup of mutex table #4414

Merged
merged 1 commit into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl ValidatorService {
tokio::spawn(async move {
narwhal_node::restarter::NodeRestarter::watch(
consensus_keypair,
&*consensus_committee,
&consensus_committee,
consensus_worker_cache,
consensus_storage_base_path,
consensus_execution_state,
Expand Down
101 changes: 92 additions & 9 deletions crates/sui-storage/src/mutex_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::info;

type InnerLockTable<K> = HashMap<K, Arc<tokio::sync::Mutex<()>>>;
Expand All @@ -22,6 +23,7 @@ pub struct MutexTable<K: Hash> {
_k: std::marker::PhantomData<K>,
_cleaner: JoinHandle<()>,
stop: Arc<AtomicBool>,
size: Arc<AtomicUsize>,
}

#[derive(Debug)]
Expand All @@ -46,6 +48,7 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
shard_size: usize,
cleanup_period: Duration,
cleanup_initial_delay: Duration,
cleanup_entries_threshold: usize,
) -> Self {
let lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>> = Arc::new(
(0..num_shards)
Expand All @@ -56,19 +59,29 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
let cloned = lock_table.clone();
let stop = Arc::new(AtomicBool::new(false));
let stop_cloned = stop.clone();
let size: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let size_cloned = size.clone();
Self {
random_state: RandomState::new(),
lock_table,
_k: std::marker::PhantomData {},
_cleaner: tokio::spawn(async move {
tokio::time::sleep(cleanup_initial_delay).await;
let mut previous_cleanup_instant = Instant::now();
while !stop_cloned.load(Ordering::SeqCst) {
Self::cleanup(cloned.clone());
tokio::time::sleep(cleanup_period).await;
if size_cloned.load(Ordering::SeqCst) >= cleanup_entries_threshold
|| previous_cleanup_instant.elapsed() >= cleanup_period
{
let num_removed = Self::cleanup(cloned.clone());
size_cloned.fetch_sub(num_removed, Ordering::SeqCst);
previous_cleanup_instant = Instant::now();
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
info!("Stopping mutex table cleanup!");
}),
stop,
size,
}
}

Expand All @@ -78,10 +91,16 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
shard_size,
Duration::from_secs(10),
Duration::from_secs(10),
10_000,
)
}

pub fn cleanup(lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>>) {
pub fn size(&self) -> usize {
self.size.load(Ordering::SeqCst)
}

pub fn cleanup(lock_table: Arc<Vec<RwLock<InnerLockTable<K>>>>) -> usize {
let mut num_removed: usize = 0;
for shard in lock_table.iter() {
let map = shard.try_write();
if map.is_err() {
Expand All @@ -93,12 +112,18 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
// This check is also likely sufficient e.g. you don't even need try_lock below, but keeping it just in case
if Arc::strong_count(v) == 1 {
let mutex_guard = v.try_lock();
mutex_guard.is_err()
if mutex_guard.is_ok() {
num_removed += 1;
false
} else {
true
}
} else {
true
}
});
}
num_removed
}

fn get_lock_idx(&self, key: &K) -> usize {
Expand Down Expand Up @@ -144,7 +169,10 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
let element = {
let mut map = self.lock_table[lock_idx].write().await;
map.entry(k)
.or_insert_with(|| Arc::new(Mutex::new(())))
.or_insert_with(|| {
self.size.fetch_add(1, Ordering::SeqCst);
Arc::new(Mutex::new(()))
})
.clone()
};
LockGuard(element.lock_owned().await)
Expand All @@ -171,7 +199,10 @@ impl<K: Hash + std::cmp::Eq + Send + Sync + 'static> MutexTable<K> {
.try_write()
.map_err(|_| TryAcquireLockError::LockTableLocked)?;
map.entry(k)
.or_insert_with(|| Arc::new(Mutex::new(())))
.or_insert_with(|| {
self.size.fetch_add(1, Ordering::SeqCst);
Arc::new(Mutex::new(()))
})
.clone()
};
let lock = element.try_lock_owned();
Expand Down Expand Up @@ -225,8 +256,13 @@ async fn test_mutex_table_concurrent_in_same_bucket() {
#[tokio::test]
async fn test_mutex_table() {
// Disable bg cleanup with Duration.MAX for initial delay
let mutex_table =
MutexTable::<String>::new_with_cleanup(1, 128, Duration::from_secs(10), Duration::MAX);
let mutex_table = MutexTable::<String>::new_with_cleanup(
1,
128,
Duration::from_secs(10),
Duration::MAX,
1000,
);
let john1 = mutex_table.try_acquire_lock("john".to_string());
assert!(john1.is_ok());
let john2 = mutex_table.try_acquire_lock("john".to_string());
Expand Down Expand Up @@ -259,6 +295,7 @@ async fn test_mutex_table_bg_cleanup() {
128,
Duration::from_secs(5),
Duration::from_secs(1),
1000,
);
let lock1 = mutex_table.try_acquire_lock("lock1".to_string());
let lock2 = mutex_table.try_acquire_lock("lock2".to_string());
Expand Down Expand Up @@ -296,3 +333,49 @@ async fn test_mutex_table_bg_cleanup() {
assert!(locked.is_empty());
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_mutex_table_bg_cleanup_with_size_threshold() {
// set up the table to never trigger cleanup because of time period but only size threshold
let mutex_table =
MutexTable::<String>::new_with_cleanup(1, 128, Duration::MAX, Duration::from_secs(1), 5);
let lock1 = mutex_table.try_acquire_lock("lock1".to_string());
let lock2 = mutex_table.try_acquire_lock("lock2".to_string());
let lock3 = mutex_table.try_acquire_lock("lock3".to_string());
let lock4 = mutex_table.try_acquire_lock("lock4".to_string());
let lock5 = mutex_table.try_acquire_lock("lock5".to_string());
assert!(lock1.is_ok());
assert!(lock2.is_ok());
assert!(lock3.is_ok());
assert!(lock4.is_ok());
assert!(lock5.is_ok());
// Trigger cleanup
MutexTable::cleanup(mutex_table.lock_table.clone());
// Try acquiring locks again, these should still fail because locks have not been released
let lock11 = mutex_table.try_acquire_lock("lock1".to_string());
let lock22 = mutex_table.try_acquire_lock("lock2".to_string());
let lock33 = mutex_table.try_acquire_lock("lock3".to_string());
let lock44 = mutex_table.try_acquire_lock("lock4".to_string());
let lock55 = mutex_table.try_acquire_lock("lock5".to_string());
assert!(lock11.is_err());
assert!(lock22.is_err());
assert!(lock33.is_err());
assert!(lock44.is_err());
assert!(lock55.is_err());
assert_eq!(mutex_table.size(), 5);
// drop all locks
drop(lock1);
drop(lock2);
drop(lock3);
drop(lock4);
drop(lock5);
tokio::task::yield_now().await;
// Wait for bg cleanup to be triggered because of size threshold
tokio::time::advance(Duration::from_secs(5)).await;
tokio::task::yield_now().await;
assert_eq!(mutex_table.size(), 0);
for entry in mutex_table.lock_table.iter() {
let locked = entry.read().await;
assert!(locked.is_empty());
}
}