Skip to content

Commit

Permalink
feat: refactor admission policy interface, fix rated random with mult…
Browse files Browse the repository at this point in the history
…iple policies (#77)

* feat: refactor admission interfaces

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix drop

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix writer inserted or dropped

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

---------

Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed Jul 20, 2023
1 parent f7953be commit 9f523b3
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 103 deletions.
2 changes: 1 addition & 1 deletion foyer-storage-bench/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::path::{Path, PathBuf};
use itertools::Itertools;
use nix::{fcntl::readlink, sys::stat::stat};

#[allow(unused_variables)]
#[allow(dead_code)]
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum FsType {
Xfs,
Expand Down
1 change: 1 addition & 0 deletions foyer-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license = "Apache-2.0"
async-channel = "1.8"
async-trait = "0.1"
bitflags = "2.3.1"
bitmaps = "3.2"
bytes = "1"
cmsketch = "0.1"
foyer-common = { path = "../foyer-common" }
Expand Down
15 changes: 10 additions & 5 deletions foyer-storage/src/admission/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use bitmaps::Bitmap;
use foyer_common::code::{Key, Value};

use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};

use crate::metrics::Metrics;

pub type Judges = Bitmap<64>;

#[allow(unused_variables)]
#[async_trait]
pub trait AdmissionPolicy: Send + Sync + 'static + Debug {
type Key: Key;
type Value: Value;

async fn judge(&self, key: &Self::Key, weight: usize) -> bool;
fn judge(&self, key: &Self::Key, weight: usize, metrics: &Arc<Metrics>) -> bool;

fn on_insert(&self, key: &Self::Key, weight: usize, metrics: &Arc<Metrics>, judge: bool);

async fn admit(&self, key: &Self::Key, weight: usize);
fn on_drop(&self, key: &Self::Key, weight: usize, metrics: &Arc<Metrics>, judge: bool);
}

pub mod rated_random;
204 changes: 144 additions & 60 deletions foyer-storage/src/admission/rated_random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,49 @@
use std::{
fmt::Debug,
marker::PhantomData,
sync::atomic::{AtomicUsize, Ordering},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};

use async_trait::async_trait;
use foyer_common::code::{Key, Value};
use parking_lot::{Mutex, MutexGuard};
use rand::{thread_rng, Rng};

use crate::metrics::Metrics;

use super::AdmissionPolicy;

const PRECISION: usize = 100000;

/// p : admit probability among △t
/// w(t) : weight at time t
/// w_t : entry weight at time t
/// r : actual admitted rate
/// E(w) : expected admitted weight
/// E(r) : expceted admitted rate
///
/// E(w(t)) = p * w(t)
/// r = sum(△t){ admitted w(t) } / △t
/// E(r) = sum(△t){ E(w(t)) } / △t
/// = sum(△t){ p * w(t) } / △t
/// = p / △t * sum(△t){ w(t) }
/// p = E(r) * △t / sum(△t){ w(t) }
/// E(w_t) = {
/// p * w ( if judge && insert )
/// p * w ( if !judge && !insert )
/// w ( if !judge && insert )
/// 0 ( if judge && !insert )
/// }
///
/// E(r) = sum_{△t}{E(w_t)} / △t
/// => E(r) * △t = sum_{△t}{ E(w_t) }
/// => E(r) * △t = sum_{△t}^{(judge && insert) || (!judge && !insert)}{ p * w_t }
/// + sum_{△t}^{(!judge && insert)}{ w_t }
/// + sum_{△t}^{(judge && !insert){ 0 }
/// => E(r) * △t) - sum_{△t}^{(!judge && insert)}{ w_t } = p * sum_{△t}^{(judge && insert) || (!judge && !insert)}{ w_t }
/// => p = (E(r) * △t) - sum_{△t}^{(!judge && insert)}{ w_t }) / (sum_{△t}^{(judge && insert) || (!judge && !insert)}{ w_t })
///
/// p = (E(r) * △t) - sum_{△t}^{(!judge && insert)}{ w_t }) / (sum_{△t}^{(judge && insert) || (!judge && !insert)}{ w_t })
/// ↑ rate ↑ △force_insert_bytes ↑ △obey_bytes
///
/// p = ( rate * △t - △force_insert_bytes ) / △obey_bytes
#[derive(Debug)]
pub struct RatedRandom<K, V>
where
K: Key,
Expand All @@ -48,34 +66,21 @@ where
rate: usize,
update_interval: Duration,

