Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

instead of skipping, let's prepare the parts and mark it noop #356

Merged
merged 70 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
ac0c9e6
WIP
TingDaoK Sep 19, 2023
43d792b
hook it up with s3
TingDaoK Sep 19, 2023
6c23fd6
io/future.h
TingDaoK Sep 19, 2023
55d025d
fix compile error
TingDaoK Sep 19, 2023
f36e660
fix use after free
TingDaoK Sep 19, 2023
88e36fc
fix tests
TingDaoK Sep 19, 2023
15b1dc4
disable the test for now
TingDaoK Sep 19, 2023
bc350f0
only allow one read per time
TingDaoK Sep 20, 2023
392ee38
close the file handler on success as well
TingDaoK Sep 20, 2023
71caee4
type case
TingDaoK Sep 20, 2023
b5754ff
add a test about mpu from file path
TingDaoK Sep 20, 2023
c7c16ed
Merge branch 'main' into parallel-read
TingDaoK Sep 20, 2023
f75c45a
use another key to avoid override
TingDaoK Sep 20, 2023
771369d
hack to prepare parts in parallel
TingDaoK Sep 20, 2023
15026da
updates
TingDaoK Sep 21, 2023
54401cd
use after free
TingDaoK Sep 21, 2023
7fc8dce
add one more test for multiple meta requests
TingDaoK Sep 22, 2023
0b8c393
const
TingDaoK Sep 22, 2023
2a0626c
use define
TingDaoK Sep 22, 2023
7630d1c
update comments
TingDaoK Sep 22, 2023
e6a4ae1
try to use mmap
TingDaoK Sep 29, 2023
d6d6318
update the length of the buffer
TingDaoK Sep 29, 2023
524c568
unused lable
TingDaoK Sep 29, 2023
2106654
forgot to destroy the string
TingDaoK Sep 29, 2023
8fc1bda
a wrapper around mmap
TingDaoK Oct 2, 2023
6565dba
add windows impl
TingDaoK Oct 2, 2023
04667c6
a bit more error handling
TingDaoK Oct 2, 2023
8b18679
renaming and comments
TingDaoK Oct 2, 2023
c9b56b2
map and unmap
TingDaoK Oct 3, 2023
d74a4ee
let's fix windows later
TingDaoK Oct 3, 2023
e08da93
windows impl
TingDaoK Oct 3, 2023
b1b0a98
fix warning
TingDaoK Oct 3, 2023
4d75985
add a comment about how that the in-page-offset is not needed
TingDaoK Oct 4, 2023
c6a1011
rcbc
TingDaoK Oct 4, 2023
afe61b8
red code best code
TingDaoK Oct 4, 2023
11141c5
clean up
TingDaoK Oct 5, 2023
cd666c7
check for file modified
TingDaoK Oct 5, 2023
c9c3dce
update the use uint64_t for offset
TingDaoK Oct 5, 2023
d4f531f
one missing
TingDaoK Oct 5, 2023
88b5de8
add test for eos
TingDaoK Oct 5, 2023
b500eb7
heap use after free
TingDaoK Oct 5, 2023
84fff59
fix compile issue
TingDaoK Oct 5, 2023
3cfc0ae
instead of skipping, let's prepare the parts and mark it noop
TingDaoK Oct 5, 2023
8651866
stop updating the numbers from list part result
TingDaoK Oct 6, 2023
0b9a4da
Merge branch 'main' into para-pre-rcbc
TingDaoK Oct 6, 2023
f204570
renaming
TingDaoK Oct 6, 2023
5e3c8f5
fix the test count
TingDaoK Oct 6, 2023
8c8a5bb
add two more test with unknown content length set or not
TingDaoK Oct 6, 2023
e2718c7
delete the extra
TingDaoK Oct 6, 2023
68c62e1
Merge branch 'para-pre-rcbc' into para-pre-keep-reading-checksum
TingDaoK Oct 6, 2023
25f0140
oops, forgot to update the object size
TingDaoK Oct 6, 2023
316b8c0
Merge branch 'para-pre-rcbc' into para-pre-keep-reading-checksum
TingDaoK Oct 6, 2023
1461481
add comment
TingDaoK Oct 6, 2023
bca43d2
oops, test file was committed by accident
TingDaoK Oct 6, 2023
1cc2d01
more test
TingDaoK Oct 6, 2023
52d2d94
whatever, it's not a big deal
TingDaoK Oct 6, 2023
540e5fa
Apply suggestions from code review
TingDaoK Oct 8, 2023
9c96251
address comments
TingDaoK Oct 8, 2023
8614ea5
Merge branch 'para-pre-rcbc' of github.com:awslabs/aws-c-s3 into para…
TingDaoK Oct 8, 2023
0401db5
clean up
TingDaoK Oct 9, 2023
0f96e35
comments addressed
TingDaoK Oct 9, 2023
e712ba6
Merge branch 'para-pre-rcbc' into para-pre-keep-reading-checksum
TingDaoK Oct 9, 2023
4914373
Apply suggestions from code review
TingDaoK Oct 9, 2023
68aa652
still needs errno.h
TingDaoK Oct 9, 2023
9b78f79
why it fails??
TingDaoK Oct 10, 2023
00a533c
fix the error
TingDaoK Oct 10, 2023
122ac29
fix compile wraning
TingDaoK Oct 10, 2023
9bcdfda
Merge branch 'para-pre-rcbc' into para-pre-keep-reading-checksum
TingDaoK Oct 12, 2023
e87fdd4
address comments
TingDaoK Oct 12, 2023
80a305c
merge
TingDaoK Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 0 additions & 13 deletions include/aws/s3/private/s3_auto_ranged_put.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,6 @@ struct aws_s3_auto_ranged_put {
uint32_t next_part_number;
} threaded_update_data;

