Skip to content
Merged
35 changes: 29 additions & 6 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ class S3Client(NativeResource):
for each connection, unless `tls_mode` is :attr:`S3RequestTlsMode.DISABLED`

part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in.
If not set, a dynamic default part size will be used based on the throughput target, memory_limit_in_bytes.
Note: for :attr:`S3RequestType.PUT_OBJECT` request, client will adjust the part size to meet the service limits.
(max number of parts per upload is 10,000, minimum upload part size is 5 MiB)

Expand All @@ -245,10 +246,11 @@ class S3Client(NativeResource):

memory_limit (Optional[int]): Memory limit, in bytes, of how much memory
client can use for buffering data for requests.
If not set, client will try to pick up from environment variable before chose the default.
Default values scale with target throughput and are currently
between 2GiB and 8GiB (may change in future)
between 2GiB and 24GiB (may change in future)

network_interface_names: (Optional[Sequence(str)])
network_interface_names (Optional[Sequence(str)]):
**THIS IS AN EXPERIMENTAL AND UNSTABLE API.**
A sequence of network interface names. The client will distribute the
connections across network interfaces. If any interface name is invalid, goes down,
Expand All @@ -257,10 +259,14 @@ class S3Client(NativeResource):
is not supported on Windows. `AWS_ERROR_PLATFORM_NOT_SUPPORTED` will be raised on unsupported platforms. On
Linux, SO_BINDTODEVICE is used and requires kernel version >= 5.7 or root privileges.

fio_options: (Optional[S3FileIoOptions])
fio_options (Optional[S3FileIoOptions]):
If set, this controls how the client interact with file I/O.
If not set, a default options will be created to avoid memory issue based on the size of file.
Note: Only applies when the request created with `send_filepath`

