Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/core/src/raw/enum_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWays<ONE, TWO> {
}
}

impl<ONE: oio::Copy, TWO: oio::Copy> oio::Copy for TwoWays<ONE, TWO> {
async fn next(&mut self) -> Result<Option<usize>> {
match self {
Self::One(v) => v.next().await,
Self::Two(v) => v.next().await,
}
}

async fn abort(&mut self) -> Result<()> {
match self {
Self::One(v) => v.abort().await,
Self::Two(v) => v.abort().await,
}
}
}

impl<ONE: oio::List, TWO: oio::List> oio::List for TwoWays<ONE, TWO> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
match self {
Expand Down
3 changes: 3 additions & 0 deletions core/core/src/raw/oio/copy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@

mod api;
pub use api::*;

mod multipart_copy;
pub use multipart_copy::*;
282 changes: 282 additions & 0 deletions core/core/src/raw/oio/copy/multipart_copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::future::Future;
use std::sync::Arc;

use futures::FutureExt;
use futures::select;

use crate::raw::oio::MultipartPart;
use crate::raw::*;
use crate::*;

/// MultipartCopy is used to implement [`oio::Copy`] based on multipart copy.
///
/// By implementing MultipartCopy, services only need to provide the
/// service-specific multipart copy operations. [`MultipartCopier`] will drive
/// the upload id, part queue, completion, and abort state.
pub trait MultipartCopy: Send + Sync + Unpin + 'static {
/// copy_once is used to copy the source object at once.
///
/// MultipartCopier will call this API when the source object can be copied
/// without starting a multipart copy.
fn copy_once(&self) -> impl Future<Output = Result<()>> + MaybeSend;

/// initiate_copy starts a multipart copy and returns the upload id.
fn initiate_copy(&self) -> impl Future<Output = Result<String>> + MaybeSend;

/// copy_part copies one source range into one multipart upload part.
///
/// - part_number is the index of the part, starting from 0.
fn copy_part(
&self,
upload_id: &str,
part_number: usize,
range: BytesRange,
) -> impl Future<Output = Result<MultipartPart>> + MaybeSend;

/// complete_copy completes the multipart copy with the ordered part list.
fn complete_copy(
&self,
upload_id: &str,
parts: &[MultipartPart],
) -> impl Future<Output = Result<()>> + MaybeSend;

/// abort_copy cancels the pending multipart copy and purges intermediate state.
fn abort_copy(&self, upload_id: &str) -> impl Future<Output = Result<()>> + MaybeSend;
}

struct CopyInput<C: MultipartCopy> {
copier: Arc<C>,
executor: Executor,
upload_id: Arc<String>,
part_number: usize,
range: BytesRange,
}

impl<C: MultipartCopy> Clone for CopyInput<C> {
fn clone(&self) -> Self {
Self {
copier: self.copier.clone(),
executor: self.executor.clone(),
upload_id: self.upload_id.clone(),
part_number: self.part_number,
range: self.range,
}
}
}

struct CopiedPart {
part: MultipartPart,
size: u64,
}

/// MultipartCopier implements [`oio::Copy`] based on multipart copy.
pub struct MultipartCopier<C: MultipartCopy> {
copier: Arc<C>,
executor: Executor,

upload_id: Option<Arc<String>>,
parts: Vec<MultipartPart>,
next_part_number: usize,
next_offset: u64,
source_size: u64,
copy_once_threshold: u64,
part_size: u64,
concurrent: usize,
completed: bool,

tasks: ConcurrentTasks<CopyInput<C>, CopiedPart>,
}

impl<C: MultipartCopy> MultipartCopier<C> {
/// Create a new MultipartCopier.
pub fn new(
info: Arc<AccessorInfo>,
inner: C,
source_size: u64,
copy_once_threshold: u64,
part_size: u64,
concurrent: usize,
) -> Self {
let copier = Arc::new(inner);
let executor = info.executor();
let concurrent = concurrent.max(1);

Self {
copier,
executor: executor.clone(),
upload_id: None,
parts: Vec::new(),
next_part_number: 0,
next_offset: 0,
source_size,
copy_once_threshold,
part_size,
concurrent,
completed: false,

tasks: ConcurrentTasks::new(executor, concurrent, 8192, |input| {
Box::pin(async move {
let size = input
.range
.size()
.expect("multipart copy range must be sized");
let fut =
input
.copier
.copy_part(&input.upload_id, input.part_number, input.range);

let result = match input.executor.timeout() {
None => fut.await.map(|part| CopiedPart { part, size }),
Some(timeout) => {
select! {
result = fut.fuse() => {
result.map(|part| CopiedPart { part, size })
}
_ = timeout.fuse() => {
Err(Error::new(
ErrorKind::Unexpected, "copy part timeout")
.with_context("upload_id", input.upload_id.to_string())
.with_context("part_number", input.part_number.to_string())
.set_temporary())
}
}
}
};

(input, result)
})
}),
}
}

async fn upload_id(&mut self) -> Result<Arc<String>> {
match self.upload_id.clone() {
Some(upload_id) => Ok(upload_id),
None => {
let upload_id = self.copier.initiate_copy().await?;
let upload_id = Arc::new(upload_id);
self.upload_id = Some(upload_id.clone());
Ok(upload_id)
}
}
}

async fn fill_tasks(&mut self, upload_id: Arc<String>) -> Result<()> {
let mut scheduled = 0;

while self.next_offset < self.source_size
&& self.tasks.has_remaining()
&& scheduled < self.concurrent
{
let size = self.part_size.min(self.source_size - self.next_offset);
let range = BytesRange::new(self.next_offset, Some(size));

let input = CopyInput {
copier: self.copier.clone(),
executor: self.executor.clone(),
upload_id: upload_id.clone(),
part_number: self.next_part_number,
range,
};

loop {
match self.tasks.execute(input.clone()).await {
Ok(()) => break,
Err(err) if err.is_temporary() => continue,
Err(err) => return Err(err),
}
}

self.next_offset += size;
self.next_part_number += 1;
scheduled += 1;

if self.tasks.has_result() {
break;
}
}

Ok(())
}
}

