Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7710,6 +7710,9 @@ Multiple algorithms can be specified, e.g. 'dpsize,greedy'.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_database_paimon_rest_catalog, false, R"(
Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_rest'
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_database_s3_tables, false, R"(
Allow experimental database engine DataLakeCatalog with catalog_type = 's3tables' (Amazon S3 Tables Iceberg REST with SigV4)
)", EXPERIMENTAL) \
\
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"optimize_read_in_window_order", true, false, "Disable this logic by default."},
{"correlated_subqueries_use_in_memory_buffer", false, true, "Use in-memory buffer for input of correlated subqueries by default."},
{"allow_experimental_database_paimon_rest_catalog", false, false, "New setting"},
{"allow_experimental_database_s3_tables", false, false, "New setting"},
{"allow_experimental_object_storage_queue_hive_partitioning", false, false, "New setting."},
{"type_json_use_partial_match_to_skip_paths_by_regexp", false, true, "Add new setting that allows to use partial match in regexp paths skip in JSON type parsing"},
{"max_insert_block_size_bytes", 0, 0, "New setting that allows to control the size of blocks in bytes during parsing of data in Row Input Format."},
Expand Down
3 changes: 2 additions & 1 deletion src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ IMPLEMENT_SETTING_ENUM(
{"hive", DatabaseDataLakeCatalogType::ICEBERG_HIVE},
{"onelake", DatabaseDataLakeCatalogType::ICEBERG_ONELAKE},
{"biglake", DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE},
{"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}})
{"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST},
{"s3tables", DatabaseDataLakeCatalogType::S3_TABLES}})

IMPLEMENT_SETTING_ENUM(
FileCachePolicy,
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ enum class DatabaseDataLakeCatalogType : uint8_t
ICEBERG_ONELAKE,
ICEBERG_BIGLAKE,
PAIMON_REST,
S3_TABLES,
};

DECLARE_SETTING_ENUM(DatabaseDataLakeCatalogType)
Expand Down
110 changes: 110 additions & 0 deletions src/Databases/DataLake/AWSV4Signer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "config.h"

#if USE_AVRO && USE_SSL && USE_AWS_S3

#include <Databases/DataLake/AWSV4Signer.h>

#include <Common/Exception.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/String.h>

#include <aws/core/auth/signer/AWSAuthV4Signer.h>
#include <aws/core/http/standard/StandardHttpRequest.h>
#include <aws/core/http/URI.h>
#include <aws/core/utils/memory/AWSMemory.h>

#include <sstream>
#include <utility>

namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
}

namespace DataLake
{
namespace
{

Aws::Http::HttpMethod mapPocoMethodToAws(const String & method)
{
using Aws::Http::HttpMethod;
using Poco::Net::HTTPRequest;

static const std::pair<String, HttpMethod> supported_methods[] = {
{HTTPRequest::HTTP_GET, HttpMethod::HTTP_GET},
{HTTPRequest::HTTP_POST, HttpMethod::HTTP_POST},
{HTTPRequest::HTTP_PUT, HttpMethod::HTTP_PUT},
{HTTPRequest::HTTP_DELETE, HttpMethod::HTTP_DELETE},
{HTTPRequest::HTTP_HEAD, HttpMethod::HTTP_HEAD},
{HTTPRequest::HTTP_PATCH, HttpMethod::HTTP_PATCH},
};

for (const auto & [poco_method, aws_method] : supported_methods)
if (method == poco_method)
return aws_method;

throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported HTTP method for AWS SigV4 signing: {}", method);
}

}

void signRequestWithAWSV4(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Perhaps name it like signHeaders as the output of this method is a list of headers and not a request?

const String & method,
const Poco::URI & uri,
const DB::HTTPHeaderEntries & extra_headers,
const String & payload,
Aws::Client::AWSAuthV4Signer & signer,
const String & region,
const String & service,
DB::HTTPHeaderEntries & out_headers)
{
const Aws::Http::URI aws_uri(uri.toString().c_str());
Aws::Http::Standard::StandardHttpRequest request(aws_uri, mapPocoMethodToAws(method));

for (const auto & h : extra_headers)
{
if (Poco::icompare(h.name, "authorization") == 0)
continue;
request.SetHeaderValue(Aws::String(h.name.c_str(), h.name.size()), Aws::String(h.value.c_str(), h.value.size()));
}

if (!payload.empty())
{
auto body_stream = Aws::MakeShared<std::stringstream>("AWSV4Signer");
body_stream->write(payload.data(), static_cast<std::streamsize>(payload.size()));
body_stream->seekg(0);
request.AddContentBody(body_stream);
}

static constexpr bool sign_body = true;
if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "AWS SigV4 signing failed");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOGICAL_ERROR will crash ClickHouse, it should be something else


bool has_authorization = false;
for (const auto & [key, value] : request.GetHeaders())
{
if (Poco::icompare(key, "authorization") == 0 && !value.empty())
has_authorization = true;
}
if (!has_authorization)
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS,
"AWS credentials are missing or incomplete; cannot sign S3 Tables REST request");

out_headers.clear();
for (const auto & [key, value] : request.GetHeaders())
{
if (Poco::icompare(key, "host") == 0)
continue;
out_headers.emplace_back(String(key.c_str(), key.size()), String(value.c_str(), value.size()));
}
}

}

#endif
34 changes: 34 additions & 0 deletions src/Databases/DataLake/AWSV4Signer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "config.h"

#if USE_AVRO && USE_SSL && USE_AWS_S3

#include <Core/Types.h>
#include <IO/HTTPHeaderEntries.h>
#include <Poco/URI.h>

