Skip to content

Commit

Permalink
fix(object_store,aws,gcp): multipart upload enforce size limit of 5 M…
Browse files Browse the repository at this point in the history
…iB not 5MB (#3234)

* fix: use better minimum part size

* test: don't make the test larger than necessary

* Further tweaks

* Format

Co-authored-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
  • Loading branch information
wjones127 and tustvold committed Dec 1, 2022
1 parent f133621 commit 8f5fd9a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
6 changes: 3 additions & 3 deletions object_store/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Setup environment

```
export TEST_INTEGRATION=1
export AWS_DEFAULT_REGION=us-east-1
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export OBJECT_STORE_AWS_DEFAULT_REGION=us-east-1
export OBJECT_STORE_AWS_ACCESS_KEY_ID=test
export OBJECT_STORE_AWS_SECRET_ACCESS_KEY=test
export AWS_ENDPOINT=http://128.0.0.1:4566
export OBJECT_STORE_BUCKET=test-bucket
```
Expand Down
3 changes: 2 additions & 1 deletion object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,8 @@ mod tests {
assert_eq!(bytes_expected, bytes_written);

// Can overwrite some storage
let data = get_vec_of_bytes(5_000, 5);
// Sizes carefully chosen to exactly hit min limit of 5 MiB
let data = get_vec_of_bytes(242_880, 22);
let bytes_expected = data.concat();
let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
for chunk in &data {
Expand Down
15 changes: 10 additions & 5 deletions object_store/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ where
current_buffer: Vec::new(),
// TODO: Should self vary by provider?
// TODO: Should we automatically increase then when part index gets large?
min_part_size: 5_000_000,

// Minimum size of 5 MiB
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
// https://cloud.google.com/storage/quotas#requests
min_part_size: 5_242_880,
current_part_idx: 0,
completion_task: None,
}
Expand Down Expand Up @@ -113,13 +117,14 @@ where
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
) -> Poll<Result<usize, io::Error>> {
// Poll current tasks
self.as_mut().poll_tasks(cx)?;

// If adding buf to pending buffer would trigger send, check
// whether we have capacity for another task.
let enough_to_send = (buf.len() + self.current_buffer.len()) > self.min_part_size;
let enough_to_send =
(buf.len() + self.current_buffer.len()) >= self.min_part_size;
if enough_to_send && self.tasks.len() < self.max_concurrency {
// If we do, copy into the buffer and submit the task, and return ready.
self.current_buffer.extend_from_slice(buf);
Expand Down Expand Up @@ -149,7 +154,7 @@ where
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
) -> Poll<Result<(), io::Error>> {
// Poll current tasks
self.as_mut().poll_tasks(cx)?;

Expand Down Expand Up @@ -177,7 +182,7 @@ where
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
) -> Poll<Result<(), io::Error>> {
// First, poll flush
match self.as_mut().poll_flush(cx) {
Poll::Pending => return Poll::Pending,
Expand Down

0 comments on commit 8f5fd9a

Please sign in to comment.