Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6880,13 +6880,17 @@ Possible values:
- '' - do not force any kind of Exchange operators, let the optimizer choose,
- 'Persisted' - use temporary files in object storage,
- 'Streaming' - stream exchange data over network.
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
\
/** Experimental timeSeries* aggregate functions. */ \
DECLARE_WITH_ALIAS(Bool, allow_experimental_time_series_aggregate_functions, false, R"(
Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation.
)", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \
\

/* ####################################################### */ \
/* ############ END OF EXPERIMENTAL FEATURES ############# */ \
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.6",
{
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/DataLakeConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace DataLake
{

static constexpr auto DATABASE_ENGINE_NAME = "DataLakeCatalog";
static constexpr auto DATABASE_ALIAS_NAME = "Iceberg";
static constexpr std::string_view FILE_PATH_PREFIX = "file:/";

/// Some catalogs (Unity or Glue) may store not only Iceberg/DeltaLake tables but other kinds of "tables"
Expand Down
6 changes: 6 additions & 0 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,11 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
}

if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
}

for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);

Expand Down Expand Up @@ -724,6 +729,7 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
std::move(engine_for_tables));
};
factory.registerDatabase("DataLakeCatalog", create_fn, { .supports_arguments = true, .supports_settings = true });
factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true });
}

}
Expand Down
18 changes: 12 additions & 6 deletions src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,19 @@ void TableMetadata::setLocation(const std::string & location_)
auto pos_to_path = location_.substr(pos_to_bucket).find('/');

if (pos_to_path == std::string::npos)
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);

pos_to_path = pos_to_bucket + pos_to_path;
{ // empty path
location_without_path = location_;
path.clear();
bucket = location_.substr(pos_to_bucket);
}
else
{
pos_to_path = pos_to_bucket + pos_to_path;

location_without_path = location_.substr(0, pos_to_path);
path = location_.substr(pos_to_path + 1);
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
location_without_path = location_.substr(0, pos_to_path);
path = location_.substr(pos_to_path + 1);
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
}

LOG_TEST(getLogger("TableMetadata"),
"Parsed location without path: {}, path: {}",
Expand Down
6 changes: 6 additions & 0 deletions src/IO/ReadBufferFromS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
}
else
{
LOG_TEST(
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}",
bucket, key, version_id.empty() ? "Latest" : version_id);
}

ProfileEvents::increment(ProfileEvents::S3GetObject);
if (client_ptr->isClientForDisk())
Expand Down
14 changes: 8 additions & 6 deletions src/IO/S3/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
auto bucket_uri = getURIForBucket(bucket);
if (!bucket_uri)
{
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
return *maybe_error;

if (auto region = getRegionForBucket(bucket); !region.empty())
Expand Down Expand Up @@ -589,7 +589,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));


bool found_new_endpoint = false;
// if we found correct endpoint after 301 responses, update the cache for future requests
SCOPE_EXIT(
Expand Down Expand Up @@ -869,12 +868,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
}

// Do a list request because head requests don't have body in response
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
{
ListObjectsV2Request req;
GetObjectRequest req;
req.SetBucket(bucket);
req.SetMaxKeys(1);
auto result = ListObjectsV2(req);
req.SetKey(key);
req.SetRange("bytes=0-1");
auto result = GetObject(req);

if (result.IsSuccess())
return std::nullopt;
return result.GetError();
Expand Down
2 changes: 1 addition & 1 deletion src/IO/S3/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class Client : private Aws::S3::S3Client

void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;

std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;

Expand Down
62 changes: 62 additions & 0 deletions src/IO/S3/URI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
validateKey(key, uri);
}

bool URI::isAWSRegion(std::string_view region)
{
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
static const std::unordered_set<std::string_view> regions = {
"us-east-2",
"us-east-1",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-2",
"ap-southeast-3",
"ap-southeast-5",
"ap-southeast-4",
"ap-south-1",
"ap-northeast-3",
"ap-northeast-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-east-2",
"ap-southeast-7",
"ap-northeast-1",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-west-1",
"eu-west-2",
"eu-south-1",
"eu-west-3",
"eu-south-2",
"eu-north-1",
"eu-central-2",
"il-central-1",
"mx-central-1",
"me-south-1",
"me-central-1",
"sa-east-1",
"us-gov-east-1",
"us-gov-west-1"
};

/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
if (region.substr(0, 3) == "s3-")
region = region.substr(3);

return regions.contains(region);
}

void URI::addRegionToURI(const std::string &region)
{
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
{
if (pos > 0)
{ /// Check if region is already in endpoint to avoid add it second time
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
if (prev_pos == std::string::npos)
prev_pos = 0;
else
++prev_pos;
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
if (isAWSRegion(endpoint_region))
return;
}
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
}
}