bytes: AtomicUsize,
obey_bytes: AtomicUsize,
force_inser_bytes: AtomicUsize,

probability: AtomicUsize,

inner: Mutex<Inner<K, V>>,
}
inner: Mutex<Inner>,

impl<K, V> Debug for RatedRandom<K, V>
where
K: Key,
V: Value,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DynamicRandom")
.field("rate", &self.rate)
.field("probability", &self.probability.load(Ordering::Relaxed))
.finish()
}
_marker: PhantomData<(K, V)>,
}

struct Inner<K, V>
where
K: Key,
V: Value,
{
#[derive(Debug)]
struct Inner {
last_update_time: Option<Instant>,
last_bytes: usize,

_marker: PhantomData<(K, V)>,
last_obey_bytes: usize,
last_force_insert_bytes: usize,
}

impl<K, V> RatedRandom<K, V>
Expand All @@ -87,29 +92,48 @@ where
Self {
rate,
update_interval,
bytes: AtomicUsize::new(0),

obey_bytes: AtomicUsize::new(0),
force_inser_bytes: AtomicUsize::new(0),

probability: AtomicUsize::new(0),
inner: Mutex::new(Inner {
last_update_time: None,
last_bytes: 0,
_marker: PhantomData,
last_obey_bytes: 0,
last_force_insert_bytes: 0,
}),
_marker: PhantomData,
}
}

