diff --git a/foyer-storage/src/admission/mod.rs b/foyer-storage/src/admission/mod.rs index dca37f26..0be895b7 100644 --- a/foyer-storage/src/admission/mod.rs +++ b/foyer-storage/src/admission/mod.rs @@ -12,39 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::marker::PhantomData; - +use async_trait::async_trait; use foyer_common::code::{Key, Value}; use std::fmt::Debug; #[allow(unused_variables)] +#[async_trait] pub trait AdmissionPolicy: Send + Sync + 'static + Debug { type Key: Key; type Value: Value; - fn judge(&self, key: &Self::Key, value: &Self::Value) -> bool; - - fn admit(&self, key: &Self::Key, value: &Self::Value) {} -} - -#[derive(Debug)] -pub struct AdmitAll(PhantomData<(K, V)>); - -impl Default for AdmitAll { - fn default() -> Self { - Self(PhantomData) - } -} - -impl AdmissionPolicy for AdmitAll { - type Key = K; - - type Value = V; + async fn judge(&self, key: &Self::Key, weight: usize) -> bool; - fn judge(&self, _key: &Self::Key, _value: &Self::Value) -> bool { - true - } + async fn admit(&self, key: &Self::Key, weight: usize); } pub mod rated_random; diff --git a/foyer-storage/src/admission/rated_random.rs b/foyer-storage/src/admission/rated_random.rs index 31b71648..7bb714d5 100644 --- a/foyer-storage/src/admission/rated_random.rs +++ b/foyer-storage/src/admission/rated_random.rs @@ -19,6 +19,7 @@ use std::{ time::{Duration, Instant}, }; +use async_trait::async_trait; use foyer_common::code::{Key, Value}; use parking_lot::{Mutex, MutexGuard}; use rand::{thread_rng, Rng}; @@ -96,18 +97,18 @@ where } } - fn judge(&self, key: &K, value: &V) -> bool { + fn judge(&self, _key: &K, weight: usize) -> bool { if let Some(inner) = self.inner.try_lock() { self.update(inner); } - // TODO(MrCroxx): unify weighter? - let weight = key.serialized_len() + value.serialized_len(); self.bytes.fetch_add(weight, Ordering::Relaxed); thread_rng().gen_range(0..PRECISION) < self.probability.load(Ordering::Relaxed) } + fn admit(&self, _key: &K, _weight: usize) {} + fn update(&self, mut inner: MutexGuard<'_, Inner>) { let now = Instant::now(); @@ -132,6 +133,7 @@ where } } +#[async_trait] impl AdmissionPolicy for RatedRandom where K: Key, @@ -141,8 +143,12 @@ where type Value = V; - fn judge(&self, key: &Self::Key, value: &Self::Value) -> bool { - self.judge(key, value) + async fn judge(&self, key: &Self::Key, weight: usize) -> bool { + self.judge(key, weight) + } + + async fn admit(&self, key: &Self::Key, weight: usize) { + self.admit(key, weight) } } @@ -168,9 +174,10 @@ mod tests { async fn submit(rr: Arc>>, score: Arc) { loop { tokio::time::sleep(Duration::from_millis(10)).await; - let size = thread_rng().gen_range(1000..10000); - if rr.judge(&0, &vec![0; size]) { - score.fetch_add(size, Ordering::Relaxed); + let weight = thread_rng().gen_range(1000..10000); + if rr.judge(&0, weight) { + score.fetch_add(weight, Ordering::Relaxed); + rr.admit(&0, weight); } } } @@ -184,6 +191,6 @@ mod tests { let error = (s as isize - RATE as isize * 10).unsigned_abs(); let eratio = error as f64 / (RATE as f64 * 10.0); - assert!(eratio < ERATIO); + assert!(eratio < ERATIO, "eratio: {} < ERATIO: {}", eratio, ERATIO); } } diff --git a/foyer-storage/src/metrics.rs b/foyer-storage/src/metrics.rs index c5b700a0..ad36fef2 100644 --- a/foyer-storage/src/metrics.rs +++ b/foyer-storage/src/metrics.rs @@ -20,7 +20,8 @@ use prometheus::{ #[derive(Debug)] pub struct Metrics { - pub latency_insert: Histogram, + pub latency_insert_admitted: Histogram, + pub latency_insert_rejected: Histogram, pub latency_lookup_hit: Histogram, pub latency_lookup_miss: Histogram, pub latency_remove: Histogram, @@ -68,7 +69,8 @@ impl Metrics { register_int_counter_vec_with_registry!(opts, &["op", "extra"], registry).unwrap() }; - let latency_insert = latency.with_label_values(&["insert", ""]); + let latency_insert_admitted = latency.with_label_values(&["insert", "admitted"]); + let latency_insert_rejected = latency.with_label_values(&["insert", "rejected"]); let latency_lookup_hit = latency.with_label_values(&["lookup", "hit"]); let latency_lookup_miss = latency.with_label_values(&["lookup", "miss"]); let latency_remove = latency.with_label_values(&["remove", ""]); @@ -86,7 +88,8 @@ impl Metrics { }; Self { - latency_insert, + latency_insert_admitted, + latency_insert_rejected, latency_lookup_hit, latency_lookup_miss, latency_remove, diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index c26ecaba..535ee6d4 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -12,7 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt::Debug, marker::PhantomData, sync::Arc, time::Instant}; +use std::{ + fmt::Debug, + marker::PhantomData, + sync::Arc, + time::{Duration, Instant}, +}; use bytes::{Buf, BufMut}; use foyer_common::{bits, queue::AsyncQueue, rate::RateLimiter}; @@ -243,56 +248,15 @@ where #[tracing::instrument(skip(self))] pub async fn insert(&self, key: K, value: V) -> Result { - let _timer = self.metrics.latency_insert.start_timer(); - - for admission in &self.admissions { - if !admission.judge(&key, &value) { - return Ok(false); - } - } - for admission in &self.admissions { - admission.admit(&key, &value); - } - - let serialized_len = self.serialized_len(&key, &value); - self.metrics.bytes_insert.inc_by(serialized_len as u64); - - let mut slice = match self.region_manager.allocate(serialized_len).await { - crate::region::AllocateResult::Ok(slice) => slice, - crate::region::AllocateResult::Full { mut slice, remain } => { - // current region is full, write region footer and try allocate again - let footer = RegionFooter { - magic: REGION_MAGIC, - padding: remain as u64, - }; - footer.write(slice.as_mut()); - slice.destroy().await; - self.region_manager.allocate(serialized_len).await.unwrap() - } - }; - - write_entry(slice.as_mut(), &key, &value); - - let index = Index { - region: slice.region_id(), - version: slice.version(), - offset: slice.offset() as u32, - len: slice.len() as u32, - key_len: key.serialized_len() as u32, - value_len: value.serialized_len() as u32, - - key: key.clone(), - }; - - slice.destroy().await; - - self.indices.insert(index); - - for listener in self.event_listeners.iter() { - listener.on_insert(&key).await?; - } + let weight = self.serialized_len(&key, &value); + let writer = StoreWriter::new(self, key, weight); + writer.finish(value).await + } - Ok(true) + /// `weight` MUST be equal to `key.serialized_len() + value.serialized_len()` + #[tracing::instrument(skip(self))] + pub fn writer(&self, key: K, weight: usize) -> StoreWriter<'_, K, V, BA, D, EP, EL> { + StoreWriter::new(self, key, weight) } #[tracing::instrument(skip(self))] @@ -462,6 +426,185 @@ where }; Ok(res) } + + async fn judge_inner(&self, writer: &StoreWriter<'_, K, V, BA, D, EP, EL>) -> bool { + for admission in &self.admissions { + if !admission.judge(&writer.key, writer.weight).await { + return false; + } + } + true + } + + async fn apply_writer( + &self, + mut writer: StoreWriter<'_, K, V, BA, D, EP, EL>, + value: V, + ) -> Result { + let now = Instant::now(); + + if !writer.judge().await { + let duration = now.elapsed() + writer.duration; + self.metrics + .latency_insert_rejected + .observe(duration.as_secs_f64()); + return Ok(false); + } + + assert!(writer.admitted.unwrap()); + + let key = &writer.key; + + for admission in &self.admissions { + admission.admit(key, writer.weight).await; + } + + let serialized_len = self.serialized_len(key, &value); + assert_eq!(serialized_len, writer.weight); + + self.metrics.bytes_insert.inc_by(serialized_len as u64); + + let mut slice = match self.region_manager.allocate(serialized_len).await { + crate::region::AllocateResult::Ok(slice) => slice, + crate::region::AllocateResult::Full { mut slice, remain } => { + // current region is full, write region footer and try allocate again + let footer = RegionFooter { + magic: REGION_MAGIC, + padding: remain as u64, + }; + footer.write(slice.as_mut()); + slice.destroy().await; + self.region_manager.allocate(serialized_len).await.unwrap() + } + }; + + write_entry(slice.as_mut(), key, &value); + + let index = Index { + region: slice.region_id(), + version: slice.version(), + offset: slice.offset() as u32, + len: slice.len() as u32, + key_len: key.serialized_len() as u32, + value_len: value.serialized_len() as u32, + + key: key.clone(), + }; + + slice.destroy().await; + + self.indices.insert(index); + + for listener in self.event_listeners.iter() { + listener.on_insert(key).await?; + } + + let duration = now.elapsed() + writer.duration; + self.metrics + .latency_insert_admitted + .observe(duration.as_secs_f64()); + + Ok(true) + } +} + +pub struct StoreWriter<'a, K, V, BA, D, EP, EL> +where + K: Key, + V: Value, + BA: BufferAllocator, + D: Device, + EP: EvictionPolicy, Link = EL>, + EL: Link, +{ + store: &'a Store, + key: K, + weight: usize, + + admitted: Option, + + /// judge duration + duration: Duration, + + applied: bool, +} + +impl<'a, K, V, BA, D, EP, EL> StoreWriter<'a, K, V, BA, D, EP, EL> +where + K: Key, + V: Value, + BA: BufferAllocator, + D: Device, + EP: EvictionPolicy, Link = EL>, + EL: Link, +{ + fn new(store: &'a Store, key: K, weight: usize) -> Self { + Self { + store, + key, + weight, + admitted: None, + duration: Duration::from_nanos(0), + applied: false, + } + } + + /// Judge if the entry can be admitted by configured admission policies. + pub async fn judge(&mut self) -> bool { + let now = Instant::now(); + if let Some(admitted) = self.admitted { + self.duration += now.elapsed(); + return admitted; + } + let admitted = self.store.judge_inner(self).await; + self.admitted = Some(admitted); + self.duration += now.elapsed(); + admitted + } + + pub async fn finish(mut self, value: V) -> Result { + self.applied = true; + self.store.apply_writer(self, value).await + } +} + +impl<'a, K, V, BA, D, EP, EL> Debug for StoreWriter<'a, K, V, BA, D, EP, EL> +where + K: Key, + V: Value, + BA: BufferAllocator, + D: Device, + EP: EvictionPolicy, Link = EL>, + EL: Link, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StoreWriter") + .field("key", &self.key) + .field("weight", &self.weight) + .field("admitted", &self.admitted) + .field("duration", &self.duration) + .field("applied", &self.applied) + .finish() + } +} + +impl<'a, K, V, BA, D, EP, EL> Drop for StoreWriter<'a, K, V, BA, D, EP, EL> +where + K: Key, + V: Value, + BA: BufferAllocator, + D: Device, + EP: EvictionPolicy, Link = EL>, + EL: Link, +{ + fn drop(&mut self) { + if !self.applied { + self.store + .metrics + .latency_insert_rejected + .observe(self.duration.as_secs_f64()); + } + } } #[derive(Debug)]