Skip to content

Commit

Permalink
refactor: remove unnecessary unsafe silce usage (#176)
Browse files Browse the repository at this point in the history
* modify device interface

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* refine test

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

* remove comment

Signed-off-by: MrCroxx <mrcroxx@outlook.com>

---------

Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed Oct 15, 2023
1 parent 3ebea99 commit 28c2539
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 30 deletions.
27 changes: 14 additions & 13 deletions foyer-storage/src/device/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ impl Device for FsDevice {
Self::open(config).await
}

async fn write(
async fn write<B>(
&self,
buf: impl IoBuf,
buf: B,
range: impl IoRange,
region: RegionId,
offset: u64,
) -> (DeviceResult<usize>, impl IoBuf) {
) -> (DeviceResult<usize>, B)
where
B: IoBuf,
{
let file_capacity = self.inner.config.file_capacity;

let range = range.bounds(0..buf.as_ref().len());
Expand All @@ -110,13 +113,16 @@ impl Device for FsDevice {
.await
}

async fn read(
async fn read<B>(
&self,
mut buf: impl IoBufMut,
mut buf: B,
range: impl IoRange,
region: RegionId,
offset: u64,
) -> (DeviceResult<usize>, impl IoBufMut) {
) -> (DeviceResult<usize>, B)
where
B: IoBufMut,
{
let file_capacity = self.inner.config.file_capacity;

let range = range.bounds(0..buf.as_ref().len());
Expand Down Expand Up @@ -250,8 +256,6 @@ mod tests {

use bytes::BufMut;

use crate::slice::{Slice, SliceMut};

use super::*;

const FILES: usize = 8;
Expand All @@ -276,12 +280,9 @@ mod tests {
let mut rbuffer = dev.io_buffer(ALIGN, ALIGN);
(&mut rbuffer[..]).put_slice(&[0; ALIGN]);

let wbuf = unsafe { Slice::new(&wbuffer) };
let rbuf = unsafe { SliceMut::new(&mut rbuffer) };

let (res, _wbuf) = dev.write(wbuf, .., 0, 0).await;
let (res, wbuffer) = dev.write(wbuffer, .., 0, 0).await;
res.unwrap();
let (res, _rbuf) = dev.read(rbuf, .., 0, 0).await;
let (res, rbuffer) = dev.read(rbuffer, .., 0, 0).await;
res.unwrap();

assert_eq!(&wbuffer, &rbuffer);
Expand Down
36 changes: 23 additions & 13 deletions foyer-storage/src/device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::Future;
pub trait BufferAllocator = Allocator + Clone + Send + Sync + 'static + Debug;
pub trait IoBuf = AsRef<[u8]> + Send + Sync + 'static + Debug;
pub trait IoBufMut = AsRef<[u8]> + AsMut<[u8]> + Send + Sync + 'static + Debug;
pub trait IoRange = RangeBoundsExt<usize> + Send + Sync + 'static + Debug;
pub trait IoRange = RangeBoundsExt<usize> + Sized + Send + Sync + 'static + Debug;

pub trait Device: Sized + Clone + Send + Sync + 'static + Debug {
type IoBufferAllocator: BufferAllocator;
Expand All @@ -36,22 +36,26 @@ pub trait Device: Sized + Clone + Send + Sync + 'static + Debug {
fn open(config: Self::Config) -> impl Future<Output = DeviceResult<Self>> + Send;

#[must_use]
fn write(
fn write<B>(
&self,
buf: impl IoBuf,
buf: B,
range: impl IoRange,
region: RegionId,
offset: u64,
) -> impl Future<Output = (DeviceResult<usize>, impl IoBuf)> + Send;
) -> impl Future<Output = (DeviceResult<usize>, B)> + Send
where
B: IoBuf;

#[must_use]
fn read(
fn read<B>(
&self,
buf: impl IoBufMut,
buf: B,
range: impl IoRange,
region: RegionId,
offset: u64,
) -> impl Future<Output = (DeviceResult<usize>, impl IoBufMut)> + Send;
) -> impl Future<Output = (DeviceResult<usize>, B)> + Send
where
B: IoBufMut;

#[must_use]
fn flush(&self) -> impl Future<Output = DeviceResult<()>> + Send;
Expand Down Expand Up @@ -115,23 +119,29 @@ pub mod tests {
Ok(Self::new(config))
}

async fn write(
async fn write<B>(
&self,
buf: impl IoBuf,
buf: B,
_range: impl IoRange,
_region: RegionId,
_offset: u64,
) -> (DeviceResult<usize>, impl IoBuf) {
) -> (DeviceResult<usize>, B)
where
B: IoBuf,
{
(Ok(0), buf)
}

async fn read(
async fn read<B>(
&self,
buf: impl IoBufMut,
buf: B,
_range: impl IoRange,
_region: RegionId,
_offset: u64,
) -> (DeviceResult<usize>, impl IoBufMut) {
) -> (DeviceResult<usize>, B)
where
B: IoBufMut,
{
(Ok(0), buf)
}

Expand Down
14 changes: 10 additions & 4 deletions foyer-storage/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::{
metrics::Metrics,
region::RegionId,
region_manager::{RegionEpItemAdapter, RegionManager},
slice::Slice,
};

#[derive(Debug)]
Expand Down Expand Up @@ -90,6 +89,7 @@ where

// step 1: write buffer back to device
let slice = region.load(.., 0).await?.unwrap();
let mut slice = Some(slice);

{
// wait all physical readers (from previous version) and writers done
Expand All @@ -104,17 +104,23 @@ where
let start = offset;
let end = std::cmp::min(offset + len, region.device().region_size());

let s = unsafe { Slice::new(&slice.as_ref()[start..end]) };
if let Some(limiter) = &self.rate_limiter && let Some(duration) = limiter.consume(len as f64) {
tokio::time::sleep(duration).await;
}
let (res, _s) = region
let (res, s) = region
.device()
.write(s, .., region.id(), offset as u64)
.write(
slice.take().unwrap(),
start..end,
region.id(),
offset as u64,
)
.await;
res?;
slice = Some(s);
offset += len;
}

drop(slice);

tracing::trace!("[flusher] step 2");
Expand Down

0 comments on commit 28c2539

Please sign in to comment.