Skip to content

Commit

Permalink
refactor: replaced some Vecs with boxed slices
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Jan 18, 2024
1 parent 38e50f9 commit b03f063
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 57 deletions.
5 changes: 4 additions & 1 deletion benches/lsmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ fn load_block_from_disk(c: &mut Criterion) {
}
}

let mut block = ValueBlock { items, crc: 0 };
let mut block = ValueBlock {
items: items.into_boxed_slice(),
crc: 0,
};
let mut file = tempfile::tempfile().unwrap();

let mut bytes = Vec::with_capacity(u16::MAX.into());
Expand Down
18 changes: 12 additions & 6 deletions src/disk_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::io::{Cursor, Read, Write};
/// The integrity of a block can be checked using the CRC value that is saved in it.
#[derive(Clone, Debug)]
pub struct DiskBlock<T: Clone + Serializable + Deserializable> {
pub items: Vec<T>,
pub items: Box<[T]>,
pub crc: u32,
}

Expand Down Expand Up @@ -37,7 +37,7 @@ impl<T: Clone + Serializable + Deserializable> DiskBlock<T> {

impl<T: Clone + Serializable + Deserializable> DiskBlock<T> {
/// Calculates the CRC from a list of values
pub fn create_crc(items: &Vec<T>) -> crate::Result<u32> {
pub fn create_crc(items: &[T]) -> crate::Result<u32> {
let mut hasher = crc32fast::Hasher::new();

// NOTE: Truncation is okay and actually needed
Expand Down Expand Up @@ -73,7 +73,7 @@ impl<T: Clone + Serializable + Deserializable> Serializable for DiskBlock<T> {
writer.write_all(&(self.items.len() as u32).to_be_bytes())?;

// Serialize each value
for value in &self.items {
for value in self.items.iter() {
value.serialize(writer)?;
}

Expand All @@ -95,7 +95,10 @@ impl<T: Clone + Serializable + Deserializable> Deserializable for DiskBlock<T> {
items.push(T::deserialize(reader)?);
}

Ok(Self { items, crc })
Ok(Self {
items: items.into_boxed_slice(),
crc,
})
}
}

Expand All @@ -113,7 +116,10 @@ mod tests {
let items = vec![item1.clone(), item2.clone()];
let crc = DiskBlock::create_crc(&items)?;

let block = DiskBlock { items, crc };
let block = DiskBlock {
items: items.into_boxed_slice(),
crc,
};

// Serialize to bytes
let mut serialized = Vec::new();
Expand Down Expand Up @@ -141,7 +147,7 @@ mod tests {
let item2 = Value::new(vec![7, 8, 9], vec![10, 11, 12], 43, ValueType::Value);

let block = DiskBlock {
items: vec![item1, item2],
items: [item1, item2].into(),
crc: 12345,
};

Expand Down
3 changes: 2 additions & 1 deletion src/segment/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ impl BlockIndex {

let mut tree = BTreeMap::new();

for item in index.items {
// TODO: https://github.com/rust-lang/rust/issues/59878
for item in index.items.into_vec() {
tree.insert(
item.start_key,
BlockHandleBlockHandle {
Expand Down
57 changes: 27 additions & 30 deletions src/segment/index/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub struct Writer {
index_writer: BufWriter<File>,
block_size: u32,
block_counter: u32,
block_chunk: DiskBlock<BlockHandle>,
index_chunk: DiskBlock<BlockHandle>,
block_chunk: Vec<BlockHandle>,
index_chunk: Vec<BlockHandle>,
}

impl Writer {
Expand All @@ -48,35 +48,30 @@ impl Writer {
let index_writer = File::create(path.as_ref().join(TOP_LEVEL_INDEX_FILE))?;
let index_writer = BufWriter::new(index_writer);

let block_chunk = DiskBlock {
items: vec![],
crc: 0,
};

let index_chunk = DiskBlock {
items: vec![],
crc: 0,
};

Ok(Self {
path: path.as_ref().into(),
file_pos: 0,
block_writer: Some(block_writer),
index_writer,
block_counter: 0,
block_size,
block_chunk,
index_chunk,
block_chunk: Vec::with_capacity(1_000),
index_chunk: Vec::with_capacity(1_000),
})
}

fn write_block(&mut self) -> crate::Result<()> {
// Prepare block
let mut block = DiskBlock::<BlockHandle> {
items: std::mem::replace(&mut self.block_chunk, Vec::with_capacity(1_000))
.into_boxed_slice(),
crc: 0,
};

// Serialize block
let mut bytes = Vec::with_capacity(u16::MAX.into());
self.block_chunk.crc = DiskBlock::<BlockHandle>::create_crc(&self.block_chunk.items)?;
self.block_chunk
.serialize(&mut bytes)
.expect("should serialize block");
block.crc = DiskBlock::<BlockHandle>::create_crc(&block.items)?;
block.serialize(&mut bytes).expect("should serialize block");

// Compress using LZ4
let bytes = compress_prepend_size(&bytes);
Expand All @@ -88,22 +83,17 @@ impl Writer {
.write_all(&bytes)?;

// Expect is fine, because the chunk is not empty
let first = self
.block_chunk
.items
.first()
.expect("Chunk should not be empty");
let first = block.items.first().expect("Chunk should not be empty");

let bytes_written = bytes.len();

self.index_chunk.items.push(BlockHandle {
self.index_chunk.push(BlockHandle {
start_key: first.start_key.clone(),
offset: self.file_pos,
size: bytes_written as u32,
});

self.block_counter = 0;
self.block_chunk.items.clear();
self.file_pos += bytes_written as u64;

Ok(())
Expand All @@ -122,7 +112,7 @@ impl Writer {
offset,
size,
};
self.block_chunk.items.push(reference);
self.block_chunk.push(reference);

self.block_counter += block_handle_size;

Expand All @@ -146,14 +136,21 @@ impl Writer {

log::trace!("Concatted index blocks onto blocks file");

for item in &mut self.index_chunk.items {
for item in &mut self.index_chunk {
item.offset += block_file_size;
}

// Prepare block
let mut block = DiskBlock::<BlockHandle> {
items: std::mem::replace(&mut self.index_chunk, Vec::with_capacity(1_000))
.into_boxed_slice(),
crc: 0,
};

// Serialize block
let mut bytes = Vec::with_capacity(u16::MAX.into());
self.index_chunk.crc = DiskBlock::<BlockHandle>::create_crc(&self.index_chunk.items)?;
self.index_chunk
block.crc = DiskBlock::<BlockHandle>::create_crc(&block.items)?;
block
.serialize(&mut bytes)
.expect("should serialize index block");

Expand All @@ -167,7 +164,7 @@ impl Writer {
log::trace!(
"Written top level index to {}, with {} pointers ({} bytes)",
self.path.join(TOP_LEVEL_INDEX_FILE).display(),
self.index_chunk.items.len(),
block.items.len(),
bytes.len(),
);

Expand Down
5 changes: 3 additions & 2 deletions src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Reader {
&self.segment_id,
key,
)? {
let items = block.items.clone().into();
let items = block.items.clone().to_vec().into();
self.blocks.insert(key.to_vec().into(), items);

Some(())
Expand All @@ -109,7 +109,8 @@ impl Reader {

drop(file_guard);

self.blocks.insert(key.to_vec().into(), block.items.into());
self.blocks
.insert(key.to_vec().into(), block.items.to_vec().into());

Ok(Some(()))
} else {
Expand Down
35 changes: 18 additions & 17 deletions src/segment/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub struct Writer {

block_writer: BufWriter<File>,
index_writer: IndexWriter,
chunk: ValueBlock,
chunk: Vec<Value>,

pub block_count: usize,
pub item_count: usize,
Expand Down Expand Up @@ -181,10 +181,7 @@ impl Writer {

let index_writer = IndexWriter::new(&opts.path, opts.block_size)?;

let chunk = ValueBlock {
items: Vec::with_capacity(1_000),
crc: 0,
};
let chunk = Vec::with_capacity(10_000);

Ok(Self {
opts,
Expand All @@ -210,31 +207,36 @@ impl Writer {
key_count: 0,

#[cfg(feature = "bloom")]
bloom_hash_buffer: Vec::with_capacity(1_000),
bloom_hash_buffer: Vec::with_capacity(10_000),
})
}

/// Writes a compressed block to disk
///
/// This is triggered when a `Writer::write` causes the buffer to grow to the configured `block_size`
fn write_block(&mut self) -> crate::Result<()> {
debug_assert!(!self.chunk.items.is_empty());
debug_assert!(!self.chunk.is_empty());

let uncompressed_chunk_size = self
.chunk
.items
.iter()
.map(|item| item.size() as u64)
.sum::<u64>();

self.uncompressed_size += uncompressed_chunk_size;

// Prepare block
let mut block = ValueBlock {
items: std::mem::replace(&mut self.chunk, Vec::with_capacity(10_000))
.into_boxed_slice(),
crc: 0,
};
block.crc = ValueBlock::create_crc(&block.items)?;

// Serialize block
let mut bytes = Vec::with_capacity(u16::MAX.into());
self.chunk.crc = ValueBlock::create_crc(&self.chunk.items)?;
self.chunk
.serialize(&mut bytes)
.expect("should serialize block");

block.serialize(&mut bytes).expect("should serialize block");

// Compress using LZ4
let bytes = compress_prepend_size(&bytes);
Expand All @@ -248,16 +250,15 @@ impl Writer {
let bytes_written = bytes.len() as u32;

// Expect is fine, because the chunk is not empty
let first = self.chunk.items.first().expect("Chunk should not be empty");
let first = block.items.first().expect("Chunk should not be empty");

self.index_writer
.register_block(first.key.clone(), self.file_pos, bytes_written)?;

// Adjust metadata
self.file_pos += u64::from(bytes_written);
self.item_count += self.chunk.items.len();
self.item_count += block.items.len();
self.block_count += 1;
self.chunk.items.clear();

Ok(())
}
Expand Down Expand Up @@ -288,7 +289,7 @@ impl Writer {
let seqno = item.seqno;

self.chunk_size += item.size();
self.chunk.items.push(item);
self.chunk.push(item);

if self.chunk_size >= self.opts.block_size as usize {
self.write_block()?;
Expand All @@ -313,7 +314,7 @@ impl Writer {

/// Finishes the segment, making sure all data is written durably
pub fn finish(&mut self) -> crate::Result<()> {
if !self.chunk.items.is_empty() {
if !self.chunk.is_empty() {
self.write_block()?;
}

Expand Down

0 comments on commit b03f063

Please sign in to comment.