/*
* Should only be used during prepare requests. Note: stream reads must be sequential,
* so prepare currently never runs concurrently with another prepare
*/
struct {
/*
* Start index of skipping parts.
* This is used to keep track of how many parts have been read from input steam and where to try to start
* skipping parts from.
*/
uint32_t part_index_for_skipping;
} prepare_data;

/* Members to only be used when the mutex in the base type is locked. */
struct {
/* Array list of `struct aws_s3_mpu_part_info *`
Expand Down
11 changes: 9 additions & 2 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,12 @@ struct aws_s3_meta_request {
/* Initial HTTP Message that this meta request is based on. */
struct aws_http_message *initial_request_message;

/* Async stream for meta request's body.
* NULL if using initial_request_message's synchronous body stream instead. */
/* The meta request's outgoing body comes from one of these:
* 1) request_body_async_stream: if set, then async stream 1 part at a time
* 2) request_body_parallel_stream: if set, then stream multiple parts in parallel
* 3) initial_request_message's body_stream: else synchronously stream parts */
struct aws_async_input_stream *request_body_async_stream;
struct aws_parallel_input_stream *request_body_parallel_stream;

/* Part size to use for uploads and downloads. Passed down by the creating client. */
const size_t part_size;
Expand Down Expand Up @@ -259,6 +262,7 @@ struct aws_s3_mpu_part_info {
uint64_t size;
struct aws_string *etag;
struct aws_byte_buf checksum_base64;
bool uploaded_before_resume;
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
};

AWS_EXTERN_C_BEGIN
Expand Down Expand Up @@ -363,10 +367,13 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r
* It may read from the underlying stream multiple times, if that's what it takes to fill the buffer.
* Returns a future whose result bool indicates whether end of stream was reached.
* This future may complete on any thread, and may complete synchronously.
*
* Read from offset to fill the buffer
*/
AWS_S3_API
struct aws_future_bool *aws_s3_meta_request_read_body(
struct aws_s3_meta_request *meta_request,
uint64_t offset,
struct aws_byte_buf *buffer);

bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request);
Expand Down
105 changes: 105 additions & 0 deletions include/aws/s3/private/s3_parallel_input_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#ifndef AWS_S3_PARALLEL_INPUT_STREAM_H
#define AWS_S3_PARALLEL_INPUT_STREAM_H

#include <aws/s3/s3.h>

#include <aws/common/ref_count.h>

AWS_PUSH_SANE_WARNING_LEVEL

struct aws_byte_buf;
struct aws_future_bool;
struct aws_input_stream;

struct aws_event_loop_group;