void URI::validateBucket(const String & bucket, const Poco::URI & uri)
Expand Down
4 changes: 4 additions & 0 deletions src/IO/S3/URI.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ struct URI
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
static void validateKey(const std::string & key, const Poco::URI & uri);

/// Returns true if 'region' string is an AWS S3 region
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
static bool isAWSRegion(std::string_view region);

private:
std::pair<std::string, std::optional<std::string>> getURIAndArchivePattern(const std::string & source);
};
Expand Down
3 changes: 2 additions & 1 deletion src/Parsers/ASTSetQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ void ASTSetQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & format,
return true;
}

if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name)
if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name
|| DataLake::DATABASE_ALIAS_NAME == state.create_engine_name)
{
if (DataLake::SETTINGS_TO_HIDE.contains(change.name))
{
Expand Down
2 changes: 1 addition & 1 deletion src/Parsers/FunctionSecretArgumentsFinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ class FunctionSecretArgumentsFinder
/// S3('url', 'access_key_id', 'secret_access_key')
findS3DatabaseSecretArguments();
}
else if (engine_name == "DataLakeCatalog")
else if (engine_name == "DataLakeCatalog" || engine_name == "Iceberg")
{
findDataLakeCatalogSecretArguments();
}
Expand Down
84 changes: 80 additions & 4 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include <Storages/IStorageCluster.h>

#include <pcg_random.hpp>
#include <Common/randomSeed.h>

#include <Common/Exception.h>
#include <Core/Settings.h>
#include <Core/QueryProcessingStage.h>
Expand All @@ -13,6 +16,7 @@
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Planner/Utils.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/narrowPipe.h>
Expand All @@ -22,6 +26,9 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/extractTableFunctionFromSelectQuery.h>

#include <algorithm>
#include <memory>
Expand All @@ -40,6 +47,7 @@ namespace Setting
extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsUInt64 object_storage_max_nodes;
extern const SettingsBool object_storage_remote_initiator;
}

namespace ErrorCodes
Expand Down Expand Up @@ -73,7 +81,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
if (extension)
return;

extension = storage->getTaskIteratorExtension(predicate, context, cluster);
extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster);
}

/// The code executes on initiator
Expand All @@ -97,15 +105,16 @@ void IStorageCluster::read(

storage_snapshot->check(column_names);

updateBeforeRead(context);
auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]);
const auto & settings = context->getSettingsRef();

auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

Block sample_block;
ASTPtr query_to_send = query_info.query;

if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
if (settings[Setting::allow_experimental_analyzer])
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
}
Expand All @@ -118,6 +127,17 @@ void IStorageCluster::read(

updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);

if (settings[Setting::object_storage_remote_initiator])
{
auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
auto modified_query_info = query_info;
modified_query_info.cluster = src_distributed->getCluster();
auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context);
storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams);
return;
}

RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
Expand Down Expand Up @@ -145,6 +165,62 @@ void IStorageCluster::read(
query_plan.addStep(std::move(reading));
}

IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send)
{
auto host_addresses = cluster->getShardsAddresses();
if (host_addresses.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);

static pcg64 rng(randomSeed());
size_t shard_num = rng() % host_addresses.size();
auto shard_addresses = host_addresses[shard_num];
/// After getClusterImpl each shard must have exactly 1 replica
if (shard_addresses.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
auto host_name = shard_addresses[0].toString();

LOG_INFO(log, "Choose remote initiator '{}'", host_name);

bool secure = shard_addresses[0].secure == Protocol::Secure::Enable;
std::string remote_function_name = secure ? "remoteSecure" : "remote";

/// Clean object_storage_remote_initiator setting to avoid infinite remote call
auto new_context = Context::createCopy(context);
new_context->setSetting("object_storage_remote_initiator", false);

auto * select_query = query_to_send->as<ASTSelectQuery>();
if (!select_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query");

auto query_settings = select_query->settings();
if (query_settings)
{
auto & settings_ast = query_settings->as<ASTSetQuery &>();
if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty())
{
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {});
}
}

ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send);
if (!table_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");

auto remote_query = makeASTFunction(remote_function_name, std::make_shared<ASTLiteral>(host_name), table_expression->table_function);

table_expression->table_function = remote_query;

auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context);

auto storage = remote_function->execute(query_to_send, new_context, remote_function_name);

return RemoteCallVariables{storage, new_context};
}

SinkToStoragePtr IStorageCluster::write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
Expand Down
Loading
Loading