From b6235b92e6ad893ac326a84e362d592dec2fcda8 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 23 Nov 2025 12:37:16 +1300 Subject: [PATCH 01/10] feat: Exponential cache --- python/Cargo.lock | 7 +++ src/metadata/cache.rs | 142 ++++++++++++++++++++++++++++++++++++++++++ src/metadata/mod.rs | 2 + 3 files changed, 151 insertions(+) create mode 100644 src/metadata/cache.rs diff --git a/python/Cargo.lock b/python/Cargo.lock index 17fde9f..d27cafe 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -21,6 +21,7 @@ dependencies = [ name = "async-tiff" version = "0.1.0" dependencies = [ + "bytemuck", "byteorder", "bytes", "flate2", @@ -83,6 +84,12 @@ version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +[[package]] +name = "bytemuck" +version = "1.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" + [[package]] name = "byteorder" version = "1.5.0" diff --git a/src/metadata/cache.rs b/src/metadata/cache.rs new file mode 100644 index 0000000..10a834e --- /dev/null +++ b/src/metadata/cache.rs @@ -0,0 +1,142 @@ +//! Caching strategies for metadata fetching. + +use std::ops::Range; +use std::sync::Arc; + +use bytes::{Bytes, BytesMut}; +use futures::future::BoxFuture; +use tokio::sync::Mutex; + +use crate::error::AsyncTiffResult; +use crate::metadata::MetadataFetch; + +/// Logic for managing a cache of sequential buffers +struct SequentialCache { + /// Contiguous blocks from offset 0 + /// + /// # Invariant + /// - Buffers are contiguous from offset 0 + buffers: Vec, + + /// Total length cached (== sum of buffers lengths) + len: u64, +} + +impl SequentialCache { + /// Create a new, empty SequentialCache + fn new() -> Self { + Self { + buffers: vec![], + len: 0, + } + } + + /// Check if the given range is fully contained within the cached buffers + fn contains(&self, range: Range) -> bool { + range.end <= self.len + } + + /// Slice out the given range from the cached buffers + fn slice(&self, range: Range) -> Bytes { + let out_len = (range.end - range.start) as usize; + // guaranteed valid + let mut remaining = range; + let mut out_buffers: Vec = vec![]; + + for b in &self.buffers { + let b_len = b.len() as u64; + + // this block falls entirely before the desired range start + if remaining.start >= b_len { + remaining.start -= b_len; + remaining.end -= b_len; + continue; + } + + // we slice bytes out of *this* block + let start = remaining.start as usize; + let end = (remaining.end - remaining.start).min(b_len - remaining.start) as usize; + + let chunk = b.slice(start..end); + out_buffers.push(chunk); + + // consumed some portion; update and potentially break + remaining.start = 0; + if remaining.end <= b_len { + break; + } + remaining.end -= b_len; + } + + if out_buffers.len() == 1 { + out_buffers.into_iter().next().unwrap() + } else { + let mut out = BytesMut::with_capacity(out_len); + for b in out_buffers { + out.extend_from_slice(&b); + } + out.into() + } + } + + fn append_buffer(&mut self, buffer: Bytes) { + self.len += buffer.len() as u64; + self.buffers.push(buffer); + } +} + +/// A MetadataFetch implementation that caches fetched data in exponentially growing chunks, +/// sequentially from the beginning of the file. +pub struct ExponentialMetadataCache { + fetch: Arc, + cache: Arc>, +} + +impl ExponentialMetadataCache { + /// Create a new ExponentialMetadataCache wrapping the given MetadataFetch + pub fn new(fetch: F) -> AsyncTiffResult { + Ok(Self { + fetch: Arc::new(fetch), + cache: Arc::new(Mutex::new(SequentialCache::new())), + }) + } +} + +fn next_fetch_size(existing_len: u64) -> u64 { + let min = 64 * 1024; + if existing_len == 0 { + return min; + } + existing_len * 2 +} + +impl MetadataFetch for ExponentialMetadataCache { + fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { + let inner = self.fetch.clone(); + let cache = self.cache.clone(); + + Box::pin(async move { + let mut g = cache.lock().await; + + // First check if we already have the range cached + if g.contains(range.start..range.end) { + return Ok(g.slice(range)); + } + + // Compute the correct fetch range + let start_len = g.len; + let needed = range.end.saturating_sub(start_len); + let fetch_size = next_fetch_size(start_len).max(needed); + let fetch_range = start_len..start_len + fetch_size; + + // Perform the fetch while holding mutex + // (this is OK because the mutex is async) + let bytes = inner.fetch(fetch_range).await?; + + // Now append safely + g.append_buffer(bytes); + + Ok(g.slice(range)) + }) + } +} diff --git a/src/metadata/mod.rs b/src/metadata/mod.rs index 3592014..b566eff 100644 --- a/src/metadata/mod.rs +++ b/src/metadata/mod.rs @@ -58,8 +58,10 @@ //! fetches the first `N` bytes out of a file. //! +pub mod cache; mod fetch; mod reader; +pub use cache::ExponentialMetadataCache; pub use fetch::{MetadataFetch, PrefetchBuffer}; pub use reader::{ImageFileDirectoryReader, TiffMetadataReader}; From 12b0ebfd19d1ae015e313ddaec0696fe55a4af57 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 23 Nov 2025 12:48:10 +1300 Subject: [PATCH 02/10] cleanup --- src/metadata/cache.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/metadata/cache.rs b/src/metadata/cache.rs index 10a834e..c823411 100644 --- a/src/metadata/cache.rs +++ b/src/metadata/cache.rs @@ -88,31 +88,30 @@ impl SequentialCache { /// A MetadataFetch implementation that caches fetched data in exponentially growing chunks, /// sequentially from the beginning of the file. pub struct ExponentialMetadataCache { - fetch: Arc, + inner: F, cache: Arc>, } impl ExponentialMetadataCache { /// Create a new ExponentialMetadataCache wrapping the given MetadataFetch - pub fn new(fetch: F) -> AsyncTiffResult { + pub fn new(inner: F) -> AsyncTiffResult { Ok(Self { - fetch: Arc::new(fetch), + inner, cache: Arc::new(Mutex::new(SequentialCache::new())), }) } } fn next_fetch_size(existing_len: u64) -> u64 { - let min = 64 * 1024; if existing_len == 0 { - return min; + 64 * 1024 + } else { + existing_len * 2 } - existing_len * 2 } impl MetadataFetch for ExponentialMetadataCache { fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - let inner = self.fetch.clone(); let cache = self.cache.clone(); Box::pin(async move { @@ -131,7 +130,7 @@ impl MetadataFetch for ExponentialMetadataCache< // Perform the fetch while holding mutex // (this is OK because the mutex is async) - let bytes = inner.fetch(fetch_range).await?; + let bytes = self.inner.fetch(fetch_range).await?; // Now append safely g.append_buffer(bytes); From f109f228f4ac38854c6abe326cdc56c4d5c96a2e Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 23 Nov 2025 12:58:06 +1300 Subject: [PATCH 03/10] add tokio sync dependency --- Cargo.toml | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3c58d05..d05e1d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,7 @@ num_enum = "0.7.3" object_store = { version = "0.12", optional = true } reqwest = { version = "0.12", default-features = false, optional = true } thiserror = "1" -tokio = { version = "1.43.0", optional = true, default-features = false, features = [ - "io-util", - "sync", -] } +tokio = { version = "1.43.0", default-features = false, features = ["sync"] } weezl = "0.1.0" [dev-dependencies] @@ -37,7 +34,7 @@ tokio-test = "0.4.4" [features] default = ["object_store", "reqwest"] -tokio = ["dep:tokio"] +tokio = ["tokio/io-util"] reqwest = ["dep:reqwest"] object_store = ["dep:object_store"] From 43524f9d1e301859b4bb3e5abe652756a49ee9e0 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 23 Nov 2025 15:52:09 +1300 Subject: [PATCH 04/10] Replace prefetch buffer --- src/cog.rs | 16 ++++--------- src/metadata/cache.rs | 55 +++++++++++++++++++++++++++---------------- src/metadata/fetch.rs | 36 ---------------------------- src/metadata/mod.rs | 3 +-- 4 files changed, 41 insertions(+), 69 deletions(-) diff --git a/src/cog.rs b/src/cog.rs index 1021b23..4a68d90 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -27,7 +27,8 @@ mod test { use tiff::decoder::{DecodingResult, Limits}; use super::*; - use crate::metadata::{PrefetchBuffer, TiffMetadataReader}; + use crate::metadata::cache::ReadaheadMetadataCache; + use crate::metadata::TiffMetadataReader; use crate::reader::{AsyncFileReader, ObjectReader}; #[ignore = "local file"] @@ -37,16 +38,9 @@ mod test { let path = object_store::path::Path::parse("m_4007307_sw_18_060_20220803.tif").unwrap(); let store = Arc::new(LocalFileSystem::new_with_prefix(folder).unwrap()); let reader = Arc::new(ObjectReader::new(store, path)) as Arc; - let prefetch_reader = PrefetchBuffer::new(reader.clone(), 32 * 1024) - .await - .unwrap(); - let mut metadata_reader = TiffMetadataReader::try_open(&prefetch_reader) - .await - .unwrap(); - let ifds = metadata_reader - .read_all_ifds(&prefetch_reader) - .await - .unwrap(); + let cached_reader = ReadaheadMetadataCache::new(reader.clone()).unwrap(); + let mut metadata_reader = TiffMetadataReader::try_open(&cached_reader).await.unwrap(); + let ifds = metadata_reader.read_all_ifds(&cached_reader).await.unwrap(); let tiff = TIFF::new(ifds); let ifd = &tiff.ifds[1]; diff --git a/src/metadata/cache.rs b/src/metadata/cache.rs index c823411..b6ded4b 100644 --- a/src/metadata/cache.rs +++ b/src/metadata/cache.rs @@ -11,7 +11,7 @@ use crate::error::AsyncTiffResult; use crate::metadata::MetadataFetch; /// Logic for managing a cache of sequential buffers -struct SequentialCache { +struct SequentialBlockCache { /// Contiguous blocks from offset 0 /// /// # Invariant @@ -22,8 +22,8 @@ struct SequentialCache { len: u64, } -impl SequentialCache { - /// Create a new, empty SequentialCache +impl SequentialBlockCache { + /// Create a new, empty SequentialBlockCache fn new() -> Self { Self { buffers: vec![], @@ -55,7 +55,8 @@ impl SequentialCache { // we slice bytes out of *this* block let start = remaining.start as usize; - let end = (remaining.end - remaining.start).min(b_len - remaining.start) as usize; + let size = (remaining.end - remaining.start).min(b_len - remaining.start) as usize; + let end = start + size; let chunk = b.slice(start..end); out_buffers.push(chunk); @@ -87,35 +88,49 @@ impl SequentialCache { /// A MetadataFetch implementation that caches fetched data in exponentially growing chunks, /// sequentially from the beginning of the file. -pub struct ExponentialMetadataCache { +pub struct ReadaheadMetadataCache { inner: F, - cache: Arc>, + cache: Arc>, + initial: u64, + multiplier: f64, } -impl ExponentialMetadataCache { - /// Create a new ExponentialMetadataCache wrapping the given MetadataFetch +impl ReadaheadMetadataCache { + /// Create a new ReadaheadMetadataCache wrapping the given MetadataFetch pub fn new(inner: F) -> AsyncTiffResult { Ok(Self { inner, - cache: Arc::new(Mutex::new(SequentialCache::new())), + cache: Arc::new(Mutex::new(SequentialBlockCache::new())), + initial: 32 * 1024, + multiplier: 2.0, }) } -} -fn next_fetch_size(existing_len: u64) -> u64 { - if existing_len == 0 { - 64 * 1024 - } else { - existing_len * 2 + /// Set the initial fetch size in bytes, otherwise defaults to 32 KiB + pub fn with_initial_size(mut self, initial: u64) -> Self { + self.initial = initial; + self + } + + /// Set the multiplier for subsequent fetch sizes, otherwise defaults to 2.0 + pub fn with_multiplier(mut self, multiplier: f64) -> Self { + self.multiplier = multiplier; + self + } + + fn next_fetch_size(&self, existing_len: u64) -> u64 { + if existing_len == 0 { + self.initial + } else { + (existing_len as f64 * self.multiplier).round() as u64 + } } } -impl MetadataFetch for ExponentialMetadataCache { +impl MetadataFetch for ReadaheadMetadataCache { fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - let cache = self.cache.clone(); - Box::pin(async move { - let mut g = cache.lock().await; + let mut g = self.cache.lock().await; // First check if we already have the range cached if g.contains(range.start..range.end) { @@ -125,7 +140,7 @@ impl MetadataFetch for ExponentialMetadataCache< // Compute the correct fetch range let start_len = g.len; let needed = range.end.saturating_sub(start_len); - let fetch_size = next_fetch_size(start_len).max(needed); + let fetch_size = self.next_fetch_size(start_len).max(needed); let fetch_range = start_len..start_len + fetch_size; // Perform the fetch while holding mutex diff --git a/src/metadata/fetch.rs b/src/metadata/fetch.rs index 126b3ae..a6a5cd0 100644 --- a/src/metadata/fetch.rs +++ b/src/metadata/fetch.rs @@ -2,7 +2,6 @@ use std::ops::Range; use bytes::Bytes; use futures::future::BoxFuture; -use futures::FutureExt; use crate::error::AsyncTiffResult; use crate::reader::{AsyncFileReader, EndianAwareReader, Endianness}; @@ -25,41 +24,6 @@ impl MetadataFetch for T { } } -/// Buffering for the first `N` bytes of a file. -/// -/// This is designed so that the async requests made by the underlying tag reader get intercepted -/// here and served from the existing buffer when possible. -#[derive(Debug)] -pub struct PrefetchBuffer { - fetch: F, - buffer: Bytes, -} - -impl PrefetchBuffer { - /// Construct a new PrefetchBuffer, catching the first `prefetch` bytes of the file. - pub async fn new(fetch: F, prefetch: u64) -> AsyncTiffResult { - let buffer = fetch.fetch(0..prefetch).await?; - Ok(Self { fetch, buffer }) - } -} - -impl MetadataFetch for PrefetchBuffer { - fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { - if range.start < self.buffer.len() as _ { - if range.end < self.buffer.len() as _ { - let usize_range = range.start as usize..range.end as usize; - let result = self.buffer.slice(usize_range); - async { Ok(result) }.boxed() - } else { - // TODO: reuse partial internal buffer - self.fetch.fetch(range) - } - } else { - self.fetch.fetch(range) - } - } -} - pub(crate) struct MetadataCursor<'a, F: MetadataFetch> { fetch: &'a F, offset: u64, diff --git a/src/metadata/mod.rs b/src/metadata/mod.rs index b566eff..e977949 100644 --- a/src/metadata/mod.rs +++ b/src/metadata/mod.rs @@ -62,6 +62,5 @@ pub mod cache; mod fetch; mod reader; -pub use cache::ExponentialMetadataCache; -pub use fetch::{MetadataFetch, PrefetchBuffer}; +pub use fetch::MetadataFetch; pub use reader::{ImageFileDirectoryReader, TiffMetadataReader}; From e69626ee145d49a176912ac9037c8631e4274b85 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 23 Nov 2025 15:55:16 +1300 Subject: [PATCH 05/10] make infallible --- src/cog.rs | 2 +- src/metadata/cache.rs | 6 +++--- tests/geo.rs | 7 +++---- tests/ome_tiff.rs | 16 +++++----------- 4 files changed, 12 insertions(+), 19 deletions(-) diff --git a/src/cog.rs b/src/cog.rs index 4a68d90..b58f2be 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -38,7 +38,7 @@ mod test { let path = object_store::path::Path::parse("m_4007307_sw_18_060_20220803.tif").unwrap(); let store = Arc::new(LocalFileSystem::new_with_prefix(folder).unwrap()); let reader = Arc::new(ObjectReader::new(store, path)) as Arc; - let cached_reader = ReadaheadMetadataCache::new(reader.clone()).unwrap(); + let cached_reader = ReadaheadMetadataCache::new(reader.clone()); let mut metadata_reader = TiffMetadataReader::try_open(&cached_reader).await.unwrap(); let ifds = metadata_reader.read_all_ifds(&cached_reader).await.unwrap(); let tiff = TIFF::new(ifds); diff --git a/src/metadata/cache.rs b/src/metadata/cache.rs index b6ded4b..0d5ddd8 100644 --- a/src/metadata/cache.rs +++ b/src/metadata/cache.rs @@ -97,13 +97,13 @@ pub struct ReadaheadMetadataCache { impl ReadaheadMetadataCache { /// Create a new ReadaheadMetadataCache wrapping the given MetadataFetch - pub fn new(inner: F) -> AsyncTiffResult { - Ok(Self { + pub fn new(inner: F) -> Self { + Self { inner, cache: Arc::new(Mutex::new(SequentialBlockCache::new())), initial: 32 * 1024, multiplier: 2.0, - }) + } } /// Set the initial fetch size in bytes, otherwise defaults to 32 KiB diff --git a/tests/geo.rs b/tests/geo.rs index 9647af4..1fd1450 100644 --- a/tests/geo.rs +++ b/tests/geo.rs @@ -1,7 +1,8 @@ use std::env; use std::sync::Arc; -use async_tiff::metadata::{PrefetchBuffer, TiffMetadataReader}; +use async_tiff::metadata::cache::ReadaheadMetadataCache; +use async_tiff::metadata::TiffMetadataReader; use async_tiff::reader::{AsyncFileReader, ObjectReader}; use object_store::local::LocalFileSystem; @@ -12,9 +13,7 @@ async fn test_parse_file_with_unknown_geokey() { .unwrap(); let store = Arc::new(LocalFileSystem::new_with_prefix(folder).unwrap()); let reader = Arc::new(ObjectReader::new(store, path)) as Arc; - let prefetch_reader = PrefetchBuffer::new(reader.clone(), 32 * 1024) - .await - .unwrap(); + let prefetch_reader = ReadaheadMetadataCache::new(reader.clone()); let mut metadata_reader = TiffMetadataReader::try_open(&prefetch_reader) .await .unwrap(); diff --git a/tests/ome_tiff.rs b/tests/ome_tiff.rs index 5e5920c..2864cbe 100644 --- a/tests/ome_tiff.rs +++ b/tests/ome_tiff.rs @@ -2,7 +2,8 @@ use std::sync::Arc; -use async_tiff::metadata::{PrefetchBuffer, TiffMetadataReader}; +use async_tiff::metadata::cache::ReadaheadMetadataCache; +use async_tiff::metadata::TiffMetadataReader; use async_tiff::reader::{AsyncFileReader, ObjectReader}; use async_tiff::tiff::tags::PhotometricInterpretation; use async_tiff::TIFF; @@ -13,16 +14,9 @@ async fn open_remote_tiff(url: &str) -> TIFF { let (store, path) = object_store::parse_url(&parsed_url).unwrap(); let reader = Arc::new(ObjectReader::new(Arc::new(store), path)) as Arc; - let prefetch_reader = PrefetchBuffer::new(reader.clone(), 32 * 1024) - .await - .unwrap(); - let mut metadata_reader = TiffMetadataReader::try_open(&prefetch_reader) - .await - .unwrap(); - let ifds = metadata_reader - .read_all_ifds(&prefetch_reader) - .await - .unwrap(); + let cached_reader = ReadaheadMetadataCache::new(reader.clone()); + let mut metadata_reader = TiffMetadataReader::try_open(&cached_reader).await.unwrap(); + let ifds = metadata_reader.read_all_ifds(&cached_reader).await.unwrap(); TIFF::new(ifds) } From 00a6e72676ec624f3bfe918a59df4feb3195c3a5 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 23 Nov 2025 16:12:47 +1300 Subject: [PATCH 06/10] Add unit test for readahead cache --- src/metadata/cache.rs | 81 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/src/metadata/cache.rs b/src/metadata/cache.rs index 0d5ddd8..21dfb58 100644 --- a/src/metadata/cache.rs +++ b/src/metadata/cache.rs @@ -106,6 +106,11 @@ impl ReadaheadMetadataCache { } } + /// Access the inner MetadataFetch + pub fn inner(&self) -> &F { + &self.inner + } + /// Set the initial fetch size in bytes, otherwise defaults to 32 KiB pub fn with_initial_size(mut self, initial: u64) -> Self { self.initial = initial; @@ -154,3 +159,79 @@ impl MetadataFetch for ReadaheadMetadataCache }) } } + +#[cfg(test)] +mod test { + use futures::future::FutureExt; + + use super::*; + + struct TestFetch { + data: Bytes, + /// The number of fetches that actually reach the raw Fetch implementation + num_fetches: Arc>, + } + + impl TestFetch { + fn new(data: Bytes) -> Self { + Self { + data, + num_fetches: Arc::new(Mutex::new(0)), + } + } + } + + impl MetadataFetch for TestFetch { + fn fetch( + &self, + range: Range, + ) -> futures::future::BoxFuture<'_, crate::error::AsyncTiffResult> { + if range.start as usize >= self.data.len() { + return async { Ok(Bytes::new()) }.boxed(); + } + + let end = (range.end as usize).min(self.data.len()); + let slice = self.data.slice(range.start as _..end); + async move { + let mut g = self.num_fetches.lock().await; + *g += 1; + Ok(slice) + } + .boxed() + } + } + + #[tokio::test] + async fn test_readahead_cache() { + let data = Bytes::from_static(b"abcdefghijklmnopqrstuvwxyz"); + let fetch = TestFetch::new(data.clone()); + let cache = ReadaheadMetadataCache::new(fetch) + .with_initial_size(2) + .with_multiplier(3.0); + + // Make initial request + let result = cache.fetch(0..2).await.unwrap(); + assert_eq!(result.as_ref(), b"ab"); + assert_eq!(*cache.inner.num_fetches.lock().await, 1); + + // Making a request within the cached range should not trigger a new fetch + let result = cache.fetch(1..2).await.unwrap(); + assert_eq!(result.as_ref(), b"b"); + assert_eq!(*cache.inner.num_fetches.lock().await, 1); + + // Making a request that exceeds the cached range should trigger a new fetch + let result = cache.fetch(2..5).await.unwrap(); + assert_eq!(result.as_ref(), b"cde"); + assert_eq!(*cache.inner.num_fetches.lock().await, 2); + + // Multiplier should be accurate: initial was 2, next was 6 (2*3), so total cached is now 8 + let result = cache.fetch(5..8).await.unwrap(); + assert_eq!(result.as_ref(), b"fgh"); + assert_eq!(*cache.inner.num_fetches.lock().await, 2); + + // Should work even for fetch range larger than underlying buffer + let result = cache.fetch(8..20).await.unwrap(); + assert_eq!(result.as_ref(), b"ijklmnopqrst"); + assert_eq!(*cache.inner.num_fetches.lock().await, 3); + } +} From 73adace837a982f0b7c5ec61d2f8f3110b72384b Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 23 Nov 2025 16:17:05 +1300 Subject: [PATCH 07/10] Update python bindings --- python/Cargo.lock | 8 +------- python/README.md | 4 ++-- python/python/async_tiff/_tiff.pyi | 5 +++++ python/src/tiff.rs | 14 ++++++++------ python/tests/test_cog.py | 2 +- 5 files changed, 17 insertions(+), 16 deletions(-) diff --git a/python/Cargo.lock b/python/Cargo.lock index d27cafe..ba50592 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -21,7 +21,6 @@ dependencies = [ name = "async-tiff" version = "0.1.0" dependencies = [ - "bytemuck", "byteorder", "bytes", "flate2", @@ -31,6 +30,7 @@ dependencies = [ "object_store", "reqwest", "thiserror 1.0.69", + "tokio", "weezl", ] @@ -84,12 +84,6 @@ version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" -[[package]] -name = "bytemuck" -version = "1.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" - [[package]] name = "byteorder" version = "1.5.0" diff --git a/python/README.md b/python/README.md index eea777d..a9dc173 100644 --- a/python/README.md +++ b/python/README.md @@ -21,7 +21,7 @@ from async_tiff.store import S3Store store = S3Store("naip-visualization", region="us-west-2", request_payer=True) path = "ny/2022/60cm/rgb/40073/m_4007307_sw_18_060_20220803.tif" -tiff = await TIFF.open(path, store=store, prefetch=32768) +tiff = await TIFF.open(path, store=store) primary_ifd = tiff.ifds[0] primary_ifd.geo_key_directory.citation @@ -68,7 +68,7 @@ from async_tiff.store import S3Store store = S3Store("sentinel-cogs", region="us-west-2", skip_signature=True) path = "sentinel-s2-l2a-cogs/12/S/UF/2022/6/S2B_12SUF_20220609_0_L2A/B04.tif" -tiff = await TIFF.open(path, store=store, prefetch=32768) +tiff = await TIFF.open(path, store=store) primary_ifd = tiff.ifds[0] # Text readable citation primary_ifd.geo_key_directory.citation diff --git a/python/python/async_tiff/_tiff.pyi b/python/python/async_tiff/_tiff.pyi index dcc0f67..4c62204 100644 --- a/python/python/async_tiff/_tiff.pyi +++ b/python/python/async_tiff/_tiff.pyi @@ -17,6 +17,7 @@ class TIFF: *, store: ObjectStore | ObspecInput, prefetch: int = 32768, + multiplier: int | float = 2.0, ) -> TIFF: """Open a new TIFF. @@ -24,6 +25,10 @@ class TIFF: path: The path within the store to read from. store: The backend to use for data fetching. prefetch: The number of initial bytes to read up front. + multiplier: The multiplier to use for readahead size growth. Must be + greater than 1.0. For example, for a value of `2.0`, the first metadata + read will be of size `prefetch`, and then the next read will be of size + `prefetch * 2`. Returns: A TIFF instance. diff --git a/python/src/tiff.rs b/python/src/tiff.rs index 861c519..67ad82a 100644 --- a/python/src/tiff.rs +++ b/python/src/tiff.rs @@ -1,9 +1,10 @@ use std::sync::Arc; -use async_tiff::metadata::{PrefetchBuffer, TiffMetadataReader}; +use async_tiff::metadata::cache::ReadaheadMetadataCache; +use async_tiff::metadata::TiffMetadataReader; use async_tiff::reader::AsyncFileReader; use async_tiff::TIFF; -use pyo3::exceptions::{PyFileNotFoundError, PyIndexError, PyTypeError}; +use pyo3::exceptions::{PyIndexError, PyTypeError}; use pyo3::prelude::*; use pyo3::types::PyType; use pyo3_async_runtimes::tokio::future_into_py; @@ -21,20 +22,21 @@ pub(crate) struct PyTIFF { #[pymethods] impl PyTIFF { #[classmethod] - #[pyo3(signature = (path, *, store, prefetch=32768))] + #[pyo3(signature = (path, *, store, prefetch=32768, multiplier=2.0))] fn open<'py>( _cls: &'py Bound, py: Python<'py>, path: String, store: StoreInput, prefetch: u64, + multiplier: f64, ) -> PyResult> { let reader = store.into_async_file_reader(path); let cog_reader = future_into_py(py, async move { - let metadata_fetch = PrefetchBuffer::new(reader.clone(), prefetch) - .await - .map_err(|err| PyFileNotFoundError::new_err(err.to_string()))?; + let metadata_fetch = ReadaheadMetadataCache::new(reader.clone()) + .with_initial_size(prefetch) + .with_multiplier(multiplier); let mut metadata_reader = TiffMetadataReader::try_open(&metadata_fetch).await.unwrap(); let ifds = metadata_reader .read_all_ifds(&metadata_fetch) diff --git a/python/tests/test_cog.py b/python/tests/test_cog.py index 55029ed..75a04c9 100644 --- a/python/tests/test_cog.py +++ b/python/tests/test_cog.py @@ -11,7 +11,7 @@ async def test_cog_s3(): """ path = "sentinel-s2-l2a-cogs/12/S/UF/2022/6/S2B_12SUF_20220609_0_L2A/B04.tif" store = S3Store("sentinel-cogs", region="us-west-2", skip_signature=True) - tiff = await TIFF.open(path=path, store=store, prefetch=32768) + tiff = await TIFF.open(path=path, store=store) ifds = tiff.ifds assert len(ifds) == 5 From 6d7cb006811e58a74b08a7cc1d2980ffc9749f0c Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 23 Nov 2025 16:43:56 +1300 Subject: [PATCH 08/10] minimal Python error handling --- python/DEVELOP.md | 4 ++-- python/pyproject.toml | 3 +-- python/src/error.rs | 43 +++++++++++++++++++++++++++++++++++++++++++ python/src/lib.rs | 1 + python/src/tiff.rs | 32 ++++++++++++++++++++------------ python/uv.lock | 11 ----------- 6 files changed, 67 insertions(+), 27 deletions(-) create mode 100644 python/src/error.rs diff --git a/python/DEVELOP.md b/python/DEVELOP.md index 30d0ddc..7add138 100644 --- a/python/DEVELOP.md +++ b/python/DEVELOP.md @@ -1,6 +1,6 @@ ``` uv sync --no-install-package async-tiff -uv run --no-project maturin develop +uv run --no-project maturin develop --uv uv run --no-project mkdocs serve -uv run --no-project pytest --verbose +uv run --no-project pytest tests --verbose ``` diff --git a/python/pyproject.toml b/python/pyproject.toml index b8b263f..bb3d0a3 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -32,7 +32,6 @@ dev-dependencies = [ "mkdocstrings>=0.27.0", "numpy>=1", "obstore>=0.5.1", - "pip>=24.2", "pytest-asyncio>=0.26.0", "pytest>=8.3.3", "ruff>=0.8.4", @@ -40,5 +39,5 @@ dev-dependencies = [ [tool.pytest.ini_options] addopts = "--color=yes" -asyncio_default_fixture_loop_scope="function" +asyncio_default_fixture_loop_scope = "function" asyncio_mode = "auto" diff --git a/python/src/error.rs b/python/src/error.rs new file mode 100644 index 0000000..e89ef42 --- /dev/null +++ b/python/src/error.rs @@ -0,0 +1,43 @@ +use async_tiff::error::AsyncTiffError; +use pyo3::create_exception; +use pyo3::exceptions::PyFileNotFoundError; +use pyo3::prelude::*; + +// Base exception +// Note that this is named `BaseError` instead of `ObstoreError` to not leak the name "obstore" to +// other Rust-Python libraries using pyo3-object_store. +create_exception!( + async_tiff, + AsyncTiffException, + pyo3::exceptions::PyException, + "A general error from the underlying Rust async-tiff library." +); + +#[allow(missing_docs)] +pub enum PyAsyncTiffError { + AsyncTiffError(async_tiff::error::AsyncTiffError), +} + +impl From for PyAsyncTiffError { + fn from(value: AsyncTiffError) -> Self { + Self::AsyncTiffError(value) + } +} + +impl From for PyErr { + fn from(error: PyAsyncTiffError) -> Self { + match error { + PyAsyncTiffError::AsyncTiffError(err) => match err { + AsyncTiffError::ObjectStore(err) => match err { + object_store::Error::NotFound { path: _, source: _ } => { + PyFileNotFoundError::new_err(err.to_string()) + } + _ => AsyncTiffException::new_err(err.to_string()), + }, + _ => AsyncTiffException::new_err(err.to_string()), + }, + } + } +} + +pub type PyAsyncTiffResult = std::result::Result; diff --git a/python/src/lib.rs b/python/src/lib.rs index f3cd219..7db5a0e 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -2,6 +2,7 @@ mod decoder; mod enums; +mod error; mod geo; mod ifd; mod reader; diff --git a/python/src/tiff.rs b/python/src/tiff.rs index 67ad82a..bbf9c25 100644 --- a/python/src/tiff.rs +++ b/python/src/tiff.rs @@ -9,6 +9,7 @@ use pyo3::prelude::*; use pyo3::types::PyType; use pyo3_async_runtimes::tokio::future_into_py; +use crate::error::PyAsyncTiffResult; use crate::reader::StoreInput; use crate::tile::PyTile; use crate::PyImageFileDirectory; @@ -19,6 +20,20 @@ pub(crate) struct PyTIFF { reader: Arc, } +async fn open( + reader: Arc, + prefetch: u64, + multiplier: f64, +) -> PyAsyncTiffResult { + let metadata_fetch = ReadaheadMetadataCache::new(reader.clone()) + .with_initial_size(prefetch) + .with_multiplier(multiplier); + let mut metadata_reader = TiffMetadataReader::try_open(&metadata_fetch).await?; + let ifds = metadata_reader.read_all_ifds(&metadata_fetch).await?; + let tiff = TIFF::new(ifds); + Ok(PyTIFF { tiff, reader }) +} + #[pymethods] impl PyTIFF { #[classmethod] @@ -33,18 +48,11 @@ impl PyTIFF { ) -> PyResult> { let reader = store.into_async_file_reader(path); - let cog_reader = future_into_py(py, async move { - let metadata_fetch = ReadaheadMetadataCache::new(reader.clone()) - .with_initial_size(prefetch) - .with_multiplier(multiplier); - let mut metadata_reader = TiffMetadataReader::try_open(&metadata_fetch).await.unwrap(); - let ifds = metadata_reader - .read_all_ifds(&metadata_fetch) - .await - .unwrap(); - let tiff = TIFF::new(ifds); - Ok(PyTIFF { tiff, reader }) - })?; + let cog_reader = + future_into_py( + py, + async move { Ok(open(reader, prefetch, multiplier).await?) }, + )?; Ok(cog_reader) } diff --git a/python/uv.lock b/python/uv.lock index 49bcc51..cd21206 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -40,7 +40,6 @@ dev = [ { name = "mkdocstrings-python" }, { name = "numpy" }, { name = "obstore" }, - { name = "pip" }, { name = "pytest" }, { name = "pytest-asyncio" }, { name = "ruff" }, @@ -62,7 +61,6 @@ dev = [ { name = "mkdocstrings-python", specifier = ">=1.13.0" }, { name = "numpy", specifier = ">=1" }, { name = "obstore", specifier = ">=0.5.1" }, - { name = "pip", specifier = ">=24.2" }, { name = "pytest", specifier = ">=8.3.3" }, { name = "pytest-asyncio", specifier = ">=0.26.0" }, { name = "ruff", specifier = ">=0.8.4" }, @@ -1024,15 +1022,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/52/3b/ce7a01026a7cf46e5452afa86f97a5e88ca97f562cafa76570178ab56d8d/pillow-10.4.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:0755ffd4a0c6f267cccbae2e9903d95477ca2f77c4fcf3a3a09570001856c8a5", size = 2554661, upload-time = "2024-07-01T09:48:20.293Z" }, ] -[[package]] -name = "pip" -version = "25.0.1" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/70/53/b309b4a497b09655cb7e07088966881a57d082f48ac3cb54ea729fd2c6cf/pip-25.0.1.tar.gz", hash = "sha256:88f96547ea48b940a3a385494e181e29fb8637898f88d88737c5049780f196ea", size = 1950850, upload-time = "2025-02-09T17:14:04.423Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/c9/bc/b7db44f5f39f9d0494071bddae6880eb645970366d0a200022a1a93d57f5/pip-25.0.1-py3-none-any.whl", hash = "sha256:c46efd13b6aa8279f33f2864459c8ce587ea6a1a59ee20de055868d8f7688f7f", size = 1841526, upload-time = "2025-02-09T17:14:01.463Z" }, -] - [[package]] name = "platformdirs" version = "4.3.6" From e897ba7a1b924686b5c2759cf99f77ca8d014382 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 23 Nov 2025 16:49:03 +1300 Subject: [PATCH 09/10] Update docs --- src/metadata/mod.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/metadata/mod.rs b/src/metadata/mod.rs index e977949..0d72b82 100644 --- a/src/metadata/mod.rs +++ b/src/metadata/mod.rs @@ -11,7 +11,8 @@ //! //! use object_store::local::LocalFileSystem; //! -//! use async_tiff::metadata::{PrefetchBuffer, TiffMetadataReader}; +//! use async_tiff::metadata::TiffMetadataReader; +//! use async_tiff::metadata::cache::ReadaheadMetadataCache; //! use async_tiff::reader::ObjectReader; //! //! // Create new Arc @@ -23,24 +24,22 @@ //! "tests/image_tiff/images/tiled-jpeg-rgb-u8.tif".into(), //! ); //! -//! // Use PrefetchBuffer to ensure that a given number of bytes at the start of the -//! // file are prefetched. +//! // Use ReadaheadMetadataCache to ensure that a given number of bytes at the start of the +//! // file are prefetched, and to ensure that any additional fetches are made in larger chunks. //! // -//! // This or a similar caching layer should **always** be used and ensures that the -//! // underlying read calls that the TiffMetadataReader makes don't translate to actual -//! // network fetches. -//! let prefetch_reader = PrefetchBuffer::new(reader.clone(), 32 * 1024) -//! .await -//! .unwrap(); +//! // The `ReadaheadMetadataCache` or a similar caching layer should **always** be used to ensure +//! // that the underlying small read calls that the TiffMetadataReader makes don't translate to +//! // individual tiny network fetches. +//! let cached_reader = ReadaheadMetadataCache::new(reader.clone()); //! //! // Create a TiffMetadataReader wrapping some MetadataFetch -//! let mut metadata_reader = TiffMetadataReader::try_open(&prefetch_reader) +//! let mut metadata_reader = TiffMetadataReader::try_open(&cached_reader) //! .await //! .unwrap(); //! //! // Read all IFDs out of the source. //! let ifds = metadata_reader -//! .read_all_ifds(&prefetch_reader) +//! .read_all_ifds(&cached_reader) //! .await //! .unwrap(); //! # }) @@ -54,9 +53,9 @@ //! [`MetadataFetch`] implementation. //! //! Thus, it is **imperative to always supply some sort of caching, prefetching, or buffering** -//! middleware when reading metadata. [`PrefetchBuffer`] is an example of this, which -//! fetches the first `N` bytes out of a file. -//! +//! middleware when reading metadata. [`ReadaheadMetadataCache`] is an example of this, which +//! fetches the first `N` bytes out of a file, and then multiplies the size of any subsequent +//! fetches by a given `multiplier`. pub mod cache; mod fetch; From b8c3ffacf46d55390e6159e436d07053af6d7eff Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 23 Nov 2025 17:01:46 +1300 Subject: [PATCH 10/10] rename to cache --- src/metadata/cache.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/metadata/cache.rs b/src/metadata/cache.rs index 21dfb58..a9995da 100644 --- a/src/metadata/cache.rs +++ b/src/metadata/cache.rs @@ -135,15 +135,15 @@ impl ReadaheadMetadataCache { impl MetadataFetch for ReadaheadMetadataCache { fn fetch(&self, range: Range) -> BoxFuture<'_, AsyncTiffResult> { Box::pin(async move { - let mut g = self.cache.lock().await; + let mut cache = self.cache.lock().await; // First check if we already have the range cached - if g.contains(range.start..range.end) { - return Ok(g.slice(range)); + if cache.contains(range.start..range.end) { + return Ok(cache.slice(range)); } // Compute the correct fetch range - let start_len = g.len; + let start_len = cache.len; let needed = range.end.saturating_sub(start_len); let fetch_size = self.next_fetch_size(start_len).max(needed); let fetch_range = start_len..start_len + fetch_size; @@ -153,9 +153,9 @@ impl MetadataFetch for ReadaheadMetadataCache let bytes = self.inner.fetch(fetch_range).await?; // Now append safely - g.append_buffer(bytes); + cache.append_buffer(bytes); - Ok(g.slice(range)) + Ok(cache.slice(range)) }) } }