namespace Aws::Client
{
class AWSAuthV4Signer;
}

namespace DataLake
{

/// Sign a Poco-style HTTP request using the AWS SDK's AWSAuthV4Signer.
/// Builds a temporary Aws::Http::StandardHttpRequest, signs it, then extracts
/// the resulting headers into out_headers (excluding Host; ReadWriteBufferFromHTTP sets it from the URI).
void signRequestWithAWSV4(
const String & method,
const Poco::URI & uri,
const DB::HTTPHeaderEntries & extra_headers,
const String & payload,
Aws::Client::AWSAuthV4Signer & signer,
const String & region,
const String & service,
DB::HTTPHeaderEntries & out_headers);

}

#endif
45 changes: 41 additions & 4 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include <Databases/DataLake/RestCatalog.h>
#include <Databases/DataLake/GlueCatalog.h>
#include <Databases/DataLake/PaimonRestCatalog.h>
#if USE_AWS_S3 && USE_SSL
#include <Databases/DataLake/S3TablesCatalog.h>
#endif
#include <DataTypes/DataTypeString.h>

#include <Storages/ObjectStorage/S3/Configuration.h>
Expand Down Expand Up @@ -90,6 +93,7 @@ namespace Setting
extern const SettingsBool allow_experimental_database_glue_catalog;
extern const SettingsBool allow_experimental_database_hms_catalog;
extern const SettingsBool allow_experimental_database_paimon_rest_catalog;
extern const SettingsBool allow_experimental_database_s3_tables;
extern const SettingsBool use_hive_partitioning;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString cluster_for_parallel_replicas;
Expand Down Expand Up @@ -137,11 +141,12 @@ DatabaseDataLake::DatabaseDataLake(

void DatabaseDataLake::validateSettings()
{
if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::GLUE)
if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::GLUE
|| settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Enforce non-empty warehouse for s3tables

validateSettings() now groups S3_TABLES with Glue and skips the warehouse check, but this catalog still relies on warehouse for loadConfig() (/v1/config?warehouse=...) and for deriving config.prefix when the server does not return one; allowing an empty warehouse defers this to runtime failures on namespace/table requests instead of rejecting invalid configuration at CREATE DATABASE time.

Useful? React with 👍 / 👎.

{
if (settings[DatabaseDataLakeSetting::region].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue Catalog. "
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue or S3 Tables catalog. "
"Please specify 'SETTINGS region=<region_name>' in the CREATE DATABASE query");
}
else if (settings[DatabaseDataLakeSetting::warehouse].value.empty())
Expand Down Expand Up @@ -336,6 +341,23 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
}
break;
}
case DB::DatabaseDataLakeCatalogType::S3_TABLES:
{
#if USE_AWS_S3 && USE_SSL
catalog_impl = std::make_shared<DataLake::S3TablesCatalog>(
settings[DatabaseDataLakeSetting::warehouse].value,
url,
settings[DatabaseDataLakeSetting::region].value,
catalog_parameters,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
#else
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Amazon S3 Tables catalog requires ClickHouse built with USE_AWS_S3 and USE_SSL");
#endif
break;
}
}
return catalog_impl;
}
Expand Down Expand Up @@ -368,6 +390,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
case DatabaseDataLakeCatalogType::ICEBERG_HIVE:
case DatabaseDataLakeCatalogType::ICEBERG_REST:
case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE:
case DatabaseDataLakeCatalogType::S3_TABLES:
{
switch (type)
{
Expand Down Expand Up @@ -962,9 +985,10 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
}

if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST
&& catalog_type != DatabaseDataLakeCatalogType::S3_TABLES)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must use `rest` or `s3tables` catalog type only");
}

for (auto & engine_arg : engine_args)
Expand Down Expand Up @@ -1050,6 +1074,19 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
engine_func->name = "Paimon";
break;
}
case DatabaseDataLakeCatalogType::S3_TABLES:
{
if (!args.create_query.attach
&& !args.context->getSettingsRef()[Setting::allow_experimental_database_s3_tables])
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"DatabaseDataLake with S3 Tables catalog is experimental. "
"To allow its usage, enable setting allow_experimental_database_s3_tables");
}

engine_func->name = "Iceberg";
break;
}
case DatabaseDataLakeCatalogType::NONE:
break;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ bool TableMetadata::hasStorageCredentials() const
return storage_credentials != nullptr;
}

bool TableMetadata::hasDataLakeSpecificProperties() const
{
return data_lake_specific_metadata.has_value();
}

std::string TableMetadata::getMetadataLocation(const std::string & iceberg_metadata_file_location) const
{
std::string metadata_location = iceberg_metadata_file_location;
Expand Down
4 changes: 2 additions & 2 deletions src/Databases/DataLake/RestCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class RestCatalog : public ICatalog, public DB::WithContext

Poco::Net::HTTPBasicCredentials credentials{};

DB::ReadWriteBufferFromHTTPPtr createReadBuffer(
virtual DB::ReadWriteBufferFromHTTPPtr createReadBuffer(
const std::string & endpoint,
const Poco::URI::QueryParameters & params = {},
const DB::HTTPHeaderEntries & headers = {}) const;
Expand Down Expand Up @@ -183,7 +183,7 @@ class RestCatalog : public ICatalog, public DB::WithContext
AccessToken retrieveAccessTokenOAuth() const;
static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result);

void sendRequest(
virtual void sendRequest(
const String & endpoint,
Poco::JSON::Object::Ptr request_body,
const String & method = Poco::Net::HTTPRequest::HTTP_POST,
Expand Down
Loading
Loading