Skip to content

Commit

Permalink
Add disktree merge
Browse files Browse the repository at this point in the history
  • Loading branch information
JayKickliter committed Feb 16, 2024
1 parent bbc9fcf commit 99bedf0
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 3 deletions.
21 changes: 20 additions & 1 deletion src/disktree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use byteorder::ReadBytesExt;
use memmap::MmapOptions;
use std::{
fs::File,
io::{Cursor, Read, Seek, SeekFrom},
io::{Cursor, Read, Seek, SeekFrom, Write},
marker::Send,
ops::Range,
path::Path,
Expand Down Expand Up @@ -60,6 +60,25 @@ impl DiskTreeMap {
}
}

/// Merge several `DiskTreeMap`s.
///
/// Each map must contain exact either:
/// - one res0 node
/// - any number nodes which are all children of exactly one res0 node.
pub fn merge<W, F, E>(wtr: W, subtrees: &[DiskTreeMap], f: Option<F>) -> Result
where
W: Write + Seek,
F: FnMut(&mut dyn Write, &&[u8]) -> std::result::Result<(), E>,
E: std::error::Error + Sync + Send + 'static,
{
if let Some(f) = f {
crate::disktree::writer::DiskTreeWriter::new(wtr).merge(subtrees, f)
} else {
let f = |wtr: &mut dyn Write, val: &&[u8]| wtr.write_all(val);
crate::disktree::writer::DiskTreeWriter::new(wtr).merge(subtrees, f)
}
}

/// Returns `(Cell, &[u8])`, if present.
pub fn get(&self, cell: Cell) -> Result<Option<(Cell, &[u8])>> {
let base_cell_pos = Self::base_cell_dptr(cell);
Expand Down
74 changes: 72 additions & 2 deletions src/disktree/writer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{
compaction::Compactor,
disktree::{dptr::Dp, dtseek::DtSeek, tree::HDR_MAGIC, varint},
disktree::{dptr::Dp, dtseek::DtSeek, tree::HDR_MAGIC, varint, DiskTreeMap},
error::{Error, Result},
node::Node,
HexTreeMap,
};
use byteorder::WriteBytesExt;
use std::io::Write;
use std::io::{Cursor, Write};

impl<V, C> HexTreeMap<V, C>
where
Expand Down Expand Up @@ -72,6 +72,76 @@ where
Ok(())
}

pub fn merge<F, E>(&mut self, subtrees: &[DiskTreeMap], mut f: F) -> Result
where
F: FnMut(&mut dyn Write, &&[u8]) -> std::result::Result<(), E>,
E: std::error::Error + Sync + Send + 'static,
{
{
// Write magic string
self.wtr.write_all(HDR_MAGIC)?;
// Write version field
const VERSION: u8 = 0;
self.wtr.write_u8(0xFE - VERSION)?;
}

{
// We currently need to read the incoming disktrees to memory
// before writing them out. This creates a lifetime problem
// where we have to do res0 fixups immediately after writing a
// res0 node. Therefore, to ensure offsets are correct, we pad
// out the file to make sure we don't write over
// not-yet-written res0 base offsets.
let pos = self.pos()?;
self.wtr.write_all(&vec![0; 122 * Dp::size()])?;
self.seek(pos)?;
}

let root_disk_trees = {
let mut root_disk_trees: Box<[Option<&DiskTreeMap>]> = (0..122).map(|_| None).collect();
for disktree in subtrees {
let tree_roots = crate::disktree::iter::Iter::read_base_nodes(&mut Cursor::new(
(*disktree.0).as_ref(),
))?;
if !tree_roots.is_empty() {
assert!(root_disk_trees[tree_roots[0].0 as usize].is_none());
root_disk_trees[tree_roots[0].0 as usize] = Some(disktree);
}
}
root_disk_trees
};

for (idx, maybe_disktree) in root_disk_trees.iter().enumerate() {
match maybe_disktree {
None => Dp::null().write(&mut self.wtr)?,
Some(disktree) => {
let mut fixups: Vec<(Dp, &Node<&[u8]>)> = Vec::new();
let mut hextree: HexTreeMap<&[u8]> = HexTreeMap::new();
for res in disktree.iter()? {
let (cell, val) = res?;
hextree.insert(cell, val);
}
if let Some(node) = hextree
.nodes
.get(idx)
.expect("we already determined this node should exist")
.as_deref()
{
fixups.push((self.pos()?, node));
Dp::null().write(&mut self.wtr)?
}
for (fixee_dptr, node) in fixups {
let node_dptr = self.write_node(node, &mut f)?;
self.seek(fixee_dptr)?;
node_dptr.write(&mut self.wtr)?;
}
}
}
}

Ok(())
}

fn write_node<V, F, E>(&mut self, node: &Node<V>, f: &mut F) -> Result<Dp>
where
F: FnMut(&mut dyn Write, &V) -> std::result::Result<(), E>,
Expand Down

0 comments on commit 99bedf0

Please sign in to comment.