Skip to content

Commit

Permalink
feat: introduce reinsertion picker and impl reinsert (#469)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed May 11, 2024
1 parent a317ffc commit bae2863
Show file tree
Hide file tree
Showing 15 changed files with 775 additions and 254 deletions.
2 changes: 1 addition & 1 deletion foyer-memory/src/context.rs
Expand Up @@ -16,7 +16,7 @@
pub enum CacheContext {
/// The default context shared by all eviction container implementations.
Default,
LruPriorityLow,
LowPriority,
}

impl Default for CacheContext {
Expand Down
12 changes: 6 additions & 6 deletions foyer-memory/src/eviction/fifo.rs
Expand Up @@ -27,17 +27,17 @@ use crate::{
};

#[derive(Debug, Clone)]
pub struct FifoContext;
pub struct FifoContext(CacheContext);

impl From<CacheContext> for FifoContext {
fn from(_: CacheContext) -> Self {
Self
fn from(context: CacheContext) -> Self {
Self(context)
}
}

impl From<FifoContext> for CacheContext {
fn from(_: FifoContext) -> Self {
CacheContext::Default
fn from(context: FifoContext) -> Self {
context.0
}
}

Expand Down Expand Up @@ -180,7 +180,7 @@ pub mod tests {

unsafe fn new_test_fifo_handle_ptr(data: u64) -> NonNull<TestFifoHandle> {
let mut handle = Box::<TestFifoHandle>::default();
handle.init(0, data, 1, FifoContext);
handle.init(0, data, 1, FifoContext(CacheContext::Default));
NonNull::new_unchecked(Box::into_raw(handle))
}

Expand Down
12 changes: 6 additions & 6 deletions foyer-memory/src/eviction/lfu.rs
Expand Up @@ -59,17 +59,17 @@ impl Default for LfuConfig {
}

#[derive(Debug, Clone)]
pub struct LfuContext;
pub struct LfuContext(CacheContext);

impl From<CacheContext> for LfuContext {
fn from(_: CacheContext) -> Self {
Self
fn from(context: CacheContext) -> Self {
Self(context)
}
}

impl From<LfuContext> for CacheContext {
fn from(_: LfuContext) -> Self {
CacheContext::Default
fn from(context: LfuContext) -> Self {
context.0
}
}

Expand Down Expand Up @@ -464,7 +464,7 @@ mod tests {
let ptrs = (0..100)
.map(|i| {
let mut handle = Box::<TestLfuHandle>::default();
handle.init(i, i, 1, LfuContext);
handle.init(i, i, 1, LfuContext(CacheContext::Default));
NonNull::new_unchecked(Box::into_raw(handle))
})
.collect_vec();
Expand Down
4 changes: 2 additions & 2 deletions foyer-memory/src/eviction/lru.rs
Expand Up @@ -58,7 +58,7 @@ impl From<CacheContext> for LruContext {
fn from(value: CacheContext) -> Self {
match value {
CacheContext::Default => Self::HighPriority,
CacheContext::LruPriorityLow => Self::LowPriority,
CacheContext::LowPriority => Self::LowPriority,
}
}
}
Expand All @@ -67,7 +67,7 @@ impl From<LruContext> for CacheContext {
fn from(value: LruContext) -> Self {
match value {
LruContext::HighPriority => CacheContext::Default,
LruContext::LowPriority => CacheContext::LruPriorityLow,
LruContext::LowPriority => CacheContext::LowPriority,
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions foyer-memory/src/eviction/s3fifo.rs
Expand Up @@ -33,17 +33,17 @@ use crate::{
const MAX_FREQ: u8 = 3;

#[derive(Debug, Clone)]
pub struct S3FifoContext;
pub struct S3FifoContext(CacheContext);

impl From<CacheContext> for S3FifoContext {
fn from(_: CacheContext) -> Self {
Self
fn from(context: CacheContext) -> Self {
Self(context)
}
}

impl From<S3FifoContext> for CacheContext {
fn from(_: S3FifoContext) -> Self {
CacheContext::Default
fn from(context: S3FifoContext) -> Self {
context.0
}
}

Expand Down Expand Up @@ -428,7 +428,7 @@ mod tests {
let ptrs = (0..100)
.map(|i| {
let mut handle = Box::<TestS3FifoHandle>::default();
handle.init(i, i, 1, S3FifoContext);
handle.init(i, i, 1, S3FifoContext(CacheContext::Default));
NonNull::new_unchecked(Box::into_raw(handle))
})
.collect_vec();
Expand Down
164 changes: 132 additions & 32 deletions foyer-storage/src/large/flusher.rs
Expand Up @@ -31,6 +31,7 @@ use tokio::sync::oneshot;
use super::device::{Device, DeviceExt, IoBuffer, RegionId, IO_BUFFER_ALLOCATOR};
use super::generic::GenericStoreConfig;
use super::indexer::{EntryAddress, Indexer};
use super::reclaimer::Reinsertion;
use super::region::{GetCleanRegionHandle, RegionManager};
use super::storage::EnqueueFuture;
use super::tombstone::{Tombstone, TombstoneLog};
Expand Down Expand Up @@ -151,6 +152,7 @@ where
{
CacheEntry(CacheEntry<K, V, S>),
Tombstone(Tombstone),
Reinsertion(Reinsertion),
}

impl<K, V, S> From<CacheEntry<K, V, S>> for Submission<K, V, S>
Expand All @@ -175,6 +177,17 @@ where
}
}

impl<K, V, S> From<Reinsertion> for Submission<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: BuildHasher + Send + Sync + 'static + Debug,
{
fn from(reinsertion: Reinsertion) -> Self {
Self::Reinsertion(reinsertion)
}
}

#[derive(Debug)]
pub struct Flusher<K, V, S, D>
where
Expand All @@ -194,6 +207,26 @@ where
flush: bool,
}

impl<K, V, S, D> Clone for Flusher<K, V, S, D>
where
K: StorageKey,
V: StorageValue,
S: BuildHasher + Send + Sync + 'static + Debug,
D: Device,
{
fn clone(&self) -> Self {
Self {
batch: self.batch.clone(),
indexer: self.indexer.clone(),
region_manager: self.region_manager.clone(),
device: self.device.clone(),
tombstone_log: self.tombstone_log.clone(),
compression: self.compression,
flush: self.flush,
}
}
}

impl<K, V, S, D> Flusher<K, V, S, D>
where
K: StorageKey,
Expand Down Expand Up @@ -225,6 +258,7 @@ where
match submission.into() {
Submission::CacheEntry(entry) => self.entry(entry, sequence),
Submission::Tombstone(tombstone) => self.tombstone(tombstone, sequence),
Submission::Reinsertion(reinsertion) => self.reinsertion(reinsertion, sequence),
}
}

Expand Down Expand Up @@ -254,24 +288,7 @@ where
return;
}

let hash = entry.hash();

// Attempt to get a clean region for the new group.
if state.groups.is_empty() {
let handle = self.region_manager.get_clean_region();
state.groups.push(WriteGroup {
writer: RegionHandle {
handle,
offset: 0,
size: 0,
is_full: false,
},
buffer: IoBuffer::with_capacity_in(self.device.region_size(), &IO_BUFFER_ALLOCATOR),
indices: vec![],
txs: vec![],
entries: vec![],
})
}
self.may_init_batch_state(state);

// Attempt to pick the latest group to write.
let group = state.groups.last_mut().unwrap();
Expand Down Expand Up @@ -315,30 +332,19 @@ where
bits::debug_assert_aligned(self.device.align(), group.buffer.len());
bits::debug_assert_aligned(self.device.align(), buffer.len());

let handle = self.region_manager.get_clean_region();
state.groups.push(WriteGroup {
writer: RegionHandle {
handle,
offset: 0,
size: 0,
is_full: false,
},
buffer,
indices: vec![],
txs: vec![],
entries: vec![],
});
self.append_groups_with_buffer(state, buffer);
boffset = 0;
}

// Re-reference the latest group in case it may be out-dated.
let group = state.groups.last_mut().unwrap();
group.indices.push((
hash,
entry.hash(),
EntryAddress {
region: RegionId::MAX,
offset: group.writer.size as u32 + boffset as u32,
len: len as _,
sequence,
},
));
group.txs.push(tx);
Expand All @@ -354,6 +360,70 @@ where
EnqueueFuture::new(rx)
}

fn reinsertion(&self, mut reinsertion: Reinsertion, sequence: Sequence) -> EnqueueFuture {
tracing::trace!("[flusher]: submit reinsertion with sequence: {sequence}");

debug_assert_eq!(sequence, 0);

let (tx, rx) = oneshot::channel();
let append = |state: &mut BatchState<K, V, S, D>| {
self.may_init_batch_state(state);

if self.indexer.get(reinsertion.hash).is_none() {
let _ = tx.send(Ok(false));
return;
}

// Attempt to pick the latest group to write.
let group = state.groups.last_mut().unwrap();
bits::debug_assert_aligned(self.device.align(), group.buffer.len());

// Rotate group early for we know the len of the buffer to write.
let aligned = bits::align_up(self.device.align(), reinsertion.buffer.len());
if group.writer.size + group.buffer.len() + aligned > self.device.region_size() {
tracing::trace!("[flusher]: split group at size: {size}, acc buf len: {acc_buf_len}, buf len: {buf_len}, total (if not split): {total}, exceeds region size: {region_size}",
size = group.writer.size,
acc_buf_len = group.buffer.len(),
buf_len = reinsertion.buffer.len(),
total = group.writer.size + group.buffer.len() + reinsertion.buffer.len(),
region_size = self.device.region_size(),
);

group.writer.is_full = true;

bits::debug_assert_aligned(self.device.align(), group.buffer.len());

self.append_groups(state);
}

// Re-reference the latest group in case it may be out-dated.
let group = state.groups.last_mut().unwrap();
let boffset = group.buffer.len();
let len = reinsertion.buffer.len();
group.buffer.reserve(aligned);
group.buffer.append(&mut reinsertion.buffer);
unsafe { group.buffer.set_len(boffset + aligned) };
group.indices.push((
reinsertion.hash,
EntryAddress {
region: RegionId::MAX,
offset: group.writer.size as u32 + boffset as u32,
len: len as _,
sequence: reinsertion.sequence,
},
));
group.txs.push(tx);
group.writer.size += aligned;
};

if let Some(token) = self.batch.accumulate(append) {
tracing::trace!("[flusher]: reinsertion with sequence: {sequence} becomes leader");
self.commit(token);
}

EnqueueFuture::new(rx)
}

fn commit(&self, token: LeaderToken<BatchState<K, V, S, D>, Result<()>>) {
let indexer = self.indexer.clone();
let flush = self.flush;
Expand Down Expand Up @@ -467,4 +537,34 @@ where
}
Ok(())
}

/// Initialize the batch state if needed.
fn may_init_batch_state(&self, state: &mut BatchState<K, V, S, D>) {
if state.groups.is_empty() {
self.append_groups(state);
}
}

fn append_groups(&self, state: &mut BatchState<K, V, S, D>) {
self.append_groups_with_buffer(
state,
IoBuffer::with_capacity_in(self.device.region_size(), &IO_BUFFER_ALLOCATOR),
);
}

fn append_groups_with_buffer(&self, state: &mut BatchState<K, V, S, D>, buffer: IoBuffer) {
let handle = self.region_manager.get_clean_region();
state.groups.push(WriteGroup {
writer: RegionHandle {
handle,
offset: 0,
size: 0,
is_full: false,
},
buffer,
indices: vec![],
txs: vec![],
entries: vec![],
});
}
}

0 comments on commit bae2863

Please sign in to comment.