impl<C> oio::Copy for MultipartCopier<C>
where
C: MultipartCopy,
{
async fn next(&mut self) -> Result<Option<usize>> {
if self.completed {
return Ok(None);
}

if self.upload_id.is_none() && self.source_size <= self.copy_once_threshold {
self.copier.copy_once().await?;
self.completed = true;
return Ok(None);
}

let upload_id = self.upload_id().await?;
self.fill_tasks(upload_id.clone()).await?;

loop {
match self.tasks.next().await {
Some(Ok(result)) => {
let size = result.size.try_into().map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"multipart copy part size exceeds usize",
)
})?;
self.parts.push(result.part);
return Ok(Some(size));
}
Some(Err(err)) if err.is_temporary() => continue,
Some(Err(err)) => return Err(err),
None => break,
}
}

if self.parts.len() != self.next_part_number {
return Err(Error::new(
ErrorKind::Unexpected,
"multipart copy part numbers mismatch, please report bug to opendal",
)
.with_context("expected", self.next_part_number)
.with_context("actual", self.parts.len())
.with_context("upload_id", upload_id.to_string()));
}

self.parts.sort_by_key(|part| part.part_number);
self.copier.complete_copy(&upload_id, &self.parts).await?;
self.completed = true;
Ok(None)
}

async fn abort(&mut self) -> Result<()> {
self.tasks.clear();
let Some(upload_id) = self.upload_id.take() else {
return Ok(());
};

self.copier.abort_copy(&upload_id).await?;
self.completed = true;
Ok(())
}
}
13 changes: 13 additions & 0 deletions core/core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ impl OpCopy {
#[derive(Debug, Clone, Default)]
pub struct OpCopier {
concurrent: usize,
chunk: Option<usize>,
}

impl OpCopier {
Expand All @@ -923,6 +924,17 @@ impl OpCopier {
pub fn concurrent(&self) -> usize {
self.concurrent.max(1)
}

/// Set the chunk size for the copier.
pub fn with_chunk(mut self, chunk: usize) -> Self {
self.chunk = Some(chunk);
self
}

/// Get the chunk size for the copier.
pub fn chunk(&self) -> Option<usize> {
self.chunk
}
}

impl From<options::CopyOptions> for (OpCopy, OpCopier) {
Expand All @@ -933,6 +945,7 @@ impl From<options::CopyOptions> for (OpCopy, OpCopier) {
},
OpCopier {
concurrent: value.concurrent.max(1),
chunk: value.chunk,
},
)
}
Expand Down
6 changes: 6 additions & 0 deletions core/core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ pub struct Capability {
pub copy: bool,
/// Indicates if conditional copy operations with if-not-exists are supported.
pub copy_with_if_not_exists: bool,
/// Indicates if copy operations can be split into multiple server-side tasks.
pub copy_can_multi: bool,
/// Maximum size supported for segmented copy tasks.
pub copy_multi_max_size: Option<usize>,
/// Minimum size required for segmented copy tasks.
pub copy_multi_min_size: Option<usize>,

/// Indicates if rename operations are supported.
pub rename: bool,
Expand Down
24 changes: 24 additions & 0 deletions core/core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,22 @@ impl<F: Future<Output = Result<()>>> FutureCopy<F> {
self.args.0.if_not_exists = v;
self
}

/// Sets concurrent copy operations for this copy.
///
/// Refer to [`options::CopyOptions::concurrent`] for more details.
pub fn concurrent(mut self, v: usize) -> Self {
self.args.0.concurrent = v.max(1);
self
}

/// Sets chunk size for segmented copy operations.
///
/// Refer to [`options::CopyOptions::chunk`] for more details.
pub fn chunk(mut self, v: usize) -> Self {
self.args.0.chunk = Some(v);
self
}
}

/// Future that generated by [`Operator::copier_with`].
Expand All @@ -1443,4 +1459,12 @@ impl<F: Future<Output = Result<Copier>>> FutureCopier<F> {
self.args.0.concurrent = v.max(1);
self
}

/// Sets chunk size for segmented copy operations.
///
/// Refer to [`options::CopyOptions::chunk`] for more details.
pub fn chunk(mut self, v: usize) -> Self {
self.args.0.chunk = Some(v);
self
}
}
Loading
Loading