From 409b299b9ce9020617b157d25734d5629d0fed14 Mon Sep 17 00:00:00 2001 From: sadhan Date: Wed, 31 Aug 2022 11:53:01 -0700 Subject: [PATCH] Adaptive cleanup of mutex table --- crates/sui-core/src/authority_server.rs | 2 +- crates/sui-storage/src/mutex_table.rs | 101 +++++++++++++++++++++--- 2 files changed, 93 insertions(+), 10 deletions(-) diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index b046ae185e1db..9f8f3a5032233 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -191,7 +191,7 @@ impl ValidatorService { tokio::spawn(async move { narwhal_node::restarter::NodeRestarter::watch( consensus_keypair, - &*consensus_committee, + &consensus_committee, consensus_worker_cache, consensus_storage_base_path, consensus_execution_state, diff --git a/crates/sui-storage/src/mutex_table.rs b/crates/sui-storage/src/mutex_table.rs index 7590771b7a3db..5dd171a315a54 100644 --- a/crates/sui-storage/src/mutex_table.rs +++ b/crates/sui-storage/src/mutex_table.rs @@ -6,12 +6,13 @@ use std::collections::HashMap; use std::error::Error; use std::fmt; use std::hash::{BuildHasher, Hash, Hasher}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; use tokio::task::JoinHandle; +use tokio::time::Instant; use tracing::info; type InnerLockTable = HashMap>>; @@ -22,6 +23,7 @@ pub struct MutexTable { _k: std::marker::PhantomData, _cleaner: JoinHandle<()>, stop: Arc, + size: Arc, } #[derive(Debug)] @@ -46,6 +48,7 @@ impl MutexTable { shard_size: usize, cleanup_period: Duration, cleanup_initial_delay: Duration, + cleanup_entries_threshold: usize, ) -> Self { let lock_table: Arc>>> = Arc::new( (0..num_shards) @@ -56,19 +59,29 @@ impl MutexTable { let cloned = lock_table.clone(); let stop = Arc::new(AtomicBool::new(false)); let stop_cloned = stop.clone(); + let size: Arc = Arc::new(AtomicUsize::new(0)); + let size_cloned = size.clone(); Self { random_state: RandomState::new(), lock_table, _k: std::marker::PhantomData {}, _cleaner: tokio::spawn(async move { tokio::time::sleep(cleanup_initial_delay).await; + let mut previous_cleanup_instant = Instant::now(); while !stop_cloned.load(Ordering::SeqCst) { - Self::cleanup(cloned.clone()); - tokio::time::sleep(cleanup_period).await; + if size_cloned.load(Ordering::SeqCst) >= cleanup_entries_threshold + || previous_cleanup_instant.elapsed() >= cleanup_period + { + let num_removed = Self::cleanup(cloned.clone()); + size_cloned.fetch_sub(num_removed, Ordering::SeqCst); + previous_cleanup_instant = Instant::now(); + } + tokio::time::sleep(Duration::from_secs(1)).await; } info!("Stopping mutex table cleanup!"); }), stop, + size, } } @@ -78,10 +91,16 @@ impl MutexTable { shard_size, Duration::from_secs(10), Duration::from_secs(10), + 10_000, ) } - pub fn cleanup(lock_table: Arc>>>) { + pub fn size(&self) -> usize { + self.size.load(Ordering::SeqCst) + } + + pub fn cleanup(lock_table: Arc>>>) -> usize { + let mut num_removed: usize = 0; for shard in lock_table.iter() { let map = shard.try_write(); if map.is_err() { @@ -93,12 +112,18 @@ impl MutexTable { // This check is also likely sufficient e.g. you don't even need try_lock below, but keeping it just in case if Arc::strong_count(v) == 1 { let mutex_guard = v.try_lock(); - mutex_guard.is_err() + if mutex_guard.is_ok() { + num_removed += 1; + false + } else { + true + } } else { true } }); } + num_removed } fn get_lock_idx(&self, key: &K) -> usize { @@ -144,7 +169,10 @@ impl MutexTable { let element = { let mut map = self.lock_table[lock_idx].write().await; map.entry(k) - .or_insert_with(|| Arc::new(Mutex::new(()))) + .or_insert_with(|| { + self.size.fetch_add(1, Ordering::SeqCst); + Arc::new(Mutex::new(())) + }) .clone() }; LockGuard(element.lock_owned().await) @@ -171,7 +199,10 @@ impl MutexTable { .try_write() .map_err(|_| TryAcquireLockError::LockTableLocked)?; map.entry(k) - .or_insert_with(|| Arc::new(Mutex::new(()))) + .or_insert_with(|| { + self.size.fetch_add(1, Ordering::SeqCst); + Arc::new(Mutex::new(())) + }) .clone() }; let lock = element.try_lock_owned(); @@ -225,8 +256,13 @@ async fn test_mutex_table_concurrent_in_same_bucket() { #[tokio::test] async fn test_mutex_table() { // Disable bg cleanup with Duration.MAX for initial delay - let mutex_table = - MutexTable::::new_with_cleanup(1, 128, Duration::from_secs(10), Duration::MAX); + let mutex_table = MutexTable::::new_with_cleanup( + 1, + 128, + Duration::from_secs(10), + Duration::MAX, + 1000, + ); let john1 = mutex_table.try_acquire_lock("john".to_string()); assert!(john1.is_ok()); let john2 = mutex_table.try_acquire_lock("john".to_string()); @@ -259,6 +295,7 @@ async fn test_mutex_table_bg_cleanup() { 128, Duration::from_secs(5), Duration::from_secs(1), + 1000, ); let lock1 = mutex_table.try_acquire_lock("lock1".to_string()); let lock2 = mutex_table.try_acquire_lock("lock2".to_string()); @@ -296,3 +333,49 @@ async fn test_mutex_table_bg_cleanup() { assert!(locked.is_empty()); } } + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn test_mutex_table_bg_cleanup_with_size_threshold() { + // set up the table to never trigger cleanup because of time period but only size threshold + let mutex_table = + MutexTable::::new_with_cleanup(1, 128, Duration::MAX, Duration::from_secs(1), 5); + let lock1 = mutex_table.try_acquire_lock("lock1".to_string()); + let lock2 = mutex_table.try_acquire_lock("lock2".to_string()); + let lock3 = mutex_table.try_acquire_lock("lock3".to_string()); + let lock4 = mutex_table.try_acquire_lock("lock4".to_string()); + let lock5 = mutex_table.try_acquire_lock("lock5".to_string()); + assert!(lock1.is_ok()); + assert!(lock2.is_ok()); + assert!(lock3.is_ok()); + assert!(lock4.is_ok()); + assert!(lock5.is_ok()); + // Trigger cleanup + MutexTable::cleanup(mutex_table.lock_table.clone()); + // Try acquiring locks again, these should still fail because locks have not been released + let lock11 = mutex_table.try_acquire_lock("lock1".to_string()); + let lock22 = mutex_table.try_acquire_lock("lock2".to_string()); + let lock33 = mutex_table.try_acquire_lock("lock3".to_string()); + let lock44 = mutex_table.try_acquire_lock("lock4".to_string()); + let lock55 = mutex_table.try_acquire_lock("lock5".to_string()); + assert!(lock11.is_err()); + assert!(lock22.is_err()); + assert!(lock33.is_err()); + assert!(lock44.is_err()); + assert!(lock55.is_err()); + assert_eq!(mutex_table.size(), 5); + // drop all locks + drop(lock1); + drop(lock2); + drop(lock3); + drop(lock4); + drop(lock5); + tokio::task::yield_now().await; + // Wait for bg cleanup to be triggered because of size threshold + tokio::time::advance(Duration::from_secs(5)).await; + tokio::task::yield_now().await; + assert_eq!(mutex_table.size(), 0); + for entry in mutex_table.lock_table.iter() { + let locked = entry.read().await; + assert!(locked.is_empty()); + } +}