diff --git a/Cargo.toml b/Cargo.toml index dfdf165..453bdc4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,12 +7,14 @@ edition = "2021" byteorder = "1" bytes = "1.7.0" flate2 = "1.0.20" +futures = "0.3.31" jpeg = { package = "jpeg-decoder", version = "0.3.0", default-features = false } ndarray = "*" num_enum = "*" object_store = "0.11" thiserror = "1" tiff = "0.9" +tokio = { version = "1.43.0", optional = true } weezl = "0.1.0" [dev-dependencies] diff --git a/src/async_reader.rs b/src/async_reader.rs new file mode 100644 index 0000000..eb32598 --- /dev/null +++ b/src/async_reader.rs @@ -0,0 +1,113 @@ +use bytes::Bytes; +use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +use object_store::ObjectStore; +use std::io::SeekFrom; +use std::ops::Range; +use std::sync::Arc; + +use crate::error::{AiocogeoError, Result}; + +/// The asynchronous interface used to read COG files +/// +/// This was derived from the Parquet `AsyncFileReader`: +/// https://docs.rs/parquet/latest/parquet/arrow/async_reader/trait.AsyncFileReader.html +/// +/// Notes: +/// +/// 1. There is a default implementation for types that implement [`AsyncRead`] +/// and [`AsyncSeek`], for example [`tokio::fs::File`]. +/// +/// 2. [`ObjectReader`], available when the `object_store` crate feature +/// is enabled, implements this interface for [`ObjectStore`]. +/// +/// [`ObjectStore`]: object_store::ObjectStore +/// +/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html +pub trait AsyncFileReader: Send { + /// Retrieve the bytes in `range` + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + + /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + async move { + let mut result = Vec::with_capacity(ranges.len()); + + for range in ranges.into_iter() { + let data = self.get_bytes(range).await?; + result.push(data); + } + + Ok(result) + } + .boxed() + } +} + +/// This allows Box to be used as an AsyncFileReader, +impl AsyncFileReader for Box { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + self.as_mut().get_bytes(range) + } + + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + self.as_mut().get_byte_ranges(ranges) + } +} + +#[cfg(feature = "tokio")] +impl AsyncFileReader for T { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + async move { + self.seek(SeekFrom::Start(range.start as u64)).await?; + + let to_read = range.end - range.start; + let mut buffer = Vec::with_capacity(to_read); + let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; + if read != to_read { + return Err(AiocogeoError::EndOfFile(to_read, read)); + } + + Ok(buffer.into()) + } + .boxed() + } +} + +#[derive(Clone, Debug)] +pub struct ObjectReader { + store: Arc, + path: object_store::path::Path, +} + +impl ObjectReader { + /// Creates a new [`ObjectReader`] for the provided [`ObjectStore`] and path + /// + /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`] + pub fn new(store: Arc, path: object_store::path::Path) -> Self { + Self { store, path } + } +} + +impl AsyncFileReader for ObjectReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + self.store + .get_range(&self.path, range) + .map_err(|e| e.into()) + .boxed() + } + + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> + where + Self: Send, + { + async move { + self.store + .get_ranges(&self.path, &ranges) + .await + .map_err(|e| e.into()) + } + .boxed() + } +} diff --git a/src/cog.rs b/src/cog.rs index 3b3e6b8..febfa78 100644 --- a/src/cog.rs +++ b/src/cog.rs @@ -1,22 +1,19 @@ -use std::sync::Arc; - use bytes::Bytes; -use object_store::path::Path; -use object_store::ObjectStore; use crate::cursor::{Endianness, ObjectStoreCursor}; use crate::error::Result; use crate::ifd::ImageFileDirectories; +use crate::AsyncFileReader; pub struct COGReader { - store: Arc, - path: Path, + #[allow(dead_code)] + reader: Box, ifds: ImageFileDirectories, } impl COGReader { - pub async fn try_open(store: Arc, path: Path) -> Result { - let mut cursor = ObjectStoreCursor::new(store, path); + pub async fn try_open(reader: Box) -> Result { + let mut cursor = ObjectStoreCursor::new(reader); let magic_bytes = cursor.read(2).await; // Should be b"II" for little endian or b"MM" for big endian if magic_bytes == Bytes::from_static(b"II") { @@ -38,8 +35,8 @@ impl COGReader { .await .unwrap(); - let (store, path) = cursor.into_inner(); - Ok(Self { store, path, ifds }) + let reader = cursor.into_inner(); + Ok(Self { reader, ifds }) } /// Return the EPSG code representing the crs of the image @@ -60,6 +57,9 @@ impl COGReader { #[cfg(test)] mod test { use std::io::BufReader; + use std::sync::Arc; + + use crate::ObjectReader; use super::*; use object_store::local::LocalFileSystem; @@ -68,15 +68,13 @@ mod test { #[tokio::test] async fn tmp() { let folder = "/Users/kyle/github/developmentseed/aiocogeo-rs/"; - let path = Path::parse("m_4007307_sw_18_060_20220803.tif").unwrap(); + let path = object_store::path::Path::parse("m_4007307_sw_18_060_20220803.tif").unwrap(); let store = Arc::new(LocalFileSystem::new_with_prefix(folder).unwrap()); - let reader = COGReader::try_open(store.clone(), path.clone()) - .await - .unwrap(); - let cursor = ObjectStoreCursor::new(store.clone(), path.clone()); - let ifd = &reader.ifds.as_ref()[4]; + let reader = ObjectReader::new(store, path); + let cog_reader = COGReader::try_open(Box::new(reader.clone())).await.unwrap(); + let ifd = &cog_reader.ifds.as_ref()[4]; dbg!(ifd.compression); - let tile = ifd.get_tile(0, 0, &cursor).await.unwrap(); + let tile = ifd.get_tile(0, 0, Box::new(reader)).await.unwrap(); std::fs::write("img.buf", tile).unwrap(); // dbg!(tile.len()); } diff --git a/src/cursor.rs b/src/cursor.rs index 4b55e1d..854e65a 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -1,11 +1,11 @@ use std::io::Cursor; use std::ops::Range; -use std::sync::Arc; use byteorder::{BigEndian, LittleEndian, ReadBytesExt}; use bytes::Bytes; -use object_store::path::Path; -use object_store::ObjectStore; + +use crate::error::AiocogeoError; +use crate::AsyncFileReader; #[derive(Debug, Clone, Copy, Default)] pub enum Endianness { @@ -17,8 +17,7 @@ pub enum Endianness { /// A wrapper around an [ObjectStore] that provides a seek-oriented interface // TODO: in the future add buffering to this pub(crate) struct ObjectStoreCursor { - store: Arc, - path: Path, + reader: Box, offset: usize, endianness: Endianness, } @@ -37,10 +36,9 @@ macro_rules! impl_read_byteorder { } impl ObjectStoreCursor { - pub(crate) fn new(store: Arc, path: Path) -> Self { + pub(crate) fn new(reader: Box) -> Self { Self { - store, - path, + reader, offset: 0, endianness: Default::default(), } @@ -50,14 +48,14 @@ impl ObjectStoreCursor { self.endianness = endianness; } - pub(crate) fn into_inner(self) -> (Arc, Path) { - (self.store, self.path) + pub(crate) fn into_inner(self) -> Box { + self.reader } pub(crate) async fn read(&mut self, length: usize) -> Bytes { let range = self.offset..self.offset + length; self.offset += length; - self.store.get_range(&self.path, range).await.unwrap() + self.reader.get_bytes(range).await.unwrap() } /// Read a u8 from the cursor @@ -95,15 +93,13 @@ impl ObjectStoreCursor { } } - pub(crate) fn store(&self) -> &Arc { - &self.store + #[allow(dead_code)] + pub(crate) fn reader(&self) -> &dyn AsyncFileReader { + &self.reader } - pub(crate) async fn get_range( - &self, - range: Range, - ) -> Result { - Ok(self.store.get_range(&self.path, range).await?) + pub(crate) async fn get_range(&mut self, range: Range) -> Result { + self.reader.get_bytes(range).await } /// Advance cursor position by a set amount diff --git a/src/error.rs b/src/error.rs index bd45e60..9215a01 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,6 +5,9 @@ use thiserror::Error; #[derive(Error, Debug)] #[non_exhaustive] pub enum AiocogeoError { + #[error("End of File: expected to read {0} bytes, got {1}")] + EndOfFile(usize, usize), + /// General error. #[error("General error: {0}")] General(String), diff --git a/src/ifd.rs b/src/ifd.rs index 98a37d6..1db508e 100644 --- a/src/ifd.rs +++ b/src/ifd.rs @@ -16,13 +16,14 @@ use crate::cursor::ObjectStoreCursor; use crate::decoder::decode_tile; use crate::error::Result; use crate::geo_key_directory::{GeoKeyDirectory, GeoKeyTag}; +use crate::AsyncFileReader; const DOCUMENT_NAME: u16 = 269; /// A collection of all the IFD // TODO: maybe separate out the primary/first image IFD out of the vec, as that one should have // geospatial metadata? -pub(crate) struct ImageFileDirectories { +pub struct ImageFileDirectories { /// There's always at least one IFD in a TIFF. We store this separately ifds: Vec, // Is it guaranteed that if masks exist that there will be one per image IFD? Or could there be @@ -59,7 +60,7 @@ impl ImageFileDirectories { // The ordering of these tags matches the sorted order in TIFF spec Appendix A #[allow(dead_code)] #[derive(Debug, Clone)] -pub(crate) struct ImageFileDirectory { +pub struct ImageFileDirectory { pub(crate) new_subfile_type: Option, /// The number of columns in the image, i.e., the number of pixels per row. @@ -533,7 +534,14 @@ impl ImageFileDirectory { } } - pub async fn get_tile(&self, x: usize, y: usize, cursor: &ObjectStoreCursor) -> Result { + pub async fn get_tile( + &self, + x: usize, + y: usize, + reader: Box, + ) -> Result { + let mut cursor = ObjectStoreCursor::new(reader); + let idx = (y * self.tile_count().0) + x; let offset = self.tile_offsets[idx] as usize; // TODO: aiocogeo has a -1 here, but I think that was in error diff --git a/src/lib.rs b/src/lib.rs index 975e373..57bf8b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ mod affine; +mod async_reader; mod cog; mod cursor; mod decoder; @@ -9,4 +10,6 @@ mod ifd; mod partial_reads; mod tag; +pub use async_reader::{AsyncFileReader, ObjectReader}; pub use cog::COGReader; +pub use ifd::{ImageFileDirectories, ImageFileDirectory};