Skip to content

Commit

Permalink
use binary heap for less cmp when merging speedy-kv segments
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkeldenker committed May 13, 2024
1 parent 19dab37 commit 7e8781f
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 56 deletions.
28 changes: 14 additions & 14 deletions crates/core/src/crawler/planner.rs
Expand Up @@ -407,23 +407,21 @@ impl CrawlPlanner {
.finalize()?;
}

let mut metadata = Metadata {
stats: stats.into_inner().unwrap_or_else(|e| e.into_inner()),
};
let mut stats = stats.into_inner().unwrap_or_else(|e| e.into_inner());
stats.sort_by(|a, b| b.schedule_budget.cmp(&a.schedule_budget));

tracing::info!(
"total scheduled urls: {}",
metadata.stats.iter().map(|d| d.scheduled_urls).sum::<u64>()
);
let total_scheduled_urls: u64 = stats.iter().map(|d| d.scheduled_urls).sum();
let total_wander_budget: u64 = stats.iter().map(|d| d.wander_budget).sum();

tracing::info!(
"total wander budget: {}",
metadata.stats.iter().map(|d| d.wander_budget).sum::<u64>()
);
let metadata = Metadata {
stats,
total_scheduled_urls,
total_wander_budget,
};

tracing::info!("total scheduled urls: {}", metadata.total_scheduled_urls);

metadata
.stats
.sort_by(|a, b| b.schedule_budget.cmp(&a.schedule_budget));
tracing::info!("total wander budget: {}", metadata.total_wander_budget);

let metadata_path = output.as_ref().join("metadata.json");

Expand All @@ -446,6 +444,8 @@ struct DomainStats {

#[derive(serde::Serialize)]
struct Metadata {
total_scheduled_urls: u64,
total_wander_budget: u64,
stats: Vec<DomainStats>,
}

Expand Down
119 changes: 78 additions & 41 deletions crates/speedy-kv/src/segment.rs
Expand Up @@ -15,6 +15,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>

use std::{
collections::BinaryHeap,
io::{BufWriter, Write},
ops::RangeBounds,
path::{Path, PathBuf},
Expand Down Expand Up @@ -189,7 +190,7 @@ impl<K, V> Segment<K, V> {

// cleanup old segments
for segment in segments {
std::fs::remove_file(dbg!(segment.blob_index.path()))?;
std::fs::remove_file(segment.blob_index.path())?;
std::fs::remove_file(segment.id_index.path())?;
std::fs::remove_file(segment.store.path())?;
std::fs::remove_file(segment.bloom_path())?;
Expand Down Expand Up @@ -301,65 +302,101 @@ impl<K, V> Segment<K, V> {
}
}

struct SortedSegments<I>
struct SortedPeekable<'a, K, V, I>
where
I: Iterator,
I: Iterator<Item = (SerializedRef<'a, K>, SerializedRef<'a, V>)>,
{
segment_ord: usize,
iter: Peekable<I>,
}

impl<'a, K, V, I> PartialOrd for SortedPeekable<'a, K, V, I>
where
I: Iterator<Item = (SerializedRef<'a, K>, SerializedRef<'a, V>)>,
{
segments: Vec<Peekable<I>>,
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl<I> SortedSegments<I>
impl<'a, K, V, I> Ord for SortedPeekable<'a, K, V, I>
where
I: Iterator,
I: Iterator<Item = (SerializedRef<'a, K>, SerializedRef<'a, V>)>,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match (self.iter.peek(), other.iter.peek()) {
(Some((a, _)), Some((b, _))) => a
.cmp(b)
.reverse()
.then_with(|| self.segment_ord.cmp(&other.segment_ord)),
(Some(_), None) => std::cmp::Ordering::Greater,
(None, Some(_)) => std::cmp::Ordering::Less,
(None, None) => std::cmp::Ordering::Equal,
}
}
}

impl<'a, K, V, I> PartialEq for SortedPeekable<'a, K, V, I>
where
I: Iterator<Item = (SerializedRef<'a, K>, SerializedRef<'a, V>)>,
{
fn eq(&self, other: &Self) -> bool {
match (self.iter.peek(), other.iter.peek()) {
(Some((a, _)), Some((b, _))) => a == b && self.segment_ord == other.segment_ord,
(None, None) => true,
_ => false,
}
}
}

impl<'a, K, V, I> Eq for SortedPeekable<'a, K, V, I> where
I: Iterator<Item = (SerializedRef<'a, K>, SerializedRef<'a, V>)>
{
}

struct SortedSegments<'a, K, V, I>
where
I: Iterator<Item = (SerializedRef<'a, K>, SerializedRef<'a, V>)>,
{
segments: BinaryHeap<SortedPeekable<'a, K, V, I>>,
}

impl<'a, K, V, I> SortedSegments<'a, K, V, I>
where
I: Iterator<Item = (SerializedRef<'a, K>, SerializedRef<'a, V>)>,
{
fn new(segments: Vec<Peekable<I>>) -> Self {
Self { segments }
Self {
segments: segments
.into_iter()
.enumerate()
.map(|(segment_ord, iter)| SortedPeekable { segment_ord, iter })
.collect(),
}
}
}

impl<'a, K, V, I> Iterator for SortedSegments<I>
impl<'a, K, V, I> Iterator for SortedSegments<'a, K, V, I>
where
I: Iterator<Item = (SerializedRef<'a, K>, SerializedRef<'a, V>)>,
{
type Item = I::Item;

fn next(&mut self) -> Option<Self::Item> {
let mut min_idx = None;

let num_segments = self.segments.len();

for idx in 0..num_segments {
let segment = &self.segments[idx];
if let Some(item) = segment.peek() {
if let Some(min) = min_idx {
let v: &Peekable<I> = &self.segments[min];
let value = &v.peek().unwrap().0;

if &item.0 <= value {
min_idx = Some(idx);
}
} else {
min_idx = Some(idx);
}
}
}
let (key, value) = {
let mut min = self.segments.peek_mut()?;
min.iter.next()?
};

if let Some(idx) = min_idx {
let res = self.segments[idx].next();

if let Some(res) = res.as_ref() {
for segment in &mut self.segments {
if let Some(peeked) = segment.peek() {
if peeked.0 == res.0 {
segment.next();
}
}
}
// advance all segments with the same key
while let Some(mut peek) = self.segments.peek_mut() {
if peek.iter.peek().map(|(k, _)| k) != Some(&key) {
break;
}

res
} else {
None
peek.iter.next();
}

Some((key, value))
}
}
23 changes: 22 additions & 1 deletion crates/speedy-kv/src/serialized.rs
Expand Up @@ -98,12 +98,33 @@ impl<T> Serialized<T> {
}
}

#[derive(Debug)]
pub struct SerializedRef<'a, T> {
bytes: &'a [u8],
_marker: std::marker::PhantomData<T>,
}

impl<'a, T> std::fmt::Debug for SerializedRef<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let truncated = self.bytes.len() > 16;

let truncated_bytes = if truncated {
&self.bytes[..16]
} else {
self.bytes
};

let field = if truncated {
"bytes (truncated)"
} else {
"bytes"
};

f.debug_struct("Serialized")
.field(field, &truncated_bytes)
.finish()
}
}

impl<'a, T> Clone for SerializedRef<'a, T> {
fn clone(&self) -> Self {
*self
Expand Down

0 comments on commit 7e8781f

Please sign in to comment.