From 17f79a2c59bdbf9f6d4ab0531d439f479e62acd6 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 11 Jul 2025 14:33:16 +0200 Subject: [PATCH] Send iceberg_metadata_file_path setting to swarm node --- .../DataLakes/DataLakeConfiguration.cpp | 396 ++++++++++++++++++ .../DataLakes/DataLakeConfiguration.h | 312 +------------- 2 files changed, 412 insertions(+), 296 deletions(-) create mode 100644 src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.cpp diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.cpp b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.cpp new file mode 100644 index 000000000000..ecd3e07a64ed --- /dev/null +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.cpp @@ -0,0 +1,396 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int FORMAT_VERSION_TOO_OLD; +extern const int LOGICAL_ERROR; +} + +namespace StorageObjectStorageSetting +{ +extern const StorageObjectStorageSettingsBool allow_dynamic_metadata_for_data_lakes; +extern const StorageObjectStorageSettingsString iceberg_metadata_file_path; +} + + +template +void DataLakeConfiguration::update(ObjectStoragePtr object_storage, ContextPtr local_context) +{ + BaseStorageConfiguration::update(object_storage, local_context); + + bool existed = current_metadata != nullptr; + + if (updateMetadataObjectIfNeeded(object_storage, local_context)) + { + if (hasExternalDynamicMetadata() && existed) + { + throw Exception( + ErrorCodes::FORMAT_VERSION_TOO_OLD, + "Metadata is not consinsent with the one which was used to infer table schema. Please, retry the query."); + } + } +} + +template +std::optional DataLakeConfiguration::tryGetTableStructureFromMetadata() const +{ + if (!current_metadata) + return std::nullopt; + auto schema_from_metadata = current_metadata->getTableSchema(); + if (!schema_from_metadata.empty()) + { + return ColumnsDescription(std::move(schema_from_metadata)); + } + return std::nullopt; +} + +template +std::optional DataLakeConfiguration::tryGetSamplePathFromMetadata() const +{ + if (!current_metadata) + return std::nullopt; + auto data_files = current_metadata->getDataFiles(); + if (!data_files.empty()) + return data_files[0]; + return std::nullopt; +} + +template +std::optional DataLakeConfiguration::totalRows() +{ + if (!current_metadata) + return {}; + + return current_metadata->totalRows(); +} + +template +std::shared_ptr DataLakeConfiguration::getInitialSchemaByPath(const String & data_path) const +{ + if (!current_metadata) + return {}; + return current_metadata->getInitialSchemaByPath(data_path); +} + +template +std::shared_ptr DataLakeConfiguration::getSchemaTransformer(const String & data_path) const +{ + if (!current_metadata) + return {}; + return current_metadata->getSchemaTransformer(data_path); +} + +template +bool DataLakeConfiguration::hasExternalDynamicMetadata() +{ + return BaseStorageConfiguration::getSettingsRef()[StorageObjectStorageSetting::allow_dynamic_metadata_for_data_lakes] + && current_metadata + && current_metadata->supportsExternalMetadataChange(); +} + +template +ColumnsDescription DataLakeConfiguration::updateAndGetCurrentSchema( + ObjectStoragePtr object_storage, + ContextPtr context) +{ + BaseStorageConfiguration::update(object_storage, context); + updateMetadataObjectIfNeeded(object_storage, context); + return ColumnsDescription{current_metadata->getTableSchema()}; +} + +template +ReadFromFormatInfo DataLakeConfiguration::prepareReadingFromFormat( + ObjectStoragePtr object_storage, + const Strings & requested_columns, + const StorageSnapshotPtr & storage_snapshot, + bool supports_subset_of_columns, + ContextPtr local_context) +{ + auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns); + if (!current_metadata) + { + current_metadata = DataLakeMetadata::create( + object_storage, + weak_from_this(), + local_context); + } + auto read_schema = current_metadata->getReadSchema(); + if (!read_schema.empty()) + { + /// There is a difference between "table schema" and "read schema". + /// "table schema" is a schema from data lake table metadata, + /// while "read schema" is a schema from data files. + /// In most cases they would be the same. + /// TODO: Try to hide this logic inside IDataLakeMetadata. + + const auto read_schema_names = read_schema.getNames(); + const auto table_schema_names = current_metadata->getTableSchema().getNames(); + chassert(read_schema_names.size() == table_schema_names.size()); + + if (read_schema_names != table_schema_names) + { + LOG_TEST(log, "Read schema: {}, table schema: {}, requested columns: {}", + fmt::join(read_schema_names, ", "), + fmt::join(table_schema_names, ", "), + fmt::join(info.requested_columns.getNames(), ", ")); + + auto column_name_mapping = [&]() + { + std::map result; + for (size_t i = 0; i < read_schema_names.size(); ++i) + result[table_schema_names[i]] = read_schema_names[i]; + return result; + }(); + + /// Go through requested columns and change column name + /// from table schema to column name from read schema. + + std::vector read_columns; + for (const auto & column_name : info.requested_columns) + { + const auto pos = info.format_header.getPositionByName(column_name.name); + auto column = info.format_header.getByPosition(pos); + column.name = column_name_mapping.at(column_name.name); + info.format_header.setColumn(pos, column); + + read_columns.emplace_back(column.name, column.type); + } + info.requested_columns = NamesAndTypesList(read_columns.begin(), read_columns.end()); + } + } + + return info; +} + +template +bool DataLakeConfiguration::updateMetadataObjectIfNeeded( + ObjectStoragePtr object_storage, + ContextPtr context) +{ + if (!current_metadata) + { + current_metadata = DataLakeMetadata::create( + object_storage, + weak_from_this(), + context); + return true; + } + + if (current_metadata->supportsUpdate()) + { + return current_metadata->update(context); + } + + auto new_metadata = DataLakeMetadata::create( + object_storage, + weak_from_this(), + context); + + if (*current_metadata != *new_metadata) + { + current_metadata = std::move(new_metadata); + return true; + } + else + { + return false; + } +} + +template +ASTPtr DataLakeConfiguration::createArgsWithAccessData() const +{ + auto res = BaseStorageConfiguration::createArgsWithAccessData(); + + auto iceberg_metadata_file_path = BaseStorageConfiguration::getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_file_path]; + + if (iceberg_metadata_file_path.changed) + { + auto * arguments = res->template as(); + if (!arguments) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not an expression list"); + + bool has_settings = false; + + for (auto & arg : arguments->children) + { + if (auto * settings = arg->template as()) + { + has_settings = true; + settings->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value); + break; + } + } + + if (!has_settings) + { + std::shared_ptr settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value); + arguments->children.push_back(settings_ast); + } + } + + return res; +} + +#if USE_AVRO +# if USE_AWS_S3 +template class DataLakeConfiguration; +# endif +# if USE_AZURE_BLOB_STORAGE +template class DataLakeConfiguration; +# endif +# if USE_HDFS +template class DataLakeConfiguration; +# endif +template class DataLakeConfiguration; +#endif +#if USE_PARQUET && USE_AWS_S3 +template class DataLakeConfiguration; +#endif +#if USE_PARQUET +template class DataLakeConfiguration; +#endif +#if USE_AWS_S3 +template class DataLakeConfiguration; +#endif + +ObjectStorageType StorageIcebergConfiguration::extractDynamicStorageType( + ASTs & args, ContextPtr context, ASTPtr * type_arg) const +{ + static const auto storage_type_name = "storage_type"; + + if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context)) + { + if (named_collection->has(storage_type_name)) + { + return objectStorageTypeFromString(named_collection->get(storage_type_name)); + } + } + + auto type_it = args.end(); + + /// S3 by default for backward compatibility + /// Iceberg without storage_type == IcebergS3 + ObjectStorageType type = ObjectStorageType::S3; + + for (auto arg_it = args.begin(); arg_it != args.end(); ++arg_it) + { + const auto * type_ast_function = (*arg_it)->as(); + + if (type_ast_function && type_ast_function->name == "equals" + && type_ast_function->arguments && type_ast_function->arguments->children.size() == 2) + { + auto name = type_ast_function->arguments->children[0]->as(); + + if (name && name->name() == storage_type_name) + { + if (type_it != args.end()) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake can have only one key-value argument: storage_type='type'."); + } + + auto value = type_ast_function->arguments->children[1]->as(); + + if (!value) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake parameter 'storage_type' has wrong type, string literal expected."); + } + + if (value->value.getType() != Field::Types::String) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake parameter 'storage_type' has wrong value type, string expected."); + } + + type = objectStorageTypeFromString(value->value.safeGet()); + + type_it = arg_it; + } + } + } + + if (type_it != args.end()) + { + if (type_arg) + *type_arg = *type_it; + args.erase(type_it); + } + + return type; +} + +void StorageIcebergConfiguration::createDynamicStorage(ObjectStorageType type) +{ + if (impl) + { + if (impl->getType() == type) + return; + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't change datalake engine storage"); + } + + switch (type) + { +# if USE_AWS_S3 + case ObjectStorageType::S3: + impl = std::make_unique(); + break; +# endif +# if USE_AZURE_BLOB_STORAGE + case ObjectStorageType::Azure: + impl = std::make_unique(); + break; +# endif +# if USE_HDFS + case ObjectStorageType::HDFS: + impl = std::make_unique(); + break; +# endif + case ObjectStorageType::Local: + impl = std::make_unique(); + break; + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsuported DataLake storage {}", type); + } +} + +StorageObjectStorage::Configuration & StorageIcebergConfiguration::getImpl() const +{ + if (!impl) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Dynamic DataLake storage not initialized"); + + return *impl; +} + +ASTPtr StorageIcebergConfiguration::createArgsWithAccessData() const +{ + return getImpl().createArgsWithAccessData(); +} + +} diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 0dedb5d388f5..f2d6b6f2df91 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include @@ -12,37 +11,11 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include - -#include namespace DB { -namespace ErrorCodes -{ -extern const int FORMAT_VERSION_TOO_OLD; -extern const int LOGICAL_ERROR; -} - -namespace StorageObjectStorageSetting -{ -extern const StorageObjectStorageSettingsBool allow_dynamic_metadata_for_data_lakes; -} - - template concept StorageConfiguration = std::derived_from; @@ -56,82 +29,23 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl std::string getEngineName() const override { return DataLakeMetadata::name + BaseStorageConfiguration::getEngineName(); } - void update(ObjectStoragePtr object_storage, ContextPtr local_context) override - { - BaseStorageConfiguration::update(object_storage, local_context); - - bool existed = current_metadata != nullptr; - - if (updateMetadataObjectIfNeeded(object_storage, local_context)) - { - if (hasExternalDynamicMetadata() && existed) - { - throw Exception( - ErrorCodes::FORMAT_VERSION_TOO_OLD, - "Metadata is not consinsent with the one which was used to infer table schema. Please, retry the query."); - } - } - } + void update(ObjectStoragePtr object_storage, ContextPtr local_context) override; - std::optional tryGetTableStructureFromMetadata() const override - { - if (!current_metadata) - return std::nullopt; - auto schema_from_metadata = current_metadata->getTableSchema(); - if (!schema_from_metadata.empty()) - { - return ColumnsDescription(std::move(schema_from_metadata)); - } - return std::nullopt; - } + std::optional tryGetTableStructureFromMetadata() const override; - std::optional tryGetSamplePathFromMetadata() const override - { - if (!current_metadata) - return std::nullopt; - auto data_files = current_metadata->getDataFiles(); - if (!data_files.empty()) - return data_files[0]; - return std::nullopt; - } + std::optional tryGetSamplePathFromMetadata() const override; - std::optional totalRows() override - { - if (!current_metadata) - return {}; + std::optional totalRows() override; - return current_metadata->totalRows(); - } + std::shared_ptr getInitialSchemaByPath(const String & data_path) const override; - std::shared_ptr getInitialSchemaByPath(const String & data_path) const override - { - if (!current_metadata) - return {}; - return current_metadata->getInitialSchemaByPath(data_path); - } + std::shared_ptr getSchemaTransformer(const String & data_path) const override; - std::shared_ptr getSchemaTransformer(const String & data_path) const override - { - if (!current_metadata) - return {}; - return current_metadata->getSchemaTransformer(data_path); - } - - bool hasExternalDynamicMetadata() override - { - return BaseStorageConfiguration::getSettingsRef()[StorageObjectStorageSetting::allow_dynamic_metadata_for_data_lakes] - && current_metadata - && current_metadata->supportsExternalMetadataChange(); - } + bool hasExternalDynamicMetadata() override; ColumnsDescription updateAndGetCurrentSchema( ObjectStoragePtr object_storage, - ContextPtr context) override - { - BaseStorageConfiguration::update(object_storage, context); - updateMetadataObjectIfNeeded(object_storage, context); - return ColumnsDescription{current_metadata->getTableSchema()}; - } + ContextPtr context) override; bool supportsFileIterator() const override { return true; } @@ -159,6 +73,8 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl } #endif + ASTPtr createArgsWithAccessData() const override; + private: DataLakeMetadataPtr current_metadata; LoggerPtr log = getLogger("DataLakeConfiguration"); @@ -168,97 +84,11 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl const Strings & requested_columns, const StorageSnapshotPtr & storage_snapshot, bool supports_subset_of_columns, - ContextPtr local_context) override - { - auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns); - if (!current_metadata) - { - current_metadata = DataLakeMetadata::create( - object_storage, - weak_from_this(), - local_context); - } - auto read_schema = current_metadata->getReadSchema(); - if (!read_schema.empty()) - { - /// There is a difference between "table schema" and "read schema". - /// "table schema" is a schema from data lake table metadata, - /// while "read schema" is a schema from data files. - /// In most cases they would be the same. - /// TODO: Try to hide this logic inside IDataLakeMetadata. - - const auto read_schema_names = read_schema.getNames(); - const auto table_schema_names = current_metadata->getTableSchema().getNames(); - chassert(read_schema_names.size() == table_schema_names.size()); - - if (read_schema_names != table_schema_names) - { - LOG_TEST(log, "Read schema: {}, table schema: {}, requested columns: {}", - fmt::join(read_schema_names, ", "), - fmt::join(table_schema_names, ", "), - fmt::join(info.requested_columns.getNames(), ", ")); - - auto column_name_mapping = [&]() - { - std::map result; - for (size_t i = 0; i < read_schema_names.size(); ++i) - result[table_schema_names[i]] = read_schema_names[i]; - return result; - }(); - - /// Go through requested columns and change column name - /// from table schema to column name from read schema. - - std::vector read_columns; - for (const auto & column_name : info.requested_columns) - { - const auto pos = info.format_header.getPositionByName(column_name.name); - auto column = info.format_header.getByPosition(pos); - column.name = column_name_mapping.at(column_name.name); - info.format_header.setColumn(pos, column); - - read_columns.emplace_back(column.name, column.type); - } - info.requested_columns = NamesAndTypesList(read_columns.begin(), read_columns.end()); - } - } - - return info; - } + ContextPtr local_context) override; bool updateMetadataObjectIfNeeded( ObjectStoragePtr object_storage, - ContextPtr context) - { - if (!current_metadata) - { - current_metadata = DataLakeMetadata::create( - object_storage, - weak_from_this(), - context); - return true; - } - - if (current_metadata->supportsUpdate()) - { - return current_metadata->update(context); - } - - auto new_metadata = DataLakeMetadata::create( - object_storage, - weak_from_this(), - context); - - if (*current_metadata != *new_metadata) - { - current_metadata = std::move(new_metadata); - return true; - } - else - { - return false; - } - } + ContextPtr context); }; @@ -380,10 +210,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, getImpl().initialize(engine_args, local_context, with_table_structure, settings); } - ASTPtr createArgsWithAccessData() const override - { - return getImpl().createArgsWithAccessData(); - } + ASTPtr createArgsWithAccessData() const override; const String & getFormat() const override { return getImpl().getFormat(); } const String & getCompressionMethod() const override { return getImpl().getCompressionMethod(); } @@ -400,74 +227,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, /// Find storage_type argument and remove it from args if exists. /// Return storage type. - ObjectStorageType extractDynamicStorageType(ASTs & args, ContextPtr context, ASTPtr * type_arg = nullptr) const override - { - static const auto storage_type_name = "storage_type"; - - if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context)) - { - if (named_collection->has(storage_type_name)) - { - return objectStorageTypeFromString(named_collection->get(storage_type_name)); - } - } - - auto type_it = args.end(); - - /// S3 by default for backward compatibility - /// Iceberg without storage_type == IcebergS3 - ObjectStorageType type = ObjectStorageType::S3; - - for (auto arg_it = args.begin(); arg_it != args.end(); ++arg_it) - { - const auto * type_ast_function = (*arg_it)->as(); - - if (type_ast_function && type_ast_function->name == "equals" - && type_ast_function->arguments && type_ast_function->arguments->children.size() == 2) - { - auto name = type_ast_function->arguments->children[0]->as(); - - if (name && name->name() == storage_type_name) - { - if (type_it != args.end()) - { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "DataLake can have only one key-value argument: storage_type='type'."); - } - - auto value = type_ast_function->arguments->children[1]->as(); - - if (!value) - { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "DataLake parameter 'storage_type' has wrong type, string literal expected."); - } - - if (value->value.getType() != Field::Types::String) - { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "DataLake parameter 'storage_type' has wrong value type, string expected."); - } - - type = objectStorageTypeFromString(value->value.safeGet()); - - type_it = arg_it; - } - } - } - - if (type_it != args.end()) - { - if (type_arg) - *type_arg = *type_it; - args.erase(type_it); - } - - return type; - } + ObjectStorageType extractDynamicStorageType(ASTs & args, ContextPtr context, ASTPtr * type_arg = nullptr) const override; void createDynamicConfiguration(ASTs & args, ContextPtr context) { @@ -482,50 +242,10 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, virtual void assertInitialized() const override { return getImpl().assertInitialized(); } - private: - inline StorageObjectStorage::Configuration & getImpl() const - { - if (!impl) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Dynamic DataLake storage not initialized"); + StorageObjectStorage::Configuration & getImpl() const; - return *impl; - } - - void createDynamicStorage(ObjectStorageType type) - { - if (impl) - { - if (impl->getType() == type) - return; - - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't change datalake engine storage"); - } - - switch (type) - { -# if USE_AWS_S3 - case ObjectStorageType::S3: - impl = std::make_unique(); - break; -# endif -# if USE_AZURE_BLOB_STORAGE - case ObjectStorageType::Azure: - impl = std::make_unique(); - break; -# endif -# if USE_HDFS - case ObjectStorageType::HDFS: - impl = std::make_unique(); - break; -# endif - case ObjectStorageType::Local: - impl = std::make_unique(); - break; - default: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsuported DataLake storage {}", type); - } - } + void createDynamicStorage(ObjectStorageType type); std::shared_ptr impl; };