fn judge(&self, _key: &K, weight: usize) -> bool {
fn judge(&self, _key: &K, _weight: usize, _metrics: &Arc<Metrics>) -> bool {
if let Some(inner) = self.inner.try_lock() {
self.update(inner);
}

self.bytes.fetch_add(weight, Ordering::Relaxed);

thread_rng().gen_range(0..PRECISION) < self.probability.load(Ordering::Relaxed)
}

fn admit(&self, _key: &K, _weight: usize) {}
fn on_insert(&self, _key: &K, weight: usize, _metrics: &Arc<Metrics>, judge: bool) {
if judge {
// obey
self.obey_bytes.fetch_add(weight, Ordering::Relaxed);
} else {
// force insert
self.force_inser_bytes.fetch_add(weight, Ordering::Relaxed);
}
}

fn on_drop(&self, _key: &K, weight: usize, _metrics: &Arc<Metrics>, judge: bool) {
if !judge {
// obey
self.obey_bytes.fetch_add(weight, Ordering::Relaxed);
}
}

fn update(&self, mut inner: MutexGuard<'_, Inner>) {
// p = ( rate * △t - △force_insert_bytes ) / △obey_bytes

fn update(&self, mut inner: MutexGuard<'_, Inner<K, V>>) {
let now = Instant::now();

let elapsed = match inner.last_update_time {
Expand All @@ -121,19 +145,35 @@ where
return;
}

let bytes = self.bytes.load(Ordering::Relaxed);
let bytes_delta = std::cmp::max(bytes - inner.last_bytes, 1);
let now_obey_bytes = self.obey_bytes.load(Ordering::Relaxed);
let now_force_insert_bytes = self.force_inser_bytes.load(Ordering::Relaxed);

let delta_obey_bytes = now_obey_bytes - inner.last_obey_bytes;
let delta_force_insert_bytes = now_force_insert_bytes - inner.last_force_insert_bytes;

inner.last_update_time = Some(now);
inner.last_obey_bytes = now_obey_bytes;
inner.last_force_insert_bytes = now_force_insert_bytes;

let p = if delta_obey_bytes == 0 {
1.0
} else {
let numerator =
self.rate as f64 * elapsed.as_secs_f64() - delta_force_insert_bytes as f64;
let numerator = numerator.abs();
let p = numerator / delta_obey_bytes as f64;
p.min(1.0)
};

debug_assert!((0.0..=1.0).contains(&p), "p out of range 0..=1: {}", p);

let p = self.rate as f64 * elapsed.as_secs_f64() / bytes_delta as f64;
let p = (p * PRECISION as f64) as usize;
self.probability.store(p, Ordering::Relaxed);

inner.last_update_time = Some(now);
inner.last_bytes = bytes;
tracing::debug!("probability: {}", p as f64 / PRECISION as f64);
}
}

#[async_trait]
impl<K, V> AdmissionPolicy for RatedRandom<K, V>
where
K: Key,
Expand All @@ -143,54 +183,98 @@ where

type Value = V;

async fn judge(&self, key: &Self::Key, weight: usize) -> bool {
self.judge(key, weight)
fn judge(&self, key: &Self::Key, weight: usize, metrics: &Arc<Metrics>) -> bool {
self.judge(key, weight, metrics)
}

fn on_insert(&self, key: &Self::Key, weight: usize, metrics: &Arc<Metrics>, judge: bool) {
self.on_insert(key, weight, metrics, judge)
}

async fn admit(&self, key: &Self::Key, weight: usize) {
self.admit(key, weight)
fn on_drop(&self, key: &Self::Key, weight: usize, metrics: &Arc<Metrics>, judge: bool) {
self.on_drop(key, weight, metrics, judge)
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use itertools::Itertools;

use super::*;

#[ignore]
#[tokio::test]
async fn test_rated_random() {
const RATE: usize = 1_000_000;
const CASES: usize = 10;
const ERATIO: f64 = 0.1;

let handles = (0..CASES).map(|_| tokio::spawn(case())).collect_vec();
let mut eratios = vec![];
for handle in handles {
let eratio = handle.await.unwrap();
assert!(eratio < ERATIO, "eratio: {} < ERATIO: {}", eratio, ERATIO);
eratios.push(eratio);
}
println!("========== RatedRandom error ratio begin ==========");
for eratio in eratios {
println!("eratio: {eratio}");
}
println!("=========== RatedRandom error ratio end ===========");
}

async fn case() -> f64 {
const RATE: usize = 1_000_000;
const CONCURRENCY: usize = 10;

const P_OTHER: f64 = 0.8;
const P_FORCE: f64 = 0.1;

let metrics = Arc::new(Metrics::default());

let score = Arc::new(AtomicUsize::new(0));

let rr = Arc::new(RatedRandom::<u64, Vec<u8>>::new(
RATE,
Duration::from_millis(100),
));

async fn submit(rr: Arc<RatedRandom<u64, Vec<u8>>>, score: Arc<AtomicUsize>) {
// scope: CONCURRENCY * (1 / interval) * range
// [1_000_000, 10_000_000]
// FORCE: [100_000, 1_000_000]

async fn submit(
rr: Arc<RatedRandom<u64, Vec<u8>>>,
score: Arc<AtomicUsize>,
metrics: Arc<Metrics>,
) {
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
let weight = thread_rng().gen_range(1000..10000);
if rr.judge(&0, weight) {
tokio::time::sleep(Duration::from_millis(1)).await;
let weight = thread_rng().gen_range(100..1000);

let judge = rr.judge(&0, weight, &metrics);
let p_other = thread_rng().gen_range(0.0..=1.0);
let p_force = thread_rng().gen_range(0.0..=1.0);

let insert = (p_force <= P_FORCE) || (p_other <= P_OTHER && judge);

if insert {
score.fetch_add(weight, Ordering::Relaxed);
rr.admit(&0, weight);
rr.on_insert(&0, weight, &metrics, judge);
} else {
rr.on_drop(&0, weight, &metrics, judge);
}
}
}

for _ in 0..10 {
tokio::spawn(submit(rr.clone(), score.clone()));
for _ in 0..CONCURRENCY {
tokio::spawn(submit(rr.clone(), score.clone(), metrics.clone()));
}

tokio::time::sleep(Duration::from_secs(10)).await;
let s = score.load(Ordering::Relaxed);
let error = (s as isize - RATE as isize * 10).unsigned_abs();
let eratio = error as f64 / (RATE as f64 * 10.0);

assert!(eratio < ERATIO, "eratio: {} < ERATIO: {}", eratio, ERATIO);
error as f64 / (RATE as f64 * 10.0)
}
}
2 changes: 1 addition & 1 deletion foyer-storage/src/device/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl FsDeviceConfig {
struct FsDeviceInner {
config: FsDeviceConfig,

#[allow(unused_variables)]
#[allow(dead_code)]
dir: File,

files: Vec<File>,
Expand Down
Loading

0 comments on commit 9f523b3

Please sign in to comment.