Skip to content

Commit

Permalink
Fix case where s3 uploads in wrong order
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Nov 5, 2021
1 parent d093fdd commit 4798fe9
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 37 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Expand Up @@ -43,12 +43,12 @@ pretty_assertions = "0.7.2"
rustfmt-nightly = { git = "https://github.com/rust-lang/rustfmt", tag = "v1.4.38" }
maplit = "1.0.2"
mock_instant = "0.2.1"
rusoto_mock = "0.46.0"
rusoto_mock = "=0.46.0"

[patch.crates-io]
rusoto_mock = { git = "https://github.com/allada/rusoto.git", branch = "with_request_checker-owned-0.46.0" }
rusoto_credential = { git = "https://github.com/allada/rusoto.git", branch = "with_request_checker-owned-0.46.0" }
rusoto_signature = { git = "https://github.com/allada/rusoto.git", branch = "with_request_checker-owned-0.46.0" }
rusoto_mock = { git = "https://github.com/allada/rusoto.git", rev = "cc9acca00dbafa41a37d75faeaf2a4baba33d42e" }
rusoto_credential = { git = "https://github.com/allada/rusoto.git", rev = "cc9acca00dbafa41a37d75faeaf2a4baba33d42e" }
rusoto_signature = { git = "https://github.com/allada/rusoto.git", rev = "cc9acca00dbafa41a37d75faeaf2a4baba33d42e" }

[package.metadata.raze.crates.json5.'*']
data_attr = "['src/json5.pest']"
Expand Down
52 changes: 26 additions & 26 deletions cas/store/s3_store.rs
Expand Up @@ -8,7 +8,8 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use futures::stream::{unfold, FuturesUnordered, StreamExt};
use futures::future::{try_join_all, FutureExt};
use futures::stream::unfold;
use http::status::StatusCode;
use lease::{Lease, Pool as ObjectPool};
use rand::{rngs::OsRng, Rng};
Expand All @@ -22,7 +23,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::time::sleep;
use tokio_util::io::ReaderStream;

use common::{log, DigestInfo};
use common::{log, DigestInfo, JoinHandleDropGuard};
use config;
use error::{make_err, make_input_err, Code, Error, ResultExt};
use retry::{ExponentialBackoff, Retrier, RetryResult};
Expand Down Expand Up @@ -238,7 +239,7 @@ impl StoreTrait for S3Store {
let mut part_number: i64 = 1;

// We might end up with +1 capacity units than needed, but that is the worst case.
let mut completed_part_futs = FuturesUnordered::new();
let mut completed_part_futs = Vec::with_capacity((max_size / bytes_per_upload_part) + 1);
loop {
let mut write_buf = self.get_large_vec().await;
let mut take = reader.take(bytes_per_upload_part as u64);
Expand All @@ -263,31 +264,30 @@ impl StoreTrait for S3Store {
};

let s3_client = self.s3_client.clone();
completed_part_futs.push(tokio::spawn(async move {
let part_number = request.part_number;
let mut response = s3_client
.upload_part(request)
.await
.map_err(|e| make_err!(Code::Unknown, "Failed to upload part: {:?}", e))?;
let e_tag = response.e_tag.take();
// Double down to ensure our Lease<Vec<u8>> is freed up and returned to pool.
drop(response);
Result::<CompletedPart, Error>::Ok(CompletedPart {
e_tag,
part_number: Some(part_number),
})
}));
completed_part_futs.push(
JoinHandleDropGuard::new(tokio::spawn(async move {
let part_number = request.part_number;
let mut response = s3_client
.upload_part(request)
.await
.map_err(|e| make_err!(Code::Unknown, "Failed to upload part: {:?}", e))?;
let e_tag = response.e_tag.take();
// Double down to ensure our Lease<Vec<u8>> is freed up and returned to pool.
drop(response);
Result::<CompletedPart, Error>::Ok(CompletedPart {
e_tag,
part_number: Some(part_number),
})
}))
.map(|r| match r.err_tip(|| "Failed to run s3 upload spawn") {
Ok(r2) => r2.err_tip(|| "S3 upload chunk failure"),
Err(e) => Err(e),
}),
);
part_number += 1;
}

let mut completed_parts = Vec::with_capacity(completed_part_futs.len());
while let Some(result) = completed_part_futs.next().await {
completed_parts.push(
result
.err_tip(|| "Failed to join s3 chunk upload")?
.err_tip(|| "Failed to upload chunk")?,
);
}
let completed_parts = try_join_all(completed_part_futs).await?;
self.s3_client
.complete_multipart_upload(CompleteMultipartUploadRequest {
bucket: self.bucket.to_owned(),
Expand Down Expand Up @@ -346,7 +346,7 @@ impl StoreTrait for S3Store {

let get_object_output = self.s3_client.get_object(get_req).await.map_err(|e| match e {
RusotoError::Service(GetObjectError::NoSuchKey(err)) => {
return make_err!(Code::NotFound, "Error reading from S3: {:?}", err)
return make_err!(Code::NotFound, "File not found in S3: {:?}", err)
}
_ => make_err!(Code::Unknown, "Error reading from S3: {:?}", e),
})?;
Expand Down
1 change: 1 addition & 0 deletions util/BUILD
Expand Up @@ -23,6 +23,7 @@ rust_library(
"//third_party:hex",
"//third_party:lazy_init",
"//third_party:log",
"//third_party:tokio",
":error",
],
visibility = ["//visibility:public"],
Expand Down
33 changes: 31 additions & 2 deletions util/common.rs
@@ -1,14 +1,17 @@
// Copyright 2020 Nathan (Blaise) Bruer. All rights reserved.

use std::convert::TryFrom;
use std::convert::TryInto;
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::pin::Pin;
use std::task::{Context, Poll};

use hex::FromHex;
use lazy_init::LazyTransform;
pub use log;
use proto::build::bazel::remote::execution::v2::Digest;
use tokio::task::{JoinError, JoinHandle};

use error::{make_input_err, Error, ResultExt};

Expand Down Expand Up @@ -107,3 +110,29 @@ impl Into<Digest> for DigestInfo {
}
}
}

/// Simple wrapper that will abort a future that is running in another spawn in the
/// event that this handle gets dropped.
pub struct JoinHandleDropGuard<T> {
inner: JoinHandle<T>,
}

impl<T> JoinHandleDropGuard<T> {
pub fn new(inner: JoinHandle<T>) -> Self {
Self { inner }
}
}

impl<T> Future for JoinHandleDropGuard<T> {
type Output = Result<T, JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner).poll(cx)
}
}

impl<T> Drop for JoinHandleDropGuard<T> {
fn drop(&mut self) {
self.inner.abort();
}
}

0 comments on commit 4798fe9

Please sign in to comment.