Skip to content

Commit

Permalink
fix: refactor future interface in insert with (#80)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed Jul 24, 2023
1 parent a8c1c78 commit 21e6ace
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
3 changes: 3 additions & 0 deletions foyer-storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use prometheus::{
#[derive(Debug)]
pub struct Metrics {
pub latency_insert_inserted: Histogram,
pub latency_insert_filtered: Histogram,
pub latency_insert_dropped: Histogram,
pub latency_lookup_hit: Histogram,
pub latency_lookup_miss: Histogram,
Expand Down Expand Up @@ -70,6 +71,7 @@ impl Metrics {
};

let latency_insert_inserted = latency.with_label_values(&["insert", "inserted"]);
let latency_insert_filtered = latency.with_label_values(&["insert", "filtered"]);
let latency_insert_dropped = latency.with_label_values(&["insert", "dropped"]);
let latency_lookup_hit = latency.with_label_values(&["lookup", "hit"]);
let latency_lookup_miss = latency.with_label_values(&["lookup", "miss"]);
Expand All @@ -89,6 +91,7 @@ impl Metrics {

Self {
latency_insert_inserted,
latency_insert_filtered,
latency_insert_dropped,
latency_lookup_hit,
latency_lookup_miss,
Expand Down
39 changes: 26 additions & 13 deletions foyer-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::{
fmt::Debug,
marker::PhantomData,
ops::{BitAnd, BitOr},
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -48,6 +47,8 @@ use std::hash::Hasher;

const REGION_MAGIC: u64 = 0x19970327;

pub trait FetchValueFuture<V> = Future<Output = anyhow::Result<V>> + Send + 'static;

#[derive(Debug, Default)]
pub struct PrometheusConfig {
pub registry: Option<prometheus::Registry>,
Expand Down Expand Up @@ -273,13 +274,13 @@ where
#[tracing::instrument(skip(self, f))]
pub async fn insert_with<F>(&self, key: K, f: F, weight: usize) -> Result<bool>
where
F: Fn(&K) -> anyhow::Result<V>,
F: FnOnce() -> anyhow::Result<V>,
{
let mut writer = self.writer(key, weight);
if !writer.judge() {
return Ok(false);
}
let value = f(&writer.key).map_err(Error::fetch_value)?;
let value = f().map_err(Error::fetch_value)?;
writer.finish(value).await
}

Expand All @@ -290,22 +291,23 @@ where
///
/// `weight` MUST be equal to `key.serialized_len() + value.serialized_len()`
#[tracing::instrument(skip(self, f))]
pub async fn insert_with_future<F>(&self, key: K, f: F, weight: usize) -> Result<bool>
pub async fn insert_with_future<F, FU>(&self, key: K, f: F, weight: usize) -> Result<bool>
where
F: Fn(&K) -> Pin<Box<dyn Future<Output = anyhow::Result<V>>>>,
F: FnOnce() -> FU,
FU: FetchValueFuture<V>,
{
let mut writer = self.writer(key, weight);
if !writer.judge() {
return Ok(false);
}
let value = f(&writer.key).await.map_err(Error::fetch_value)?;
let value = f().await.map_err(Error::fetch_value)?;
writer.finish(value).await
}

#[tracing::instrument(skip(self, f))]
pub async fn insert_if_not_exists_with<F>(&self, key: K, f: F, weight: usize) -> Result<bool>
where
F: Fn(&K) -> anyhow::Result<V>,
F: FnOnce() -> anyhow::Result<V>,
{
if !self.exists(&key)? {
return Ok(false);
Expand All @@ -314,14 +316,15 @@ where
}

#[tracing::instrument(skip(self, f))]
pub async fn insert_if_not_exists_with_future<F>(
pub async fn insert_if_not_exists_with_future<F, FU>(
&self,
key: K,
f: F,
weight: usize,
) -> Result<bool>
where
F: Fn(&K) -> Pin<Box<dyn Future<Output = anyhow::Result<V>>>>,
F: FnOnce() -> FU,
FU: FetchValueFuture<V>,
{
if !self.exists(&key)? {
return Ok(false);
Expand Down Expand Up @@ -522,10 +525,7 @@ where
let now = Instant::now();

if !writer.judge() {
let duration = now.elapsed() + writer.duration;
self.metrics
.latency_insert_dropped
.observe(duration.as_secs_f64());
writer.duration += now.elapsed();
return Ok(false);
}

Expand Down Expand Up @@ -711,11 +711,24 @@ where
.metrics
.latency_insert_dropped
.observe(self.duration.as_secs_f64());
let mut filtered = false;
if let Some(judge) = self.judges.as_ref() {
for (i, admission) in self.store.admissions.iter().enumerate() {
let judge = judge.get(i);
admission.on_drop(&self.key, self.weight, &self.store.metrics, judge);
}
filtered = !self.judge();
}
if filtered {
self.store
.metrics
.latency_insert_filtered
.observe(self.duration.as_secs_f64());
} else {
self.store
.metrics
.latency_insert_dropped
.observe(self.duration.as_secs_f64());
}
}
}
Expand Down

0 comments on commit 21e6ace

Please sign in to comment.