Skip to content

Commit

Permalink
Rust: Prototype of generic HttpFgbReader (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Feb 23, 2024
1 parent dbd0473 commit b2e4506
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 23 deletions.
3 changes: 2 additions & 1 deletion src/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ keywords = ["geo", "r-tree", "spatial"]

[features]
default = ["http"]
http = ["http-range-client", "bytes"]
http = ["http-range-client", "bytes", "reqwest"]

[dependencies]
flatbuffers = "23.5.26"
Expand All @@ -24,6 +24,7 @@ bytes = { version = "1.5.0", optional = true }
log = "0.4.20"
fallible-streaming-iterator = "0.1.9"
tempfile = "3.8.1"
reqwest = { version = "0.11.22", optional = true }

[dev-dependencies]
geozero = { version = "0.12.0", default-features = true }
Expand Down
55 changes: 38 additions & 17 deletions src/rust/src/http_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,25 @@ use crate::{check_magic_bytes, HEADER_MAX_BUFFER_SIZE};
use crate::{Error, Result};
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use http_range_client::BufferedHttpRangeClient;
use http_range_client::{
AsyncBufferedHttpRangeClient, AsyncHttpRangeClient, BufferedHttpRangeClient,
};
use std::ops::Range;

// The largest request we'll speculatively make.
// If a single huge feature requires, we'll necessarily exceed this limit.
const DEFAULT_HTTP_FETCH_SIZE: usize = 1_048_576; // 1MB

/// FlatGeobuf dataset HTTP reader
pub struct HttpFgbReader {
client: BufferedHttpRangeClient,
pub struct HttpFgbReader<T: AsyncHttpRangeClient = reqwest::Client> {
client: AsyncBufferedHttpRangeClient<T>,
// feature reading requires header access, therefore
// header_buf is included in the FgbFeature struct.
fbs: FgbFeature,
}

pub struct AsyncFeatureIter {
client: BufferedHttpRangeClient,
pub struct AsyncFeatureIter<T: AsyncHttpRangeClient = reqwest::Client> {
client: AsyncBufferedHttpRangeClient<T>,
// feature reading requires header access, therefore
// header_buf is included in the FgbFeature struct.
fbs: FgbFeature,
Expand All @@ -32,11 +34,20 @@ pub struct AsyncFeatureIter {
count: usize,
}

impl HttpFgbReader {
pub async fn open(url: &str) -> Result<HttpFgbReader> {
impl HttpFgbReader<reqwest::Client> {
pub async fn open(url: &str) -> Result<HttpFgbReader<reqwest::Client>> {
trace!("starting: opening http reader, reading header");
let mut client = BufferedHttpRangeClient::new(url);
let client = BufferedHttpRangeClient::new(url);
Self::_open(client).await
}
}

impl<T: AsyncHttpRangeClient> HttpFgbReader<T> {
pub async fn new(client: AsyncBufferedHttpRangeClient<T>) -> Result<HttpFgbReader<T>> {
Self::_open(client).await
}

async fn _open(mut client: AsyncBufferedHttpRangeClient<T>) -> Result<HttpFgbReader<T>> {
// Because we use a buffered HTTP reader, anything extra we fetch here can
// be utilized to skip subsequent fetches.
// Immediately following the header is the optional spatial index, we deliberately fetch
Expand Down Expand Up @@ -95,7 +106,7 @@ impl HttpFgbReader {
8 + self.fbs.header_buf.len()
}
/// Select all features.
pub async fn select_all(self) -> Result<AsyncFeatureIter> {
pub async fn select_all(self) -> Result<AsyncFeatureIter<T>> {
let header = self.fbs.header();
let count = header.features_count();
// TODO: support reading with unknown feature count
Expand Down Expand Up @@ -123,7 +134,7 @@ impl HttpFgbReader {
min_y: f64,
max_x: f64,
max_y: f64,
) -> Result<AsyncFeatureIter> {
) -> Result<AsyncFeatureIter<T>> {
trace!("starting: select_bbox, traversing index");
// Read R-Tree index and build filter for features within bbox
let header = self.fbs.header();
Expand Down Expand Up @@ -167,7 +178,7 @@ impl HttpFgbReader {
}
}

impl AsyncFeatureIter {
impl<T: AsyncHttpRangeClient> AsyncFeatureIter<T> {
pub fn header(&self) -> Header {
self.fbs.header()
}
Expand Down Expand Up @@ -203,9 +214,9 @@ enum FeatureSelection {
}

impl FeatureSelection {
async fn next_feature_buffer(
async fn next_feature_buffer<T: AsyncHttpRangeClient>(
&mut self,
client: &mut BufferedHttpRangeClient,
client: &mut AsyncBufferedHttpRangeClient<T>,
) -> Result<Option<Bytes>> {
match self {
FeatureSelection::SelectAll(select_all) => select_all.next_buffer(client).await,
Expand All @@ -223,7 +234,10 @@ struct SelectAll {
}

impl SelectAll {
async fn next_buffer(&mut self, client: &mut BufferedHttpRangeClient) -> Result<Option<Bytes>> {
async fn next_buffer<T: AsyncHttpRangeClient>(
&mut self,
client: &mut AsyncBufferedHttpRangeClient<T>,
) -> Result<Option<Bytes>> {
client.min_req_size(DEFAULT_HTTP_FETCH_SIZE);

if self.features_left == 0 {
Expand All @@ -247,7 +261,10 @@ struct SelectBbox {
}

impl SelectBbox {
async fn next_buffer(&mut self, client: &mut BufferedHttpRangeClient) -> Result<Option<Bytes>> {
async fn next_buffer<T: AsyncHttpRangeClient>(
&mut self,
client: &mut AsyncBufferedHttpRangeClient<T>,
) -> Result<Option<Bytes>> {
let mut next_buffer = None;
while next_buffer.is_none() {
let Some(feature_batch) = self.feature_batches.last_mut() else {
Expand Down Expand Up @@ -345,7 +362,10 @@ impl FeatureBatch {
}
}

async fn next_buffer(&mut self, client: &mut BufferedHttpRangeClient) -> Result<Option<Bytes>> {
async fn next_buffer<T: AsyncHttpRangeClient>(
&mut self,
client: &mut AsyncBufferedHttpRangeClient<T>,
) -> Result<Option<Bytes>> {
client.set_min_req_size(self.min_request_size);
let Some(feature_range) = self.feature_ranges.next() else {
return Ok(None);
Expand All @@ -371,8 +391,9 @@ impl FeatureBatch {
mod geozero_api {
use crate::AsyncFeatureIter;
use geozero::{error::Result, FeatureAccess, FeatureProcessor};
use http_range_client::AsyncHttpRangeClient;

impl AsyncFeatureIter {
impl<T: AsyncHttpRangeClient> AsyncFeatureIter<T> {
/// Read and process all selected features
pub async fn process_features<W: FeatureProcessor>(&mut self, out: &mut W) -> Result<()> {
out.dataset_begin(self.fbs.header().name())?;
Expand Down
12 changes: 7 additions & 5 deletions src/rust/src/packed_r_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::Result;

use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
#[cfg(feature = "http")]
use http_range_client::BufferedHttpRangeClient;
use http_range_client::{
AsyncBufferedHttpRangeClient, AsyncHttpRangeClient, BufferedHttpRangeClient,
};
use std::cmp::{max, min};
use std::collections::VecDeque;
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
Expand Down Expand Up @@ -161,8 +163,8 @@ fn read_node_items<R: Read + Seek>(

/// Read partial item vec from http
#[cfg(feature = "http")]
async fn read_http_node_items(
client: &mut BufferedHttpRangeClient,
async fn read_http_node_items<T: AsyncHttpRangeClient>(
client: &mut AsyncBufferedHttpRangeClient<T>,
base: usize,
node_ids: &Range<usize>,
) -> Result<Vec<NodeItem>> {
Expand Down Expand Up @@ -552,8 +554,8 @@ impl PackedRTree {

#[cfg(feature = "http")]
#[allow(clippy::too_many_arguments)]
pub async fn http_stream_search(
client: &mut BufferedHttpRangeClient,
pub async fn http_stream_search<T: AsyncHttpRangeClient>(
client: &mut AsyncBufferedHttpRangeClient<T>,
index_begin: usize,
num_items: usize,
branching_factor: u16,
Expand Down

0 comments on commit b2e4506

Please sign in to comment.