Skip to content

Commit

Permalink
feat: add config to enable/disable direct io (#446)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed Apr 28, 2024
1 parent 7cac4a7 commit 8337a0c
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 54 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -118,6 +118,7 @@ async fn main() -> Result<()> {
.with_file_size(4 * 1024 * 1024)
.with_align(4 * 1024)
.with_io_size(16 * 1024)
.with_direct(true)
.build(),
)
.with_catalog_shards(4)
Expand Down
1 change: 1 addition & 0 deletions examples/hybrid_full.rs
Expand Up @@ -49,6 +49,7 @@ async fn main() -> Result<()> {
.with_file_size(4 * 1024 * 1024)
.with_align(4 * 1024)
.with_io_size(16 * 1024)
.with_direct(true)
.build(),
)
.with_catalog_shards(4)
Expand Down
4 changes: 4 additions & 0 deletions foyer-storage-bench/src/main.rs
Expand Up @@ -101,6 +101,9 @@ pub struct Args {
#[arg(long, default_value_t = 4)]
reclaimers: usize,

#[arg(long, default_value_t = false)]
disable_direct: bool,

#[arg(long, default_value_t = 4096)]
align: usize,

Expand Down Expand Up @@ -369,6 +372,7 @@ async fn main() {
FsDeviceConfigBuilder::new(&args.dir)
.with_capacity(args.capacity * 1024 * 1024)
.with_file_size(args.file_size * 1024 * 1024)
.with_direct(!args.disable_direct)
.with_align(args.align)
.with_io_size(args.io_size)
.build(),
Expand Down
6 changes: 4 additions & 2 deletions foyer-storage/src/buffer.rs
Expand Up @@ -142,13 +142,14 @@ where
self.offset = 0;

// write region header
unsafe { self.buffer.set_len(self.device.align()) };
let len = std::cmp::max(self.device.align(), RegionHeader::serialized_len());
unsafe { self.buffer.set_len(len) };
let header = RegionHeader {
magic: REGION_MAGIC,
version: Version::latest(),
};
header.write(&mut self.buffer[..]);
debug_assert_eq!(self.buffer.len(), self.device.align());
debug_assert_eq!(self.buffer.len(), len);

Ok(entries)
}
Expand Down Expand Up @@ -367,6 +368,7 @@ mod tests {
file_size: 64 * 1024, // 64 KiB
align: 4 * 1024, // 4 KiB
io_size: 16 * 1024, // 16 KiB
direct: true,
})
.await
.unwrap();
Expand Down
34 changes: 32 additions & 2 deletions foyer-storage/src/device/fs.rs
Expand Up @@ -33,6 +33,7 @@ pub struct FsDeviceConfigBuilder {
pub file_size: Option<usize>,
pub align: Option<usize>,
pub io_size: Option<usize>,
pub direct: bool,
}

impl FsDeviceConfigBuilder {
Expand All @@ -48,6 +49,7 @@ impl FsDeviceConfigBuilder {
file_size: None,
align: None,
io_size: None,
direct: true,
}
}

Expand All @@ -71,12 +73,20 @@ impl FsDeviceConfigBuilder {
self
}

pub fn with_direct(mut self, direct: bool) -> Self {
self.direct = direct;
self
}

pub fn build(self) -> FsDeviceConfig {
let align_v = |value: usize, align: usize| value - value % align;

let dir = self.dir;

let align = self.align.unwrap_or(Self::DEFAULT_ALIGN);
let mut align = self.align.unwrap_or(Self::DEFAULT_ALIGN);
if !self.direct {
align = 1;
}

let capacity = self.capacity.unwrap_or({
// Create an empty directory before to get freespace.
Expand All @@ -99,6 +109,7 @@ impl FsDeviceConfigBuilder {
file_size,
align,
io_size,
direct: self.direct,
}
}
}
Expand All @@ -119,6 +130,9 @@ pub struct FsDeviceConfig {

/// recommended optimized io block size
pub io_size: usize,

/// enable direct i/o
pub direct: bool,
}

impl FsDeviceConfig {
Expand Down Expand Up @@ -286,7 +300,9 @@ impl FsDevice {
#[cfg(target_os = "linux")]
{
use std::os::unix::fs::OpenOptionsExt;
opts.custom_flags(libc::O_DIRECT);
if config.direct {
opts.custom_flags(libc::O_DIRECT);
}
}

let file = opts.open(path)?;
Expand Down Expand Up @@ -339,6 +355,7 @@ mod tests {
file_size: FILE_CAPACITY,
align: ALIGN,
io_size: ALIGN,
direct: true,
};
let dev = FsDevice::open(config).await.unwrap();

Expand Down Expand Up @@ -379,4 +396,17 @@ mod tests {

config.assert();
}

#[test]
fn test_config_wo_direct() {
let dir = tempfile::tempdir().unwrap();

let config = FsDeviceConfigBuilder::new(dir.path()).with_direct(false).build();

println!("{config:?}");

assert_eq!(config.align, 1);

config.assert();
}
}
107 changes: 60 additions & 47 deletions foyer-storage/src/generic.rs
Expand Up @@ -334,6 +334,7 @@ where
config.clean_region_threshold,
store.clone(),
region_manager.clone(),
config.flush,
metrics.clone(),
stop_rx,
)
Expand Down Expand Up @@ -920,9 +921,9 @@ where
D: Device,
{
pub async fn open(region: Region<D>) -> Result<Option<Self>> {
let align = region.device().align();
let cursor = std::cmp::max(region.device().align(), RegionHeader::serialized_len());

let slice = match region.load_range(0..align).await? {
let slice = match region.load_range(0..cursor).await? {
Some(slice) => slice,
None => return Ok(None),
};
Expand All @@ -933,20 +934,21 @@ where

Ok(Some(Self {
region,
cursor: align,
cursor,
_marker: PhantomData,
}))
}

pub async fn next(&mut self) -> Result<Option<(Arc<K>, Item<K, V>)>> {
let region_size = self.region.device().region_size();
let align = self.region.device().align();
let step = std::cmp::max(align, EntryHeader::serialized_len());

if self.cursor + align >= region_size {
if self.cursor + step >= region_size {
return Ok(None);
}

let Some(slice) = self.region.load_range(self.cursor..self.cursor + align).await? else {
let Some(slice) = self.region.load_range(self.cursor..self.cursor + step).await? else {
return Ok(None);
};

Expand Down Expand Up @@ -1127,26 +1129,20 @@ where

#[cfg(test)]
mod tests {
use std::{ops::Range, path::PathBuf};
use std::ops::Range;

use foyer_common::{bits::align_up, range::RangeBoundsExt};
use foyer_memory::FifoConfig;

use super::*;
use crate::{
device::fs::{FsDevice, FsDeviceConfig},
storage::StorageExt,
test_utils::JudgeRecorder,
};
use crate::{device::fs::FsDevice, storage::StorageExt, test_utils::JudgeRecorder, FsDeviceConfigBuilder};

type TestStore = GenericStore<u64, Vec<u8>, FsDevice>;
type TestStoreConfig = GenericStoreConfig<u64, Vec<u8>, FsDevice>;

#[tokio::test]
// TODO(MrCroxx): use `expect` after `lint_reasons` is stable.
#[allow(clippy::identity_op)]
async fn test_recovery() {
const KB: usize = 1024;
async fn case_recovery(direct: bool) {
const MB: usize = 1024 * 1024;

let tempdir = tempfile::tempdir().unwrap();
Expand All @@ -1158,13 +1154,11 @@ mod tests {
let config = TestStoreConfig {
name: "".to_string(),
eviction_config: FifoConfig {}.into(),
device_config: FsDeviceConfig {
dir: PathBuf::from(tempdir.path()),
capacity: 16 * MB,
file_size: 4 * MB,
align: 4 * KB,
io_size: 4 * KB,
},
device_config: FsDeviceConfigBuilder::new(tempdir.path())
.with_capacity(16 * MB)
.with_file_size(4 * MB)
.with_direct(direct)
.build(),
catalog_shards: 1,
admissions,
reinsertions,
Expand Down Expand Up @@ -1206,13 +1200,11 @@ mod tests {
let config = TestStoreConfig {
name: "".to_string(),
eviction_config: FifoConfig {}.into(),
device_config: FsDeviceConfig {
dir: PathBuf::from(tempdir.path()),
capacity: 16 * MB,
file_size: 4 * MB,
align: 4096,
io_size: 4096 * KB,
},
device_config: FsDeviceConfigBuilder::new(tempdir.path())
.with_capacity(16 * MB)
.with_file_size(4 * MB)
.with_direct(direct)
.build(),
catalog_shards: 1,
admissions: vec![],
reinsertions: vec![],
Expand Down Expand Up @@ -1241,8 +1233,7 @@ mod tests {

// TODO(MrCroxx): use `expect` after `lint_reasons` is stable.
#[allow(clippy::identity_op)]
async fn case_corrupt(recover_mode: RecoverMode) {
const KB: usize = 1024;
async fn case_corrupt(recover_mode: RecoverMode, direct: bool) {
const MB: usize = 1024 * 1024;
const RANGE: Range<u64> = 0..21;

Expand All @@ -1255,13 +1246,11 @@ mod tests {
let config = TestStoreConfig {
name: "".to_string(),
eviction_config: FifoConfig {}.into(),
device_config: FsDeviceConfig {
dir: PathBuf::from(tempdir.path()),
capacity: 16 * MB,
file_size: 4 * MB,
align: 4 * KB,
io_size: 4 * KB,
},
device_config: FsDeviceConfigBuilder::new(tempdir.path())
.with_capacity(16 * MB)
.with_file_size(4 * MB)
.with_direct(direct)
.build(),
catalog_shards: 1,
admissions,
reinsertions,
Expand Down Expand Up @@ -1336,13 +1325,11 @@ mod tests {
let config = TestStoreConfig {
name: "".to_string(),
eviction_config: FifoConfig {}.into(),
device_config: FsDeviceConfig {
dir: PathBuf::from(tempdir.path()),
capacity: 16 * MB,
file_size: 4 * MB,
align: 4096,
io_size: 4096 * KB,
},
device_config: FsDeviceConfigBuilder::new(tempdir.path())
.with_capacity(16 * MB)
.with_file_size(4 * MB)
.with_direct(direct)
.build(),
catalog_shards: 1,
admissions: vec![],
reinsertions: vec![],
Expand Down Expand Up @@ -1381,19 +1368,45 @@ mod tests {
drop(store);
}

#[tokio::test]
async fn test_recovery() {
case_recovery(true).await
}

#[tokio::test]
async fn test_recovery_wo_direct() {
case_recovery(false).await
}

#[tokio::test]
async fn test_recovery_corrupt_no_recovery() {
case_corrupt(RecoverMode::NoRecovery).await
case_corrupt(RecoverMode::NoRecovery, true).await
}

#[tokio::test]
async fn test_recovery_corrupt_quiet_recovery() {
case_corrupt(RecoverMode::QuietRecovery).await
case_corrupt(RecoverMode::QuietRecovery, true).await
}

#[tokio::test]
#[should_panic]
async fn test_recovery_corrupt_strict_recovery() {
case_corrupt(RecoverMode::StrictRecovery).await
case_corrupt(RecoverMode::StrictRecovery, true).await
}

#[tokio::test]
async fn test_recovery_corrupt_no_recovery_wo_direct() {
case_corrupt(RecoverMode::NoRecovery, false).await
}

#[tokio::test]
async fn test_recovery_corrupt_quiet_recovery_wo_direct() {
case_corrupt(RecoverMode::QuietRecovery, false).await
}

#[tokio::test]
#[should_panic]
async fn test_recovery_corrupt_strict_recovery_wo_direct() {
case_corrupt(RecoverMode::StrictRecovery, false).await
}
}
2 changes: 2 additions & 0 deletions foyer-storage/src/lazy.rs
Expand Up @@ -264,6 +264,7 @@ mod tests {
file_size: 4 * MB,
align: 4096,
io_size: 4096 * KB,
direct: true,
},
catalog_shards: 1,
admissions: vec![],
Expand Down Expand Up @@ -298,6 +299,7 @@ mod tests {
file_size: 4 * MB,
align: 4096,
io_size: 4096 * KB,
direct: true,
},
catalog_shards: 1,
admissions: vec![],
Expand Down

0 comments on commit 8337a0c

Please sign in to comment.