Skip to content

Commit

Permalink
refactor: extract key value serde code to serde mod (#453)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx committed Apr 30, 2024
1 parent cefb41f commit f65fb2d
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 181 deletions.
64 changes: 9 additions & 55 deletions foyer-storage/src/buffer.rs
Expand Up @@ -22,11 +22,10 @@ use foyer_common::{
};

use crate::{
compress::Compression,
device::{allocator::WritableVecA, Device, DeviceError},
flusher::Entry,
generic::{checksum, EntryHeader},
region::{RegionHeader, RegionId, Version, REGION_MAGIC},
serde::{EntrySerializer, SerdeError},
};

#[derive(thiserror::Error, Debug)]
Expand All @@ -37,6 +36,8 @@ pub enum BufferError {
Device(#[from] DeviceError),
#[error("bincode error: {0}")]
Bincode(#[from] bincode::Error),
#[error("serde error: {0}")]
Serde(#[from] SerdeError),
#[error("other error: {0}")]
Other(#[from] anyhow::Error),
}
Expand Down Expand Up @@ -236,58 +237,7 @@ where
let old = self.buffer.len();
debug_assert!(is_aligned(self.device.align(), old));

let mut cursor = old;

// TODO(MrCroxx): reserve buffer capacity for entry

// reserve space for header, header will be filled after the serialized len is known
cursor += EntryHeader::serialized_len();
unsafe { self.buffer.set_len(cursor) };

// write value
match compression {
Compression::None => {
bincode::serialize_into(WritableVecA(&mut self.buffer), &value).map_err(BufferError::from)?;
}
Compression::Zstd => {
let encoder = zstd::Encoder::new(WritableVecA(&mut self.buffer), 0)
.map_err(BufferError::from)?
.auto_finish();
bincode::serialize_into(encoder, &value).map_err(BufferError::from)?;
}

Compression::Lz4 => {
let encoder = lz4::EncoderBuilder::new()
.checksum(lz4::ContentChecksum::NoChecksum)
.auto_flush(true)
.build(WritableVecA(&mut self.buffer))
.map_err(BufferError::from)?;
bincode::serialize_into(encoder, &value).map_err(BufferError::from)?;
}
}

let compressed_value_len = self.buffer.len() - cursor;
cursor = self.buffer.len();

// write key
bincode::serialize_into(WritableVecA(&mut self.buffer), &key).map_err(BufferError::from)?;
let encoded_key_len = self.buffer.len() - cursor;
cursor = self.buffer.len();

// calculate checksum
cursor -= compressed_value_len + encoded_key_len;
let checksum = checksum(&self.buffer[cursor..cursor + compressed_value_len + encoded_key_len]);

// write entry header
cursor -= EntryHeader::serialized_len();
let header = EntryHeader {
key_len: encoded_key_len as u32,
value_len: compressed_value_len as u32,
sequence,
compression,
checksum,
};
header.write(&mut self.buffer[cursor..cursor + EntryHeader::serialized_len()]);
EntrySerializer::serialize(&key, &value, &sequence, &compression, WritableVecA(&mut self.buffer))?;

// (*) if size exceeds region limit, rollback write and return
if self.offset + self.buffer.len() > self.device.region_size() {
Expand Down Expand Up @@ -335,7 +285,11 @@ mod tests {
use tempfile::tempdir;

use super::*;
use crate::device::fs::{FsDevice, FsDeviceConfig};
use crate::{
device::fs::{FsDevice, FsDeviceConfig},
serde::EntryHeader,
Compression,
};

fn ent(size: usize) -> Entry<(), Vec<u8>> {
Entry {
Expand Down
17 changes: 17 additions & 0 deletions foyer-storage/src/device/allocator.rs
Expand Up @@ -12,14 +12,31 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::{Deref, DerefMut};

use allocator_api2::{
alloc::{AllocError, Allocator, Global},
vec::Vec as VecA,
};
use foyer_common::bits;

#[derive(Debug)]
pub struct WritableVecA<'a, T, A: Allocator>(pub &'a mut VecA<T, A>);

impl<'a, T, A: Allocator> Deref for WritableVecA<'a, T, A> {
type Target = VecA<T, A>;

fn deref(&self) -> &Self::Target {
self.0
}
}

impl<'a, T, A: Allocator> DerefMut for WritableVecA<'a, T, A> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
}
}

impl<'a, A: Allocator> std::io::Write for WritableVecA<'a, u8, A> {
#[inline]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Expand Down
4 changes: 3 additions & 1 deletion foyer-storage/src/error.rs
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::{buffer::BufferError, device::DeviceError};
use crate::{buffer::BufferError, device::DeviceError, serde::SerdeError};
use std::fmt::Debug;

#[derive(thiserror::Error, Debug)]
Expand All @@ -21,6 +21,8 @@ pub enum Error {
Device(#[from] DeviceError),
#[error("buffer error: {0}")]
Buffer(#[from] BufferError),
#[error("serde error: {0}")]
Serde(#[from] SerdeError),
#[error("other error: {0}")]
Other(#[from] anyhow::Error),
}
Expand Down
18 changes: 3 additions & 15 deletions foyer-storage/src/flusher.rs
Expand Up @@ -29,22 +29,14 @@ use crate::{
region_manager::RegionManager,
};

pub struct Entry<K, V>
where
K: StorageKey,
V: StorageValue,
{
pub struct Entry<K, V> {
pub key: Arc<K>,
pub value: Arc<V>,
pub sequence: Sequence,
pub compression: Compression,
}

impl<K, V> Debug for Entry<K, V>
where
K: StorageKey,
V: StorageValue,
{
impl<K, V> Debug for Entry<K, V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Entry")
.field("sequence", &self.sequence)
Expand All @@ -53,11 +45,7 @@ where
}
}

impl<K, V> Clone for Entry<K, V>
where
K: StorageKey,
V: StorageValue,
{
impl<K, V> Clone for Entry<K, V> {
fn clone(&self) -> Self {
Self {
key: self.key.clone(),
Expand Down
118 changes: 8 additions & 110 deletions foyer-storage/src/generic.rs
Expand Up @@ -16,7 +16,7 @@ use std::{
borrow::Borrow,
collections::HashSet,
fmt::Debug,
hash::{Hash, Hasher},
hash::Hash,
marker::PhantomData,
sync::{
atomic::{AtomicU64, Ordering},
Expand All @@ -25,9 +25,7 @@ use std::{
time::{Duration, Instant},
};

use anyhow::anyhow;
use bitmaps::Bitmap;
use bytes::{Buf, BufMut};
use foyer_common::{
bits,
code::{StorageKey, StorageValue},
Expand All @@ -42,11 +40,9 @@ use tokio::{
sync::{broadcast, mpsc, Semaphore},
task::JoinHandle,
};
use twox_hash::XxHash64;

use crate::{
admission::{AdmissionContext, AdmissionPolicy},
buffer::BufferError,
catalog::{AtomicSequence, Catalog, Index, Item, Sequence},
compress::Compression,
device::Device,
Expand All @@ -58,6 +54,7 @@ use crate::{
region::{Region, RegionHeader, RegionId},
region_manager::RegionManager,
reinsertion::{ReinsertionContext, ReinsertionPolicy},
serde::{EntryDeserializer, EntryHeader},
storage::{CachedEntry, Storage, StorageWriter},
Error,
};
Expand Down Expand Up @@ -444,8 +441,8 @@ where
}
};

let res = match read_entry::<K, V>(buf.as_ref()) {
Ok((key, value)) => {
let res = match EntryDeserializer::deserialize(buf.as_ref()) {
Ok((_header, key, value)) => {
inner.metrics.op_bytes_get.inc_by(buf.len() as u64);
Ok(Some(CachedEntry::Owned {
key: Box::new(key),
Expand All @@ -455,7 +452,7 @@ where
Err(e) => {
// Remove index if the storage layer fails to get it (because of entry magic mismatch).
inner.catalog.remove::<K>(key.as_ref());
Err(e)
Err(Error::Serde(e))
}
};

Expand Down Expand Up @@ -801,106 +798,6 @@ where
}
}

const ENTRY_MAGIC: u32 = 0x97_03_27_00;
const ENTRY_MAGIC_MASK: u32 = 0xFF_FF_FF_00;

#[derive(Debug)]
pub struct EntryHeader {
pub key_len: u32,
pub value_len: u32,
pub sequence: Sequence,
pub checksum: u64,
pub compression: Compression,
}

impl EntryHeader {
pub const fn serialized_len() -> usize {
4 + 4 + 8 + 8 + 4 /* magic & compression */
}

pub fn write(&self, mut buf: &mut [u8]) {
buf.put_u32(self.key_len);
buf.put_u32(self.value_len);
buf.put_u64(self.sequence);
buf.put_u64(self.checksum);

let v = ENTRY_MAGIC | self.compression.to_u8() as u32;
buf.put_u32(v);
}

pub fn read(mut buf: &[u8]) -> Result<Self> {
let key_len = buf.get_u32();
let value_len = buf.get_u32();
let sequence = buf.get_u64();
let checksum = buf.get_u64();

let v = buf.get_u32();
let magic = v & ENTRY_MAGIC_MASK;
if magic != ENTRY_MAGIC {
return Err(anyhow!("magic mismatch, expected: {}, got: {}", ENTRY_MAGIC, magic).into());
}
let compression = Compression::try_from(v as u8)?;

Ok(Self {
key_len,
value_len,
sequence,
compression,
checksum,
})
}
}

/// | header | value (compressed) | key | <padding> |
///
/// # Safety
///
/// `buf.len()` must exactly fit entry size
fn read_entry<K, V>(buf: &[u8]) -> Result<(K, V)>
where
K: StorageKey,
V: StorageValue,
{
// read entry header
let header = EntryHeader::read(buf)?;

// TODO(MrCroxx): optimize buffer copy here.

// read value
let mut offset = EntryHeader::serialized_len();
let compressed = &buf[offset..offset + header.value_len as usize];
offset += header.value_len as usize;
let value = match header.compression {
Compression::None => bincode::deserialize_from(compressed).map_err(BufferError::from)?,
Compression::Zstd => {
let decoder = zstd::Decoder::new(compressed).map_err(BufferError::from)?;
bincode::deserialize_from(decoder).map_err(BufferError::from)?
}
Compression::Lz4 => {
let decoder = lz4::Decoder::new(compressed).map_err(BufferError::from)?;
bincode::deserialize_from(decoder).map_err(BufferError::from)?
}
};

// read key
let compressed = &buf[offset..offset + header.key_len as usize];
let key = bincode::deserialize_from(compressed).map_err(BufferError::from)?;
offset += header.key_len as usize;

let checksum = checksum(&buf[EntryHeader::serialized_len()..offset]);
if checksum != header.checksum {
return Err(anyhow!("magic mismatch, expected: {}, got: {}", header.checksum, checksum).into());
}

Ok((key, value))
}

pub fn checksum(buf: &[u8]) -> u64 {
let mut hasher = XxHash64::with_seed(0);
hasher.write(buf);
hasher.finish()
}

pub struct RegionEntryIter<K, V, D>
where
K: StorageKey,
Expand Down Expand Up @@ -1027,9 +924,9 @@ where
let Some(slice) = self.region.load_range(start..end).await? else {
return Ok(None);
};
let res = read_entry::<K, V>(slice.as_ref())
let res = EntryDeserializer::deserialize(slice.as_ref())
.ok()
.map(|(k, v)| (k, v, slice.len()));
.map(|(_header, k, v)| (k, v, slice.len()));
drop(slice);

Ok(res)
Expand Down Expand Up @@ -1131,6 +1028,7 @@ where
mod tests {
use std::ops::Range;

use bytes::BufMut;
use foyer_common::{bits::align_up, range::RangeBoundsExt};
use foyer_memory::FifoConfig;

Expand Down
1 change: 1 addition & 0 deletions foyer-storage/src/lib.rs
Expand Up @@ -31,6 +31,7 @@ mod region;
mod region_manager;
mod reinsertion;
mod runtime;
mod serde;
mod storage;
mod store;

Expand Down

0 comments on commit f65fb2d

Please sign in to comment.