From 8da2f97bfd9a613c02acbd4b329d11937ca6257f Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Mon, 10 Jul 2023 20:04:50 +0200 Subject: [PATCH] object_store: Implement `ObjectStore` for `Arc` (#4502) * object_store: Add `Box` tests * object_store: Extract `as_ref_impl!()` macro * object_store: Implement `ObjectStore` for `Arc` --- object_store/src/lib.rs | 179 ++++++++++++++++++++----------------- object_store/src/memory.rs | 26 ++++++ 2 files changed, 125 insertions(+), 80 deletions(-) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 864cabc4a8c..97e6aae9713 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -270,6 +270,7 @@ use std::fmt::{Debug, Formatter}; #[cfg(not(target_arch = "wasm32"))] use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; +use std::sync::Arc; use tokio::io::AsyncWrite; #[cfg(any(feature = "azure", feature = "aws", feature = "gcp", feature = "http"))] @@ -526,105 +527,123 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { } } -#[async_trait] -impl ObjectStore for Box { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - self.as_ref().put(location, bytes).await - } +macro_rules! as_ref_impl { + ($type:ty) => { + #[async_trait] + impl ObjectStore for $type { + async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + self.as_ref().put(location, bytes).await + } - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - self.as_ref().put_multipart(location).await - } + async fn put_multipart( + &self, + location: &Path, + ) -> Result<(MultipartId, Box)> { + self.as_ref().put_multipart(location).await + } - async fn abort_multipart( - &self, - location: &Path, - multipart_id: &MultipartId, - ) -> Result<()> { - self.as_ref().abort_multipart(location, multipart_id).await - } + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> Result<()> { + self.as_ref().abort_multipart(location, multipart_id).await + } - async fn append( - &self, - location: &Path, - ) -> Result> { - self.as_ref().append(location).await - } + async fn append( + &self, + location: &Path, + ) -> Result> { + self.as_ref().append(location).await + } - async fn get(&self, location: &Path) -> Result { - self.as_ref().get(location).await - } + async fn get(&self, location: &Path) -> Result { + self.as_ref().get(location).await + } - async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - self.as_ref().get_opts(location, options).await - } + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> Result { + self.as_ref().get_opts(location, options).await + } - async fn get_range(&self, location: &Path, range: Range) -> Result { - self.as_ref().get_range(location, range).await - } + async fn get_range( + &self, + location: &Path, + range: Range, + ) -> Result { + self.as_ref().get_range(location, range).await + } - async fn get_ranges( - &self, - location: &Path, - ranges: &[Range], - ) -> Result> { - self.as_ref().get_ranges(location, ranges).await - } + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> Result> { + self.as_ref().get_ranges(location, ranges).await + } - async fn head(&self, location: &Path) -> Result { - self.as_ref().head(location).await - } + async fn head(&self, location: &Path) -> Result { + self.as_ref().head(location).await + } - async fn delete(&self, location: &Path) -> Result<()> { - self.as_ref().delete(location).await - } + async fn delete(&self, location: &Path) -> Result<()> { + self.as_ref().delete(location).await + } - fn delete_stream<'a>( - &'a self, - locations: BoxStream<'a, Result>, - ) -> BoxStream<'a, Result> { - self.as_ref().delete_stream(locations) - } + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, Result>, + ) -> BoxStream<'a, Result> { + self.as_ref().delete_stream(locations) + } - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { - self.as_ref().list(prefix).await - } + async fn list( + &self, + prefix: Option<&Path>, + ) -> Result>> { + self.as_ref().list(prefix).await + } - async fn list_with_offset( - &self, - prefix: Option<&Path>, - offset: &Path, - ) -> Result>> { - self.as_ref().list_with_offset(prefix, offset).await - } + async fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> Result>> { + self.as_ref().list_with_offset(prefix, offset).await + } - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { - self.as_ref().list_with_delimiter(prefix).await - } + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> Result { + self.as_ref().list_with_delimiter(prefix).await + } - async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - self.as_ref().copy(from, to).await - } + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + self.as_ref().copy(from, to).await + } - async fn rename(&self, from: &Path, to: &Path) -> Result<()> { - self.as_ref().rename(from, to).await - } + async fn rename(&self, from: &Path, to: &Path) -> Result<()> { + self.as_ref().rename(from, to).await + } - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.as_ref().copy_if_not_exists(from, to).await - } + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.as_ref().copy_if_not_exists(from, to).await + } - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.as_ref().rename_if_not_exists(from, to).await - } + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.as_ref().rename_if_not_exists(from, to).await + } + } + }; } +as_ref_impl!(Arc); +as_ref_impl!(Box); + /// Result of a list call that includes objects, prefixes (directories) and a /// token for the next set of results. Individual result sets may be limited to /// 1,000 objects based on the underlying object storage's limitations. diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 98b3a15eecb..cfc2ac82303 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -415,6 +415,32 @@ mod tests { stream_get(&integration).await; } + #[tokio::test] + async fn box_test() { + let integration: Box = Box::new(InMemory::new()); + + put_get_delete_list(&integration).await; + get_opts(&integration).await; + list_uses_directories_correctly(&integration).await; + list_with_delimiter(&integration).await; + rename_and_copy(&integration).await; + copy_if_not_exists(&integration).await; + stream_get(&integration).await; + } + + #[tokio::test] + async fn arc_test() { + let integration: Arc = Arc::new(InMemory::new()); + + put_get_delete_list(&integration).await; + get_opts(&integration).await; + list_uses_directories_correctly(&integration).await; + list_with_delimiter(&integration).await; + rename_and_copy(&integration).await; + copy_if_not_exists(&integration).await; + stream_get(&integration).await; + } + #[tokio::test] async fn unknown_length() { let integration = InMemory::new();