diff --git a/Cargo.toml b/Cargo.toml index b383924..9fe30cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,11 +20,18 @@ all-features = true [features] default = ["disktree"] -disktree = ["serde", "byteorder"] +disktree = [ + "byteorder", + "leb128", + "memmap", + "serde", +] serde = ["dep:serde"] [dependencies] byteorder = { version = "1", optional = true } +leb128 = { version = "0.2.5", optional = true } +memmap = { version = "0.7", optional = true } serde = { version = "1", optional = true, features = ["derive"] } [dev-dependencies] diff --git a/benches/benches.rs b/benches/benches.rs index d5586d3..a3d5945 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -51,7 +51,7 @@ fn set_lookup(c: &mut Criterion) { fn disk_set_lookup(c: &mut Criterion) { let mut group = c.benchmark_group("US915 DiskTreeSet lookup"); - let mut us915_disk_set = { + let us915_disk_set = { let us915_set: HexTreeSet = PLAIN_US915_INDICES .iter() .map(|&idx| Cell::try_from(idx).unwrap()) @@ -60,7 +60,7 @@ fn disk_set_lookup(c: &mut Criterion) { us915_set .to_disktree(&mut file, |_, _| Ok::<(), std::io::Error>(())) .unwrap(); - DiskTree::from_reader(file).unwrap() + DiskTree::memmap(file).unwrap() }; let tarpon_springs = coord! {x: -82.753822, y: 28.15215}; diff --git a/src/disktree/iter.rs b/src/disktree/iter.rs index 4f9f8f0..a38d5fd 100644 --- a/src/disktree/iter.rs +++ b/src/disktree/iter.rs @@ -1,18 +1,19 @@ use crate::{ cell::CellStack, - disktree::{dptr::Dptr, tree::HDR_SZ, ReadVal}, + disktree::{dptr::Dptr, tree::HDR_SZ}, error::Result, + Cell, }; use byteorder::ReadBytesExt; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{Cursor, Seek, SeekFrom}; -pub(crate) struct Iter<'a, R, F> { +pub(crate) struct Iter<'a> { cell_stack: CellStack, - curr: Option<(u8, Dptr)>, - rdr: &'a mut R, + curr_node: Option<(u8, Dptr)>, + disktree_buf: &'a [u8], + disktree_csr: Cursor<&'a [u8]>, + node_stack: Vec>, recycle_bin: Vec>, - stack: Vec>, - f: F, } enum Node { @@ -22,15 +23,14 @@ enum Node { Parent(Vec<(u8, Dptr)>), } -impl<'a, R, F> Iter<'a, R, F> -where - R: Seek + Read, -{ +impl<'a> Iter<'a> { fn seek_to(&mut self, dptr: Dptr) -> Result { - Ok(Dptr::from(self.rdr.seek(SeekFrom::Start(u64::from(dptr)))?)) + Ok(Dptr::from( + self.disktree_csr.seek(SeekFrom::Start(u64::from(dptr)))?, + )) } - fn read_base_nodes(rdr: &mut R) -> Result> { + fn read_base_nodes(rdr: &mut Cursor<&[u8]>) -> Result> { let mut buf = Vec::with_capacity(122); rdr.seek(SeekFrom::Start(HDR_SZ))?; for digit in 0..122 { @@ -46,16 +46,19 @@ where // `pos` is a position in the file of this node's tag. fn read_node(&mut self, dptr: Dptr) -> Result { let dptr = self.seek_to(dptr)?; - let node_tag = self.rdr.read_u8()?; + let node_tag = self.disktree_csr.read_u8()?; let base_pos = Dptr::from(u64::from(dptr) + std::mem::size_of_val(&node_tag) as u64); - debug_assert_eq!(base_pos, Dptr::from(self.rdr.stream_position().unwrap())); + debug_assert_eq!( + base_pos, + Dptr::from(self.disktree_csr.stream_position().unwrap()) + ); assert!(node_tag == 0 || node_tag > 0b1000_0000); if node_tag == 0 { Ok(Node::Leaf(base_pos)) } else { let mut children = self.node_buf(); let n_children = (node_tag & 0b0111_1111).count_ones() as usize; - let child_dptrs = Dptr::read_n(&mut self.rdr, n_children)?; + let child_dptrs = Dptr::read_n(&mut self.disktree_csr, n_children)?; children.extend( (0..7) .rev() @@ -91,46 +94,43 @@ where // `Some` with the contents of the user's deserializer, but let's // make sure we never yeild another value by clearing stack. fn stop_yeilding(&mut self) { - self.stack.clear(); - self.curr = None; + self.node_stack.clear(); + self.curr_node = None; } - pub(crate) fn new(rdr: &'a mut R, f: F) -> Result> { + pub(crate) fn new(disktree_buf: &'a [u8]) -> Result> { + let mut disktree_csr = Cursor::new(disktree_buf); let mut cell_stack = CellStack::new(); - let mut stack = Vec::new(); + let mut node_stack = Vec::new(); let recycle_bin = Vec::new(); - let mut base_nodes = Self::read_base_nodes(rdr)?; - let curr = base_nodes.pop(); - stack.push(base_nodes); - if let Some((digit, _)) = curr { + let mut base_nodes = Self::read_base_nodes(&mut disktree_csr)?; + let curr_node = base_nodes.pop(); + node_stack.push(base_nodes); + if let Some((digit, _)) = curr_node { cell_stack.push(digit); } Ok(Self { cell_stack, - curr, - rdr, + curr_node, + disktree_buf, + disktree_csr, recycle_bin, - stack, - f, + node_stack, }) } } -impl<'a, R, F> Iterator for Iter<'a, R, F> -where - R: Read + Seek, - F: ReadVal, -{ - type Item = >::T; +impl<'a> Iterator for Iter<'a> { + type Item = Result<(Cell, &'a [u8])>; fn next(&mut self) -> Option { - while self.curr.is_none() { - if let Some(mut dptrs) = self.stack.pop() { + while self.curr_node.is_none() { + if let Some(mut dptrs) = self.node_stack.pop() { self.cell_stack.pop(); if let Some((digit, dptr)) = dptrs.pop() { self.cell_stack.push(digit); - self.curr = Some((digit, dptr)); - self.stack.push(dptrs); + self.curr_node = Some((digit, dptr)); + self.node_stack.push(dptrs); } else { self.recycle_node_buf(dptrs); } @@ -138,32 +138,35 @@ where break; } } - while let Some((digit, dptr)) = self.curr { + while let Some((digit, dptr)) = self.curr_node { self.cell_stack.swap(digit); match self.read_node(dptr) { Err(e) => { self.stop_yeilding(); - return Some(self.f.read(Err(e))); + return Some(Err(e)); } Ok(Node::Parent(mut children)) => { if let Some((digit, dptr)) = children.pop() { self.cell_stack.push(digit); - self.curr = Some((digit, dptr)); - self.stack.push(children); + self.curr_node = Some((digit, dptr)); + self.node_stack.push(children); } else { self.recycle_node_buf(children); } } Ok(Node::Leaf(dptr)) => { - self.curr = None; + self.curr_node = None; if let Err(e) = self.seek_to(dptr) { self.stop_yeilding(); - return Some(self.f.read(Err(e))); + return Some(Err(e)); } - return Some(self.f.read(Ok(( + let val_len = leb128::read::unsigned(&mut self.disktree_csr).unwrap() as usize; + let pos = self.disktree_csr.position() as usize; + let val_buf = &self.disktree_buf[pos..][..val_len]; + return Some(Ok(( *self.cell_stack.cell().expect("corrupted cell-stack"), - self.rdr, - )))); + val_buf, + ))); } }; } diff --git a/src/disktree/mod.rs b/src/disktree/mod.rs index cdd5d48..d575ef3 100644 --- a/src/disktree/mod.rs +++ b/src/disktree/mod.rs @@ -1,13 +1,11 @@ //! An on-disk hextree. pub use tree::DiskTree; -pub use value::ReadVal; mod dptr; mod iter; mod node; mod tree; -mod value; mod writer; #[cfg(test)] @@ -51,15 +49,15 @@ mod tests { monaco .to_disktree(&mut file, |wtr, val| bincode::serialize_into(wtr, val)) .unwrap(); - let mut monaco_disktree = DiskTree::from_reader(file).unwrap(); + let monaco_disktree = DiskTree::open(path).unwrap(); assert_eq!(monaco.get(point_2).unzip().1, None); assert_eq!(monaco.get(point_1).unzip().1, Some(&Region::Monaco)); for (ht_cell, &ht_val) in monaco.iter() { let now = std::time::Instant::now(); - let (dt_cell, dt_val_rdr) = monaco_disktree.seek_to_cell(ht_cell).unwrap().unwrap(); - let dt_val = bincode::deserialize_from(dt_val_rdr).unwrap(); + let (dt_cell, val_buf) = monaco_disktree.get(ht_cell).unwrap().unwrap(); + let dt_val = bincode::deserialize_from(val_buf).unwrap(); let lookup_duration = now.elapsed(); println!("loookup of {dt_cell} took {lookup_duration:?}"); assert_eq!(ht_val, dt_val); @@ -90,33 +88,16 @@ mod tests { monaco .to_disktree(&mut file, |wtr, val| bincode::serialize_into(wtr, val)) .unwrap(); - let mut monaco_disktree = DiskTree::from_reader(file).unwrap(); - - // Error type for user-defined deserializer. - #[derive(Debug)] - enum RdrErr { - Bincode(bincode::Error), - Disktree(crate::error::Error), - } - - // Our function for deserializing `Cell` values from the - // disktree. - fn deserialze_cell( - res: crate::error::Result<(Cell, &mut R)>, - ) -> Result<(Cell, Cell), RdrErr> { - match res { - Ok((cell, rdr)) => match bincode::deserialize_from(rdr) { - Ok(val) => Ok((cell, val)), - Err(e) => Err(RdrErr::Bincode(e)), - }, - Err(e) => Err(RdrErr::Disktree(e)), - } - } + let monaco_disktree = DiskTree::open(path).unwrap(); // Create the iterator with the user-defined deserialzer. - let disktree_iter = monaco_disktree.iter(deserialze_cell).unwrap(); + let disktree_iter = monaco_disktree.iter().unwrap(); let start = std::time::Instant::now(); - let disktree_collection: Vec<_> = disktree_iter.collect::, _>>().unwrap(); + let mut disktree_collection = Vec::new(); + for res in disktree_iter { + let (cell, val_buf) = res.unwrap(); + disktree_collection.push((cell, bincode::deserialize_from(val_buf).unwrap())); + } let elapsed = start.elapsed(); println!("{elapsed:?}"); let start = std::time::Instant::now(); diff --git a/src/disktree/tree.rs b/src/disktree/tree.rs old mode 100644 new mode 100755 index ce19740..00681b0 --- a/src/disktree/tree.rs +++ b/src/disktree/tree.rs @@ -1,107 +1,103 @@ use crate::{ digits::Digits, - disktree::{dptr::Dptr, iter::Iter, node::Node, ReadVal}, - error::{Error, Result}, - Cell, + disktree::{dptr::Dptr, iter::Iter, node::Node}, + error::Result, + Cell, Error, }; use byteorder::ReadBytesExt; +use memmap::{Mmap, MmapOptions}; use std::{ fs::File, - io::{Read, Seek, SeekFrom}, + io::{Cursor, Read, Seek, SeekFrom}, path::Path, }; -pub(crate) const HDR_SZ: u64 = 1; +pub(crate) const HDR_MAGIC: &[u8] = b"hextree\0"; +pub(crate) const HDR_SZ: u64 = HDR_MAGIC.len() as u64 + 1; /// An on-disk hextree map. -pub struct DiskTree(R); +pub struct DiskTree(B); -impl DiskTree { +impl DiskTree { /// Opens a `DiskTree` at the specified path. pub fn open>(path: P) -> Result { let file = File::open(path)?; - Self::from_reader(file) + Self::memmap(file) } -} -impl DiskTree { - /// Conumes `self` and returns the backing store. - pub fn into_inner(self) -> R { - self.0 + /// Memory maps the provided disktree-containing. + pub fn memmap(file: File) -> Result { + #[allow(unsafe_code)] + let mm = unsafe { MmapOptions::new().map(&file)? }; + Self::with_buf(mm) } } -impl DiskTree { - /// Opens a `DiskTree` with a provided reader. - pub fn from_reader(mut rdr: R) -> Result { - rdr.seek(SeekFrom::Start(0))?; +impl> DiskTree { + /// Opens a `DiskTree` with a provided buffer. + pub fn with_buf(buf: B) -> Result { + let mut csr = Cursor::new(buf); + let magic = { + let mut buf = [0_u8; HDR_MAGIC.len()]; + csr.read_exact(&mut buf)?; + buf + }; + if magic != HDR_MAGIC { + return Err(Error::NotDisktree); + } + let version = { // We use 0xFE as a version offset since it is much less // likely to randomly appear than 0; - 0xFE - rdr.read_u8()? + 0xFE - csr.read_u8()? }; match version { - 0 => Ok(Self(rdr)), + 0 => Ok(Self(csr.into_inner())), unsupported_version => Err(Error::Version(unsupported_version)), } } - /// Returns a reader pre-seeked to the value for cell, if present. - pub fn seek_to_cell(&mut self, cell: Cell) -> Result> { + /// Returns `(Cell, &[u8])`, if present. + pub fn get(&self, cell: Cell) -> Result> { let base_cell_pos = Self::base_cell_dptr(cell); - self.seek_to_pos(base_cell_pos)?; - let node_dptr = Dptr::read(&mut self.0)?; + let mut csr = Cursor::new(self.0.as_ref()); + csr.seek(SeekFrom::Start(base_cell_pos.into()))?; + let node_dptr = Dptr::read(&mut csr)?; if node_dptr.is_null() { return Ok(None); } let digits = Digits::new(cell); - if let Some((cell, dptr)) = self._get(0, node_dptr, cell, digits)? { - self.seek_to_pos(dptr.into())?; - Ok(Some((cell, &mut self.0))) + if let Some((cell, dptr)) = Self::_get(&mut csr, 0, node_dptr, cell, digits)? { + csr.seek(SeekFrom::Start(dptr.into()))?; + let val_len = leb128::read::unsigned(&mut csr).unwrap() as usize; + let val_start = csr.position() as usize; + let val_bytes = &self.0.as_ref()[val_start..][..val_len]; + Ok(Some((cell, val_bytes))) } else { Ok(None) } } /// Returns `true` if the tree fully contains `cell`. - pub fn contains(&mut self, cell: Cell) -> Result { - let base_cell_pos = Self::base_cell_dptr(cell); - self.seek_to_pos(base_cell_pos)?; - let node_dptr = Dptr::read(&mut self.0)?; - if node_dptr.is_null() { - return Ok(false); - } - let digits = Digits::new(cell); - self._get(0, node_dptr, cell, digits) - .map(|opt| opt.is_some()) + pub fn contains(&self, cell: Cell) -> Result { + self.get(cell).map(|opt| opt.is_some()) } - /// Returns an iterator visiting all cell-value pairs in arbitrary - /// order. - /// - /// However, insteading of returning the concrete value, the - /// iterator retuns a reader pre-seeked to the node's value. - pub fn iter<'a, F>( - &'a mut self, - f: F, - ) -> Result>::T> + 'a> - where - F: ReadVal + 'a, - { - Iter::new(&mut self.0, f) + /// Returns an iterator visiting all `(Cell, &[u8])` pairs in + /// arbitrary order. + pub fn iter(&self) -> Result>> { + Iter::new(self.0.as_ref()) } - /// Leaf: | 0_u8 | bincode bytes | - /// Parent: | 1_u8 | Dptr | Dptr | Dptr | Dptr | Dptr | Dptr | Dptr | fn _get( - &mut self, + csr: &mut Cursor<&[u8]>, res: u8, node_dptr: Dptr, cell: Cell, mut digits: Digits, ) -> Result> { - self.seek_to_pos(node_dptr)?; - let node = Node::read(&mut self.0)?; + csr.seek(SeekFrom::Start(node_dptr.into()))?; + let node = Node::read(csr)?; match (digits.next(), node) { (None, Node::Leaf(dptr)) => Ok(Some((cell, dptr))), (Some(_), Node::Leaf(dptr)) => Ok(Some(( @@ -110,7 +106,7 @@ impl DiskTree { ))), (Some(digit), Node::Parent(children)) => match children[digit as usize] { None => Ok(None), - Some(dptr) => self._get(res + 1, dptr, cell, digits), + Some(dptr) => Self::_get(csr, res + 1, dptr, cell, digits), }, // No digits left, but `self` isn't full, so this cell // can't fully contain the target. @@ -118,11 +114,6 @@ impl DiskTree { } } - fn seek_to_pos(&mut self, dptr: Dptr) -> Result { - self.0.seek(SeekFrom::Start(u64::from(dptr)))?; - Ok(()) - } - /// Returns the DPtr to a base (res0) cell dptr. fn base_cell_dptr(cell: Cell) -> Dptr { Dptr::from(HDR_SZ + Dptr::size() * (cell.base() as u64)) diff --git a/src/disktree/value.rs b/src/disktree/value.rs deleted file mode 100644 index 7582870..0000000 --- a/src/disktree/value.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::{error::Result, Cell}; -use std::io::Read; - -/// The `ReadVal` trait defines the contract for reading concrete -/// types from a disk tree. -pub trait ReadVal { - /// The associated type `T` represents the result of - /// deserialization, which may be the deserialized type or an - /// error result, depending on fallibility. - type T; - - /// Reads data from the provided reader and returns the - /// deserialized result. - /// - /// # Arguments - /// - /// * `rdr` - A `Result` containing a `Cell` and a mutable - /// reference to the reader. - /// - /// # Returns - /// - /// The deserialized result, which can be the deserialized type or - /// an error. - fn read(&self, rdr: Result<(Cell, &mut R)>) -> Self::T; -} - -impl ReadVal for F -where - R: Read, - F: Fn(Result<(Cell, &mut R)>) -> T, -{ - type T = T; - - fn read(&self, rdr: Result<(Cell, &mut R)>) -> T { - self(rdr) - } -} diff --git a/src/disktree/writer.rs b/src/disktree/writer.rs index 432a4dd..7ffb901 100644 --- a/src/disktree/writer.rs +++ b/src/disktree/writer.rs @@ -1,6 +1,6 @@ use crate::{ compaction::Compactor, - disktree::dptr::Dptr, + disktree::{dptr::Dptr, tree::HDR_MAGIC}, error::{Error, Result}, node::Node, HexTreeMap, @@ -13,35 +13,46 @@ impl> HexTreeMap { pub fn to_disktree(&self, wtr: W, f: F) -> Result where W: Write + Seek, - F: Fn(&mut W, &V) -> std::result::Result<(), E>, + F: Fn(&mut dyn Write, &V) -> std::result::Result<(), E>, E: std::error::Error + Sync + Send + 'static, { - DiskTreeWriter(wtr).write(self, f) + DiskTreeWriter::new(wtr).write(self, f) } } -pub(crate) struct DiskTreeWriter(W); +pub(crate) struct DiskTreeWriter { + scratch_pad: Vec, + wtr: W, +} + +impl DiskTreeWriter { + pub fn new(wtr: W) -> Self { + let scratch_pad = Vec::new(); + Self { wtr, scratch_pad } + } +} impl DiskTreeWriter { pub fn write(&mut self, hextree: &HexTreeMap, mut f: F) -> Result where - F: Fn(&mut W, &V) -> std::result::Result<(), E>, + F: Fn(&mut dyn Write, &V) -> std::result::Result<(), E>, E: std::error::Error + Sync + Send + 'static, { + // Write magic string + self.wtr.write_all(HDR_MAGIC)?; // Write version field const VERSION: u8 = 0; - self.0.write_u8(0xFE - VERSION)?; - // Write base cells placeholder offsets. + self.wtr.write_u8(0xFE - VERSION)?; + let mut fixups: Vec<(Dptr, &Node)> = Vec::new(); - // Empty: | DPTR_DEFAULT | - // Node: | Dptr | + // Write base cells placeholder offsets. for base in hextree.nodes.iter() { match base.as_deref() { - None => Dptr::null().write(&mut self.0)?, + None => Dptr::null().write(&mut self.wtr)?, Some(node) => { fixups.push((self.pos()?, node)); - Dptr::null().write(&mut self.0)? + Dptr::null().write(&mut self.wtr)? } } } @@ -49,30 +60,32 @@ impl DiskTreeWriter { for (fixee_dptr, node) in fixups { let node_dptr = self.write_node(node, &mut f)?; self.seek_to(fixee_dptr)?; - node_dptr.write(&mut self.0)?; + node_dptr.write(&mut self.wtr)?; } Ok(()) } - /// Leaf: | 0_u8 | bincode bytes | - /// Parent: | 1_u8 | Dptr | Dptr | Dptr | Dptr | Dptr | Dptr | Dptr | fn write_node(&mut self, node: &Node, f: &mut F) -> Result where - F: Fn(&mut W, &V) -> std::result::Result<(), E>, + F: Fn(&mut dyn Write, &V) -> std::result::Result<(), E>, E: std::error::Error + Sync + Send + 'static, { - let node_pos: Dptr = self.0.seek(SeekFrom::End(0))?.into(); + let node_pos: Dptr = self.wtr.seek(SeekFrom::End(0))?.into(); let mut node_fixups: Vec<(Dptr, &Node)> = Vec::new(); match node { Node::Leaf(val) => { - self.0.write_u8(0)?; - // bincode::serialize_into(&mut self.0, val)?; - f(&mut self.0, val).map_err(|e| Error::Writer(Box::new(e)))? + self.wtr.write_u8(0)?; + debug_assert!(self.scratch_pad.is_empty()); + f(&mut self.scratch_pad, val).map_err(|e| Error::Writer(Box::new(e)))?; + let val_len = self.scratch_pad.len() as u64; + leb128::write::unsigned(&mut self.wtr, val_len)?; + self.wtr.write_all(&self.scratch_pad)?; + self.scratch_pad.clear(); } Node::Parent(children) => { let tag_pos = self.pos()?; - self.0.write_u8(0b1000_0000)?; + self.wtr.write_u8(0b1000_0000)?; let mut tag = 0; for child in children.iter() { match child.as_deref() { @@ -86,32 +99,31 @@ impl DiskTreeWriter { // this node is empty. tag = (tag >> 1) | 0b1000_0000; node_fixups.push((self.pos()?, node)); - Dptr::null().write(&mut self.0)?; + Dptr::null().write(&mut self.wtr)?; } }; } self.seek_to(tag_pos)?; // Make the top bit 1 as a sentinel. tag = (tag >> 1) | 0b1000_0000; - // println!("{tag_pos:010x}: write tag {tag:08b}"); - self.0.write_u8(tag)?; + self.wtr.write_u8(tag)?; } }; for (fixee_dptr, node) in node_fixups { let node_dptr = self.write_node(node, f)?; self.seek_to(fixee_dptr)?; - node_dptr.write(&mut self.0)?; + node_dptr.write(&mut self.wtr)?; } Ok(node_pos) } fn pos(&mut self) -> Result { - Ok(Dptr::from(self.0.stream_position()?)) + Ok(Dptr::from(self.wtr.stream_position()?)) } fn seek_to(&mut self, dptr: Dptr) -> Result { - Ok(Dptr::from(self.0.seek(SeekFrom::Start(u64::from(dptr)))?)) + Ok(Dptr::from(self.wtr.seek(SeekFrom::Start(u64::from(dptr)))?)) } } diff --git a/src/error.rs b/src/error.rs index cdbfd8b..70b9d0f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,6 +12,10 @@ pub enum Error { #[cfg(feature = "disktree")] Io(std::io::Error), + /// Not a disktree. + #[cfg(feature = "disktree")] + NotDisktree, + /// Unsupported version. #[cfg(feature = "disktree")] Version(u8), @@ -44,6 +48,9 @@ impl std::error::Error for Error { #[cfg(feature = "disktree")] Error::Io(inner) => inner.source(), + #[cfg(feature = "disktree")] + Error::NotDisktree => None, + #[cfg(feature = "disktree")] Error::Version(_) => None, @@ -67,6 +74,11 @@ impl std::fmt::Display for Error { #[cfg(feature = "disktree")] Error::Io(io_error) => io_error.fmt(f), + #[cfg(feature = "disktree")] + Error::NotDisktree => { + write!(f, "file missing magic header") + } + #[cfg(feature = "disktree")] Error::Version(version) => { write!(f, "unsupported version, got {version}")