Skip to content

Commit

Permalink
refactor: remove in-mem cache listener, advance disk cache insert (#463)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed May 10, 2024
1 parent baebadb commit 03f205c
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 376 deletions.
207 changes: 81 additions & 126 deletions foyer-memory/src/cache.rs

Large diffs are not rendered by default.

110 changes: 40 additions & 70 deletions foyer-memory/src/generic.rs

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion foyer-memory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ mod eviction;
mod generic;
mod handle;
mod indexer;
mod listener;
mod metrics;
mod prelude;

Expand Down
54 changes: 0 additions & 54 deletions foyer-memory/src/listener.rs

This file was deleted.

1 change: 0 additions & 1 deletion foyer-memory/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pub use crate::{
context::CacheContext,
eviction::{fifo::FifoConfig, lfu::LfuConfig, lru::LruConfig, s3fifo::S3FifoConfig},
generic::Weighter,
listener::{CacheEventListener, DefaultCacheEventListener},
metrics::Metrics,
};
pub use ahash::RandomState;
12 changes: 6 additions & 6 deletions foyer-storage/src/large/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::Compression;
use foyer_common::async_batch_pipeline::{AsyncBatchPipeline, LeaderToken};
use foyer_common::bits;
use foyer_common::code::{StorageKey, StorageValue};
use foyer_memory::{CacheEntry, DefaultCacheEventListener};
use foyer_memory::CacheEntry;
use futures::future::{try_join, try_join_all};

use tokio::sync::oneshot;
Expand Down Expand Up @@ -86,7 +86,7 @@ where
indices: Vec<(u64, EntryAddress)>,
txs: Vec<oneshot::Sender<Result<bool>>>,
// hold the entries to avoid memory cache lookup miss?
entries: Vec<CacheEntry<K, V, DefaultCacheEventListener<K, V>, S>>,
entries: Vec<CacheEntry<K, V, S>>,
}

impl<K, V, S, D> Debug for WriteGroup<K, V, S, D>
Expand Down Expand Up @@ -149,17 +149,17 @@ where
V: StorageValue,
S: BuildHasher + Send + Sync + 'static + Debug,
{
CacheEntry(CacheEntry<K, V, DefaultCacheEventListener<K, V>, S>),
CacheEntry(CacheEntry<K, V, S>),
Tombstone(Tombstone),
}

impl<K, V, S> From<CacheEntry<K, V, DefaultCacheEventListener<K, V>, S>> for Submission<K, V, S>
impl<K, V, S> From<CacheEntry<K, V, S>> for Submission<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: BuildHasher + Send + Sync + 'static + Debug,
{
fn from(entry: CacheEntry<K, V, DefaultCacheEventListener<K, V>, S>) -> Self {
fn from(entry: CacheEntry<K, V, S>) -> Self {
Self::CacheEntry(entry)
}
}
Expand Down Expand Up @@ -244,7 +244,7 @@ where
EnqueueFuture::new(rx)
}

fn entry(&self, entry: CacheEntry<K, V, DefaultCacheEventListener<K, V>, S>, sequence: Sequence) -> EnqueueFuture {
fn entry(&self, entry: CacheEntry<K, V, S>, sequence: Sequence) -> EnqueueFuture {
tracing::trace!("[flusher]: submit entry with sequence: {sequence}");

let (tx, rx) = oneshot::channel();
Expand Down
9 changes: 3 additions & 6 deletions foyer-storage/src/large/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
};

use foyer_common::code::{StorageKey, StorageValue};
use foyer_memory::{CacheEntry, DefaultCacheEventListener};
use foyer_memory::CacheEntry;
use futures::future::{join_all, try_join_all};

use tokio::sync::{oneshot, Semaphore};
Expand Down Expand Up @@ -219,7 +219,7 @@ where
Ok(())
}

fn enqueue(&self, entry: CacheEntry<K, V, DefaultCacheEventListener<K, V>, S>) -> EnqueueFuture {
fn enqueue(&self, entry: CacheEntry<K, V, S>) -> EnqueueFuture {
if !self.inner.active.load(Ordering::Relaxed) {
let (tx, rx) = oneshot::channel();
tx.send(Err(anyhow::anyhow!("cannot enqueue new entry after closed").into()))
Expand Down Expand Up @@ -339,10 +339,7 @@ where
self.close().await
}

fn enqueue(
&self,
entry: CacheEntry<Self::Key, Self::Value, DefaultCacheEventListener<Self::Key, Self::Value>, Self::BuildHasher>,
) -> EnqueueFuture {
fn enqueue(&self, entry: CacheEntry<Self::Key, Self::Value, Self::BuildHasher>) -> EnqueueFuture {
self.enqueue(entry)
}

Expand Down
7 changes: 2 additions & 5 deletions foyer-storage/src/large/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
};

use foyer_common::code::{StorageKey, StorageValue};
use foyer_memory::{CacheEntry, DefaultCacheEventListener};
use foyer_memory::CacheEntry;
use pin_project::pin_project;
use tokio::sync::oneshot;

Expand Down Expand Up @@ -63,10 +63,7 @@ pub trait Storage: Send + Sync + 'static + Clone {
#[must_use]
fn close(&self) -> impl Future<Output = Result<()>> + Send;

fn enqueue(
&self,
entry: CacheEntry<Self::Key, Self::Value, DefaultCacheEventListener<Self::Key, Self::Value>, Self::BuildHasher>,
) -> EnqueueFuture;
fn enqueue(&self, entry: CacheEntry<Self::Key, Self::Value, Self::BuildHasher>) -> EnqueueFuture;

#[must_use]
fn lookup<Q>(&self, key: &Q) -> impl Future<Output = Result<Option<Self::Value>>> + Send
Expand Down
Loading

0 comments on commit 03f205c

Please sign in to comment.