struct aws_parallel_input_stream {
const struct aws_parallel_input_stream_vtable *vtable;
struct aws_allocator *alloc;
struct aws_ref_count ref_count;

void *impl;
};

struct aws_parallel_input_stream_vtable {
/**
* Destroy the stream, its refcount has reached 0.
*/
void (*destroy)(struct aws_parallel_input_stream *stream);

/**
* Read into the buffer in parallel.
* The implementation needs to support this to be invoked concurrently from multiple threads
*/
struct aws_future_bool *(
*read)(struct aws_parallel_input_stream *stream, uint64_t offset, struct aws_byte_buf *dest);
};

AWS_EXTERN_C_BEGIN

/**
* Initialize aws_parallel_input_stream "base class"
*/
AWS_S3_API
void aws_parallel_input_stream_init_base(
struct aws_parallel_input_stream *stream,
struct aws_allocator *alloc,
const struct aws_parallel_input_stream_vtable *vtable,
void *impl);

/**
* Increment reference count.
* You may pass in NULL (has no effect).
* Returns whatever pointer was passed in.
*/
AWS_S3_API
struct aws_parallel_input_stream *aws_parallel_input_stream_acquire(struct aws_parallel_input_stream *stream);

/**
* Decrement reference count.
* You may pass in NULL (has no effect).
* Always returns NULL.
*/
AWS_S3_API
struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_parallel_input_stream *stream);

/**
* Read from the offset until fill the dest, or EOS reached.
* It's thread safe to be called from multiple threads without waiting for other read to complete
*
* @param stream The stream to read from
* @param offset The offset in the stream from beginning to start reading
* @param dest The output buffer read to
* @return a future, which will contain an error code if something went wrong,
* or a result bool indicating whether EOF has been reached.
*/
AWS_S3_API
struct aws_future_bool *aws_parallel_input_stream_read(
struct aws_parallel_input_stream *stream,
uint64_t offset,
struct aws_byte_buf *dest);

/**
* Create a new file based parallel input stream implementation.
*
* This implementation will open a file handler when the read happens, and seek to the offset to start reading. Close
* the file handler as read finishes.
*
* @param allocator memory allocator
* @param file_name The file path to read from
* @return aws_parallel_input_stream
*/
AWS_S3_API
struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
struct aws_allocator *allocator,
struct aws_byte_cursor file_name);

AWS_EXTERN_C_END
AWS_POP_SANE_WARNING_LEVEL

#endif /* AWS_S3_PARALLEL_INPUT_STREAM_H */
3 changes: 3 additions & 0 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ struct aws_s3_request {
* requests for uploading data after the end of the stream (those requests
* will use below flag to indicate that they should not be sent). */
uint32_t is_noop : 1;

/* When true, this request has already been uploaded. we still prepare the request to check the durability. */
uint32_t uploaded_before_resume : 1;
};

AWS_EXTERN_C_BEGIN
Expand Down
7 changes: 0 additions & 7 deletions include/aws/s3/private/s3_request_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_he
size_t excluded_headers_size,
bool exclude_x_amz_meta);

/* Copy message and retain all headers, but replace body with one that reads directly from a filepath. */
AWS_S3_API
struct aws_http_message *aws_s3_message_util_copy_http_message_filepath_body_all_headers(
struct aws_allocator *allocator,
struct aws_http_message *message,
struct aws_byte_cursor filepath);

/* Copy headers from one message to the other and exclude specific headers.
* exclude_x_amz_meta controls whether S3 user metadata headers (prefixed with "x-amz-meta) are excluded.*/
AWS_S3_API
Expand Down
1 change: 1 addition & 0 deletions include/aws/s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum aws_s3_errors {
AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE,
AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH,
AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED,
AWS_ERROR_S3_FILE_MODIFIED,
AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID)
};

Expand Down
1 change: 1 addition & 0 deletions source/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static struct aws_error_info s_errors[] = {
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE, "The metric data is not available, the requests ends before the metric happens."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH, "Request body length must match Content-Length header."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED, "RequestTimeTooSkewed error received from S3."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_FILE_MODIFIED, "The file was modified during upload."),
};
/* clang-format on */

Expand Down