Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ jobs:
OPENDAL_WEBSITE_NOT_LATEST: true

- name: Deploy to nightlies for tagged version
uses: burnett01/rsync-deployments@5.2
uses: burnett01/rsync-deployments@0dc935cdecc5f5e571865e60d2a6cdc673704823
if: ${{ startsWith(github.ref, 'refs/tags/') && !contains(github.ref, 'rc') }}
with:
switches: -avzr
Expand All @@ -707,7 +707,7 @@ jobs:
OPENDAL_WEBSITE_BASE_URL: /opendal/opendal-docs-stable/

- name: Deploy to nightlies for stable version
uses: burnett01/rsync-deployments@5.2
uses: burnett01/rsync-deployments@0dc935cdecc5f5e571865e60d2a6cdc673704823
if: ${{ startsWith(github.ref, 'refs/tags/') && !contains(github.ref, 'rc') }}
with:
switches: -avzr --delete
Expand Down
134 changes: 106 additions & 28 deletions integrations/object_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ use object_store::PutResult;
use object_store::{GetOptions, UploadPart};
use object_store::{GetRange, GetResultPayload};
use object_store::{GetResult, PutMode};
use opendal::options::CopyOptions;
use opendal::raw::percent_decode_path;
use opendal::Buffer;
use opendal::Writer;
use opendal::{Operator, OperatorInfo};
use std::collections::HashMap;
use tokio::sync::{Mutex, Notify};

/// OpendalStore implements ObjectStore trait by using opendal.
Expand Down Expand Up @@ -109,6 +111,41 @@ impl OpendalStore {
pub fn info(&self) -> &OperatorInfo {
self.info.as_ref()
}

/// Copy a file from one location to another
async fn copy_request(
&self,
from: &Path,
to: &Path,
if_not_exists: bool,
) -> object_store::Result<()> {
let mut copy_options = CopyOptions::default();
if if_not_exists {
copy_options.if_not_exists = true;
}

// Perform the copy operation
self.inner
.copy_options(
&percent_decode_path(from.as_ref()),
&percent_decode_path(to.as_ref()),
copy_options,
)
.into_send()
.await
.map_err(|err| {
if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
object_store::Error::AlreadyExists {
path: to.to_string(),
source: Box::new(err),
}
} else {
format_object_store_error(err, from.as_ref())
}
})?;

Ok(())
}
}

impl Debug for OpendalStore {
Expand Down Expand Up @@ -206,15 +243,59 @@ impl ObjectStore for OpendalStore {

async fn put_multipart_opts(
&self,
_location: &Path,
_opts: PutMultipartOptions,
location: &Path,
opts: PutMultipartOptions,
) -> object_store::Result<Box<dyn MultipartUpload>> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"put_multipart_opts is not implemented so far",
)),
})
const DEFAULT_CONCURRENT: usize = 8;

let mut options = opendal::options::WriteOptions {
concurrent: DEFAULT_CONCURRENT,
..Default::default()
};

// Collect user metadata separately to handle multiple entries
let mut user_metadata = HashMap::new();

// Handle attributes if provided
for (key, value) in opts.attributes.iter() {
match key {
object_store::Attribute::CacheControl => {
options.cache_control = Some(value.to_string());
}
object_store::Attribute::ContentDisposition => {
options.content_disposition = Some(value.to_string());
}
object_store::Attribute::ContentEncoding => {
options.content_encoding = Some(value.to_string());
}
object_store::Attribute::ContentLanguage => {
// no support
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create an issue for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracing in #6470

continue;
}
object_store::Attribute::ContentType => {
options.content_type = Some(value.to_string());
}
object_store::Attribute::Metadata(k) => {
user_metadata.insert(k.to_string(), value.to_string());
}
_ => {}
}
}

// Apply user metadata if any entries were collected
if !user_metadata.is_empty() {
options.user_metadata = Some(user_metadata);
}

let writer = self
.inner
.writer_options(&percent_decode_path(location.as_ref()), options)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
let upload = OpendalMultipartUpload::new(writer, location.clone());

Ok(Box::new(upload))
}

async fn get_opts(
Expand Down Expand Up @@ -245,6 +326,17 @@ impl ObjectStore for OpendalStore {
.map_err(|err| format_object_store_error(err, location.as_ref()))?
};

// Convert user defined metadata from OpenDAL to object_store attributes
let mut attributes = object_store::Attributes::new();
if let Some(user_meta) = meta.user_metadata() {
for (key, value) in user_meta {
attributes.insert(
object_store::Attribute::Metadata(key.clone().into()),
value.clone().into(),
);
}
}

let meta = ObjectMeta {
location: location.clone(),
last_modified: meta.last_modified().unwrap_or_default(),
Expand All @@ -258,7 +350,7 @@ impl ObjectStore for OpendalStore {
payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
range: 0..0,
meta,
attributes: Default::default(),
attributes,
});
}

Expand Down Expand Up @@ -319,7 +411,7 @@ impl ObjectStore for OpendalStore {
payload: GetResultPayload::Stream(Box::pin(stream)),
range: read_range.start..read_range.end,
meta,
attributes: Default::default(),
attributes,
})
}

Expand Down Expand Up @@ -466,16 +558,11 @@ impl ObjectStore for OpendalStore {
}

async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner
.copy(
&percent_decode_path(from.as_ref()),
&percent_decode_path(to.as_ref()),
)
.into_send()
.await
.map_err(|err| format_object_store_error(err, from.as_ref()))?;
self.copy_request(from, to, false).await
}

Ok(())
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.copy_request(from, to, true).await
}

async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
Expand All @@ -490,15 +577,6 @@ impl ObjectStore for OpendalStore {

Ok(())
}

async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"copy_if_not_exists is not implemented so far",
)),
})
}
}

/// `MultipartUpload`'s impl based on `Writer` in opendal
Expand Down
Loading