Skip to content

Commit

Permalink
Merge pull request #54931 from yariks5s/s3_style_url
Browse files Browse the repository at this point in the history
s3-style URL fix
  • Loading branch information
alexey-milovidov committed Sep 29, 2023
2 parents 2ce9251 + 2b95d13 commit 8f9a227
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 1 deletion.
22 changes: 22 additions & 0 deletions docs/en/sql-reference/table-functions/s3.md
Expand Up @@ -162,6 +162,28 @@ The below get data from all `test-data.csv.gz` files from any folder inside `my-
SELECT * FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');
```

Note. It is possible to specify custom URL mappers in the server configuration file. Example:
``` sql
SELECT * FROM s3('s3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip');
```
The URL `'s3://clickhouse-public-datasets/my-test-bucket-768/**/test-data.csv.gz'` would be replaced to `'http://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/**/test-data.csv.gz'`


Custom mapper can be added into `config.xml`:
``` xml
<url_scheme_mappers>
<s3>
<to>https://{bucket}.s3.amazonaws.com</to>
</s3>
<gs>
<to>https://{bucket}.storage.googleapis.com</to>
</gs>
<oss>
<to>https://{bucket}.oss.aliyuncs.com</to>
</oss>
</url_scheme_mappers>
```

## Partitioned Write

If you specify `PARTITION BY` expression when inserting data into `S3` table, a separate file is created for each partition value. Splitting the data into separate files helps to improve reading operations efficiency.
Expand Down
12 changes: 12 additions & 0 deletions programs/server/config.xml
Expand Up @@ -91,6 +91,18 @@
</formatting> -->
</logger>

<url_scheme_mappers>
<s3>
<to>https://{bucket}.s3.amazonaws.com</to>
</s3>
<gs>
<to>https://{bucket}.storage.googleapis.com</to>
</gs>
<oss>
<to>https://{bucket}.oss.aliyuncs.com</to>
</oss>
</url_scheme_mappers>

<!-- Add headers to response in options request. OPTIONS method is used in CORS preflight requests. -->
<!-- It is off by default. Next headers are obligate for CORS.-->
<!-- http_options_response>
Expand Down
7 changes: 7 additions & 0 deletions src/Common/Macros.cpp
@@ -1,3 +1,5 @@
#include <algorithm>
#include <unordered_map>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Macros.h>
#include <Common/Exception.h>
Expand Down Expand Up @@ -36,6 +38,11 @@ Macros::Macros(const Poco::Util::AbstractConfiguration & config, const String &
}
}

Macros::Macros(std::map<String, String> map)
{
macros = std::move(map);
}

String Macros::expand(const String & s,
MacroExpansionInfo & info) const
{
Expand Down
1 change: 1 addition & 0 deletions src/Common/Macros.h
Expand Up @@ -27,6 +27,7 @@ class Macros
public:
Macros() = default;
Macros(const Poco::Util::AbstractConfiguration & config, const String & key, Poco::Logger * log = nullptr);
explicit Macros(std::map<String, String> map);

struct MacroExpansionInfo
{
Expand Down
37 changes: 36 additions & 1 deletion src/IO/S3/URI.cpp
@@ -1,5 +1,8 @@
#include <IO/S3/URI.h>

#include <Poco/URI.h>
#include "Common/Macros.h"
#include <Interpreters/Context.h>
#include <Storages/NamedCollectionsHelpers.h>
#if USE_AWS_S3
#include <Common/Exception.h>
#include <Common/quoteString.h>
Expand All @@ -18,6 +21,15 @@
namespace DB
{

struct URIConverter
{
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper)
{
Macros macros({{"bucket", uri.getHost()}});
uri = macros.expand(mapper[uri.getScheme()]).empty()? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + "/" + uri.getPathAndQuery());
}
};

namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
Expand Down Expand Up @@ -46,6 +58,29 @@ URI::URI(const std::string & uri_)

uri = Poco::URI(uri_);

std::unordered_map<std::string, std::string> mapper;
auto context = Context::getGlobalContextInstance();
if (context)
{
const auto *config = &context->getConfigRef();
if (config->has("url_scheme_mappers"))
{
std::vector<String> config_keys;
config->keys("url_scheme_mappers", config_keys);
for (const std::string & config_key : config_keys)
mapper[config_key] = config->getString("url_scheme_mappers." + config_key + ".to");
}
else
{
mapper["s3"] = "https://{bucket}.s3.amazonaws.com";
mapper["gs"] = "https://{bucket}.storage.googleapis.com";
mapper["oss"] = "https://{bucket}.oss.aliyuncs.com";
}

if (!mapper.empty())
URIConverter::modifyURI(uri, mapper);
}

storage_name = S3;

if (uri.getHost().empty())
Expand Down
Empty file.
10 changes: 10 additions & 0 deletions tests/integration/test_s3_style_link/configs/config.d/minio.xml
@@ -0,0 +1,10 @@
<?xml version="1.0"?>

<!-- Using named collections 22.4+ -->
<clickhouse>
<url_scheme_mappers>
<minio>
<to>http://minio1:9001/root/{bucket}</to>
</minio>
</url_scheme_mappers>
</clickhouse>
@@ -0,0 +1,9 @@
<clickhouse>
<users>
<default>
<password></password>
<profile>default</profile>
<named_collection_control>1</named_collection_control>
</default>
</users>
</clickhouse>
57 changes: 57 additions & 0 deletions tests/integration/test_s3_style_link/test.py
@@ -0,0 +1,57 @@
import logging
import pytest
from helpers.cluster import ClickHouseCluster


cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=[
"configs/config.d/minio.xml",
],
user_configs=[
"configs/users.d/users.xml",
],
with_minio=True,
)


@pytest.fixture(scope="module")
def started_cluster():
try:
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")

yield cluster
finally:
logging.info("Stopping cluster")
cluster.shutdown()
logging.info("Cluster stopped")


def test_s3_table_functions(started_cluster):
"""
Simple test to check s3 table function functionalities
"""
node.query(
"""
INSERT INTO FUNCTION s3
(
'minio://data/test_file.tsv.gz', 'minio', 'minio123'
)
SELECT * FROM numbers(1000000);
"""
)

assert (
node.query(
"""
SELECT count(*) FROM s3
(
'minio://data/test_file.tsv.gz', 'minio', 'minio123'
);
"""
)
== "1000000\n"
)

0 comments on commit 8f9a227

Please sign in to comment.