Skip to content

Commit

Permalink
Memory map disktrees
Browse files Browse the repository at this point in the history
  • Loading branch information
JayKickliter committed Jan 4, 2024
1 parent 701affc commit 5927299
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 202 deletions.
9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@ all-features = true

[features]
default = ["disktree"]
disktree = ["serde", "byteorder"]
disktree = [
"byteorder",
"leb128",
"memmap",
"serde",
]
serde = ["dep:serde"]

[dependencies]
byteorder = { version = "1", optional = true }
leb128 = { version = "0.2.5", optional = true }
memmap = { version = "0.7", optional = true }
serde = { version = "1", optional = true, features = ["derive"] }

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn set_lookup(c: &mut Criterion) {
fn disk_set_lookup(c: &mut Criterion) {
let mut group = c.benchmark_group("US915 DiskTreeSet lookup");

let mut us915_disk_set = {
let us915_disk_set = {
let us915_set: HexTreeSet = PLAIN_US915_INDICES
.iter()
.map(|&idx| Cell::try_from(idx).unwrap())
Expand All @@ -60,7 +60,7 @@ fn disk_set_lookup(c: &mut Criterion) {
us915_set
.to_disktree(&mut file, |_, _| Ok::<(), std::io::Error>(()))
.unwrap();
DiskTree::from_reader(file).unwrap()
DiskTree::memmap(file).unwrap()
};

let tarpon_springs = coord! {x: -82.753822, y: 28.15215};
Expand Down
97 changes: 50 additions & 47 deletions src/disktree/iter.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use crate::{
cell::CellStack,
disktree::{dptr::Dptr, tree::HDR_SZ, ReadVal},
disktree::{dptr::Dptr, tree::HDR_SZ},
error::Result,
Cell,
};
use byteorder::ReadBytesExt;
use std::io::{Read, Seek, SeekFrom};
use std::io::{Cursor, Seek, SeekFrom};

pub(crate) struct Iter<'a, R, F> {
pub(crate) struct Iter<'a> {
cell_stack: CellStack,
curr: Option<(u8, Dptr)>,
rdr: &'a mut R,
curr_node: Option<(u8, Dptr)>,
disktree_buf: &'a [u8],
disktree_csr: Cursor<&'a [u8]>,
node_stack: Vec<Vec<(u8, Dptr)>>,
recycle_bin: Vec<Vec<(u8, Dptr)>>,
stack: Vec<Vec<(u8, Dptr)>>,
f: F,
}

enum Node {
Expand All @@ -22,15 +23,14 @@ enum Node {
Parent(Vec<(u8, Dptr)>),
}

impl<'a, R, F> Iter<'a, R, F>
where
R: Seek + Read,
{
impl<'a> Iter<'a> {
fn seek_to(&mut self, dptr: Dptr) -> Result<Dptr> {
Ok(Dptr::from(self.rdr.seek(SeekFrom::Start(u64::from(dptr)))?))
Ok(Dptr::from(
self.disktree_csr.seek(SeekFrom::Start(u64::from(dptr)))?,
))
}

fn read_base_nodes(rdr: &mut R) -> Result<Vec<(u8, Dptr)>> {
fn read_base_nodes(rdr: &mut Cursor<&[u8]>) -> Result<Vec<(u8, Dptr)>> {
let mut buf = Vec::with_capacity(122);
rdr.seek(SeekFrom::Start(HDR_SZ))?;
for digit in 0..122 {
Expand All @@ -46,16 +46,19 @@ where
// `pos` is a position in the file of this node's tag.
fn read_node(&mut self, dptr: Dptr) -> Result<Node> {
let dptr = self.seek_to(dptr)?;
let node_tag = self.rdr.read_u8()?;
let node_tag = self.disktree_csr.read_u8()?;
let base_pos = Dptr::from(u64::from(dptr) + std::mem::size_of_val(&node_tag) as u64);
debug_assert_eq!(base_pos, Dptr::from(self.rdr.stream_position().unwrap()));
debug_assert_eq!(
base_pos,
Dptr::from(self.disktree_csr.stream_position().unwrap())
);
assert!(node_tag == 0 || node_tag > 0b1000_0000);
if node_tag == 0 {
Ok(Node::Leaf(base_pos))
} else {
let mut children = self.node_buf();
let n_children = (node_tag & 0b0111_1111).count_ones() as usize;
let child_dptrs = Dptr::read_n(&mut self.rdr, n_children)?;
let child_dptrs = Dptr::read_n(&mut self.disktree_csr, n_children)?;
children.extend(
(0..7)
.rev()
Expand Down Expand Up @@ -91,79 +94,79 @@ where
// `Some` with the contents of the user's deserializer, but let's
// make sure we never yeild another value by clearing stack.
fn stop_yeilding(&mut self) {
self.stack.clear();
self.curr = None;
self.node_stack.clear();
self.curr_node = None;
}

pub(crate) fn new(rdr: &'a mut R, f: F) -> Result<Iter<'a, R, F>> {
pub(crate) fn new(disktree_buf: &'a [u8]) -> Result<Iter<'a>> {
let mut disktree_csr = Cursor::new(disktree_buf);
let mut cell_stack = CellStack::new();
let mut stack = Vec::new();
let mut node_stack = Vec::new();
let recycle_bin = Vec::new();
let mut base_nodes = Self::read_base_nodes(rdr)?;
let curr = base_nodes.pop();
stack.push(base_nodes);
if let Some((digit, _)) = curr {
let mut base_nodes = Self::read_base_nodes(&mut disktree_csr)?;
let curr_node = base_nodes.pop();
node_stack.push(base_nodes);
if let Some((digit, _)) = curr_node {
cell_stack.push(digit);
}
Ok(Self {
cell_stack,
curr,
rdr,
curr_node,
disktree_buf,
disktree_csr,
recycle_bin,
stack,
f,
node_stack,
})
}
}

impl<'a, R, F> Iterator for Iter<'a, R, F>
where
R: Read + Seek,
F: ReadVal<R>,
{
type Item = <F as ReadVal<R>>::T;
impl<'a> Iterator for Iter<'a> {
type Item = Result<(Cell, &'a [u8])>;

fn next(&mut self) -> Option<Self::Item> {
while self.curr.is_none() {
if let Some(mut dptrs) = self.stack.pop() {
while self.curr_node.is_none() {
if let Some(mut dptrs) = self.node_stack.pop() {
self.cell_stack.pop();
if let Some((digit, dptr)) = dptrs.pop() {
self.cell_stack.push(digit);
self.curr = Some((digit, dptr));
self.stack.push(dptrs);
self.curr_node = Some((digit, dptr));
self.node_stack.push(dptrs);
} else {
self.recycle_node_buf(dptrs);
}
} else {
break;
}
}
while let Some((digit, dptr)) = self.curr {
while let Some((digit, dptr)) = self.curr_node {
self.cell_stack.swap(digit);
match self.read_node(dptr) {
Err(e) => {
self.stop_yeilding();
return Some(self.f.read(Err(e)));
return Some(Err(e));
}
Ok(Node::Parent(mut children)) => {
if let Some((digit, dptr)) = children.pop() {
self.cell_stack.push(digit);
self.curr = Some((digit, dptr));
self.stack.push(children);
self.curr_node = Some((digit, dptr));
self.node_stack.push(children);
} else {
self.recycle_node_buf(children);
}
}
Ok(Node::Leaf(dptr)) => {
self.curr = None;
self.curr_node = None;
if let Err(e) = self.seek_to(dptr) {
self.stop_yeilding();
return Some(self.f.read(Err(e)));
return Some(Err(e));
}
return Some(self.f.read(Ok((
let val_len = leb128::read::unsigned(&mut self.disktree_csr).unwrap() as usize;
let pos = self.disktree_csr.position() as usize;
let val_buf = &self.disktree_buf[pos..][..val_len];
return Some(Ok((
*self.cell_stack.cell().expect("corrupted cell-stack"),
self.rdr,
))));
val_buf,
)));
}
};
}
Expand Down
39 changes: 10 additions & 29 deletions src/disktree/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
//! An on-disk hextree.

pub use tree::DiskTree;
pub use value::ReadVal;

mod dptr;
mod iter;
mod node;
mod tree;
mod value;
mod writer;

#[cfg(test)]
Expand Down Expand Up @@ -51,15 +49,15 @@ mod tests {
monaco
.to_disktree(&mut file, |wtr, val| bincode::serialize_into(wtr, val))
.unwrap();
let mut monaco_disktree = DiskTree::from_reader(file).unwrap();
let monaco_disktree = DiskTree::open(path).unwrap();

assert_eq!(monaco.get(point_2).unzip().1, None);
assert_eq!(monaco.get(point_1).unzip().1, Some(&Region::Monaco));

for (ht_cell, &ht_val) in monaco.iter() {
let now = std::time::Instant::now();
let (dt_cell, dt_val_rdr) = monaco_disktree.seek_to_cell(ht_cell).unwrap().unwrap();
let dt_val = bincode::deserialize_from(dt_val_rdr).unwrap();
let (dt_cell, val_buf) = monaco_disktree.get(ht_cell).unwrap().unwrap();
let dt_val = bincode::deserialize_from(val_buf).unwrap();
let lookup_duration = now.elapsed();
println!("loookup of {dt_cell} took {lookup_duration:?}");
assert_eq!(ht_val, dt_val);
Expand Down Expand Up @@ -90,33 +88,16 @@ mod tests {
monaco
.to_disktree(&mut file, |wtr, val| bincode::serialize_into(wtr, val))
.unwrap();
let mut monaco_disktree = DiskTree::from_reader(file).unwrap();

// Error type for user-defined deserializer.
#[derive(Debug)]
enum RdrErr {
Bincode(bincode::Error),
Disktree(crate::error::Error),
}

// Our function for deserializing `Cell` values from the
// disktree.
fn deserialze_cell<R: std::io::Read>(
res: crate::error::Result<(Cell, &mut R)>,
) -> Result<(Cell, Cell), RdrErr> {
match res {
Ok((cell, rdr)) => match bincode::deserialize_from(rdr) {
Ok(val) => Ok((cell, val)),
Err(e) => Err(RdrErr::Bincode(e)),
},
Err(e) => Err(RdrErr::Disktree(e)),
}
}
let monaco_disktree = DiskTree::open(path).unwrap();

// Create the iterator with the user-defined deserialzer.
let disktree_iter = monaco_disktree.iter(deserialze_cell).unwrap();
let disktree_iter = monaco_disktree.iter().unwrap();
let start = std::time::Instant::now();
let disktree_collection: Vec<_> = disktree_iter.collect::<Result<Vec<_>, _>>().unwrap();
let mut disktree_collection = Vec::new();
for res in disktree_iter {
let (cell, val_buf) = res.unwrap();
disktree_collection.push((cell, bincode::deserialize_from(val_buf).unwrap()));
}
let elapsed = start.elapsed();
println!("{elapsed:?}");
let start = std::time::Instant::now();
Expand Down
Loading

0 comments on commit 5927299

Please sign in to comment.