diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index 7d0774bd1f05..0e49bd7a9a87 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -176,7 +176,32 @@ void PocoHTTPClient::makeRequestInternal( Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1); - poco_request.setURI(target_uri.getPathAndQuery()); + /** Aws::Http::URI will encode URL in appropriate way for AWS S3 server. + * Poco::URI also does that correctly but it's not compatible with AWS. + * For example, `+` symbol will not be converted to `%2B` by Poco and would + * be received as space symbol. + * + * References: + * https://github.com/aws/aws-sdk-java/issues/1946 + * https://forums.aws.amazon.com/thread.jspa?threadID=55746 + * + * Example: + * Suppose we are requesting a file: abc+def.txt + * To correctly do it, we need to construct an URL containing either: + * - abc%2Bdef.txt + * this is also technically correct: + * - abc+def.txt + * but AWS servers don't support it properly, interpreting plus character as whitespace + * although it is in path part, not in query string. + * e.g. this is not correct: + * - abc%20def.txt + * + * Poco will keep plus character as is (which is correct) while AWS servers will treat it as whitespace, which is not what is intended. + * To overcome this limitation, we encode URL with "Aws::Http::URI" and then pass already prepared URL to Poco. + */ + + Aws::Http::URI aws_target_uri(uri); + poco_request.setURI(aws_target_uri.GetPath() + aws_target_uri.GetQueryString()); switch (request.GetMethod()) { diff --git a/tests/integration/test_storage_s3/s3_mock/echo.py b/tests/integration/test_storage_s3/s3_mock/echo.py new file mode 100644 index 000000000000..ced84e54d62b --- /dev/null +++ b/tests/integration/test_storage_s3/s3_mock/echo.py @@ -0,0 +1,33 @@ +import http.server +import sys + + +class RequestHandler(http.server.BaseHTTPRequestHandler): + def do_HEAD(self): + if self.path.startswith("/get-my-path/"): + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.end_headers() + + elif self.path == "/": + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.end_headers() + + else: + self.send_response(404) + self.send_header("Content-Type", "text/plain") + self.end_headers() + + + def do_GET(self): + self.do_HEAD() + if self.path.startswith("/get-my-path/"): + self.wfile.write(b'/' + self.path.split('/', maxsplit=2)[2].encode()) + + elif self.path == "/": + self.wfile.write(b"OK") + + +httpd = http.server.HTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler) +httpd.serve_forever() diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index 8baa1cd64b0f..60c7ae20fbcd 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -96,7 +96,7 @@ def cluster(): prepare_s3_bucket(cluster) logging.info("S3 bucket created") - run_s3_mock(cluster) + run_s3_mocks(cluster) yield cluster finally: @@ -141,6 +141,47 @@ def test_put(cluster, maybe_auth, positive): assert values_csv == get_s3_file_content(cluster, bucket, filename) +@pytest.mark.parametrize("special", [ + "space", + "plus" +]) +def test_get_file_with_special(cluster, special): + symbol = {"space": " ", "plus": "+"}[special] + urlsafe_symbol = {"space": "%20", "plus": "%2B"}[special] + auth = "'minio','minio123'," + bucket = cluster.minio_restricted_bucket + instance = cluster.instances["dummy"] + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + values = [[12549, 2463, 19893], [64021, 38652, 66703], [81611, 39650, 83516], [11079, 59507, 61546], [51764, 69952, 6876], [41165, 90293, 29095], [40167, 78432, 48309], [81629, 81327, 11855], [55852, 21643, 98507], [6738, 54643, 41155]] + values_csv = ('\n'.join((','.join(map(str, row)) for row in values)) + '\n').encode() + filename = f"get_file_with_{special}_{symbol}two.csv" + put_s3_file_content(cluster, bucket, filename, values_csv) + + get_query = f"SELECT * FROM s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/get_file_with_{special}_{urlsafe_symbol}two.csv', {auth}'CSV', '{table_format}') FORMAT TSV" + assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values + + get_query = f"SELECT * FROM s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/get_file_with_{special}*.csv', {auth}'CSV', '{table_format}') FORMAT TSV" + assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values + + get_query = f"SELECT * FROM s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/get_file_with_{special}_{urlsafe_symbol}*.csv', {auth}'CSV', '{table_format}') FORMAT TSV" + assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values + + +@pytest.mark.parametrize("special", [ + "space", + "plus", + "plus2" +]) +def test_get_path_with_special(cluster, special): + symbol = {"space": "%20", "plus": "%2B", "plus2": "%2B"}[special] + safe_symbol = {"space": "%20", "plus": "+", "plus2": "%2B"}[special] + auth = "'minio','minio123'," + table_format = "column1 String" + instance = cluster.instances["dummy"] + get_query = f"SELECT * FROM s3('http://resolver:8082/get-my-path/{safe_symbol}.csv', {auth}'CSV', '{table_format}') FORMAT TSV" + assert run_query(instance, get_query).splitlines() == [f"/{symbol}.csv"] + + # Test put no data to S3. @pytest.mark.parametrize("auth", [ "'minio','minio123'," @@ -379,26 +420,33 @@ def add_tales(start, end): assert run_query(instance, query).splitlines() == ["1001\t1001\t1001\t1001"] -def run_s3_mock(cluster): - logging.info("Starting s3 mock") - container_id = cluster.get_container_id('resolver') - current_dir = os.path.dirname(__file__) - cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mock", "mock_s3.py"), "mock_s3.py") - cluster.exec_in_container(container_id, ["python", "mock_s3.py"], detach=True) - - # Wait for S3 mock start - for attempt in range(10): - ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'), - ["curl", "-s", "http://resolver:8080/"], nothrow=True) - if ping_response != 'OK': - if attempt == 9: - assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response) +def run_s3_mocks(cluster): + logging.info("Starting s3 mocks") + mocks = ( + ("mock_s3.py", "resolver", "8080"), + ("unstable_server.py", "resolver", "8081"), + ("echo.py", "resolver", "8082"), + ) + for mock_filename, container, port in mocks: + container_id = cluster.get_container_id(container) + current_dir = os.path.dirname(__file__) + cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mocks", mock_filename), mock_filename) + cluster.exec_in_container(container_id, ["python", mock_filename, port], detach=True) + + # Wait for S3 mocks to start + for mock_filename, container, port in mocks: + for attempt in range(10): + ping_response = cluster.exec_in_container(cluster.get_container_id(container), + ["curl", "-s", f"http://{container}:{port}/"], nothrow=True) + if ping_response != 'OK': + if attempt == 9: + assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response) + else: + time.sleep(1) else: - time.sleep(1) - else: - break + break - logging.info("S3 mock started") + logging.info("S3 mocks started") def replace_config(old, new):