Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/IO/S3/PocoHTTPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,32 @@ void PocoHTTPClient::makeRequestInternal(

Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1);

poco_request.setURI(target_uri.getPathAndQuery());
/** Aws::Http::URI will encode URL in appropriate way for AWS S3 server.
* Poco::URI also does that correctly but it's not compatible with AWS.
* For example, `+` symbol will not be converted to `%2B` by Poco and would
* be received as space symbol.
*
* References:
* https://github.com/aws/aws-sdk-java/issues/1946
* https://forums.aws.amazon.com/thread.jspa?threadID=55746
*
* Example:
* Suppose we are requesting a file: abc+def.txt
* To correctly do it, we need to construct an URL containing either:
* - abc%2Bdef.txt
* this is also technically correct:
* - abc+def.txt
* but AWS servers don't support it properly, interpreting plus character as whitespace
* although it is in path part, not in query string.
* e.g. this is not correct:
* - abc%20def.txt
*
* Poco will keep plus character as is (which is correct) while AWS servers will treat it as whitespace, which is not what is intended.
* To overcome this limitation, we encode URL with "Aws::Http::URI" and then pass already prepared URL to Poco.
*/

Aws::Http::URI aws_target_uri(uri);
poco_request.setURI(aws_target_uri.GetPath() + aws_target_uri.GetQueryString());

switch (request.GetMethod())
{
Expand Down
33 changes: 33 additions & 0 deletions tests/integration/test_storage_s3/s3_mock/echo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import http.server
import sys


class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_HEAD(self):
if self.path.startswith("/get-my-path/"):
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()

elif self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()

else:
self.send_response(404)
self.send_header("Content-Type", "text/plain")
self.end_headers()


def do_GET(self):
self.do_HEAD()
if self.path.startswith("/get-my-path/"):
self.wfile.write(b'/' + self.path.split('/', maxsplit=2)[2].encode())

elif self.path == "/":
self.wfile.write(b"OK")


httpd = http.server.HTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()
86 changes: 67 additions & 19 deletions tests/integration/test_storage_s3/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def cluster():

prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
run_s3_mock(cluster)
run_s3_mocks(cluster)

yield cluster
finally:
Expand Down Expand Up @@ -141,6 +141,47 @@ def test_put(cluster, maybe_auth, positive):
assert values_csv == get_s3_file_content(cluster, bucket, filename)


@pytest.mark.parametrize("special", [
"space",
"plus"
])
def test_get_file_with_special(cluster, special):
symbol = {"space": " ", "plus": "+"}[special]
urlsafe_symbol = {"space": "%20", "plus": "%2B"}[special]
auth = "'minio','minio123',"
bucket = cluster.minio_restricted_bucket
instance = cluster.instances["dummy"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [[12549, 2463, 19893], [64021, 38652, 66703], [81611, 39650, 83516], [11079, 59507, 61546], [51764, 69952, 6876], [41165, 90293, 29095], [40167, 78432, 48309], [81629, 81327, 11855], [55852, 21643, 98507], [6738, 54643, 41155]]
values_csv = ('\n'.join((','.join(map(str, row)) for row in values)) + '\n').encode()
filename = f"get_file_with_{special}_{symbol}two.csv"
put_s3_file_content(cluster, bucket, filename, values_csv)

get_query = f"SELECT * FROM s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/get_file_with_{special}_{urlsafe_symbol}two.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values

get_query = f"SELECT * FROM s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/get_file_with_{special}*.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values

get_query = f"SELECT * FROM s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/get_file_with_{special}_{urlsafe_symbol}*.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values


@pytest.mark.parametrize("special", [
"space",
"plus",
"plus2"
])
def test_get_path_with_special(cluster, special):
symbol = {"space": "%20", "plus": "%2B", "plus2": "%2B"}[special]
safe_symbol = {"space": "%20", "plus": "+", "plus2": "%2B"}[special]
auth = "'minio','minio123',"
table_format = "column1 String"
instance = cluster.instances["dummy"]
get_query = f"SELECT * FROM s3('http://resolver:8082/get-my-path/{safe_symbol}.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert run_query(instance, get_query).splitlines() == [f"/{symbol}.csv"]


# Test put no data to S3.
@pytest.mark.parametrize("auth", [
"'minio','minio123',"
Expand Down Expand Up @@ -379,26 +420,33 @@ def add_tales(start, end):
assert run_query(instance, query).splitlines() == ["1001\t1001\t1001\t1001"]


def run_s3_mock(cluster):
logging.info("Starting s3 mock")
container_id = cluster.get_container_id('resolver')
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mock", "mock_s3.py"), "mock_s3.py")
cluster.exec_in_container(container_id, ["python", "mock_s3.py"], detach=True)

# Wait for S3 mock start
for attempt in range(10):
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/"], nothrow=True)
if ping_response != 'OK':
if attempt == 9:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
def run_s3_mocks(cluster):
logging.info("Starting s3 mocks")
mocks = (
("mock_s3.py", "resolver", "8080"),
("unstable_server.py", "resolver", "8081"),
("echo.py", "resolver", "8082"),
)
for mock_filename, container, port in mocks:
container_id = cluster.get_container_id(container)
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mocks", mock_filename), mock_filename)
cluster.exec_in_container(container_id, ["python", mock_filename, port], detach=True)

# Wait for S3 mocks to start
for mock_filename, container, port in mocks:
for attempt in range(10):
ping_response = cluster.exec_in_container(cluster.get_container_id(container),
["curl", "-s", f"http://{container}:{port}/"], nothrow=True)
if ping_response != 'OK':
if attempt == 9:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
else:
time.sleep(1)
else:
time.sleep(1)
else:
break
break

logging.info("S3 mock started")
logging.info("S3 mocks started")


def replace_config(old, new):
Expand Down