Skip to content

Commit

Permalink
feat: introduce tombstone log on direct fs device (#459)
Browse files Browse the repository at this point in the history
* feat: introduce tombstone log on direct fs device

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: pass builds

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: fix tombstone log bug and add unit tests

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* fix: fix tombstone log sync

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

---------

Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed May 9, 2024
1 parent c690ec9 commit 3521c65
Show file tree
Hide file tree
Showing 7 changed files with 432 additions and 55 deletions.
2 changes: 2 additions & 0 deletions foyer-storage/Cargo.toml
Expand Up @@ -15,6 +15,8 @@ ahash = "0.8"
# TODO(MrCroxx): Remove this after `allocator_api` is stable.
allocator-api2 = "0.2"
anyhow = "1.0"
# TODO(MrCroxx): use `array_chunks` after `#![feature(array_chunks)]` is stable.
array-util = "1"
async-channel = "2"
bincode = "1"
bitflags = "2.3.1"
Expand Down
137 changes: 89 additions & 48 deletions foyer-storage/src/large/device/direct_file.rs
Expand Up @@ -25,7 +25,7 @@ use std::{
sync::Arc,
};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DirectFileDeviceConfig {
pub path: PathBuf,
pub capacity: usize,
Expand Down Expand Up @@ -68,6 +68,79 @@ impl DirectFileDevice {
pub const O_DIRECT: i32 = 0x4000;
}

impl DirectFileDevice {
pub async fn pwrite(&self, mut buf: IoBuffer, offset: u64) -> Result<()> {
bits::assert_aligned(self.align() as u64, offset);

let aligned = bits::align_up(self.align(), buf.len());
buf.reserve(aligned - buf.len());
unsafe { buf.set_len(aligned) };

assert!(
offset as usize + aligned <= self.capacity(),
"offset ({offset}) + aligned ({aligned}) = total ({total}) <= capacity ({capacity})",
total = offset as usize + aligned,
capacity = self.capacity,
);

let file = self.file.clone();
asyncify(move || {
#[cfg(target_family = "unix")]
use std::os::unix::fs::FileExt;

#[cfg(target_family = "windows")]
use std::os::windows::fs::FileExt;

let written = file.write_at(buf.as_ref(), offset)?;
if written != aligned {
return Err(anyhow::anyhow!("written {written}, expected: {aligned}").into());
}

Ok(())
})
.await
}

pub async fn pread(&self, offset: u64, len: usize) -> Result<IoBuffer> {
bits::assert_aligned(self.align() as u64, offset);

let aligned = bits::align_up(self.align(), len);

assert!(
offset as usize + aligned <= self.capacity(),
"offset ({offset}) + aligned ({aligned}) = total ({total}) <= capacity ({capacity})",
total = offset as usize + aligned,
capacity = self.capacity,
);

let mut buf = IoBuffer::with_capacity_in(aligned, &IO_BUFFER_ALLOCATOR);
unsafe {
buf.set_len(aligned);
}

let file = self.file.clone();
let mut buffer = asyncify(move || {
#[cfg(target_family = "unix")]
use std::os::unix::fs::FileExt;

#[cfg(target_family = "windows")]
use std::os::windows::fs::FileExt;

let read = file.read_at(buf.as_mut(), offset)?;
if read != aligned {
return Err(anyhow::anyhow!("read {read}, expected: {aligned}").into());
}

Ok::<_, Error>(buf)
})
.await?;

buffer.truncate(len);

Ok(buffer)
}
}

impl Device for DirectFileDevice {
type Config = DirectFileDeviceConfig;

Expand All @@ -79,7 +152,7 @@ impl Device for DirectFileDevice {
self.region_size
}

async fn open(config: &Self::Config) -> Result<Self> {
async fn open(config: Self::Config) -> Result<Self> {
config.verify()?;

let dir = config.path.parent().expect("path must point to a file").to_path_buf();
Expand Down Expand Up @@ -120,56 +193,24 @@ impl Device for DirectFileDevice {
region_size = self.region_size(),
);

let region_size = self.region_size();
let file = self.file.clone();
asyncify(move || {
#[cfg(target_family = "unix")]
use std::os::unix::fs::FileExt;

#[cfg(target_family = "windows")]
use std::os::windows::fs::FileExt;

let written = file.write_at(buf.as_ref(), offset + (region as u64 * region_size as u64))?;
if written != aligned {
return Err(anyhow::anyhow!("written {written}, expected: {aligned}").into());
}

Ok(())
})
.await
let poffset = offset + region as u64 * self.region_size as u64;
self.pwrite(buf, poffset).await
}

async fn read(&self, region: RegionId, offset: u64, len: usize) -> Result<IoBuffer> {
bits::assert_aligned(self.align() as u64, offset);

let aligned = bits::align_up(self.align(), len);

let mut buf = IoBuffer::with_capacity_in(aligned, &IO_BUFFER_ALLOCATOR);
unsafe {
buf.set_len(aligned);
}

let region_size = self.region_size();
let file = self.file.clone();
let mut buffer = asyncify(move || {
#[cfg(target_family = "unix")]
use std::os::unix::fs::FileExt;

#[cfg(target_family = "windows")]
use std::os::windows::fs::FileExt;

let read = file.read_at(buf.as_mut(), offset + (region as u64 * region_size as u64))?;
if read != aligned {
return Err(anyhow::anyhow!("read {read}, expected: {aligned}").into());
}

Ok::<_, Error>(buf)
})
.await?;

buffer.truncate(len);
assert!(
offset as usize + aligned <= self.region_size(),
"offset ({offset}) + aligned ({aligned}) = total ({total}) <= region size ({region_size})",
total = offset as usize + aligned,
region_size = self.region_size(),
);

Ok(buffer)
let poffset = offset + region as u64 * self.region_size as u64;
self.pread(poffset, len).await
}

async fn flush(&self, _: Option<RegionId>) -> Result<()> {
Expand All @@ -194,9 +235,9 @@ impl DirectFileDeviceConfigBuilder {
const DEFAULT_FILE_SIZE: usize = 64 * 1024 * 1024;

/// Use the given file path as the direct file device path.
pub fn new(dir: impl AsRef<Path>) -> Self {
pub fn new(path: impl AsRef<Path>) -> Self {
Self {
path: dir.as_ref().into(),
path: path.as_ref().into(),
capacity: None,
region_size: None,
}
Expand Down Expand Up @@ -289,7 +330,7 @@ mod tests {

tracing::debug!("{config:?}");

let device = DirectFileDevice::open(&config).await.unwrap();
let device = DirectFileDevice::open(config.clone()).await.unwrap();

let mut buf = IoBuffer::with_capacity_in(64 * 1024, &IO_BUFFER_ALLOCATOR);
buf.extend(repeat_n(b'x', 64 * 1024 - 100));
Expand All @@ -303,7 +344,7 @@ mod tests {

drop(device);

let device = DirectFileDevice::open(&config).await.unwrap();
let device = DirectFileDevice::open(config).await.unwrap();

let b = device.read(0, 4096, 64 * 1024 - 100).await.unwrap();
assert_eq!(buf, b);
Expand Down
15 changes: 11 additions & 4 deletions foyer-storage/src/large/device/direct_fs.rs
Expand Up @@ -28,7 +28,7 @@ use crate::{
large::device::{IoBuffer, ALIGN, IO_BUFFER_ALLOCATOR},
};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DirectFsDeviceConfig {
pub dir: PathBuf,
pub capacity: usize,
Expand Down Expand Up @@ -97,7 +97,7 @@ impl Device for DirectFsDevice {
self.inner.file_size
}

async fn open(config: &Self::Config) -> Result<Self> {
async fn open(config: Self::Config) -> Result<Self> {
config.verify()?;

// TODO(MrCroxx): write and read config to a manifest file for pinning
Expand Down Expand Up @@ -177,6 +177,13 @@ impl Device for DirectFsDevice {

let aligned = bits::align_up(self.align(), len);

assert!(
offset as usize + aligned <= self.region_size(),
"offset ({offset}) + aligned ({aligned}) = total ({total}) <= region size ({region_size})",
total = offset as usize + aligned,
region_size = self.region_size(),
);

let mut buf = IoBuffer::with_capacity_in(aligned, &IO_BUFFER_ALLOCATOR);
unsafe {
buf.set_len(aligned);
Expand Down Expand Up @@ -330,7 +337,7 @@ mod tests {

tracing::debug!("{config:?}");

let device = DirectFsDevice::open(&config).await.unwrap();
let device = DirectFsDevice::open(config.clone()).await.unwrap();

let mut buf = IoBuffer::with_capacity_in(64 * 1024, &IO_BUFFER_ALLOCATOR);
buf.extend(repeat_n(b'x', 64 * 1024 - 100));
Expand All @@ -344,7 +351,7 @@ mod tests {

drop(device);

let device = DirectFsDevice::open(&config).await.unwrap();
let device = DirectFsDevice::open(config).await.unwrap();

let b = device.read(0, 4096, 64 * 1024 - 100).await.unwrap();
assert_eq!(buf, b);
Expand Down
7 changes: 5 additions & 2 deletions foyer-storage/src/large/device/mod.rs
Expand Up @@ -35,10 +35,13 @@ pub type RegionId = u32;

pub type IoBuffer = allocator_api2::vec::Vec<u8, &'static AlignedAllocator<ALIGN>>;

pub trait DeviceConfig: Send + Sync + 'static + Debug {
pub trait DeviceConfig: Send + Sync + 'static + Debug + Clone {
fn verify(&self) -> Result<()>;
}

/// [`Device`] represents 4K aligned block device.
///
/// Both i/o block and i/o buffer must be aligned to 4K.
pub trait Device: Send + Sync + 'static + Sized + Clone {
type Config: DeviceConfig;

Expand All @@ -49,7 +52,7 @@ pub trait Device: Send + Sync + 'static + Sized + Clone {
fn region_size(&self) -> usize;

#[must_use]
fn open(config: &Self::Config) -> impl Future<Output = Result<Self>> + Send;
fn open(config: Self::Config) -> impl Future<Output = Result<Self>> + Send;

#[must_use]
fn write(&self, buf: IoBuffer, region: RegionId, offset: u64) -> impl Future<Output = Result<()>> + Send;
Expand Down
2 changes: 1 addition & 1 deletion foyer-storage/src/large/generic.rs
Expand Up @@ -150,7 +150,7 @@ where
D: Device,
{
async fn open(mut config: GenericStoreConfig<K, V, S, D>) -> Result<Self> {
let device = D::open(&config.device_config).await?;
let device = D::open(config.device_config.clone()).await?;

let indexer = Indexer::new(device.regions(), config.indexer_shards);
let eviction_pickers = std::mem::take(&mut config.eviction_pickers);
Expand Down
1 change: 1 addition & 0 deletions foyer-storage/src/large/mod.rs
Expand Up @@ -22,6 +22,7 @@ pub mod reclaimer;
pub mod recover;
pub mod region;
pub mod storage;
pub mod tombstone;

#[cfg(test)]
pub mod test_utils;

0 comments on commit 3521c65

Please sign in to comment.