Skip to content

Commit

Permalink
refactor: levels
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Jun 27, 2024
1 parent ff4c0bd commit 825026f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 33 deletions.
19 changes: 14 additions & 5 deletions src/compaction/levelled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,21 @@ impl CompactionStrategy for Strategy {
// Get overlapping segments in same level
let key_range = aggregate_key_range(&segments_to_compact);

let curr_level_overlapping_segment_ids = level.get_overlapping_segments(&key_range);
let curr_level_overlapping_segment_ids: Vec<_> = level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.collect();

segment_ids.extend(&curr_level_overlapping_segment_ids);

// Get overlapping segments in next level
let key_range = aggregate_key_range(&segments_to_compact);

let next_level_overlapping_segment_ids =
next_level.get_overlapping_segments(&key_range);
let next_level_overlapping_segment_ids: Vec<_> = next_level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.collect();

segment_ids.extend(&next_level_overlapping_segment_ids);

let choice = CompactionInput {
Expand Down Expand Up @@ -183,8 +190,10 @@ impl CompactionStrategy for Strategy {
// Get overlapping segments in next level
let key_range = aggregate_key_range(&level);

let next_level_overlapping_segment_ids =
next_level.get_overlapping_segments(&key_range);
let next_level_overlapping_segment_ids: Vec<_> = next_level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.collect();

segment_ids.extend(next_level_overlapping_segment_ids);

Expand Down
33 changes: 18 additions & 15 deletions src/levels/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,16 @@ impl Level {
pub fn insert(&mut self, segment: Arc<Segment>) {
self.segments.push(segment);
self.set_disjoint_flag();

if self.is_disjoint {
self.sort_by_key_range();
} else {
self.sort_by_seqno();
}
self.sort();
}

pub fn remove(&mut self, segment_id: SegmentId) {
self.segments.retain(|x| segment_id != x.metadata.id);
self.set_disjoint_flag();
self.sort();
}

pub(super) fn sort(&mut self) {
if self.is_disjoint {
self.sort_by_key_range();
} else {
Expand All @@ -69,30 +67,32 @@ impl Level {
/// [key:asd:2] [key:asd:1]
///
/// point read ----------->
pub fn sort_by_seqno(&mut self) {
fn sort_by_seqno(&mut self) {
self.segments
.sort_by(|a, b| b.metadata.seqnos.1.cmp(&a.metadata.seqnos.1));
}

pub fn ids(&self) -> Vec<SegmentId> {
self.segments.iter().map(|x| x.metadata.id).collect()
/// Returns an iterator over the level's segment IDs.
pub fn ids(&self) -> impl Iterator<Item = SegmentId> + '_ {
self.segments.iter().map(|x| x.metadata.id)
}

/// Returns `true` if the level contains no segments.
pub fn is_empty(&self) -> bool {
self.segments.is_empty()
}

/// Gets the number of segments
/// Returns the number of segments.
pub fn len(&self) -> usize {
self.segments.len()
}

/// Gets the level size in bytes
/// Returns the level size in bytes.
pub fn size(&self) -> u64 {
self.segments.iter().map(|x| x.metadata.file_size).sum()
}

/// Checks if the level is disjoint and caches the result in `is_disjoint`
/// Checks if the level is disjoint and caches the result in `is_disjoint`.
fn set_disjoint_flag(&mut self) {
// TODO: calculate without heap allocation? possible?

Expand All @@ -105,12 +105,15 @@ impl Level {
self.is_disjoint = KeyRange::is_disjoint(&ranges);
}

pub fn get_overlapping_segments(&self, key_range: &KeyRange) -> Vec<SegmentId> {
/// Returns an iterator over segments in the level that have a key range
/// overlapping the input key range.
pub fn overlapping_segments<'a>(
&'a self,
key_range: &'a KeyRange,
) -> impl Iterator<Item = &'a Arc<Segment>> {
self.segments
.iter()
.filter(|x| x.metadata.key_range.overlaps_with_key_range(key_range))
.map(|x| x.metadata.id)
.collect()
}

/// Returns the segment that possibly contains the key.
Expand Down
30 changes: 17 additions & 13 deletions src/levels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ impl LevelManifest {
Ok(())
}

// NOTE: Used in tests
#[allow(unused)]
pub(crate) fn add(&mut self, segment: Arc<Segment>) {
self.insert_into_level(0, segment);
}
Expand All @@ -304,10 +306,12 @@ impl LevelManifest {
/// point read ----------->
pub(crate) fn sort_levels(&mut self) {
for level in &mut self.levels {
level.sort_by_seqno();
level.sort();
}
}

// NOTE: Used in tests
#[allow(unused)]
pub(crate) fn insert_into_level(&mut self, level_no: u8, segment: Arc<Segment>) {
let last_level_index = self.depth() - 1;
let index = level_no.clamp(0, last_level_index);
Expand Down Expand Up @@ -677,26 +681,26 @@ mod tests {

assert_eq!(
Vec::<SegmentId>::new(),
level.get_overlapping_segments(&KeyRange::new((
b"a".to_vec().into(),
b"b".to_vec().into()
))),
level
.overlapping_segments(&KeyRange::new((b"a".to_vec().into(), b"b".to_vec().into())))
.map(|x| x.metadata.id)
.collect::<Vec<_>>(),
);

assert_eq!(
vec![1],
level.get_overlapping_segments(&KeyRange::new((
b"d".to_vec().into(),
b"k".to_vec().into()
))),
level
.overlapping_segments(&KeyRange::new((b"d".to_vec().into(), b"k".to_vec().into())))
.map(|x| x.metadata.id)
.collect::<Vec<_>>(),
);

assert_eq!(
vec![1, 2],
level.get_overlapping_segments(&KeyRange::new((
b"f".to_vec().into(),
b"x".to_vec().into()
))),
level
.overlapping_segments(&KeyRange::new((b"f".to_vec().into(), b"x".to_vec().into())))
.map(|x| x.metadata.id)
.collect::<Vec<_>>(),
);
}
}

0 comments on commit 825026f

Please sign in to comment.