Skip to content

Commit

Permalink
feat: introduce store writer (#75)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed Jul 17, 2023
1 parent 754a038 commit 6c42c7f
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 85 deletions.
27 changes: 4 additions & 23 deletions foyer-storage/src/admission/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;

use async_trait::async_trait;
use foyer_common::code::{Key, Value};

use std::fmt::Debug;

#[allow(unused_variables)]
#[async_trait]
pub trait AdmissionPolicy: Send + Sync + 'static + Debug {
type Key: Key;
type Value: Value;

fn judge(&self, key: &Self::Key, value: &Self::Value) -> bool;

fn admit(&self, key: &Self::Key, value: &Self::Value) {}
}

#[derive(Debug)]
pub struct AdmitAll<K: Key, V: Value>(PhantomData<(K, V)>);

impl<K: Key, V: Value> Default for AdmitAll<K, V> {
fn default() -> Self {
Self(PhantomData)
}
}

impl<K: Key, V: Value> AdmissionPolicy for AdmitAll<K, V> {
type Key = K;

type Value = V;
async fn judge(&self, key: &Self::Key, weight: usize) -> bool;

fn judge(&self, _key: &Self::Key, _value: &Self::Value) -> bool {
true
}
async fn admit(&self, key: &Self::Key, weight: usize);
}

pub mod rated_random;
25 changes: 16 additions & 9 deletions foyer-storage/src/admission/rated_random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{
time::{Duration, Instant},
};

use async_trait::async_trait;
use foyer_common::code::{Key, Value};
use parking_lot::{Mutex, MutexGuard};
use rand::{thread_rng, Rng};
Expand Down Expand Up @@ -96,18 +97,18 @@ where
}
}

fn judge(&self, key: &K, value: &V) -> bool {
fn judge(&self, _key: &K, weight: usize) -> bool {
if let Some(inner) = self.inner.try_lock() {
self.update(inner);
}

// TODO(MrCroxx): unify weighter?
let weight = key.serialized_len() + value.serialized_len();
self.bytes.fetch_add(weight, Ordering::Relaxed);

thread_rng().gen_range(0..PRECISION) < self.probability.load(Ordering::Relaxed)
}

fn admit(&self, _key: &K, _weight: usize) {}

fn update(&self, mut inner: MutexGuard<'_, Inner<K, V>>) {
let now = Instant::now();

Expand All @@ -132,6 +133,7 @@ where
}
}

#[async_trait]
impl<K, V> AdmissionPolicy for RatedRandom<K, V>
where
K: Key,
Expand All @@ -141,8 +143,12 @@ where

type Value = V;

fn judge(&self, key: &Self::Key, value: &Self::Value) -> bool {
self.judge(key, value)
async fn judge(&self, key: &Self::Key, weight: usize) -> bool {
self.judge(key, weight)
}

async fn admit(&self, key: &Self::Key, weight: usize) {
self.admit(key, weight)
}
}

Expand All @@ -168,9 +174,10 @@ mod tests {
async fn submit(rr: Arc<RatedRandom<u64, Vec<u8>>>, score: Arc<AtomicUsize>) {
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
let size = thread_rng().gen_range(1000..10000);
if rr.judge(&0, &vec![0; size]) {
score.fetch_add(size, Ordering::Relaxed);
let weight = thread_rng().gen_range(1000..10000);
if rr.judge(&0, weight) {
score.fetch_add(weight, Ordering::Relaxed);
rr.admit(&0, weight);
}
}
}
Expand All @@ -184,6 +191,6 @@ mod tests {
let error = (s as isize - RATE as isize * 10).unsigned_abs();
let eratio = error as f64 / (RATE as f64 * 10.0);

assert!(eratio < ERATIO);
assert!(eratio < ERATIO, "eratio: {} < ERATIO: {}", eratio, ERATIO);
}
}
9 changes: 6 additions & 3 deletions foyer-storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use prometheus::{

#[derive(Debug)]
pub struct Metrics {
pub latency_insert: Histogram,
pub latency_insert_admitted: Histogram,
pub latency_insert_rejected: Histogram,
pub latency_lookup_hit: Histogram,
pub latency_lookup_miss: Histogram,
pub latency_remove: Histogram,
Expand Down Expand Up @@ -68,7 +69,8 @@ impl Metrics {
register_int_counter_vec_with_registry!(opts, &["op", "extra"], registry).unwrap()
};

let latency_insert = latency.with_label_values(&["insert", ""]);
let latency_insert_admitted = latency.with_label_values(&["insert", "admitted"]);
let latency_insert_rejected = latency.with_label_values(&["insert", "rejected"]);
let latency_lookup_hit = latency.with_label_values(&["lookup", "hit"]);
let latency_lookup_miss = latency.with_label_values(&["lookup", "miss"]);
let latency_remove = latency.with_label_values(&["remove", ""]);
Expand All @@ -86,7 +88,8 @@ impl Metrics {
};

Self {
latency_insert,
latency_insert_admitted,
latency_insert_rejected,
latency_lookup_hit,
latency_lookup_miss,
latency_remove,
Expand Down

0 comments on commit 6c42c7f

Please sign in to comment.