max_active_connections_override (Optional[int]):
When set, this will cap the number of active connections for the meta request.
When not set, the client will determine this value based on `throughput_target_gbps`. (Recommended)
"""

__slots__ = ('shutdown_event', '_region')
Expand All @@ -280,7 +286,8 @@ def __init__(
enable_s3express=False,
memory_limit=None,
network_interface_names: Optional[Sequence[str]] = None,
fio_options: Optional['S3FileIoOptions'] = None):
fio_options: Optional['S3FileIoOptions'] = None,
max_active_connections_override: Optional[int] = None):
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
assert isinstance(region, str)
assert isinstance(signing_config, AwsSigningConfig) or signing_config is None
Expand All @@ -295,6 +302,7 @@ def __init__(
assert isinstance(enable_s3express, bool) or enable_s3express is None
assert isinstance(network_interface_names, Sequence) or network_interface_names is None
assert isinstance(fio_options, S3FileIoOptions) or fio_options is None
assert isinstance(max_active_connections_override, int) or max_active_connections_override is None

if credential_provider and signing_config:
raise ValueError("'credential_provider' has been deprecated in favor of 'signing_config'. "
Expand Down Expand Up @@ -334,6 +342,8 @@ def on_shutdown():
# ensure this is a list, so it's simpler to process in C
if not isinstance(network_interface_names, list):
network_interface_names = list(network_interface_names)
if max_active_connections_override is None:
max_active_connections_override = 0
fio_options_set = False
should_stream = False
disk_throughput_gbps = 0.0
Expand Down Expand Up @@ -362,6 +372,7 @@ def on_shutdown():
should_stream,
disk_throughput_gbps,
direct_io,
max_active_connections_override,
s3_client_core)

def make_request(
Expand All @@ -378,6 +389,7 @@ def make_request(
part_size=None,
multipart_upload_threshold=None,
fio_options=None,
max_active_connections_override=None,
on_headers=None,
on_body=None,
on_done=None,
Expand Down Expand Up @@ -445,7 +457,8 @@ def make_request(
checksum_config (Optional[S3ChecksumConfig]): Optional checksum settings.

part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in.
If not set, the part size configured for the client will be used.
If not set, the part size configured for the client will be used, which defaults to a dynamic value based on
the throughput target, memory_limit_in_bytes and the requested object size.
Note: for :attr:`S3RequestType.PUT_OBJECT` request, client will adjust the part size to meet the service limits.
(max number of parts per upload is 10,000, minimum upload part size is 5 MiB)

Expand All @@ -458,11 +471,15 @@ def make_request(
If both `part_size` and `multipart_upload_threshold` are not set,
the values from `aws_s3_client_config` are used.

fio_options: (Optional[S3FileIoOptions])
fio_options (Optional[S3FileIoOptions]):
If set, this overrides the client fio_options to control how this request interact with file I/O.
If not set, a default options will be created to avoid memory issue based on the size of file.
Note: Only applies when the request created with `send_filepath`

max_active_connections_override (Optional[int]):
When set, this will cap the number of active connections for the meta request.
When not set, the client will determine it based on client side settings. (Recommended)

on_headers: Optional callback invoked as the response received, and even the API request
has been split into multiple parts, this callback will only be invoked once as
it's just making one API request to S3.
Expand Down Expand Up @@ -549,6 +566,7 @@ def make_request(
part_size=part_size,
multipart_upload_threshold=multipart_upload_threshold,
fio_options=fio_options,
max_active_connections_override=max_active_connections_override,
on_headers=on_headers,
on_body=on_body,
on_done=on_done,
Expand Down Expand Up @@ -587,6 +605,7 @@ def __init__(
part_size=None,
multipart_upload_threshold=None,
fio_options=None,
max_active_connections_override=None,
on_headers=None,
on_body=None,
on_done=None,
Expand All @@ -600,6 +619,7 @@ def __init__(
assert isinstance(part_size, int) or part_size is None
assert isinstance(multipart_upload_threshold, int) or multipart_upload_threshold is None
assert isinstance(fio_options, S3FileIoOptions) or fio_options is None
assert isinstance(max_active_connections_override, int) or max_active_connections_override is None

if type == S3RequestType.DEFAULT and not operation_name:
raise ValueError("'operation_name' must be set when using S3RequestType.DEFAULT")
Expand Down Expand Up @@ -632,6 +652,8 @@ def __init__(
should_stream = fio_options.should_stream
disk_throughput_gbps = fio_options.disk_throughput_gbps
direct_io = fio_options.direct_io
if max_active_connections_override is None:
max_active_connections_override = 0

s3_request_core = _S3RequestCore(
request,
Expand Down Expand Up @@ -664,6 +686,7 @@ def __init__(
should_stream,
disk_throughput_gbps,
direct_io,
max_active_connections_override,
s3_request_core)

@property
Expand Down
2 changes: 1 addition & 1 deletion crt/aws-c-s3
Submodule aws-c-s3 updated 40 files
+2 −0 .github/workflows/ci.yml
+1 −0 .gitignore
+1 −0 CMakeLists.txt
+45 −8 README.md
+13 −0 include/aws/s3/private/s3_auto_ranged_get.h
+11 −1 include/aws/s3/private/s3_client_impl.h
+70 −0 include/aws/s3/private/s3_default_buffer_pool.h
+23 −0 include/aws/s3/private/s3_meta_request_impl.h
+67 −4 include/aws/s3/private/s3_request.h
+73 −1 include/aws/s3/private/s3_util.h
+53 −0 include/aws/s3/s3_buffer_pool.h
+118 −5 include/aws/s3/s3_client.h
+131 −25 source/s3_auto_ranged_get.c
+47 −32 source/s3_auto_ranged_put.c
+27 −0 source/s3_buffer_pool.c
+180 −42 source/s3_client.c
+379 −129 source/s3_default_buffer_pool.c
+7 −0 source/s3_default_meta_request.c
+63 −61 source/s3_endpoint_resolver/aws_s3_endpoint_resolver_partition.c
+5,218 −4,057 source/s3_endpoint_resolver/aws_s3_endpoint_rule_set.c
+54 −4 source/s3_meta_request.c
+240 −22 source/s3_request.c
+287 −30 source/s3_util.c
+54 −1 tests/CMakeLists.txt
+373 −0 tests/fuzz/fuzz_buffer_pool_special_size.c
+282 −0 tests/fuzz/fuzz_buffer_pool_special_size_keep_pending.c
+70 −0 tests/fuzz/fuzz_extract_parts_from_etag.c
+129 −0 tests/fuzz/fuzz_optimal_range_size.c
+2 −1 tests/mock_s3_server/mock_s3_server.py
+533 −0 tests/s3_buffer_pool_special_size_tests.c
+46 −15 tests/s3_cancel_tests.c
+181 −0 tests/s3_client_memory_limit_env_var_test.c
+75 −0 tests/s3_client_test.c
+537 −36 tests/s3_data_plane_tests.c
+253 −0 tests/s3_max_active_connections_override_test.c
+1 −1 tests/s3_meta_request_test.c
+401 −64 tests/s3_mock_server_tests.c
+14 −7 tests/s3_tester.c
+10 −0 tests/s3_tester.h
+269 −6 tests/s3_util_tests.c
2 changes: 1 addition & 1 deletion crt/aws-lc
2 changes: 1 addition & 1 deletion crt/s2n
Submodule s2n updated from 30f40f to 6aefe7
41 changes: 22 additions & 19 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -245,28 +245,29 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {

struct aws_allocator *allocator = aws_py_get_allocator();

PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
int enable_s3express; /* p */
uint64_t mem_limit; /* K */
PyObject *network_interface_names_py; /* O */
int fio_options_set; /* p - boolean predicate */
int should_stream; /* p - boolean predicate */
double disk_throughput_gbps; /* d */
int direct_io; /* p - boolean predicate */
PyObject *py_core; /* O */
PyObject *bootstrap_py; /* O */
PyObject *signing_config_py; /* O */
PyObject *credential_provider_py; /* O */
PyObject *tls_options_py; /* O */
PyObject *on_shutdown_py; /* O */
struct aws_byte_cursor region; /* s# */
int tls_mode; /* i */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
double throughput_target_gbps; /* d */
int enable_s3express; /* p */
uint64_t mem_limit; /* K */
PyObject *network_interface_names_py; /* O */
int fio_options_set; /* p - boolean predicate */
int should_stream; /* p - boolean predicate */
double disk_throughput_gbps; /* d */
int direct_io; /* p - boolean predicate */
uint64_t max_active_connections_override; /* K */
PyObject *py_core; /* O */

if (!PyArg_ParseTuple(
args,
"OOOOOs#iKKdpKOppdpO",
"OOOOOs#iKKdpKOppdpKO",
&bootstrap_py,
&signing_config_py,
&credential_provider_py,
Expand All @@ -285,6 +286,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
&should_stream,
&disk_throughput_gbps,
&direct_io,
&max_active_connections_override,
&py_core)) {
return NULL;
}
Expand Down Expand Up @@ -397,6 +399,7 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
.num_network_interface_names = num_network_interface_names,
/* If fio options not set, let native code to decide the default instead */
.fio_opts = fio_options_set ? &fio_opts : NULL,
.max_active_connections_override = max_active_connections_override,
};

s3_client->native = aws_s3_client_new(allocator, &s3_config);
Expand Down
5 changes: 4 additions & 1 deletion source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,11 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
int should_stream; /* p - boolean predicate */
double disk_throughput_gbps; /* d */
int direct_io; /* p - boolean predicate */
uint64_t max_active_connections_override; /* K */
PyObject *py_core; /* O */
if (!PyArg_ParseTuple(
args,
"OOOizOOzzs#iipKKppdpO",
"OOOizOOzzs#iipKKppdpKO",
&py_s3_request,
&s3_client_py,
&http_request_py,
Expand All @@ -360,6 +361,7 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
&should_stream,
&disk_throughput_gbps,
&direct_io,
&max_active_connections_override,
&py_core)) {
return NULL;
}
Expand Down Expand Up @@ -441,6 +443,7 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
.multipart_upload_threshold = multipart_upload_threshold,
/* If fio options not set, let native code to decide the default instead */
.fio_opts = fio_options_set ? &fio_opts : NULL,
.max_active_connections_override = max_active_connections_override,
.user_data = meta_request,
};

Expand Down
19 changes: 14 additions & 5 deletions test/test_aiohttp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from test import NativeResourceTest
import ssl
import os
import json
from io import BytesIO
from http.server import HTTPServer, SimpleHTTPRequestHandler
from awscrt import io
Expand Down Expand Up @@ -485,10 +486,10 @@ def setUp(self):
'crt',
'aws-c-http',
'tests',
'py_localhost',
'server.py')
'mock_server',
'h2tls_mock_server.py')
python_path = sys.executable
self.mock_server_url = urlparse("https://localhost:3443/upload_test")
self.mock_server_url = urlparse("https://localhost:3443/echo")
self.p_server = subprocess.Popen([python_path, server_path])
# Wait for server to be ready
self._wait_for_server_ready()
Expand Down Expand Up @@ -572,8 +573,10 @@ async def _test_h2_mock_server_manual_write(self):
# Create an async generator for the request body
body_chunks = [b'hello', b'he123123', b'', b'hello']
total_length = 0
content = b''
for i in body_chunks:
total_length = total_length + len(i)
content += i

async def body_generator():
for i in body_chunks:
Expand All @@ -588,8 +591,14 @@ async def body_generator():
# Check result
self.assertEqual(200, status_code)
self.assertEqual(200, response.status_code)
# mock server response the total length received, check if it matches what we sent
self.assertEqual(total_length, int(response.body.decode()))
# Parse the response from mock server which has format:
# {
# "body": <str of data received>,
# "bytes": <byte count of data received>
# }
parsed_response = json.loads(response.body.decode())
self.assertEqual(total_length, int(parsed_response["bytes"]))
self.assertEqual(content.decode(), parsed_response["body"])
await connection.close()

class DelayStream:
Expand Down
28 changes: 21 additions & 7 deletions test/test_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import threading
from test import NativeResourceTest
import ssl
import json
import os
from io import BytesIO
from http.server import HTTPServer, SimpleHTTPRequestHandler
Expand Down Expand Up @@ -417,10 +418,10 @@ def setUp(self):
'crt',
'aws-c-http',
'tests',
'py_localhost',
'server.py')
'mock_server',
'h2tls_mock_server.py')
python_path = sys.executable
self.mock_server_url = urlparse("https://localhost:3443/upload_test")
self.mock_server_url = urlparse("https://localhost:3443/echo")
self.p_server = subprocess.Popen([python_path, server_path])
# Wait for server to be ready
self._wait_for_server_ready()
Expand Down Expand Up @@ -504,21 +505,34 @@ def test_h2_mock_server_manual_write(self):
stream = connection.request(request, response.on_response, response.on_body, manual_write=True)
stream.activate()
exception = None
body_chunks = [b'hello', b'he123123', b'hello']
total_length = 0
content = b''
for i in body_chunks:
total_length = total_length + len(i)
content += i
try:
# If the stream is not configured to allow manual writes, this should throw an exception directly
f = stream.write_data(BytesIO(b'hello'), False)
f = stream.write_data(BytesIO(body_chunks[0]), False)
f.result(self.timeout)
stream.write_data(BytesIO(b'he123123'), False)
stream.write_data(BytesIO(body_chunks[1]), False)
stream.write_data(None, False)
stream.write_data(BytesIO(b'hello'), True)
stream.write_data(BytesIO(body_chunks[2]), True)
except RuntimeError as e:
exception = e
self.assertIsNone(exception)
stream_completion_result = stream.completion_future.result(80)
# check result
self.assertEqual(200, response.status_code)
self.assertEqual(200, stream_completion_result)
print(response.body)
# Parse the response from mock server which has format:
# {
# "body": <str of data received>,
# "bytes": <byte count of data received>
# }
parsed_response = json.loads(response.body.decode())
self.assertEqual(total_length, int(parsed_response["bytes"]))
self.assertEqual(content.decode(), parsed_response["body"])

self.assertEqual(None, connection.close().exception(self.timeout))

Expand Down
Loading