Skip to content

Commit

Permalink
feat: impl poll mode flushers and reclaimers instead of push mode (#90)
Browse files Browse the repository at this point in the history
* impl poll mode flusher

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* impl poll mode reclaimer

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* clean code

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* clean code

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

---------

Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed Jul 28, 2023
1 parent 02b7761 commit 0b7a363
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 360 deletions.
29 changes: 21 additions & 8 deletions foyer-common/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use parking_lot::RwLock;
use parking_lot::Mutex;
use std::{collections::VecDeque, fmt::Debug};
use tokio::sync::Notify;
use tokio::sync::{watch, Notify};

#[derive(Debug)]
pub struct AsyncQueue<T> {
queue: RwLock<VecDeque<T>>,
queue: Mutex<VecDeque<T>>,
notified: Notify,
watch_tx: watch::Sender<usize>,
watch_rx: watch::Receiver<usize>,
}

impl<T: Debug> Default for AsyncQueue<T> {
Expand All @@ -30,21 +32,25 @@ impl<T: Debug> Default for AsyncQueue<T> {

impl<T: Debug> AsyncQueue<T> {
pub fn new() -> Self {
let (watch_tx, watch_rx) = watch::channel(0);
Self {
queue: RwLock::new(VecDeque::default()),
queue: Mutex::new(VecDeque::default()),
notified: Notify::new(),
watch_tx,
watch_rx,
}
}

pub fn try_acquire(&self) -> Option<T> {
let mut guard = self.queue.write();
let mut guard = self.queue.lock();
if let Some(item) = guard.pop_front() {
if !guard.is_empty() {
// Since in `release` we use `notify_one`, not all waiters
// will be waken up. Therefore if we figure out that the queue is not empty,
// we call `notify_one` to awake the next pending `acquire`.
self.notified.notify_one();
}
self.watch_tx.send(guard.len()).unwrap();
Some(item)
} else {
None
Expand All @@ -55,14 +61,15 @@ impl<T: Debug> AsyncQueue<T> {
loop {
let notified = self.notified.notified();
{
let mut guard = self.queue.write();
let mut guard = self.queue.lock();
if let Some(item) = guard.pop_front() {
if !guard.is_empty() {
// Since in `release` we use `notify_one`, not all waiters
// will be waken up. Therefore if we figure out that the queue is not empty,
// we call `notify_one` to awake the next pending `acquire`.
self.notified.notify_one();
}
self.watch_tx.send(guard.len()).unwrap();
break item;
}
}
Expand All @@ -71,17 +78,23 @@ impl<T: Debug> AsyncQueue<T> {
}

pub fn release(&self, item: T) {
self.queue.write().push_back(item);
let mut guard = self.queue.lock();
guard.push_back(item);
self.watch_tx.send(guard.len()).unwrap();
self.notified.notify_one();
}

pub fn len(&self) -> usize {
self.queue.read().len()
*self.watch_rx.borrow()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn watch(&self) -> watch::Receiver<usize> {
self.watch_rx.clone()
}
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions foyer-common/src/rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ use std::time::{Duration, Instant};

use parking_lot::Mutex;

#[derive(Debug)]
pub struct RateLimiter {
inner: Mutex<Inner>,
rate: f64,
}

#[derive(Debug)]
struct Inner {
quota: f64,

Expand Down
11 changes: 11 additions & 0 deletions foyer-storage-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ pub struct Args {
/// (MiB/s)
#[arg(long, default_value_t = 0)]
reclaim_rate_limit: usize,

/// `0` means equal to reclaimer count.
#[arg(long, default_value_t = 0)]
clean_region_threshold: usize,
}

impl Args {
Expand Down Expand Up @@ -243,6 +247,12 @@ async fn main() {
reinsertions.push(Arc::new(rr));
}

let clean_region_threshold = if args.clean_region_threshold == 0 {
args.reclaimers
} else {
args.clean_region_threshold
};

let config = StoreConfig {
eviction_config,
device_config,
Expand All @@ -256,6 +266,7 @@ async fn main() {
recover_concurrency: args.recover_concurrency,
event_listeners: vec![],
prometheus_config: PrometheusConfig::default(),
clean_region_threshold,
};

println!("{:#?}", config);
Expand Down
4 changes: 3 additions & 1 deletion foyer-storage/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;

use async_trait::async_trait;
use foyer_common::code::{Key, Value};

use crate::error::Result;

#[allow(unused_variables)]
#[async_trait]
pub trait EventListener: Send + Sync + 'static {
pub trait EventListener: Send + Sync + 'static + Debug {
type K: Key;
type V: Value;

Expand Down
149 changes: 36 additions & 113 deletions foyer-storage/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,139 +14,62 @@

use std::sync::Arc;

use foyer_common::{queue::AsyncQueue, rate::RateLimiter};
use foyer_common::rate::RateLimiter;
use foyer_intrusive::{core::adapter::Link, eviction::EvictionPolicy};
use itertools::Itertools;
use tokio::{
sync::{broadcast, mpsc, Mutex},
task::JoinHandle,
};

use tokio::sync::broadcast;

use crate::{
device::{BufferAllocator, Device},
error::{Error, Result},
device::Device,
error::Result,
metrics::Metrics,
region::RegionId,
region_manager::{RegionEpItemAdapter, RegionManager},
slice::Slice,
};

#[derive(Debug)]
pub struct FlushTask {
pub region_id: RegionId,
}

struct FlusherInner {
sequence: usize,

task_txs: Vec<mpsc::UnboundedSender<FlushTask>>,
}

pub struct Flusher {
runners: usize,

inner: Mutex<FlusherInner>,
}

impl Flusher {
pub fn new(runners: usize) -> Self {
let inner = FlusherInner {
sequence: 0,
task_txs: Vec::with_capacity(runners),
};
Self {
runners,
inner: Mutex::new(inner),
}
}

pub async fn run<D, E, EL>(
&self,
buffers: Arc<AsyncQueue<Vec<u8, D::IoBufferAllocator>>>,
region_manager: Arc<RegionManager<D, E, EL>>,
rate_limiter: Option<Arc<RateLimiter>>,
stop_rxs: Vec<broadcast::Receiver<()>>,
metrics: Arc<Metrics>,
) -> Vec<JoinHandle<()>>
where
D: Device,
E: EvictionPolicy<Adapter = RegionEpItemAdapter<EL>>,
EL: Link,
{
let mut inner = self.inner.lock().await;

#[allow(clippy::type_complexity)]
let (mut txs, rxs): (
Vec<mpsc::UnboundedSender<FlushTask>>,
Vec<mpsc::UnboundedReceiver<FlushTask>>,
) = (0..self.runners).map(|_| mpsc::unbounded_channel()).unzip();
inner.task_txs.append(&mut txs);

let runners = rxs
.into_iter()
.zip_eq(stop_rxs.into_iter())
.map(|(task_rx, stop_rx)| Runner {
task_rx,
buffers: buffers.clone(),
region_manager: region_manager.clone(),
rate_limiter: rate_limiter.clone(),
stop_rx,
metrics: metrics.clone(),
})
.collect_vec();

let mut handles = vec![];
for runner in runners {
let handle = tokio::spawn(async move {
runner.run().await.unwrap();
});
handles.push(handle);
}
handles
}

pub fn runners(&self) -> usize {
self.runners
}

pub async fn submit(&self, task: FlushTask) -> Result<()> {
let mut inner = self.inner.lock().await;
let submittee = inner.sequence % inner.task_txs.len();
inner.sequence += 1;
inner.task_txs[submittee].send(task).map_err(Error::other)
}
}

struct Runner<D, E, EL>
pub struct Flusher<D, EP, EL>
where
D: Device,
E: EvictionPolicy<Adapter = RegionEpItemAdapter<EL>>,
EP: EvictionPolicy<Adapter = RegionEpItemAdapter<EL>>,
EL: Link,
{
task_rx: mpsc::UnboundedReceiver<FlushTask>,
buffers: Arc<AsyncQueue<Vec<u8, D::IoBufferAllocator>>>,

region_manager: Arc<RegionManager<D, E, EL>>,
region_manager: Arc<RegionManager<D, EP, EL>>,

rate_limiter: Option<Arc<RateLimiter>>,

stop_rx: broadcast::Receiver<()>,

metrics: Arc<Metrics>,

stop_rx: broadcast::Receiver<()>,
}

impl<D, E, EL> Runner<D, E, EL>
impl<D, EP, EL> Flusher<D, EP, EL>
where
D: Device,
E: EvictionPolicy<Adapter = RegionEpItemAdapter<EL>>,
EP: EvictionPolicy<Adapter = RegionEpItemAdapter<EL>>,
EL: Link,
{
async fn run(mut self) -> Result<()> {
pub fn new(
region_manager: Arc<RegionManager<D, EP, EL>>,
rate_limiter: Option<Arc<RateLimiter>>,
metrics: Arc<Metrics>,
stop_rx: broadcast::Receiver<()>,
) -> Self {
Self {
region_manager,
rate_limiter,
metrics,
stop_rx,
}
}

pub async fn run(mut self) -> Result<()> {
loop {
tokio::select! {
biased;
Some(task) = self.task_rx.recv() => {
self.handle(task).await?;
region_id = self.region_manager.dirty_regions().acquire() => {
self.handle(region_id).await?;
}
_ = self.stop_rx.recv() => {
tracing::info!("[flusher] exit");
Expand All @@ -156,10 +79,10 @@ where
}
}

async fn handle(&self, task: FlushTask) -> Result<()> {
tracing::info!("[flusher] receive flush task, region: {}", task.region_id);
async fn handle(&self, region_id: RegionId) -> Result<()> {
tracing::info!("[flusher] receive flush task, region: {}", region_id);

let region = self.region_manager.region(&task.region_id);
let region = self.region_manager.region(&region_id);

tracing::trace!("[flusher] step 1");

Expand All @@ -171,7 +94,7 @@ where
let _ = region.exclusive(false, true, false).await;
}

tracing::trace!("[flusher] write region {} back to device", task.region_id);
tracing::trace!("[flusher] write region {} back to device", region_id);

let mut offset = 0;
let len = region.device().io_size();
Expand Down Expand Up @@ -202,10 +125,10 @@ where
tracing::trace!("[flusher] step 3");

// step 3: release buffer
self.buffers.release(buffer);
self.region_manager.set_region_evictable(&region.id()).await;
self.region_manager.buffers().release(buffer);
self.region_manager.eviction_push(region.id());

tracing::info!("[flusher] finish flush task, region: {}", task.region_id);
tracing::info!("[flusher] finish flush task, region: {}", region_id);

self.metrics
.bytes_flush
Expand Down
Loading

0 comments on commit 0b7a363

Please sign in to comment.