Skip to content

Commit

Permalink
object_store: Implement ObjectStore for Arc (#4502)
Browse files Browse the repository at this point in the history
* object_store: Add `Box<dyn ObjectStore>` tests

* object_store: Extract `as_ref_impl!()` macro

* object_store: Implement `ObjectStore` for `Arc`
  • Loading branch information
Turbo87 committed Jul 10, 2023
1 parent 6bbf2f0 commit 8da2f97
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 80 deletions.
179 changes: 99 additions & 80 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ use std::fmt::{Debug, Formatter};
#[cfg(not(target_arch = "wasm32"))]
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;
use tokio::io::AsyncWrite;

#[cfg(any(feature = "azure", feature = "aws", feature = "gcp", feature = "http"))]
Expand Down Expand Up @@ -526,105 +527,123 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
}
}

#[async_trait]
impl ObjectStore for Box<dyn ObjectStore> {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
self.as_ref().put(location, bytes).await
}
macro_rules! as_ref_impl {
($type:ty) => {
#[async_trait]
impl ObjectStore for $type {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
self.as_ref().put(location, bytes).await
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
self.as_ref().put_multipart(location).await
}
async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
self.as_ref().put_multipart(location).await
}

async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<()> {
self.as_ref().abort_multipart(location, multipart_id).await
}
async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<()> {
self.as_ref().abort_multipart(location, multipart_id).await
}

async fn append(
&self,
location: &Path,
) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
self.as_ref().append(location).await
}
async fn append(
&self,
location: &Path,
) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
self.as_ref().append(location).await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
self.as_ref().get(location).await
}
async fn get(&self, location: &Path) -> Result<GetResult> {
self.as_ref().get(location).await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.as_ref().get_opts(location, options).await
}
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> Result<GetResult> {
self.as_ref().get_opts(location, options).await
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
self.as_ref().get_range(location, range).await
}
async fn get_range(
&self,
location: &Path,
range: Range<usize>,
) -> Result<Bytes> {
self.as_ref().get_range(location, range).await
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
self.as_ref().get_ranges(location, ranges).await
}
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
self.as_ref().get_ranges(location, ranges).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.as_ref().head(location).await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.as_ref().head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.as_ref().delete(location).await
}
async fn delete(&self, location: &Path) -> Result<()> {
self.as_ref().delete(location).await
}

fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
) -> BoxStream<'a, Result<Path>> {
self.as_ref().delete_stream(locations)
}
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
) -> BoxStream<'a, Result<Path>> {
self.as_ref().delete_stream(locations)
}

async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.as_ref().list(prefix).await
}
async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.as_ref().list(prefix).await
}

async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.as_ref().list_with_offset(prefix, offset).await
}
async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
self.as_ref().list_with_offset(prefix, offset).await
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.as_ref().list_with_delimiter(prefix).await
}
async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> Result<ListResult> {
self.as_ref().list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().copy(from, to).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().copy(from, to).await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().rename(from, to).await
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().rename(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().copy_if_not_exists(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().copy_if_not_exists(from, to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().rename_if_not_exists(from, to).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().rename_if_not_exists(from, to).await
}
}
};
}

as_ref_impl!(Arc<dyn ObjectStore>);
as_ref_impl!(Box<dyn ObjectStore>);

/// Result of a list call that includes objects, prefixes (directories) and a
/// token for the next set of results. Individual result sets may be limited to
/// 1,000 objects based on the underlying object storage's limitations.
Expand Down
26 changes: 26 additions & 0 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,32 @@ mod tests {
stream_get(&integration).await;
}

#[tokio::test]
async fn box_test() {
let integration: Box<dyn ObjectStore> = Box::new(InMemory::new());

put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
}

#[tokio::test]
async fn arc_test() {
let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new());

put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
}

#[tokio::test]
async fn unknown_length() {
let integration = InMemory::new();
Expand Down

0 comments on commit 8da2f97

Please sign in to comment.