Skip to content

Commit

Permalink
[rust] Clearer code for HttpReader (#337)
Browse files Browse the repository at this point in the history
* [rust] use Range for clarity - no change in behavior

* rename for clarity

* clearer naming

* [rust] clearer index traversal code

* [rust] simplify with `chunks`

* [rust] add clarifying debug asserts

* clarify comment

* [rust] fix off by one in debug assert

An index is one less than a count.
  • Loading branch information
michaelkirk committed Dec 21, 2023
1 parent c1a2aa8 commit 390f8dc
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 104 deletions.
201 changes: 97 additions & 104 deletions src/rust/src/packed_r_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::cmp::{max, min};
use std::collections::VecDeque;
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
use std::mem::size_of;
use std::ops::Range;

#[derive(Clone, PartialEq, Debug)]
#[repr(C)]
Expand Down Expand Up @@ -163,22 +164,20 @@ fn read_node_items<R: Read + Seek>(
async fn read_http_node_items(
client: &mut BufferedHttpRangeClient,
base: usize,
node_index: usize,
num_nodes: usize,
node_ids: &Range<usize>,
) -> Result<Vec<NodeItem>> {
let begin = base + node_index * size_of::<NodeItem>();
let length = num_nodes * size_of::<NodeItem>();
let begin = base + node_ids.start * size_of::<NodeItem>();
let length = node_ids.len() * size_of::<NodeItem>();
let bytes = client
// we've already determined precisely which nodes to fetch - no need for extra.
.min_req_size(0)
.get_range(begin, length)
.await?;

let mut node_items = Vec::with_capacity(num_nodes);
for i in 0..num_nodes {
node_items.push(NodeItem::from_bytes(
&bytes[i * size_of::<NodeItem>()..(i + 1) * size_of::<NodeItem>()],
)?);
let mut node_items = Vec::with_capacity(node_ids.len());
debug_assert_eq!(bytes.len(), length);
for node_item_bytes in bytes.chunks(size_of::<NodeItem>()) {
node_items.push(NodeItem::from_bytes(node_item_bytes)?);
}
Ok(node_items)
}
Expand Down Expand Up @@ -280,7 +279,7 @@ pub struct PackedRTree {
node_items: Vec<NodeItem>,
num_leaf_nodes: usize,
branching_factor: u16,
level_bounds: Vec<(usize, usize)>,
level_bounds: Vec<Range<usize>>,
}

impl PackedRTree {
Expand All @@ -296,12 +295,12 @@ impl PackedRTree {
.level_bounds
.first()
.expect("RTree has at least one level when node_size >= 2 and num_items > 0")
.1;
.end;
self.node_items = vec![NodeItem::create(0); num_nodes]; // Quite slow!
Ok(())
}

fn generate_level_bounds(num_items: usize, node_size: u16) -> Vec<(usize, usize)> {
fn generate_level_bounds(num_items: usize, node_size: u16) -> Vec<Range<usize>> {
assert!(node_size >= 2, "Node size must be at least 2");
assert!(num_items > 0, "Cannot create empty tree");
assert!(
Expand Down Expand Up @@ -331,23 +330,22 @@ impl PackedRTree {
}
let mut level_bounds = Vec::with_capacity(level_num_nodes.len());
for i in 0..level_num_nodes.len() {
level_bounds.push((level_offsets[i], level_offsets[i] + level_num_nodes[i]));
level_bounds.push(level_offsets[i]..level_offsets[i] + level_num_nodes[i]);
}
level_bounds
}

fn generate_nodes(&mut self) {
for level in 0..self.level_bounds.len() - 1 {
let start_of_children_level = self.level_bounds[level].0;
let end_of_children_level = self.level_bounds[level].1;
let start_of_parent_level = self.level_bounds[level + 1].0;
let children_level = &self.level_bounds[level];
let parent_level = &self.level_bounds[level + 1];

let mut parent_idx = start_of_parent_level;
let mut child_idx = start_of_children_level;
while child_idx < end_of_children_level {
let mut parent_idx = parent_level.start;
let mut child_idx = children_level.start;
while child_idx < children_level.end {
let mut parent_node = NodeItem::create(child_idx as u64);
for _j in 0..self.branching_factor {
if child_idx >= end_of_children_level {
if child_idx >= children_level.end {
break;
}
parent_node.expand(&self.node_items[child_idx]);
Expand Down Expand Up @@ -415,7 +413,7 @@ impl PackedRTree {
let num_nodes = level_bounds
.first()
.expect("RTree has at least one level when node_size >= 2 and num_items > 0")
.1;
.end;
let mut tree = PackedRTree {
extent: NodeItem::create(0),
node_items: Vec::with_capacity(num_nodes),
Expand Down Expand Up @@ -457,7 +455,7 @@ impl PackedRTree {
.level_bounds
.first()
.expect("RTree has at least one level when node_size >= 2 and num_items > 0")
.0;
.start;
let bounds = NodeItem::bounds(min_x, min_y, max_x, max_y);
let mut results = Vec::new();
let mut queue = VecDeque::new();
Expand All @@ -469,7 +467,7 @@ impl PackedRTree {
// find the end index of the node
let end = min(
node_index + self.branching_factor as usize,
self.level_bounds[level].1,
self.level_bounds[level].end,
);
// search through child nodes
for pos in node_index..end {
Expand Down Expand Up @@ -501,7 +499,10 @@ impl PackedRTree {
) -> Result<Vec<SearchResultItem>> {
let bounds = NodeItem::bounds(min_x, min_y, max_x, max_y);
let level_bounds = PackedRTree::generate_level_bounds(num_items, node_size);
let (leaf_nodes_offset, num_nodes) = level_bounds
let Range {
start: leaf_nodes_offset,
end: num_nodes,
} = level_bounds
.first()
.expect("RTree has at least one level when node_size >= 2 and num_items > 0");

Expand All @@ -519,7 +520,7 @@ impl PackedRTree {
trace!("popped next node_index: {node_index}, level: {level}");
let is_leaf_node = node_index >= num_nodes - num_items;
// find the end index of the node
let end = min(node_index + node_size as usize, level_bounds[level].1);
let end = min(node_index + node_size as usize, level_bounds[level].end);
let length = end - node_index;
let node_items = read_node_items(data, index_base, node_index, length)?;
// search through child nodes
Expand Down Expand Up @@ -555,7 +556,7 @@ impl PackedRTree {
client: &mut BufferedHttpRangeClient,
index_begin: usize,
num_items: usize,
node_size: u16,
branching_factor: u16,
min_x: f64,
min_y: f64,
max_x: f64,
Expand All @@ -566,18 +567,14 @@ impl PackedRTree {
if num_items == 0 {
return Ok(vec![]);
}
let level_bounds = PackedRTree::generate_level_bounds(num_items, node_size);
let leaf_nodes_offset = level_bounds
.first()
.expect("RTree has at least one level when node_size >= 2 and num_items > 0")
.0;
let feature_begin = index_begin + PackedRTree::index_size(num_items, node_size);
debug!("http_stream_search - index_begin: {index_begin}, feature_begin: {feature_begin} num_items: {num_items}, node_size: {node_size}, level_bounds: {level_bounds:?}, GPS bounds:[({min_x}, {min_y}), ({max_x},{max_y})]");
let level_bounds = PackedRTree::generate_level_bounds(num_items, branching_factor);
let feature_begin = index_begin + PackedRTree::index_size(num_items, branching_factor);
debug!("http_stream_search - index_begin: {index_begin}, feature_begin: {feature_begin} num_items: {num_items}, branching_factor: {branching_factor}, level_bounds: {level_bounds:?}, GPS bounds:[({min_x}, {min_y}), ({max_x},{max_y})]");

#[derive(Debug, PartialEq, Eq)]
struct NodeRange {
level: usize,
nodes: std::ops::Range<usize>,
nodes: Range<usize>,
}

let mut queue = VecDeque::new();
Expand All @@ -587,95 +584,91 @@ impl PackedRTree {
});
let mut results = Vec::new();

while let Some(next) = queue.pop_front() {
debug!(
"popped node: {next:?}, remaining queue len: {}",
queue.len()
);
let start_node = next.nodes.start;
let is_leaf_node = next.level == 0;
if is_leaf_node {
assert!(start_node >= leaf_nodes_offset);
} else {
assert!(start_node < leaf_nodes_offset);
assert!(next.nodes.end < leaf_nodes_offset);
}
// find the end index of the nodes
let mut end_node = min(
next.nodes.end + node_size as usize,
level_bounds[next.level].1,
);
if is_leaf_node && end_node < level_bounds[next.level].1 {
// We can infer the length of *this* feature by getting the start of the *next*
// feature, so we get an extra node.
// This approach doesn't work for the final node in the index,
// but in that case we know that the feature runs to the end of the FGB file and
// can make an open ended range request to get "the rest of the data".
end_node += 1;
}
let num_nodes = end_node - start_node;
let node_items =
read_http_node_items(client, index_begin, start_node, num_nodes).await?;

// search through child nodes
for node_id in start_node..end_node {
let node_pos = node_id - start_node;
let node_item = &node_items[node_pos];
while let Some(node_range) = queue.pop_front() {
debug!("next: {node_range:?}. {} items left in queue", queue.len());
let node_items = read_http_node_items(client, index_begin, &node_range.nodes).await?;
for (node_pos, node_item) in node_items.iter().enumerate() {
if !bounds.intersects(node_item) {
continue;
}
if is_leaf_node {

if node_range.level == 0 {
// leaf node
let start = feature_begin + node_item.offset as usize;
if let Some(next_node_item) = &node_items.get(node_pos + 1) {
let end = feature_begin + next_node_item.offset as usize;
results.push(HttpSearchResultItem {
range: HttpRange::Range(start..end),
});
} else {
debug_assert_eq!(node_pos, num_items);
debug_assert_eq!(node_pos, num_items - 1);
results.push(HttpSearchResultItem {
range: HttpRange::RangeFrom(start..),
});
}
continue;
}

let combine_request_node_threshold =
combine_request_threshold / size_of::<NodeItem>();

// Add node to search recursion
match (queue.back_mut(), node_item.offset as usize) {
// There is an existing node for this level, and it's close to this node.
// Merge the ranges to avoid an extra request
(Some(tail), offset)
if tail.level == next.level - 1
&& offset < tail.nodes.end + combine_request_node_threshold =>
{
debug_assert!(tail.nodes.end < offset);
tail.nodes.end = offset;
} else {
let children_level = node_range.level - 1;
let mut children_nodes = node_item.offset as usize
..(node_item.offset + branching_factor as u64) as usize;
if children_level == 0 {
// These children are leaf nodes.
//
// We can right-size our feature requests if we know the size of each feature.
//
// To infer the length of *this* feature, we need the start of the *next*
// feature, so we get an extra node here.
children_nodes.end += 1;
}
// always stay within level's bounds
children_nodes.end = min(children_nodes.end, level_bounds[children_level].end);

let children_range = NodeRange {
nodes: children_nodes,
level: children_level,
};

let Some(tail) = queue.back_mut() else {
debug!("Adding new request onto empty queue: {children_range:?}");
queue.push_back(children_range);
continue;
};

if tail.level != children_level {
debug!("Adding new request for new level: {children_range:?} (existing queue tail: {tail:?})");
queue.push_back(children_range);
continue;
}

(tail, offset) => {
let node_range = NodeRange {
nodes: offset..(offset + 1),
level: next.level - 1,
};

if tail
.as_ref()
.map(|head| head.level == next.level - 1)
.unwrap_or(false)
{
debug!("requesting new NodeRange for offset: {offset} rather than merging with distant NodeRange: {tail:?}");
let wasted_bytes = {
if children_range.nodes.start >= tail.nodes.end {
(children_range.nodes.start - tail.nodes.end) * size_of::<NodeItem>()
} else {
debug!(
"pushing new level for NodeRange: {node_range:?} onto Queue with tail: {:?}",
queue.back()
// To compute feature size, we fetch an extra leaf node, but computing
// wasted_bytes for adjacent ranges will overflow in that case, so
// we skip that computation.
//
// But let's make sure we're in the state we think we are:
debug_assert_eq!(
children_range.nodes.start + 1,
tail.nodes.end,
"we only ever fetch one extra node"
);
debug_assert_eq!(
children_level, 0,
"extra node fetching only happens with leaf nodes"
);
0
}

queue.push_back(node_range);
};
if wasted_bytes > combine_request_threshold {
debug!("Adding new request for: {children_range:?} rather than merging with distant NodeRange: {tail:?} (would waste {wasted_bytes} bytes)");
queue.push_back(children_range);
continue;
}

// Merge the ranges to avoid an extra request
debug!("Extending existing request {tail:?} with nearby children: {:?} (wastes {wasted_bytes} bytes)", &children_range.nodes);
tail.nodes.end = children_range.nodes.end;
}
}
}
Expand Down Expand Up @@ -783,7 +776,7 @@ mod inspect {
processor.dataset_begin(Some("PackedRTree"))?;
let mut fid = 0;
for (levelno, level) in self.level_bounds.iter().rev().enumerate() {
for pos in level.0..level.1 {
for pos in level.clone() {
let node = &self.node_items[pos];
processor.feature_begin(fid)?;
processor.properties_begin()?;
Expand Down
14 changes: 14 additions & 0 deletions src/rust/tests/http_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ mod http {
Ok(())
}

#[tokio::test]
async fn http_bbox_read_last_feature() -> Result<()> {
let url = "https://github.com/flatgeobuf/flatgeobuf/raw/master/test/data/countries.fgb";
let fgb = HttpFgbReader::open(url).await.unwrap();
assert_eq!(fgb.header().features_count(), 179);
let mut feature_iter = fgb.select_bbox(-180.0, -90.0, 180.0, 90.0).await?;
let mut count = 0;
while let Some(_next) = feature_iter.next().await? {
count += 1;
}
assert_eq!(count, 179);
Ok(())
}

#[tokio::test]
async fn http_read_unknown_feature_count() -> Result<()> {
let url = "https://github.com/flatgeobuf/flatgeobuf/raw/master/test/data/unknown_feature_count.fgb";
Expand Down

0 comments on commit 390f8dc

Please sign in to comment.