Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3-style URL fix #54931

Merged
merged 18 commits into from Sep 29, 2023
22 changes: 22 additions & 0 deletions docs/en/sql-reference/table-functions/s3.md
Expand Up @@ -162,6 +162,28 @@ The below get data from all `test-data.csv.gz` files from any folder inside `my-
SELECT * FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');
```

Note. It is possible to specify custom URL mappers in the server configuration file. Example:
``` sql
SELECT * FROM s3('s3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');
```
The URL `'s3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz'` would be replaced to `'http://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz'`


Custom mapper can be added into `config.xml`:
``` xml
<url_scheme_mappers>
<s3>
<to>https://{bucket}.s3.amazonaws.com</to>
</s3>
<gs>
<to>https://{bucket}.storage.googleapis.com</to>
</gs>
<oss>
<to>https://{bucket}.oss.aliyuncs.com</to>
</oss>
</url_scheme_mappers>
```

## Partitioned Write

If you specify `PARTITION BY` expression when inserting data into `S3` table, a separate file is created for each partition value. Splitting the data into separate files helps to improve reading operations efficiency.
Expand Down
12 changes: 12 additions & 0 deletions programs/server/config.xml
Expand Up @@ -91,6 +91,18 @@
</formatting> -->
</logger>

<url_scheme_mappers>
<s3>
<to>https://{bucket}.s3.amazonaws.com</to>
</s3>
<gs>
<to>https://{bucket}.storage.googleapis.com</to>
</gs>
<oss>
<to>https://{bucket}.oss.aliyuncs.com</to>
</oss>
</url_scheme_mappers>

<!-- Add headers to response in options request. OPTIONS method is used in CORS preflight requests. -->
<!-- It is off by default. Next headers are obligate for CORS.-->
<!-- http_options_response>
Expand Down
7 changes: 7 additions & 0 deletions src/Common/Macros.cpp
@@ -1,3 +1,5 @@
#include <algorithm>
#include <unordered_map>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Macros.h>
#include <Common/Exception.h>
Expand Down Expand Up @@ -36,6 +38,11 @@ Macros::Macros(const Poco::Util::AbstractConfiguration & config, const String &
}
}

Macros::Macros(std::map<String, String> map)
{
macros = std::move(map);
}

String Macros::expand(const String & s,
MacroExpansionInfo & info) const
{
Expand Down
1 change: 1 addition & 0 deletions src/Common/Macros.h
Expand Up @@ -27,6 +27,7 @@ class Macros
public:
Macros() = default;
Macros(const Poco::Util::AbstractConfiguration & config, const String & key, Poco::Logger * log = nullptr);
explicit Macros(std::map<String, String> map);

struct MacroExpansionInfo
{
Expand Down
37 changes: 36 additions & 1 deletion src/IO/S3/URI.cpp
@@ -1,5 +1,8 @@
#include <IO/S3/URI.h>

#include <Poco/URI.h>
#include "Common/Macros.h"
#include <Interpreters/Context.h>
#include <Storages/NamedCollectionsHelpers.h>
#if USE_AWS_S3
#include <Common/Exception.h>
#include <Common/quoteString.h>
Expand All @@ -18,6 +21,15 @@
namespace DB
{

struct URIConverter
{
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper)
{
Macros macros({{"bucket", uri.getHost()}});
uri = macros.expand(mapper[uri.getScheme()]).empty()? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + "/" + uri.getPathAndQuery());
}
};

namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
Expand Down Expand Up @@ -46,6 +58,29 @@ URI::URI(const std::string & uri_)

uri = Poco::URI(uri_);

std::unordered_map<std::string, std::string> mapper;
auto context = Context::getGlobalContextInstance();
if (context)
{
const auto *config = &context->getConfigRef();
if (config->has("url_scheme_mappers"))
{
std::vector<String> config_keys;
config->keys("url_scheme_mappers", config_keys);
for (const std::string & config_key : config_keys)
mapper[config_key] = config->getString("url_scheme_mappers." + config_key + ".to");
}
else
{
mapper["s3"] = "https://{bucket}.s3.amazonaws.com";
mapper["gs"] = "https://{bucket}.storage.googleapis.com";
mapper["oss"] = "https://{bucket}.oss.aliyuncs.com";
}

if (!mapper.empty())
URIConverter::modifyURI(uri, mapper);
}

storage_name = S3;

if (uri.getHost().empty())
Expand Down
Empty file.
10 changes: 10 additions & 0 deletions tests/integration/test_s3_style_link/configs/config.d/minio.xml
@@ -0,0 +1,10 @@
<?xml version="1.0"?>

<!-- Using named collections 22.4+ -->
<clickhouse>
<url_scheme_mappers>
<minio>
<to>http://minio1:9001/root/{bucket}</to>
</minio>
</url_scheme_mappers>
</clickhouse>
@@ -0,0 +1,9 @@
<clickhouse>
<users>
<default>
<password></password>
<profile>default</profile>
<named_collection_control>1</named_collection_control>
</default>
</users>
</clickhouse>
57 changes: 57 additions & 0 deletions tests/integration/test_s3_style_link/test.py
@@ -0,0 +1,57 @@
import logging
import pytest
from helpers.cluster import ClickHouseCluster


cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=[
"configs/config.d/minio.xml",
],
user_configs=[
"configs/users.d/users.xml",
],
with_minio=True,
)


@pytest.fixture(scope="module")
def started_cluster():
try:
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")

yield cluster
finally:
logging.info("Stopping cluster")
cluster.shutdown()
logging.info("Cluster stopped")


def test_s3_table_functions(started_cluster):
"""
Simple test to check s3 table function functionalities
"""
node.query(
"""
INSERT INTO FUNCTION s3
(
'minio://data/test_file.tsv.gz', 'minio', 'minio123'
)
SELECT * FROM numbers(1000000);
"""
)

assert (
node.query(
"""
SELECT count(*) FROM s3
(
'minio://data/test_file.tsv.gz', 'minio', 'minio123'
);
"""
)
== "1000000\n"
)