diff --git a/contrib/azure-cmake/CMakeLists.txt b/contrib/azure-cmake/CMakeLists.txt index 1e2a4c97824f..9c361db47ca2 100644 --- a/contrib/azure-cmake/CMakeLists.txt +++ b/contrib/azure-cmake/CMakeLists.txt @@ -1,6 +1,6 @@ option (ENABLE_AZURE_BLOB_STORAGE "Enable Azure blob storage" ${ENABLE_LIBRARIES}) -if (NOT ENABLE_AZURE_BLOB_STORAGE OR BUILD_STANDALONE_KEEPER OR OS_FREEBSD) +if (NOT ENABLE_AZURE_BLOB_STORAGE OR BUILD_STANDALONE_KEEPER OR OS_FREEBSD OR ARCH_PPC64LE) message(STATUS "Not using Azure blob storage") return() endif() diff --git a/docs/en/sql-reference/table-functions/azure_blob_storage.md b/docs/en/sql-reference/table-functions/azure_blob_storage.md new file mode 100644 index 000000000000..f86307b3b85c --- /dev/null +++ b/docs/en/sql-reference/table-functions/azure_blob_storage.md @@ -0,0 +1,11 @@ +--- +slug: /en/sql-reference/table-functions/azure_blob_storage +sidebar_position: 45 +sidebar_label: azure_blob_storage +keywords: [azure blob storage] +--- + +# azure\_blob\_storage Table Function + +Provides a table-like interface to select/insert files in [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs). This table function is similar to the [s3 function](../../sql-reference/table-functions/s3.md). + diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 6394c0279a7e..84c99939f2da 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -210,6 +210,7 @@ enum class AccessType M(HDFS, "", GLOBAL, SOURCES) \ M(S3, "", GLOBAL, SOURCES) \ M(HIVE, "", GLOBAL, SOURCES) \ + M(AZURE, "", GLOBAL, SOURCES) \ M(SOURCES, "", GROUP, ALL) \ \ M(CLUSTER, "", GLOBAL, ALL) /* ON CLUSTER queries */ \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index fdee99026348..f66f7bc64655 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -348,6 +348,9 @@ The server successfully detected this situation and will download merged part fr M(S3PutObject, "Number of S3 API PutObject calls.") \ M(S3GetObject, "Number of S3 API GetObject calls.") \ \ + M(AzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.") \ + M(AzureListObjects, "Number of Azure blob storage API ListObjects calls.") \ + \ M(DiskS3DeleteObjects, "Number of DiskS3 API DeleteObject(s) calls.") \ M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \ M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 464b9168a4cd..1d7114d641f1 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -81,7 +81,9 @@ class IColumn; M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \ M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \ M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \ + M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \ M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \ + M(UInt64, azure_max_single_read_retries, 4, "The maximum number of retries during single Azure blob storage read.", 0) \ M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \ M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \ M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \ @@ -90,8 +92,11 @@ class IColumn; M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \ M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \ M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \ + M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \ M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \ + M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \ M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \ + M(Bool, azure_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in azure engine tables", 0) \ M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \ M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \ M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \ @@ -708,6 +713,7 @@ class IColumn; \ M(Bool, schema_inference_use_cache_for_file, true, "Use cache in schema inference while using file table function", 0) \ M(Bool, schema_inference_use_cache_for_s3, true, "Use cache in schema inference while using s3 table function", 0) \ + M(Bool, schema_inference_use_cache_for_azure, true, "Use cache in schema inference while using azure table function", 0) \ M(Bool, schema_inference_use_cache_for_hdfs, true, "Use cache in schema inference while using hdfs table function", 0) \ M(Bool, schema_inference_use_cache_for_url, true, "Use cache in schema inference while using url table function", 0) \ M(Bool, schema_inference_cache_require_modification_time_for_url, true, "Use schema from cache for URL with last modification time validation (for urls with Last-Modified header)", 0) \ diff --git a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp index 0f197c2ff06a..129bb97be097 100644 --- a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp @@ -35,6 +35,7 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage( size_t max_single_read_retries_, size_t max_single_download_retries_, bool use_external_buffer_, + bool restricted_seek_, size_t read_until_position_) : ReadBufferFromFileBase(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size, nullptr, 0) , blob_container_client(blob_container_client_) @@ -44,6 +45,7 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage( , read_settings(read_settings_) , tmp_buffer_size(read_settings.remote_fs_buffer_size) , use_external_buffer(use_external_buffer_) + , restricted_seek(restricted_seek_) , read_until_position(read_until_position_) { if (!use_external_buffer) @@ -54,6 +56,22 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage( } } + +void ReadBufferFromAzureBlobStorage::setReadUntilEnd() +{ + if (read_until_position) + { + read_until_position = 0; + if (initialized) + { + offset = getPosition(); + resetWorkingBuffer(); + initialized = false; + } + } + +} + void ReadBufferFromAzureBlobStorage::setReadUntilPosition(size_t position) { read_until_position = position; @@ -118,8 +136,17 @@ bool ReadBufferFromAzureBlobStorage::nextImpl() off_t ReadBufferFromAzureBlobStorage::seek(off_t offset_, int whence) { - if (initialized) - throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Seek is allowed only before first read attempt from the buffer."); + if (offset_ == getPosition() && whence == SEEK_SET) + return offset_; + + if (initialized && restricted_seek) + { + throw Exception( + ErrorCodes::CANNOT_SEEK_THROUGH_FILE, + "Seek is allowed only before first read attempt from the buffer (current offset: " + "{}, new offset: {}, reading until position: {}, available: {})", + getPosition(), offset_, read_until_position, available()); + } if (whence != SEEK_SET) throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed."); @@ -127,8 +154,36 @@ off_t ReadBufferFromAzureBlobStorage::seek(off_t offset_, int whence) if (offset_ < 0) throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset_); - offset = offset_; + if (!restricted_seek) + { + if (!working_buffer.empty() + && static_cast(offset_) >= offset - working_buffer.size() + && offset_ < offset) + { + pos = working_buffer.end() - (offset - offset_); + assert(pos >= working_buffer.begin()); + assert(pos < working_buffer.end()); + + return getPosition(); + } + + off_t position = getPosition(); + if (initialized && offset_ > position) + { + size_t diff = offset_ - position; + if (diff < read_settings.remote_read_min_bytes_for_seek) + { + ignore(diff); + return offset_; + } + } + + resetWorkingBuffer(); + if (initialized) + initialized = false; + } + offset = offset_; return offset; } @@ -152,7 +207,8 @@ void ReadBufferFromAzureBlobStorage::initialize() download_options.Range = {static_cast(offset), length}; - blob_client = std::make_unique(blob_container_client->GetBlobClient(path)); + if (!blob_client) + blob_client = std::make_unique(blob_container_client->GetBlobClient(path)); size_t sleep_time_with_backoff_milliseconds = 100; for (size_t i = 0; i < max_single_download_retries; ++i) @@ -182,6 +238,18 @@ void ReadBufferFromAzureBlobStorage::initialize() initialized = true; } +size_t ReadBufferFromAzureBlobStorage::getFileSize() +{ + if (!blob_client) + blob_client = std::make_unique(blob_container_client->GetBlobClient(path)); + + if (file_size.has_value()) + return *file_size; + + file_size = blob_client->GetProperties().Value.BlobSize; + return *file_size; +} + } #endif diff --git a/src/Disks/IO/ReadBufferFromAzureBlobStorage.h b/src/Disks/IO/ReadBufferFromAzureBlobStorage.h index 6164a0057739..4e21f5436536 100644 --- a/src/Disks/IO/ReadBufferFromAzureBlobStorage.h +++ b/src/Disks/IO/ReadBufferFromAzureBlobStorage.h @@ -24,6 +24,7 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase size_t max_single_read_retries_, size_t max_single_download_retries_, bool use_external_buffer_ = false, + bool restricted_seek_ = false, size_t read_until_position_ = 0); off_t seek(off_t off, int whence) override; @@ -37,9 +38,12 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase String getFileName() const override { return path; } void setReadUntilPosition(size_t position) override; + void setReadUntilEnd() override; bool supportsRightBoundedReads() const override { return true; } + size_t getFileSize() override; + private: void initialize(); @@ -55,6 +59,12 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase std::vector tmp_buffer; size_t tmp_buffer_size; bool use_external_buffer; + + /// There is different seek policy for disk seek and for non-disk seek + /// (non-disk seek is applied for seekable input formats: orc, arrow, parquet). + bool restricted_seek; + + off_t read_until_position = 0; off_t offset = 0; diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp index 1e06490b5bc0..1b62b5fdb05d 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp @@ -57,14 +57,22 @@ void validateContainerName(const String & container_name) AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix) { - String storage_account_url = config.getString(config_prefix + ".storage_account_url"); - validateStorageAccountUrl(storage_account_url); + std::string storage_url; + if (config.has(config_prefix + ".storage_account_url")) + { + storage_url = config.getString(config_prefix + ".storage_account_url"); + validateStorageAccountUrl(storage_url); + } + else + { + storage_url = config.getString(config_prefix + ".connection_string"); + } String container_name = config.getString(config_prefix + ".container_name", "default-container"); validateContainerName(container_name); std::optional container_already_exists {}; if (config.has(config_prefix + ".container_already_exists")) container_already_exists = {config.getBool(config_prefix + ".container_already_exists")}; - return {storage_account_url, container_name, container_already_exists}; + return {storage_url, container_name, container_already_exists}; } @@ -136,10 +144,7 @@ std::unique_ptr getAzureBlobContainerClient( /// If container_already_exists is not set (in config), ignore already exists error. /// (Conflict - The specified container already exists) if (!endpoint.container_already_exists.has_value() && e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict) - { - tryLogCurrentException("Container already exists, returning the existing container"); return getAzureBlobStorageClientWithAuth(final_url, container_name, config, config_prefix); - } throw; } } diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp index 23a0da39dd31..20c60cfe8f50 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp @@ -57,6 +57,7 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync private: bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override { + batch.clear(); auto outcome = client->ListBlobs(options); auto blob_list_response = client->ListBlobs(options); auto blobs_list = blob_list_response.Blobs; @@ -73,11 +74,11 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync {}}); } - options.ContinuationToken = blob_list_response.NextPageToken; - if (blob_list_response.HasPage()) - return true; + if (!blob_list_response.NextPageToken.HasValue() || blob_list_response.NextPageToken.Value().empty()) + return false; - return false; + options.ContinuationToken = blob_list_response.NextPageToken; + return true; } std::shared_ptr client; @@ -215,6 +216,7 @@ std::unique_ptr AzureObjectStorage::readObjects( /// NOL settings_ptr->max_single_read_retries, settings_ptr->max_single_download_retries, /* use_external_buffer */true, + /* restricted_seek */true, read_until_position); }; diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h index 5b08ceb80e32..5a34adb384a9 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h @@ -8,10 +8,7 @@ #include #include #include - -#if USE_AZURE_BLOB_STORAGE #include -#endif namespace Poco { @@ -37,11 +34,13 @@ struct AzureObjectStorageSettings { } - size_t max_single_part_upload_size; /// NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset - uint64_t min_bytes_for_seek; - size_t max_single_read_retries; - size_t max_single_download_retries; - int list_object_keys_size; + AzureObjectStorageSettings() = default; + + size_t max_single_part_upload_size = 100 * 1024 * 1024; /// NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset + uint64_t min_bytes_for_seek = 1024 * 1024; + size_t max_single_read_retries = 3; + size_t max_single_download_retries = 3; + int list_object_keys_size = 1000; }; using AzureClient = Azure::Storage::Blobs::BlobContainerClient; diff --git a/src/Disks/ObjectStorages/ObjectStorageIterator.cpp b/src/Disks/ObjectStorages/ObjectStorageIterator.cpp index 188b743958c4..72ec6e0e5006 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIterator.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageIterator.cpp @@ -9,7 +9,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -RelativePathWithMetadata ObjectStorageIteratorFromList::current() const +RelativePathWithMetadata ObjectStorageIteratorFromList::current() { if (!isValid()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator"); diff --git a/src/Disks/ObjectStorages/ObjectStorageIterator.h b/src/Disks/ObjectStorages/ObjectStorageIterator.h index c3afd395a748..2ff5ce60acc2 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIterator.h +++ b/src/Disks/ObjectStorages/ObjectStorageIterator.h @@ -10,8 +10,10 @@ class IObjectStorageIterator { public: virtual void next() = 0; - virtual bool isValid() const = 0; - virtual RelativePathWithMetadata current() const = 0; + virtual void nextBatch() = 0; + virtual bool isValid() = 0; + virtual RelativePathWithMetadata current() = 0; + virtual RelativePathsWithMetadata currentBatch() = 0; virtual size_t getAccumulatedSize() const = 0; virtual ~IObjectStorageIterator() = default; @@ -34,12 +36,22 @@ class ObjectStorageIteratorFromList : public IObjectStorageIterator ++batch_iterator; } - bool isValid() const override + void nextBatch() override + { + batch_iterator = batch.end(); + } + + bool isValid() override { return batch_iterator != batch.end(); } - RelativePathWithMetadata current() const override; + RelativePathWithMetadata current() override; + + RelativePathsWithMetadata currentBatch() override + { + return batch; + } size_t getAccumulatedSize() const override { diff --git a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp index 766071cf815b..f91c19f2fb91 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp @@ -1,5 +1,7 @@ #include +#include + namespace DB { @@ -8,6 +10,33 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +void IObjectStorageIteratorAsync::nextBatch() +{ + std::lock_guard lock(mutex); + if (!is_finished) + { + if (!is_initialized) + { + outcome_future = scheduleBatch(); + is_initialized = true; + } + + BatchAndHasNext next_batch = outcome_future.get(); + current_batch = std::move(next_batch.batch); + accumulated_size.fetch_add(current_batch.size(), std::memory_order_relaxed); + current_batch_iterator = current_batch.begin(); + if (next_batch.has_next) + outcome_future = scheduleBatch(); + else + is_finished = true; + } + else + { + current_batch.clear(); + current_batch_iterator = current_batch.begin(); + } +} + void IObjectStorageIteratorAsync::next() { std::lock_guard lock(mutex); @@ -43,19 +72,34 @@ std::future IObjectStorageIterator } -bool IObjectStorageIteratorAsync::isValid() const +bool IObjectStorageIteratorAsync::isValid() { + if (!is_initialized) + nextBatch(); + + std::lock_guard lock(mutex); return current_batch_iterator != current_batch.end(); } -RelativePathWithMetadata IObjectStorageIteratorAsync::current() const +RelativePathWithMetadata IObjectStorageIteratorAsync::current() { if (!isValid()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator"); + std::lock_guard lock(mutex); return *current_batch_iterator; } + +RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch() +{ + if (!isValid()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator"); + + std::lock_guard lock(mutex); + return current_batch; +} + size_t IObjectStorageIteratorAsync::getAccumulatedSize() const { return accumulated_size.load(std::memory_order_relaxed); diff --git a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h index 81ba9bce137c..a2b06da9a91c 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h +++ b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h @@ -22,8 +22,10 @@ class IObjectStorageIteratorAsync : public IObjectStorageIterator } void next() override; - bool isValid() const override; - RelativePathWithMetadata current() const override; + void nextBatch() override; + bool isValid() override; + RelativePathWithMetadata current() override; + RelativePathsWithMetadata currentBatch() override; size_t getAccumulatedSize() const override; ~IObjectStorageIteratorAsync() override @@ -43,9 +45,10 @@ class IObjectStorageIteratorAsync : public IObjectStorageIterator std::future scheduleBatch(); + bool is_initialized{false}; bool is_finished{false}; - std::mutex mutex; + mutable std::mutex mutex; ThreadPool list_objects_pool; ThreadPoolCallbackRunner list_objects_scheduler; std::future outcome_future; diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index cbf6cc1cbe3e..94bdca60e691 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -598,6 +598,8 @@ Block ActionsDAG::updateHeader(Block header) const } ColumnsWithTypeAndName result_columns; + + result_columns.reserve(outputs.size()); struct Frame diff --git a/src/Storages/StorageAzureBlob.cpp b/src/Storages/StorageAzureBlob.cpp new file mode 100644 index 000000000000..3ee176a68b74 --- /dev/null +++ b/src/Storages/StorageAzureBlob.cpp @@ -0,0 +1,1305 @@ +#include + + +#if USE_AZURE_BLOB_STORAGE +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + + +using namespace Azure::Storage::Blobs; + +namespace CurrentMetrics +{ + extern const Metric ObjectStorageAzureThreads; + extern const Metric ObjectStorageAzureThreadsActive; +} + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; + extern const int DATABASE_ACCESS_DENIED; + extern const int CANNOT_COMPILE_REGEXP; + extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; + extern const int LOGICAL_ERROR; + extern const int NOT_IMPLEMENTED; + +} + +namespace +{ + +const std::unordered_set required_configuration_keys = { + "blob_path", + "container", +}; + +const std::unordered_set optional_configuration_keys = { + "format", + "compression", + "structure", + "compression_method", + "account_name", + "account_key", + "connection_string", + "storage_account_url", +}; + +bool isConnectionString(const std::string & candidate) +{ + return candidate.starts_with("DefaultEndpointsProtocol"); +} + +} + +void StorageAzureBlob::processNamedCollectionResult(StorageAzureBlob::Configuration & configuration, const NamedCollection & collection) +{ + validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys); + + if (collection.has("connection_string")) + { + configuration.connection_url = collection.get("connection_string"); + configuration.is_connection_string = true; + } + + if (collection.has("storage_account_url")) + { + configuration.connection_url = collection.get("storage_account_url"); + configuration.is_connection_string = false; + } + + configuration.container = collection.get("container"); + configuration.blob_path = collection.get("blob_path"); + + if (collection.has("account_name")) + configuration.account_name = collection.get("account_name"); + + if (collection.has("account_key")) + configuration.account_key = collection.get("account_key"); + + configuration.structure = collection.getOrDefault("structure", "auto"); + configuration.format = collection.getOrDefault("format", configuration.format); + configuration.compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); +} + + +StorageAzureBlob::Configuration StorageAzureBlob::getConfiguration(ASTs & engine_args, ContextPtr local_context) +{ + StorageAzureBlob::Configuration configuration; + + /// Supported signatures: + /// + /// AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression]) + /// + + if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context)) + { + processNamedCollectionResult(configuration, *named_collection); + + configuration.blobs_paths = {configuration.blob_path}; + + if (configuration.format == "auto") + configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true); + + return configuration; + } + + if (engine_args.size() < 3 || engine_args.size() > 7) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage AzureBlobStorage requires 3 to 7 arguments: " + "AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])"); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context); + + std::unordered_map engine_args_to_idx; + + configuration.connection_url = checkAndGetLiteralArgument(engine_args[0], "connection_string/storage_account_url"); + configuration.is_connection_string = isConnectionString(configuration.connection_url); + + configuration.container = checkAndGetLiteralArgument(engine_args[1], "container"); + configuration.blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath"); + + auto is_format_arg = [] (const std::string & s) -> bool + { + return s == "auto" || FormatFactory::instance().getAllFormats().contains(s); + }; + + if (engine_args.size() == 4) + { + //'c1 UInt64, c2 UInt64 + auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); + if (is_format_arg(fourth_arg)) + { + configuration.format = fourth_arg; + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format or account name specified without account key"); + } + } + else if (engine_args.size() == 5) + { + auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); + if (is_format_arg(fourth_arg)) + { + configuration.format = fourth_arg; + configuration.compression_method = checkAndGetLiteralArgument(engine_args[4], "compression"); + } + else + { + configuration.account_name = fourth_arg; + configuration.account_key = checkAndGetLiteralArgument(engine_args[4], "account_key"); + } + } + else if (engine_args.size() == 6) + { + auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); + if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format and compression must be last arguments"); + } + else + { + configuration.account_name = fourth_arg; + + configuration.account_key = checkAndGetLiteralArgument(engine_args[4], "account_key"); + auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/account_name"); + if (!is_format_arg(sixth_arg)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); + configuration.format = sixth_arg; + } + } + else if (engine_args.size() == 7) + { + auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); + if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format and compression must be last arguments"); + } + else + { + configuration.account_name = fourth_arg; + configuration.account_key = checkAndGetLiteralArgument(engine_args[4], "account_key"); + auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/account_name"); + if (!is_format_arg(sixth_arg)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); + configuration.format = sixth_arg; + configuration.compression_method = checkAndGetLiteralArgument(engine_args[6], "compression"); + } + } + + configuration.blobs_paths = {configuration.blob_path}; + + if (configuration.format == "auto") + configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true); + + return configuration; +} + + +AzureObjectStorage::SettingsPtr StorageAzureBlob::createSettings(ContextPtr local_context) +{ + const auto & context_settings = local_context->getSettingsRef(); + auto settings_ptr = std::make_unique(); + settings_ptr->max_single_part_upload_size = context_settings.azure_max_single_part_upload_size; + settings_ptr->max_single_read_retries = context_settings.azure_max_single_read_retries; + settings_ptr->list_object_keys_size = static_cast(context_settings.azure_list_object_keys_size); + + return settings_ptr; +} + +void registerStorageAzureBlob(StorageFactory & factory) +{ + factory.registerStorage("AzureBlobStorage", [](const StorageFactory::Arguments & args) + { + auto & engine_args = args.engine_args; + if (engine_args.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); + + auto configuration = StorageAzureBlob::getConfiguration(engine_args, args.getLocalContext()); + auto client = StorageAzureBlob::createClient(configuration); + // Use format settings from global server context + settings from + // the SETTINGS clause of the create query. Settings from current + // session and user are ignored. + std::optional format_settings; + if (args.storage_def->settings) + { + FormatFactorySettings user_format_settings; + + // Apply changed settings from global context, but ignore the + // unknown ones, because we only have the format settings here. + const auto & changes = args.getContext()->getSettingsRef().changes(); + for (const auto & change : changes) + { + if (user_format_settings.has(change.name)) + user_format_settings.set(change.name, change.value); + } + + // Apply changes from SETTINGS clause, with validation. + user_format_settings.applyChanges(args.storage_def->settings->changes); + format_settings = getFormatSettings(args.getContext(), user_format_settings); + } + else + { + format_settings = getFormatSettings(args.getContext()); + } + + ASTPtr partition_by; + if (args.storage_def->partition_by) + partition_by = args.storage_def->partition_by->clone(); + + auto settings = StorageAzureBlob::createSettings(args.getContext()); + + return std::make_shared( + std::move(configuration), + std::make_unique("AzureBlobStorage", std::move(client), std::move(settings)), + args.getContext(), + args.table_id, + args.columns, + args.constraints, + args.comment, + format_settings, + partition_by); + }, + { + .supports_settings = true, + .supports_sort_order = true, // for partition by + .supports_schema_inference = true, + .source_access_type = AccessType::AZURE, + }); +} + +AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration) +{ + AzureClientPtr result; + + if (configuration.is_connection_string) + { + result = std::make_unique(BlobContainerClient::CreateFromConnectionString(configuration.connection_url, configuration.container)); + result->CreateIfNotExists(); + } + else + { + if (configuration.account_name.has_value() && configuration.account_key.has_value()) + { + auto storage_shared_key_credential = std::make_shared(*configuration.account_name, *configuration.account_key); + auto blob_service_client = std::make_unique(configuration.connection_url, storage_shared_key_credential); + try + { + result = std::make_unique(blob_service_client->CreateBlobContainer(configuration.container).Value); + } + catch (const Azure::Storage::StorageException & e) + { + if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict) + { + auto final_url = configuration.connection_url + + (configuration.connection_url.back() == '/' ? "" : "/") + + configuration.container; + + result = std::make_unique(final_url, storage_shared_key_credential); + } + else + { + throw; + } + } + } + else + { + auto managed_identity_credential = std::make_shared(); + auto blob_service_client = std::make_unique(configuration.connection_url, managed_identity_credential); + try + { + result = std::make_unique(blob_service_client->CreateBlobContainer(configuration.container).Value); + } + catch (const Azure::Storage::StorageException & e) + { + if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict) + { + auto final_url = configuration.connection_url + + (configuration.connection_url.back() == '/' ? "" : "/") + + configuration.container; + + result = std::make_unique(final_url, managed_identity_credential); + } + else + { + throw; + } + } + } + } + + return result; +} + +Poco::URI StorageAzureBlob::Configuration::getConnectionURL() const +{ + if (!is_connection_string) + return Poco::URI(connection_url); + + auto parsed_connection_string = Azure::Storage::_internal::ParseConnectionString(connection_url); + return Poco::URI(parsed_connection_string.BlobServiceUrl.GetAbsoluteUrl()); +} + + +StorageAzureBlob::StorageAzureBlob( + const Configuration & configuration_, + std::unique_ptr && object_storage_, + ContextPtr context, + const StorageID & table_id_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & comment, + std::optional format_settings_, + ASTPtr partition_by_) + : IStorage(table_id_) + , name("AzureBlobStorage") + , configuration(configuration_) + , object_storage(std::move(object_storage_)) + , distributed_processing(false) + , format_settings(format_settings_) + , partition_by(partition_by_) +{ + FormatFactory::instance().checkFormatName(configuration.format); + context->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.getConnectionURL()); + + StorageInMemoryMetadata storage_metadata; + if (columns_.empty()) + { + auto columns = getTableStructureFromData(object_storage.get(), configuration, format_settings, context); + storage_metadata.setColumns(columns); + } + else + storage_metadata.setColumns(columns_); + + storage_metadata.setConstraints(constraints_); + storage_metadata.setComment(comment); + setInMemoryMetadata(storage_metadata); + + StoredObjects objects; + for (const auto & key : configuration.blobs_paths) + objects.emplace_back(key); + + auto default_virtuals = NamesAndTypesList{ + {"_path", std::make_shared(std::make_shared())}, + {"_file", std::make_shared(std::make_shared())}}; + + auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList(); + + virtual_columns = getVirtualsForStorage(columns, default_virtuals); + for (const auto & column : virtual_columns) + virtual_block.insert({column.type->createColumn(), column.type, column.name}); +} + +void StorageAzureBlob::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) +{ + if (configuration.withGlobs()) + { + throw Exception( + ErrorCodes::DATABASE_ACCESS_DENIED, + "S3 key '{}' contains globs, so the table is in readonly mode", + configuration.blob_path); + } + + StoredObjects objects; + for (const auto & key : configuration.blobs_paths) + objects.emplace_back(key); + + object_storage->removeObjectsIfExist(objects); +} + +namespace +{ + +class StorageAzureBlobSink : public SinkToStorage +{ +public: + StorageAzureBlobSink( + const String & format, + const Block & sample_block_, + ContextPtr context, + std::optional format_settings_, + const CompressionMethod compression_method, + AzureObjectStorage * object_storage, + const String & blob_path) + : SinkToStorage(sample_block_) + , sample_block(sample_block_) + , format_settings(format_settings_) + { + StoredObject object(blob_path); + write_buf = wrapWriteBufferWithCompressionMethod(object_storage->writeObject(object, WriteMode::Rewrite), compression_method, 3); + writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings); + } + + String getName() const override { return "StorageAzureBlobSink"; } + + void consume(Chunk chunk) override + { + std::lock_guard lock(cancel_mutex); + if (cancelled) + return; + writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); + } + + void onCancel() override + { + std::lock_guard lock(cancel_mutex); + finalize(); + cancelled = true; + } + + void onException() override + { + std::lock_guard lock(cancel_mutex); + finalize(); + } + + void onFinish() override + { + std::lock_guard lock(cancel_mutex); + finalize(); + } + +private: + void finalize() + { + if (!writer) + return; + + try + { + writer->finalize(); + writer->flush(); + write_buf->finalize(); + } + catch (...) + { + /// Stop ParallelFormattingOutputFormat correctly. + writer.reset(); + write_buf->finalize(); + throw; + } + } + + Block sample_block; + std::optional format_settings; + std::unique_ptr write_buf; + OutputFormatPtr writer; + bool cancelled = false; + std::mutex cancel_mutex; +}; + +class PartitionedStorageAzureBlobSink : public PartitionedSink +{ +public: + PartitionedStorageAzureBlobSink( + const ASTPtr & partition_by, + const String & format_, + const Block & sample_block_, + ContextPtr context_, + std::optional format_settings_, + const CompressionMethod compression_method_, + AzureObjectStorage * object_storage_, + const String & blob_) + : PartitionedSink(partition_by, context_, sample_block_) + , format(format_) + , sample_block(sample_block_) + , context(context_) + , compression_method(compression_method_) + , object_storage(object_storage_) + , blob(blob_) + , format_settings(format_settings_) + { + } + + SinkPtr createSinkForPartition(const String & partition_id) override + { + auto partition_key = replaceWildcards(blob, partition_id); + validateKey(partition_key); + + return std::make_shared( + format, + sample_block, + context, + format_settings, + compression_method, + object_storage, + partition_key + ); + } + +private: + const String format; + const Block sample_block; + const ContextPtr context; + const CompressionMethod compression_method; + AzureObjectStorage * object_storage; + const String blob; + const std::optional format_settings; + + ExpressionActionsPtr partition_by_expr; + + static void validateKey(const String & str) + { + validatePartitionKey(str, true); + } +}; + +} + +Pipe StorageAzureBlob::read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr local_context, + QueryProcessingStage::Enum /*processed_stage*/, + size_t max_block_size, + size_t num_streams) +{ + if (partition_by && configuration.withWildcard()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned Azure storage is not implemented yet"); + + Pipes pipes; + + std::unordered_set column_names_set(column_names.begin(), column_names.end()); + std::vector requested_virtual_columns; + + for (const auto & virtual_column : getVirtuals()) + { + if (column_names_set.contains(virtual_column.name)) + requested_virtual_columns.push_back(virtual_column); + } + + std::shared_ptr iterator_wrapper; + if (configuration.withGlobs()) + { + /// Iterate through disclosed globs and make a source for each file + iterator_wrapper = std::make_shared( + object_storage.get(), configuration.container, std::nullopt, + configuration.blob_path, query_info.query, virtual_block, local_context, nullptr); + } + else + { + iterator_wrapper = std::make_shared( + object_storage.get(), configuration.container, configuration.blobs_paths, + std::nullopt, query_info.query, virtual_block, local_context, nullptr); + } + + ColumnsDescription columns_description; + Block block_for_format; + if (supportsSubsetOfColumns()) + { + auto fetch_columns = column_names; + const auto & virtuals = getVirtuals(); + std::erase_if( + fetch_columns, + [&](const String & col) + { return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); }); + + if (fetch_columns.empty()) + fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name); + + columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns); + block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical()); + } + else + { + columns_description = storage_snapshot->metadata->getColumns(); + block_for_format = storage_snapshot->metadata->getSampleBlock(); + } + + for (size_t i = 0; i < num_streams; ++i) + { + pipes.emplace_back(std::make_shared( + requested_virtual_columns, + configuration.format, + getName(), + block_for_format, + local_context, + format_settings, + columns_description, + max_block_size, + configuration.compression_method, + object_storage.get(), + configuration.container, + iterator_wrapper)); + } + + return Pipe::unitePipes(std::move(pipes)); +} + +SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) +{ + auto sample_block = metadata_snapshot->getSampleBlock(); + auto chosen_compression_method = chooseCompressionMethod(configuration.blobs_paths.back(), configuration.compression_method); + auto insert_query = std::dynamic_pointer_cast(query); + + auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; + bool is_partitioned_implementation = partition_by_ast && configuration.withWildcard(); + + if (is_partitioned_implementation) + { + return std::make_shared( + partition_by_ast, + configuration.format, + sample_block, + local_context, + format_settings, + chosen_compression_method, + object_storage.get(), + configuration.blobs_paths.back()); + } + else + { + if (configuration.withGlobs()) + throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, + "AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path); + + bool truncate_in_insert = local_context->getSettingsRef().azure_truncate_on_insert; + + if (!truncate_in_insert && object_storage->exists(StoredObject(configuration.blob_path))) + { + + if (local_context->getSettingsRef().azure_create_new_file_on_insert) + { + size_t index = configuration.blobs_paths.size(); + const auto & first_key = configuration.blobs_paths[0]; + auto pos = first_key.find_first_of('.'); + String new_key; + + do + { + new_key = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos)); + ++index; + } + while (object_storage->exists(StoredObject(new_key))); + + configuration.blobs_paths.push_back(new_key); + } + else + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Object in bucket {} with key {} already exists. " + "If you want to overwrite it, enable setting azure_truncate_on_insert, if you " + "want to create a new file on each insert, enable setting azure_create_new_file_on_insert", + configuration.container, configuration.blobs_paths.back()); + } + } + + return std::make_shared( + configuration.format, + sample_block, + local_context, + format_settings, + chosen_compression_method, + object_storage.get(), + configuration.blobs_paths.back()); + } +} + +NamesAndTypesList StorageAzureBlob::getVirtuals() const +{ + return virtual_columns; +} + +bool StorageAzureBlob::supportsPartitionBy() const +{ + return true; +} + +bool StorageAzureBlob::supportsSubcolumns() const +{ + return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format); +} + +bool StorageAzureBlob::supportsSubsetOfColumns() const +{ + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format); +} + +bool StorageAzureBlob::prefersLargeBlocks() const +{ + return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration.format); +} + +bool StorageAzureBlob::parallelizeOutputAfterReading(ContextPtr context) const +{ + return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context); +} + +static void addPathToVirtualColumns(Block & block, const String & path, size_t idx) +{ + if (block.has("_path")) + block.getByName("_path").column->assumeMutableRef().insert(path); + + if (block.has("_file")) + { + auto pos = path.find_last_of('/'); + assert(pos != std::string::npos); + + auto file = path.substr(pos + 1); + block.getByName("_file").column->assumeMutableRef().insert(file); + } + + block.getByName("_idx").column->assumeMutableRef().insert(idx); +} + +StorageAzureBlobSource::Iterator::Iterator( + AzureObjectStorage * object_storage_, + const std::string & container_, + std::optional keys_, + std::optional blob_path_with_globs_, + ASTPtr query_, + const Block & virtual_header_, + ContextPtr context_, + RelativePathsWithMetadata * outer_blobs_) + : WithContext(context_) + , object_storage(object_storage_) + , container(container_) + , keys(keys_) + , blob_path_with_globs(blob_path_with_globs_) + , query(query_) + , virtual_header(virtual_header_) + , outer_blobs(outer_blobs_) +{ + if (keys.has_value() && blob_path_with_globs.has_value()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot specify keys and glob simultaneously it's a bug"); + + if (!keys.has_value() && !blob_path_with_globs.has_value()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Both keys and glob mask are not specified"); + + if (keys) + { + Strings all_keys = *keys; + + blobs_with_metadata.emplace(); + /// Create a virtual block with one row to construct filter + if (query && virtual_header && !all_keys.empty()) + { + /// Append "idx" column as the filter result + virtual_header.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); + + auto block = virtual_header.cloneEmpty(); + addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0); + + VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast); + + if (filter_ast) + { + block = virtual_header.cloneEmpty(); + for (size_t i = 0; i < all_keys.size(); ++i) + addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i); + + VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); + const auto & idxs = typeid_cast(*block.getByName("_idx").column); + + Strings filtered_keys; + filtered_keys.reserve(block.rows()); + for (UInt64 idx : idxs.getData()) + filtered_keys.emplace_back(std::move(all_keys[idx])); + + all_keys = std::move(filtered_keys); + } + } + + for (auto && key : all_keys) + { + ObjectMetadata object_metadata = object_storage->getObjectMetadata(key); + total_size += object_metadata.size_bytes; + blobs_with_metadata->emplace_back(RelativePathWithMetadata{key, object_metadata}); + if (outer_blobs) + outer_blobs->emplace_back(blobs_with_metadata->back()); + } + } + else + { + const String key_prefix = blob_path_with_globs->substr(0, blob_path_with_globs->find_first_of("*?{")); + + /// We don't have to list bucket, because there is no asterisks. + if (key_prefix.size() == blob_path_with_globs->size()) + { + ObjectMetadata object_metadata = object_storage->getObjectMetadata(*blob_path_with_globs); + blobs_with_metadata->emplace_back(*blob_path_with_globs, object_metadata); + if (outer_blobs) + outer_blobs->emplace_back(blobs_with_metadata->back()); + return; + } + + object_storage_iterator = object_storage->iterate(key_prefix); + + matcher = std::make_unique(makeRegexpPatternFromGlobs(*blob_path_with_globs)); + + if (!matcher->ok()) + throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, + "Cannot compile regex from glob ({}): {}", *blob_path_with_globs, matcher->error()); + + recursive = *blob_path_with_globs == "/**" ? true : false; + } + +} + +RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() +{ + if (is_finished) + return {}; + + if (keys) + { + size_t current_index = index.fetch_add(1, std::memory_order_relaxed); + if (current_index >= blobs_with_metadata->size()) + { + is_finished = true; + return {}; + } + + return (*blobs_with_metadata)[current_index]; + } + else + { + bool need_new_batch = false; + { + std::lock_guard lock(next_mutex); + need_new_batch = !blobs_with_metadata || index >= blobs_with_metadata->size(); + } + + if (need_new_batch) + { + RelativePathsWithMetadata new_batch; + while (new_batch.empty()) + { + if (object_storage_iterator->isValid()) + { + new_batch = object_storage_iterator->currentBatch(); + object_storage_iterator->nextBatch(); + } + else + { + is_finished = true; + return {}; + } + + for (auto it = new_batch.begin(); it != new_batch.end();) + { + if (!recursive && !re2::RE2::FullMatch(it->relative_path, *matcher)) + it = new_batch.erase(it); + else + ++it; + } + } + + index.store(0, std::memory_order_relaxed); + if (!is_initialized) + { + createFilterAST(new_batch.front().relative_path); + is_initialized = true; + } + + if (filter_ast) + { + auto block = virtual_header.cloneEmpty(); + for (size_t i = 0; i < new_batch.size(); ++i) + addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i); + + VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); + const auto & idxs = typeid_cast(*block.getByName("_idx").column); + + std::lock_guard lock(next_mutex); + blob_path_with_globs.reset(); + blob_path_with_globs.emplace(); + for (UInt64 idx : idxs.getData()) + { + total_size.fetch_add(new_batch[idx].metadata.size_bytes, std::memory_order_relaxed); + blobs_with_metadata->emplace_back(std::move(new_batch[idx])); + if (outer_blobs) + outer_blobs->emplace_back(blobs_with_metadata->back()); + } + } + else + { + if (outer_blobs) + outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end()); + + std::lock_guard lock(next_mutex); + blobs_with_metadata = std::move(new_batch); + for (const auto & [_, info] : *blobs_with_metadata) + total_size.fetch_add(info.size_bytes, std::memory_order_relaxed); + } + } + + size_t current_index = index.fetch_add(1, std::memory_order_relaxed); + + std::lock_guard lock(next_mutex); + return (*blobs_with_metadata)[current_index]; + } +} + +size_t StorageAzureBlobSource::Iterator::getTotalSize() const +{ + return total_size.load(std::memory_order_relaxed); +} + + +void StorageAzureBlobSource::Iterator::createFilterAST(const String & any_key) +{ + if (!query || !virtual_header) + return; + + /// Create a virtual block with one row to construct filter + /// Append "idx" column as the filter result + virtual_header.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); + + auto block = virtual_header.cloneEmpty(); + addPathToVirtualColumns(block, fs::path(container) / any_key, 0); + VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast); +} + + +Chunk StorageAzureBlobSource::generate() +{ + while (true) + { + if (isCancelled() || !reader) + { + if (reader) + reader->cancel(); + break; + } + + Chunk chunk; + if (reader->pull(chunk)) + { + UInt64 num_rows = chunk.getNumRows(); + + const auto & file_path = reader.getPath(); + size_t total_size = file_iterator->getTotalSize(); + if (num_rows && total_size) + { + updateRowsProgressApprox( + *this, chunk, total_size, total_rows_approx_accumulated, total_rows_count_times, total_rows_approx_max); + } + + for (const auto & virtual_column : requested_virtual_columns) + { + if (virtual_column.name == "_path") + { + chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst()); + } + else if (virtual_column.name == "_file") + { + size_t last_slash_pos = file_path.find_last_of('/'); + auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1)); + chunk.addColumn(column->convertToFullColumnIfConst()); + } + } + + return chunk; + } + + + assert(reader_future.valid()); + reader = reader_future.get(); + + if (!reader) + break; + + /// Even if task is finished the thread may be not freed in pool. + /// So wait until it will be freed before scheduling a new task. + create_reader_pool.wait(); + reader_future = createReaderAsync(); + } + + return {}; +} + +Block StorageAzureBlobSource::getHeader(Block sample_block, const std::vector & requested_virtual_columns) +{ + for (const auto & virtual_column : requested_virtual_columns) + sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name}); + + return sample_block; +} + +StorageAzureBlobSource::StorageAzureBlobSource( + const std::vector & requested_virtual_columns_, + const String & format_, + String name_, + const Block & sample_block_, + ContextPtr context_, + std::optional format_settings_, + const ColumnsDescription & columns_, + UInt64 max_block_size_, + String compression_hint_, + AzureObjectStorage * object_storage_, + const String & container_, + std::shared_ptr file_iterator_) + :ISource(getHeader(sample_block_, requested_virtual_columns_)) + , WithContext(context_) + , requested_virtual_columns(requested_virtual_columns_) + , format(format_) + , name(std::move(name_)) + , sample_block(sample_block_) + , format_settings(format_settings_) + , columns_desc(columns_) + , max_block_size(max_block_size_) + , compression_hint(compression_hint_) + , object_storage(std::move(object_storage_)) + , container(container_) + , file_iterator(file_iterator_) + , create_reader_pool(CurrentMetrics::ObjectStorageAzureThreads, CurrentMetrics::ObjectStorageAzureThreadsActive, 1) + , create_reader_scheduler(threadPoolCallbackRunner(create_reader_pool, "AzureReader")) +{ + reader = createReader(); + if (reader) + reader_future = createReaderAsync(); +} + + +StorageAzureBlobSource::~StorageAzureBlobSource() +{ + create_reader_pool.wait(); +} + +String StorageAzureBlobSource::getName() const +{ + return name; +} + +StorageAzureBlobSource::ReaderHolder StorageAzureBlobSource::createReader() +{ + auto [current_key, info] = file_iterator->next(); + if (current_key.empty()) + return {}; + + size_t object_size = info.size_bytes != 0 ? info.size_bytes : object_storage->getObjectMetadata(current_key).size_bytes; + auto compression_method = chooseCompressionMethod(current_key, compression_hint); + + auto read_buf = createAzureReadBuffer(current_key, object_size); + auto input_format = FormatFactory::instance().getInput( + format, *read_buf, sample_block, getContext(), max_block_size, + format_settings, std::nullopt, std::nullopt, + /* is_remote_fs */ true, compression_method); + + QueryPipelineBuilder builder; + builder.init(Pipe(input_format)); + + if (columns_desc.hasDefaults()) + { + builder.addSimpleTransform( + [&](const Block & header) + { return std::make_shared(header, columns_desc, *input_format, getContext()); }); + } + + auto pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); + auto current_reader = std::make_unique(*pipeline); + + return ReaderHolder{fs::path(container) / current_key, std::move(read_buf), std::move(pipeline), std::move(current_reader)}; +} + +std::future StorageAzureBlobSource::createReaderAsync() +{ + return create_reader_scheduler([this] { return createReader(); }, Priority{}); +} + +std::unique_ptr StorageAzureBlobSource::createAzureReadBuffer(const String & key, size_t object_size) +{ + auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size); + read_settings.enable_filesystem_cache = false; + auto download_buffer_size = getContext()->getSettings().max_download_buffer_size; + const bool object_too_small = object_size <= 2 * download_buffer_size; + + // Create a read buffer that will prefetch the first ~1 MB of the file. + // When reading lots of tiny files, this prefetching almost doubles the throughput. + // For bigger files, parallel reading is more useful. + if (object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) + { + LOG_TRACE(log, "Downloading object of size {} from Azure with initial prefetch", object_size); + return createAsyncAzureReadBuffer(key, read_settings, object_size); + } + + return object_storage->readObject(StoredObject(key), read_settings, {}, object_size); +} + +ColumnsDescription StorageAzureBlob::getTableStructureFromData( + AzureObjectStorage * object_storage, + const Configuration & configuration, + const std::optional & format_settings, + ContextPtr ctx) +{ + RelativePathsWithMetadata read_keys; + std::shared_ptr file_iterator; + if (configuration.withGlobs()) + { + file_iterator = std::make_shared( + object_storage, configuration.container, std::nullopt, + configuration.blob_path, nullptr, Block{}, ctx, &read_keys); + } + else + { + file_iterator = std::make_shared( + object_storage, configuration.container, configuration.blobs_paths, + std::nullopt, nullptr, Block{}, ctx, &read_keys); + } + + std::optional columns_from_cache; + size_t prev_read_keys_size = read_keys.size(); + if (ctx->getSettingsRef().schema_inference_use_cache_for_azure) + columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), configuration, format_settings, ctx); + + ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr + { + auto [key, metadata] = file_iterator->next(); + + if (key.empty()) + { + if (first) + throw Exception( + ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, + "Cannot extract table structure from {} format file, because there are no files with provided path " + "in AzureBlobStorage. You must specify table structure manually", configuration.format); + + return nullptr; + } + + /// S3 file iterator could get new keys after new iteration, check them in schema cache. + if (ctx->getSettingsRef().schema_inference_use_cache_for_azure && read_keys.size() > prev_read_keys_size) + { + columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, ctx); + prev_read_keys_size = read_keys.size(); + if (columns_from_cache) + { + cached_columns = *columns_from_cache; + return nullptr; + } + } + + first = false; + int zstd_window_log_max = static_cast(ctx->getSettingsRef().zstd_window_log_max); + return wrapReadBufferWithCompressionMethod( + object_storage->readObject(StoredObject(key), ctx->getReadSettings(), {}, metadata.size_bytes), + chooseCompressionMethod(key, configuration.compression_method), + zstd_window_log_max); + }; + + ColumnsDescription columns; + if (columns_from_cache) + columns = *columns_from_cache; + else + columns = readSchemaFromFormat(configuration.format, format_settings, read_buffer_iterator, configuration.withGlobs(), ctx); + + if (ctx->getSettingsRef().schema_inference_use_cache_for_azure) + addColumnsToCache(read_keys, columns, configuration, format_settings, configuration.format, ctx); + + return columns; + +} + +std::optional StorageAzureBlob::tryGetColumnsFromCache( + const RelativePathsWithMetadata::const_iterator & begin, + const RelativePathsWithMetadata::const_iterator & end, + const StorageAzureBlob::Configuration & configuration, + const std::optional & format_settings, + const ContextPtr & ctx) +{ + auto & schema_cache = getSchemaCache(ctx); + for (auto it = begin; it < end; ++it) + { + auto get_last_mod_time = [&] -> time_t + { + return it->metadata.last_modified->epochTime(); + }; + + auto host_and_bucket = configuration.connection_url + '/' + configuration.container; + String source = host_and_bucket + '/' + it->relative_path; + auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, ctx); + auto columns = schema_cache.tryGet(cache_key, get_last_mod_time); + if (columns) + return columns; + } + + return std::nullopt; + +} + +void StorageAzureBlob::addColumnsToCache( + const RelativePathsWithMetadata & keys, + const ColumnsDescription & columns, + const StorageAzureBlob::Configuration & configuration, + const std::optional & format_settings, + const String & format_name, + const ContextPtr & ctx) +{ + auto host_and_bucket = configuration.connection_url + '/' + configuration.container; + Strings sources; + sources.reserve(keys.size()); + std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket + '/' + elem.relative_path; }); + auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx); + auto & schema_cache = getSchemaCache(ctx); + schema_cache.addMany(cache_keys, columns); +} + +SchemaCache & StorageAzureBlob::getSchemaCache(const ContextPtr & ctx) +{ + static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_azure", DEFAULT_SCHEMA_CACHE_ELEMENTS)); + return schema_cache; +} + + +std::unique_ptr StorageAzureBlobSource::createAsyncAzureReadBuffer( + const String & key, const ReadSettings & read_settings, size_t object_size) +{ + auto modified_settings{read_settings}; + modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size; + auto async_reader = object_storage->readObjects(StoredObjects{StoredObject{key, object_size}}, modified_settings); + + async_reader->setReadUntilEnd(); + if (read_settings.remote_fs_prefetch) + async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY); + + return async_reader; +} + +} + +#endif diff --git a/src/Storages/StorageAzureBlob.h b/src/Storages/StorageAzureBlob.h new file mode 100644 index 000000000000..e2001fa24aec --- /dev/null +++ b/src/Storages/StorageAzureBlob.h @@ -0,0 +1,292 @@ +#pragma once + +#include "config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class StorageAzureBlob : public IStorage +{ +public: + + using AzureClient = Azure::Storage::Blobs::BlobContainerClient; + using AzureClientPtr = std::unique_ptr; + + struct Configuration : public StatelessTableEngineConfiguration + { + Configuration() = default; + + String getPath() const { return blob_path; } + + bool update(ContextPtr context); + + void connect(ContextPtr context); + + bool withGlobs() const { return blob_path.find_first_of("*?{") != std::string::npos; } + + bool withWildcard() const + { + static const String PARTITION_ID_WILDCARD = "{_partition_id}"; + return blobs_paths.back().find(PARTITION_ID_WILDCARD) != String::npos; + } + + Poco::URI getConnectionURL() const; + + std::string connection_url; + bool is_connection_string; + + std::optional account_name; + std::optional account_key; + + std::string container; + std::string blob_path; + std::vector blobs_paths; + }; + + StorageAzureBlob( + const Configuration & configuration_, + std::unique_ptr && object_storage_, + ContextPtr context_, + const StorageID & table_id_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + const String & comment, + std::optional format_settings_, + ASTPtr partition_by_); + + static StorageAzureBlob::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context); + static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration); + + static AzureObjectStorage::SettingsPtr createSettings(ContextPtr local_context); + + static void processNamedCollectionResult(StorageAzureBlob::Configuration & configuration, const NamedCollection & collection); + + String getName() const override + { + return name; + } + + Pipe read( + const Names &, + const StorageSnapshotPtr &, + SelectQueryInfo &, + ContextPtr, + QueryProcessingStage::Enum, + size_t, + size_t) override; + + SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /* metadata_snapshot */, ContextPtr context, bool /*async_insert*/) override; + + void truncate(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override; + + NamesAndTypesList getVirtuals() const override; + + bool supportsPartitionBy() const override; + + bool supportsSubcolumns() const override; + + bool supportsSubsetOfColumns() const override; + + bool prefersLargeBlocks() const override; + + bool parallelizeOutputAfterReading(ContextPtr context) const override; + + static SchemaCache & getSchemaCache(const ContextPtr & ctx); + + static ColumnsDescription getTableStructureFromData( + AzureObjectStorage * object_storage, + const Configuration & configuration, + const std::optional & format_settings, + ContextPtr ctx); + +private: + std::string name; + Configuration configuration; + std::unique_ptr object_storage; + NamesAndTypesList virtual_columns; + Block virtual_block; + + const bool distributed_processing; + std::optional format_settings; + ASTPtr partition_by; + + + static std::optional tryGetColumnsFromCache( + const RelativePathsWithMetadata::const_iterator & begin, + const RelativePathsWithMetadata::const_iterator & end, + const StorageAzureBlob::Configuration & configuration, + const std::optional & format_settings, + const ContextPtr & ctx); + + static void addColumnsToCache( + const RelativePathsWithMetadata & keys, + const ColumnsDescription & columns, + const Configuration & configuration, + const std::optional & format_settings, + const String & format_name, + const ContextPtr & ctx); + + +}; + +class StorageAzureBlobSource : public ISource, WithContext +{ +public: + class Iterator : WithContext + { + public: + Iterator( + AzureObjectStorage * object_storage_, + const std::string & container_, + std::optional keys_, + std::optional blob_path_with_globs_, + ASTPtr query_, + const Block & virtual_header_, + ContextPtr context_, + RelativePathsWithMetadata * outer_blobs_); + + RelativePathWithMetadata next(); + size_t getTotalSize() const; + ~Iterator() = default; + + private: + AzureObjectStorage * object_storage; + std::string container; + std::optional keys; + std::optional blob_path_with_globs; + ASTPtr query; + ASTPtr filter_ast; + Block virtual_header; + + std::atomic index = 0; + std::atomic total_size = 0; + + std::optional blobs_with_metadata; + RelativePathsWithMetadata * outer_blobs; + ObjectStorageIteratorPtr object_storage_iterator; + bool recursive{false}; + + std::unique_ptr matcher; + + void createFilterAST(const String & any_key); + std::atomic is_finished = false; + std::atomic is_initialized = false; + std::mutex next_mutex; + }; + + StorageAzureBlobSource( + const std::vector & requested_virtual_columns_, + const String & format_, + String name_, + const Block & sample_block_, + ContextPtr context_, + std::optional format_settings_, + const ColumnsDescription & columns_, + UInt64 max_block_size_, + String compression_hint_, + AzureObjectStorage * object_storage_, + const String & container_, + std::shared_ptr file_iterator_); + + ~StorageAzureBlobSource() override; + + Chunk generate() override; + + String getName() const override; + + static Block getHeader(Block sample_block, const std::vector & requested_virtual_columns); + +private: + std::vector requested_virtual_columns; + String format; + String name; + Block sample_block; + std::optional format_settings; + ColumnsDescription columns_desc; + UInt64 max_block_size; + String compression_hint; + AzureObjectStorage * object_storage; + String container; + std::shared_ptr file_iterator; + + struct ReaderHolder + { + public: + ReaderHolder( + String path_, + std::unique_ptr read_buf_, + std::unique_ptr pipeline_, + std::unique_ptr reader_) + : path(std::move(path_)) + , read_buf(std::move(read_buf_)) + , pipeline(std::move(pipeline_)) + , reader(std::move(reader_)) + { + } + + ReaderHolder() = default; + ReaderHolder(const ReaderHolder & other) = delete; + ReaderHolder & operator=(const ReaderHolder & other) = delete; + + ReaderHolder(ReaderHolder && other) noexcept + { + *this = std::move(other); + } + + ReaderHolder & operator=(ReaderHolder && other) noexcept + { + /// The order of destruction is important. + /// reader uses pipeline, pipeline uses read_buf. + reader = std::move(other.reader); + pipeline = std::move(other.pipeline); + read_buf = std::move(other.read_buf); + path = std::move(other.path); + return *this; + } + + explicit operator bool() const { return reader != nullptr; } + PullingPipelineExecutor * operator->() { return reader.get(); } + const PullingPipelineExecutor * operator->() const { return reader.get(); } + const String & getPath() const { return path; } + + private: + String path; + std::unique_ptr read_buf; + std::unique_ptr pipeline; + std::unique_ptr reader; + }; + + ReaderHolder reader; + + Poco::Logger * log = &Poco::Logger::get("StorageAzureBlobSource"); + + ThreadPool create_reader_pool; + ThreadPoolCallbackRunner create_reader_scheduler; + std::future reader_future; + + UInt64 total_rows_approx_max = 0; + size_t total_rows_count_times = 0; + UInt64 total_rows_approx_accumulated = 0; + + /// Recreate ReadBuffer and Pipeline for each file. + ReaderHolder createReader(); + std::future createReaderAsync(); + + std::unique_ptr createAzureReadBuffer(const String & key, size_t object_size); + std::unique_ptr createAsyncAzureReadBuffer( + const String & key, const ReadSettings & read_settings, size_t object_size); +}; + +} + +#endif diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index 8be176a53757..5606e6728d4f 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -96,6 +96,10 @@ void registerStorageSQLite(StorageFactory & factory); void registerStorageKeeperMap(StorageFactory & factory); +#if USE_AZURE_BLOB_STORAGE +void registerStorageAzureBlob(StorageFactory & factory); +#endif + void registerStorages() { auto & factory = StorageFactory::instance(); @@ -191,6 +195,10 @@ void registerStorages() #endif registerStorageKeeperMap(factory); + + #if USE_AZURE_BLOB_STORAGE + registerStorageAzureBlob(factory); + #endif } } diff --git a/src/TableFunctions/CMakeLists.txt b/src/TableFunctions/CMakeLists.txt index b1fa61a72ee0..3544c5bf8b48 100644 --- a/src/TableFunctions/CMakeLists.txt +++ b/src/TableFunctions/CMakeLists.txt @@ -17,5 +17,5 @@ add_library(clickhouse_table_functions ${clickhouse_table_functions_sources}) target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms) if (TARGET ch_contrib::hivemetastore) - target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::hivemetastore ch_contrib::hdfs ch_contrib::parquet) + target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::hivemetastore ch_contrib::hdfs ch_contrib::parquet ch_contrib::azure_sdk) endif () diff --git a/src/TableFunctions/ITableFunctionCluster.h b/src/TableFunctions/ITableFunctionCluster.h index ad88d7b54f0e..a8329684ee6a 100644 --- a/src/TableFunctions/ITableFunctionCluster.h +++ b/src/TableFunctions/ITableFunctionCluster.h @@ -2,12 +2,13 @@ #include "config.h" -#include -#include -#include -#include #include +#include +#include #include +#include +#include +#include namespace DB diff --git a/src/TableFunctions/TableFunctionAzureBlobStorage.cpp b/src/TableFunctions/TableFunctionAzureBlobStorage.cpp new file mode 100644 index 000000000000..38d9362894ac --- /dev/null +++ b/src/TableFunctions/TableFunctionAzureBlobStorage.cpp @@ -0,0 +1,255 @@ +#include "config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "registerTableFunctions.h" +#include +#include +#include + +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; +} + +namespace +{ + +bool isConnectionString(const std::string & candidate) +{ + return candidate.starts_with("DefaultEndpointsProtocol"); +} + +} + +StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file) +{ + StorageAzureBlob::Configuration configuration; + + /// Supported signatures: + /// + /// AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]) + /// + + if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context)) + { + StorageAzureBlob::processNamedCollectionResult(configuration, *named_collection); + + configuration.blobs_paths = {configuration.blob_path}; + + if (configuration.format == "auto" && get_format_from_file) + configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true); + + return configuration; + } + + if (engine_args.size() < 3 || engine_args.size() > 8) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage Azure requires 3 to 7 arguments: " + "AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])"); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context); + + std::unordered_map engine_args_to_idx; + + configuration.connection_url = checkAndGetLiteralArgument(engine_args[0], "connection_string/storage_account_url"); + configuration.is_connection_string = isConnectionString(configuration.connection_url); + + configuration.container = checkAndGetLiteralArgument(engine_args[1], "container"); + configuration.blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath"); + + auto is_format_arg = [] (const std::string & s) -> bool + { + return s == "auto" || FormatFactory::instance().getAllFormats().contains(s); + }; + + if (engine_args.size() == 4) + { + auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name/structure"); + if (is_format_arg(fourth_arg)) + { + configuration.format = fourth_arg; + } + else + { + configuration.structure = fourth_arg; + } + } + else if (engine_args.size() == 5) + { + auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); + if (is_format_arg(fourth_arg)) + { + configuration.format = fourth_arg; + configuration.compression_method = checkAndGetLiteralArgument(engine_args[4], "compression"); + } + else + { + configuration.account_name = fourth_arg; + configuration.account_key = checkAndGetLiteralArgument(engine_args[4], "account_key"); + } + } + else if (engine_args.size() == 6) + { + auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); + if (is_format_arg(fourth_arg)) + { + configuration.format = fourth_arg; + configuration.compression_method = checkAndGetLiteralArgument(engine_args[4], "compression"); + configuration.structure = checkAndGetLiteralArgument(engine_args[5], "structure"); + } + else + { + configuration.account_name = fourth_arg; + configuration.account_key = checkAndGetLiteralArgument(engine_args[4], "account_key"); + auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/account_name"); + if (!is_format_arg(sixth_arg)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); + configuration.format = sixth_arg; + } + } + else if (engine_args.size() == 7) + { + auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); + if (is_format_arg(fourth_arg)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format, compression and structure must be last arguments"); + } + else + { + configuration.account_name = fourth_arg; + configuration.account_key = checkAndGetLiteralArgument(engine_args[4], "account_key"); + auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/account_name"); + if (!is_format_arg(sixth_arg)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); + configuration.format = sixth_arg; + configuration.compression_method = checkAndGetLiteralArgument(engine_args[6], "compression"); + } + } + else if (engine_args.size() == 8) + { + + auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); + if (is_format_arg(fourth_arg)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format and compression must be last arguments"); + } + else + { + configuration.account_name = fourth_arg; + configuration.account_key = checkAndGetLiteralArgument(engine_args[4], "account_key"); + auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/account_name"); + if (!is_format_arg(sixth_arg)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); + configuration.format = sixth_arg; + configuration.compression_method = checkAndGetLiteralArgument(engine_args[6], "compression"); + configuration.structure = checkAndGetLiteralArgument(engine_args[7], "structure"); + } + } + + configuration.blobs_paths = {configuration.blob_path}; + + if (configuration.format == "auto" && get_format_from_file) + configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true); + + return configuration; +} + +void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context) +{ + /// Clone ast function, because we can modify its arguments like removing headers. + auto ast_copy = ast_function->clone(); + + ASTs & args_func = ast_function->children; + + if (args_func.size() != 1) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments.", getName()); + + auto & args = args_func.at(0)->children; + + configuration = parseArgumentsImpl(args, context); +} + +ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context) const +{ + if (configuration.structure == "auto") + { + context->checkAccess(getSourceAccessType()); + auto client = StorageAzureBlob::createClient(configuration); + auto settings = StorageAzureBlob::createSettings(context); + + auto object_storage = std::make_unique("AzureBlobStorageTableFunction", std::move(client), std::move(settings)); + return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context); + } + + return parseColumnsListFromString(configuration.structure, context); +} + +bool TableFunctionAzureBlobStorage::supportsReadingSubsetOfColumns() +{ + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format); +} + +StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const +{ + auto client = StorageAzureBlob::createClient(configuration); + auto settings = StorageAzureBlob::createSettings(context); + + ColumnsDescription columns; + if (configuration.structure != "auto") + columns = parseColumnsListFromString(configuration.structure, context); + else if (!structure_hint.empty()) + columns = structure_hint; + + StoragePtr storage = std::make_shared( + configuration, + std::make_unique(table_name, std::move(client), std::move(settings)), + context, + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + String{}, + /// No format_settings for table function Azure + std::nullopt, + nullptr); + + storage->startup(); + + return storage; +} + +void registerTableFunctionAzureBlobStorage(TableFunctionFactory & factory) +{ + factory.registerFunction( + {.documentation + = {.description=R"(The table function can be used to read the data stored on Azure Blob Storage.)", + .examples{{"azure_blob_storage", "SELECT * FROM azure_blob_storage(connection, container, blob_path, format, structure)", ""}}}, + .allow_readonly = false}); +} + +} + +#endif diff --git a/src/TableFunctions/TableFunctionAzureBlobStorage.h b/src/TableFunctions/TableFunctionAzureBlobStorage.h new file mode 100644 index 000000000000..0bb872de3f35 --- /dev/null +++ b/src/TableFunctions/TableFunctionAzureBlobStorage.h @@ -0,0 +1,70 @@ +#pragma once + +#include "config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include +#include + + +namespace DB +{ + +class Context; + +/* AzureBlob(source, [access_key_id, secret_access_key,] [format, structure, compression]) - creates a temporary storage for a file in AzureBlob. + */ +class TableFunctionAzureBlobStorage : public ITableFunction +{ +public: + static constexpr auto name = "azure_blob_storage"; + static constexpr auto signature = "- connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]\n"; + + static size_t getMaxNumberOfArguments() { return 8; } + + String getName() const override + { + return name; + } + + virtual String getSignature() const + { + return signature; + } + + bool hasStaticStructure() const override { return configuration.structure != "auto"; } + + bool needStructureHint() const override { return configuration.structure == "auto"; } + + void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } + + bool supportsReadingSubsetOfColumns() override; + + std::unordered_set getVirtualsToCheckBeforeUsingStructureHint() const override + { + return {"_path", "_file"}; + } + + static StorageAzureBlob::Configuration parseArgumentsImpl(ASTs & args, const ContextPtr & context, bool get_format_from_file = true); + +protected: + + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns) const override; + + const char * getStorageTypeName() const override { return "Azure"; } + + ColumnsDescription getActualTableStructure(ContextPtr context) const override; + void parseArguments(const ASTPtr & ast_function, ContextPtr context) override; + + mutable StorageAzureBlob::Configuration configuration; + ColumnsDescription structure_hint; +}; + +} + +#endif diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index 4f3411df4c59..0499524a9121 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -71,6 +71,12 @@ void registerTableFunctions() registerTableFunctionFormat(factory); registerTableFunctionExplain(factory); + +#if USE_AZURE_BLOB_STORAGE + registerTableFunctionAzureBlobStorage(factory); +#endif + + } } diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index c51522a5e99c..393bc080a3da 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -69,6 +69,10 @@ void registerTableFunctionFormat(TableFunctionFactory & factory); void registerTableFunctionExplain(TableFunctionFactory & factory); +#if USE_AZURE_BLOB_STORAGE +void registerTableFunctionAzureBlobStorage(TableFunctionFactory & factory); +#endif + void registerTableFunctions(); } diff --git a/tests/integration/test_storage_azure_blob_storage/__init__.py b/tests/integration/test_storage_azure_blob_storage/__init__.py new file mode 100644 index 000000000000..e5a0d9b4834e --- /dev/null +++ b/tests/integration/test_storage_azure_blob_storage/__init__.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/tests/integration/test_storage_azure_blob_storage/configs/disable_profilers.xml b/tests/integration/test_storage_azure_blob_storage/configs/disable_profilers.xml new file mode 100644 index 000000000000..a39badbf8ec8 --- /dev/null +++ b/tests/integration/test_storage_azure_blob_storage/configs/disable_profilers.xml @@ -0,0 +1,9 @@ + + + + + 0 + 0 + + + diff --git a/tests/integration/test_storage_azure_blob_storage/configs/named_collections.xml b/tests/integration/test_storage_azure_blob_storage/configs/named_collections.xml new file mode 100644 index 000000000000..e0c18d11940a --- /dev/null +++ b/tests/integration/test_storage_azure_blob_storage/configs/named_collections.xml @@ -0,0 +1,16 @@ + + + + DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1; + cont + test_simple_write_named.csv + key UInt64, data String + CSV + + + http://azurite1:10000/devstoreaccount1 + devstoreaccount1 + Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + + + diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py new file mode 100644 index 000000000000..f0934d3aa800 --- /dev/null +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -0,0 +1,594 @@ +#!/usr/bin/env python3 + +import gzip +import json +import logging +import os +import io +import random +import threading +import time + +from azure.storage.blob import BlobServiceClient +import helpers.client +import pytest +from helpers.cluster import ClickHouseCluster, ClickHouseInstance +from helpers.network import PartitionManager +from helpers.mock_servers import start_mock_servers +from helpers.test_tools import exec_query_with_retry + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node", + main_configs=["configs/named_collections.xml"], + user_configs=["configs/disable_profilers.xml"], + with_azurite=True, + ) + cluster.start() + + yield cluster + finally: + cluster.shutdown() + + +def azure_query(node, query, try_num=10, settings={}): + for i in range(try_num): + try: + return node.query(query, settings=settings) + except Exception as ex: + retriable_errors = [ + "DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response", + "DB::Exception: Azure::Core::Http::TransportException: Connection closed before getting full response or response is less than expected", + ] + retry = False + for error in retriable_errors: + if error in str(ex): + retry = True + print(f"Try num: {i}. Having retriable error: {ex}") + time.sleep(i) + break + if not retry or i == try_num - 1: + raise Exception(ex) + continue + + +def get_azure_file_content(filename): + container_name = "cont" + connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;" + blob_service_client = BlobServiceClient.from_connection_string(connection_string) + container_client = blob_service_client.get_container_client(container_name) + blob_client = container_client.get_blob_client(filename) + download_stream = blob_client.download_blob() + return download_stream.readall().decode("utf-8") + + +def put_azure_file_content(filename, data): + container_name = "cont" + connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;" + blob_service_client = BlobServiceClient.from_connection_string(connection_string) + try: + container_client = blob_service_client.create_container(container_name) + except: + container_client = blob_service_client.get_container_client(container_name) + + blob_client = container_client.get_blob_client(filename) + buf = io.BytesIO(data) + blob_client.upload_blob(buf) + + +def test_create_table_connection_string(cluster): + node = cluster.instances["node"] + azure_query( + node, + "CREATE TABLE test_create_table_conn_string (key UInt64, data String) Engine = AzureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'cont', 'test_create_connection_string', 'CSV')", + ) + + +def test_create_table_account_string(cluster): + node = cluster.instances["node"] + azure_query( + node, + "CREATE TABLE test_create_table_account_url (key UInt64, data String) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_create_connection_string', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')", + ) + + +def test_simple_write_account_string(cluster): + node = cluster.instances["node"] + azure_query( + node, + "CREATE TABLE test_simple_write (key UInt64, data String) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')", + ) + azure_query(node, "INSERT INTO test_simple_write VALUES (1, 'a')") + print(get_azure_file_content("test_simple_write.csv")) + assert get_azure_file_content("test_simple_write.csv") == '1,"a"\n' + + +def test_simple_write_connection_string(cluster): + node = cluster.instances["node"] + azure_query( + node, + "CREATE TABLE test_simple_write_connection_string (key UInt64, data String) Engine = AzureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;', 'cont', 'test_simple_write_c.csv', 'CSV')", + ) + azure_query(node, "INSERT INTO test_simple_write_connection_string VALUES (1, 'a')") + print(get_azure_file_content("test_simple_write_c.csv")) + assert get_azure_file_content("test_simple_write_c.csv") == '1,"a"\n' + + +def test_simple_write_named_collection_1(cluster): + node = cluster.instances["node"] + azure_query( + node, + "CREATE TABLE test_simple_write_named_collection_1 (key UInt64, data String) Engine = AzureBlobStorage(azure_conf1)", + ) + azure_query( + node, "INSERT INTO test_simple_write_named_collection_1 VALUES (1, 'a')" + ) + print(get_azure_file_content("test_simple_write_named.csv")) + assert get_azure_file_content("test_simple_write_named.csv") == '1,"a"\n' + azure_query(node, "TRUNCATE TABLE test_simple_write_named_collection_1") + + +def test_simple_write_named_collection_2(cluster): + node = cluster.instances["node"] + azure_query( + node, + "CREATE TABLE test_simple_write_named_collection_2 (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_simple_write_named_2.csv', format='CSV')", + ) + azure_query( + node, "INSERT INTO test_simple_write_named_collection_2 VALUES (1, 'a')" + ) + print(get_azure_file_content("test_simple_write_named_2.csv")) + assert get_azure_file_content("test_simple_write_named_2.csv") == '1,"a"\n' + + +def test_partition_by(cluster): + node = cluster.instances["node"] + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + partition_by = "column3" + values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)" + filename = "test_{_partition_id}.csv" + + azure_query( + node, + f"CREATE TABLE test_partitioned_write ({table_format}) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') PARTITION BY {partition_by}", + ) + azure_query(node, f"INSERT INTO test_partitioned_write VALUES {values}") + + assert "1,2,3\n" == get_azure_file_content("test_3.csv") + assert "3,2,1\n" == get_azure_file_content("test_1.csv") + assert "78,43,45\n" == get_azure_file_content("test_45.csv") + + +def test_partition_by_string_column(cluster): + node = cluster.instances["node"] + table_format = "col_num UInt32, col_str String" + partition_by = "col_str" + values = "(1, 'foo/bar'), (3, 'йцук'), (78, '你好')" + filename = "test_{_partition_id}.csv" + azure_query( + node, + f"CREATE TABLE test_partitioned_string_write ({table_format}) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') PARTITION BY {partition_by}", + ) + azure_query(node, f"INSERT INTO test_partitioned_string_write VALUES {values}") + + assert '1,"foo/bar"\n' == get_azure_file_content("test_foo/bar.csv") + assert '3,"йцук"\n' == get_azure_file_content("test_йцук.csv") + assert '78,"你好"\n' == get_azure_file_content("test_你好.csv") + + +def test_partition_by_const_column(cluster): + node = cluster.instances["node"] + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)" + partition_by = "'88'" + values_csv = "1,2,3\n3,2,1\n78,43,45\n" + filename = "test_{_partition_id}.csv" + azure_query( + node, + f"CREATE TABLE test_partitioned_const_write ({table_format}) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') PARTITION BY {partition_by}", + ) + azure_query(node, f"INSERT INTO test_partitioned_const_write VALUES {values}") + assert values_csv == get_azure_file_content("test_88.csv") + + +def test_truncate(cluster): + node = cluster.instances["node"] + azure_query( + node, + "CREATE TABLE test_truncate (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_truncate.csv', format='CSV')", + ) + azure_query(node, "INSERT INTO test_truncate VALUES (1, 'a')") + assert get_azure_file_content("test_truncate.csv") == '1,"a"\n' + azure_query(node, "TRUNCATE TABLE test_truncate") + with pytest.raises(Exception): + print(get_azure_file_content("test_truncate.csv")) + + +def test_simple_read_write(cluster): + node = cluster.instances["node"] + azure_query( + node, + "CREATE TABLE test_simple_read_write (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_simple_read_write.csv', format='CSV')", + ) + + azure_query(node, "INSERT INTO test_simple_read_write VALUES (1, 'a')") + assert get_azure_file_content("test_simple_read_write.csv") == '1,"a"\n' + print(azure_query(node, "SELECT * FROM test_simple_read_write")) + assert azure_query(node, "SELECT * FROM test_simple_read_write") == "1\ta\n" + + +def test_create_new_files_on_insert(cluster): + node = cluster.instances["node"] + + azure_query( + node, + f"create table test_multiple_inserts(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_parquet', format='Parquet')", + ) + azure_query(node, "truncate table test_multiple_inserts") + azure_query( + node, + f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings azure_truncate_on_insert=1", + ) + azure_query( + node, + f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings azure_create_new_file_on_insert=1", + ) + azure_query( + node, + f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings azure_create_new_file_on_insert=1", + ) + + result = azure_query(node, f"select count() from test_multiple_inserts") + assert int(result) == 60 + + azure_query(node, f"drop table test_multiple_inserts") + + +def test_overwrite(cluster): + node = cluster.instances["node"] + + azure_query( + node, + f"create table test_overwrite(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_parquet_overwrite', format='Parquet')", + ) + azure_query(node, "truncate table test_overwrite") + + azure_query( + node, + f"insert into test_overwrite select number, randomString(100) from numbers(50) settings azure_truncate_on_insert=1", + ) + node.query_and_get_error( + f"insert into test_overwrite select number, randomString(100) from numbers(100)" + ) + azure_query( + node, + f"insert into test_overwrite select number, randomString(100) from numbers(200) settings azure_truncate_on_insert=1", + ) + + result = azure_query(node, f"select count() from test_overwrite") + assert int(result) == 200 + + +def test_insert_with_path_with_globs(cluster): + node = cluster.instances["node"] + azure_query( + node, + f"create table test_insert_globs(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_insert_with_globs*', format='Parquet')", + ) + node.query_and_get_error( + f"insert into table function test_insert_globs SELECT number, randomString(100) FROM numbers(500)" + ) + + +def test_put_get_with_globs(cluster): + # type: (ClickHouseCluster) -> None + unique_prefix = random.randint(1, 10000) + node = cluster.instances["node"] # type: ClickHouseInstance + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + max_path = "" + for i in range(10): + for j in range(10): + path = "{}/{}_{}/{}.csv".format( + unique_prefix, i, random.choice(["a", "b", "c", "d"]), j + ) + max_path = max(path, max_path) + values = f"({i},{j},{i + j})" + + azure_query( + node, + f"CREATE TABLE test_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')", + ) + + query = f"insert into test_{i}_{j} VALUES {values}" + azure_query(node, query) + + azure_query( + node, + f"CREATE TABLE test_glob_select ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV')", + ) + query = "select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from test_glob_select" + assert azure_query(node, query).splitlines() == [ + "450\t450\t900\t0.csv\t{bucket}/{max_path}".format( + bucket="cont", max_path=max_path + ) + ] + + +def test_azure_glob_scheherazade(cluster): + node = cluster.instances["node"] # type: ClickHouseInstance + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + values = "(1, 1, 1)" + nights_per_job = 1001 // 30 + jobs = [] + for night in range(0, 1001, nights_per_job): + + def add_tales(start, end): + for i in range(start, end): + path = "night_{}/tale.csv".format(i) + unique_num = random.randint(1, 10000) + azure_query( + node, + f"CREATE TABLE test_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')", + ) + query = f"insert into test_{i}_{unique_num} VALUES {values}" + azure_query(node, query) + + jobs.append( + threading.Thread( + target=add_tales, args=(night, min(night + nights_per_job, 1001)) + ) + ) + jobs[-1].start() + + for job in jobs: + job.join() + + azure_query( + node, + f"CREATE TABLE test_glob_select_scheherazade ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='night_*/tale.csv', format='CSV')", + ) + query = "select count(), sum(column1), sum(column2), sum(column3) from test_glob_select_scheherazade" + assert azure_query(node, query).splitlines() == ["1001\t1001\t1001\t1001"] + + +@pytest.mark.parametrize( + "extension,method", + [pytest.param("bin", "gzip", id="bin"), pytest.param("gz", "auto", id="gz")], +) +def test_storage_azure_get_gzip(cluster, extension, method): + node = cluster.instances["node"] + filename = f"test_get_gzip.{extension}" + name = f"test_get_gzip_{extension}" + data = [ + "Sophia Intrieri,55", + "Jack Taylor,71", + "Christopher Silva,66", + "Clifton Purser,35", + "Richard Aceuedo,43", + "Lisa Hensley,31", + "Alice Wehrley,1", + "Mary Farmer,47", + "Samara Ramirez,19", + "Shirley Lloyd,51", + "Santos Cowger,0", + "Richard Mundt,88", + "Jerry Gonzalez,15", + "Angela James,10", + "Norman Ortega,33", + "", + ] + azure_query(node, f"DROP TABLE IF EXISTS {name}") + + buf = io.BytesIO() + compressed = gzip.GzipFile(fileobj=buf, mode="wb") + compressed.write(("\n".join(data)).encode()) + compressed.close() + put_azure_file_content(filename, buf.getvalue()) + + azure_query( + node, + f"""CREATE TABLE {name} (name String, id UInt32) ENGINE = AzureBlobStorage( + azure_conf2, container='cont', blob_path ='{filename}', + format='CSV', + compression='{method}')""", + ) + + assert azure_query(node, f"SELECT sum(id) FROM {name}").splitlines() == ["565"] + azure_query(node, f"DROP TABLE {name}") + + +def test_schema_inference_no_globs(cluster): + node = cluster.instances["node"] # type: ClickHouseInstance + table_format = "column1 UInt32, column2 String, column3 UInt32" + azure_query( + node, + f"CREATE TABLE test_schema_inference_src ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs.csv', format='CSVWithNames')", + ) + + query = f"insert into test_schema_inference_src SELECT number, toString(number), number * number FROM numbers(1000)" + azure_query(node, query) + + azure_query( + node, + f"CREATE TABLE test_select_inference Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs.csv')", + ) + + print(node.query("SHOW CREATE TABLE test_select_inference")) + + query = "select sum(column1), sum(length(column2)), sum(column3), min(_file), max(_path) from test_select_inference" + assert azure_query(node, query).splitlines() == [ + "499500\t2890\t332833500\ttest_schema_inference_no_globs.csv\tcont/test_schema_inference_no_globs.csv" + ] + + +def test_schema_inference_from_globs(cluster): + node = cluster.instances["node"] + unique_prefix = random.randint(1, 10000) + node = cluster.instances["node"] # type: ClickHouseInstance + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + max_path = "" + for i in range(10): + for j in range(10): + path = "{}/{}_{}/{}.csv".format( + unique_prefix, i, random.choice(["a", "b", "c", "d"]), j + ) + max_path = max(path, max_path) + values = f"({i},{j},{i + j})" + + azure_query( + node, + f"CREATE TABLE test_schema_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSVWithNames')", + ) + + query = f"insert into test_schema_{i}_{j} VALUES {values}" + azure_query(node, query) + + azure_query( + node, + f"CREATE TABLE test_glob_select_inference Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv')", + ) + + print(node.query("SHOW CREATE TABLE test_glob_select_inference")) + + query = "select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from test_glob_select_inference" + assert azure_query(node, query).splitlines() == [ + "450\t450\t900\t0.csv\t{bucket}/{max_path}".format( + bucket="cont", max_path=max_path + ) + ] + + +def test_simple_write_account_string_table_function(cluster): + node = cluster.instances["node"] + azure_query( + node, + "INSERT INTO TABLE FUNCTION azure_blob_storage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write_tf.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', 'key UInt64, data String') VALUES (1, 'a')", + ) + print(get_azure_file_content("test_simple_write_tf.csv")) + assert get_azure_file_content("test_simple_write_tf.csv") == '1,"a"\n' + + +def test_simple_write_connection_string_table_function(cluster): + node = cluster.instances["node"] + azure_query( + node, + "INSERT INTO TABLE FUNCTION azure_blob_storage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;', 'cont', 'test_simple_write_connection_tf.csv', 'CSV', 'auto', 'key UInt64, data String') VALUES (1, 'a')", + ) + print(get_azure_file_content("test_simple_write_connection_tf.csv")) + assert get_azure_file_content("test_simple_write_connection_tf.csv") == '1,"a"\n' + + +def test_simple_write_named_collection_1_table_function(cluster): + node = cluster.instances["node"] + azure_query( + node, + "INSERT INTO TABLE FUNCTION azure_blob_storage(azure_conf1) VALUES (1, 'a')", + ) + print(get_azure_file_content("test_simple_write_named.csv")) + assert get_azure_file_content("test_simple_write_named.csv") == '1,"a"\n' + + azure_query( + node, + "CREATE TABLE drop_table (key UInt64, data String) Engine = AzureBlobStorage(azure_conf1)", + ) + + azure_query( + node, + "TRUNCATE TABLE drop_table", + ) + + +def test_simple_write_named_collection_2_table_function(cluster): + node = cluster.instances["node"] + + azure_query( + node, + "INSERT INTO TABLE FUNCTION azure_blob_storage(azure_conf2, container='cont', blob_path='test_simple_write_named_2_tf.csv', format='CSV', structure='key UInt64, data String') VALUES (1, 'a')", + ) + print(get_azure_file_content("test_simple_write_named_2_tf.csv")) + assert get_azure_file_content("test_simple_write_named_2_tf.csv") == '1,"a"\n' + + +def test_put_get_with_globs_tf(cluster): + # type: (ClickHouseCluster) -> None + unique_prefix = random.randint(1, 10000) + node = cluster.instances["node"] # type: ClickHouseInstance + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + max_path = "" + for i in range(10): + for j in range(10): + path = "{}/{}_{}/{}.csv".format( + unique_prefix, i, random.choice(["a", "b", "c", "d"]), j + ) + max_path = max(path, max_path) + values = f"({i},{j},{i + j})" + + azure_query( + node, + f"INSERT INTO TABLE FUNCTION azure_blob_storage(azure_conf2, container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}", + ) + query = f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azure_blob_storage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV', structure='{table_format}')" + assert azure_query(node, query).splitlines() == [ + "450\t450\t900\t0.csv\t{bucket}/{max_path}".format( + bucket="cont", max_path=max_path + ) + ] + + +def test_schema_inference_no_globs_tf(cluster): + node = cluster.instances["node"] # type: ClickHouseInstance + table_format = "column1 UInt32, column2 String, column3 UInt32" + + query = f"insert into table function azure_blob_storage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs_tf.csv', format='CSVWithNames', structure='{table_format}') SELECT number, toString(number), number * number FROM numbers(1000)" + azure_query(node, query) + + query = "select sum(column1), sum(length(column2)), sum(column3), min(_file), max(_path) from azure_blob_storage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs_tf.csv')" + assert azure_query(node, query).splitlines() == [ + "499500\t2890\t332833500\ttest_schema_inference_no_globs_tf.csv\tcont/test_schema_inference_no_globs_tf.csv" + ] + + +def test_schema_inference_from_globs_tf(cluster): + node = cluster.instances["node"] + unique_prefix = random.randint(1, 10000) + node = cluster.instances["node"] # type: ClickHouseInstance + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + max_path = "" + for i in range(10): + for j in range(10): + path = "{}/{}_{}/{}.csv".format( + unique_prefix, i, random.choice(["a", "b", "c", "d"]), j + ) + max_path = max(path, max_path) + values = f"({i},{j},{i + j})" + + query = f"insert into table function azure_blob_storage(azure_conf2, container='cont', blob_path='{path}', format='CSVWithNames', structure='{table_format}') VALUES {values}" + azure_query(node, query) + + query = f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azure_blob_storage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv')" + assert azure_query(node, query).splitlines() == [ + "450\t450\t900\t0.csv\t{bucket}/{max_path}".format( + bucket="cont", max_path=max_path + ) + ] + + +def test_partition_by_tf(cluster): + node = cluster.instances["node"] + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + partition_by = "column3" + values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)" + filename = "test_tf_{_partition_id}.csv" + + azure_query( + node, + f"INSERT INTO TABLE FUNCTION azure_blob_storage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}", + ) + + assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv") + assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv") + assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv") diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index ec245d8b9e02..5d30da5d2ea7 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -158,6 +158,7 @@ JDBC [] GLOBAL SOURCES HDFS [] GLOBAL SOURCES S3 [] GLOBAL SOURCES HIVE [] GLOBAL SOURCES +AZURE [] GLOBAL SOURCES SOURCES [] \N ALL CLUSTER [] GLOBAL ALL ALL ['ALL PRIVILEGES'] \N \N diff --git a/tests/queries/0_stateless/02117_show_create_table_system.reference b/tests/queries/0_stateless/02117_show_create_table_system.reference index 09cc62dac003..e864ba850188 100644 --- a/tests/queries/0_stateless/02117_show_create_table_system.reference +++ b/tests/queries/0_stateless/02117_show_create_table_system.reference @@ -297,7 +297,7 @@ CREATE TABLE system.grants ( `user_name` Nullable(String), `role_name` Nullable(String), - `access_type` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE ARBITRARY TEMPORARY TABLE' = 53, 'CREATE FUNCTION' = 54, 'CREATE NAMED COLLECTION' = 55, 'CREATE' = 56, 'DROP DATABASE' = 57, 'DROP TABLE' = 58, 'DROP VIEW' = 59, 'DROP DICTIONARY' = 60, 'DROP FUNCTION' = 61, 'DROP NAMED COLLECTION' = 62, 'DROP' = 63, 'UNDROP TABLE' = 64, 'TRUNCATE' = 65, 'OPTIMIZE' = 66, 'BACKUP' = 67, 'KILL QUERY' = 68, 'KILL TRANSACTION' = 69, 'MOVE PARTITION BETWEEN SHARDS' = 70, 'CREATE USER' = 71, 'ALTER USER' = 72, 'DROP USER' = 73, 'CREATE ROLE' = 74, 'ALTER ROLE' = 75, 'DROP ROLE' = 76, 'ROLE ADMIN' = 77, 'CREATE ROW POLICY' = 78, 'ALTER ROW POLICY' = 79, 'DROP ROW POLICY' = 80, 'CREATE QUOTA' = 81, 'ALTER QUOTA' = 82, 'DROP QUOTA' = 83, 'CREATE SETTINGS PROFILE' = 84, 'ALTER SETTINGS PROFILE' = 85, 'DROP SETTINGS PROFILE' = 86, 'SHOW USERS' = 87, 'SHOW ROLES' = 88, 'SHOW ROW POLICIES' = 89, 'SHOW QUOTAS' = 90, 'SHOW SETTINGS PROFILES' = 91, 'SHOW ACCESS' = 92, 'ACCESS MANAGEMENT' = 93, 'SHOW NAMED COLLECTIONS' = 94, 'SHOW NAMED COLLECTIONS SECRETS' = 95, 'NAMED COLLECTION CONTROL' = 96, 'SYSTEM SHUTDOWN' = 97, 'SYSTEM DROP DNS CACHE' = 98, 'SYSTEM DROP MARK CACHE' = 99, 'SYSTEM DROP UNCOMPRESSED CACHE' = 100, 'SYSTEM DROP MMAP CACHE' = 101, 'SYSTEM DROP QUERY CACHE' = 102, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 103, 'SYSTEM DROP FILESYSTEM CACHE' = 104, 'SYSTEM DROP SCHEMA CACHE' = 105, 'SYSTEM DROP S3 CLIENT CACHE' = 106, 'SYSTEM DROP CACHE' = 107, 'SYSTEM RELOAD CONFIG' = 108, 'SYSTEM RELOAD USERS' = 109, 'SYSTEM RELOAD SYMBOLS' = 110, 'SYSTEM RELOAD DICTIONARY' = 111, 'SYSTEM RELOAD MODEL' = 112, 'SYSTEM RELOAD FUNCTION' = 113, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 114, 'SYSTEM RELOAD' = 115, 'SYSTEM RESTART DISK' = 116, 'SYSTEM MERGES' = 117, 'SYSTEM TTL MERGES' = 118, 'SYSTEM FETCHES' = 119, 'SYSTEM MOVES' = 120, 'SYSTEM DISTRIBUTED SENDS' = 121, 'SYSTEM REPLICATED SENDS' = 122, 'SYSTEM SENDS' = 123, 'SYSTEM REPLICATION QUEUES' = 124, 'SYSTEM DROP REPLICA' = 125, 'SYSTEM SYNC REPLICA' = 126, 'SYSTEM RESTART REPLICA' = 127, 'SYSTEM RESTORE REPLICA' = 128, 'SYSTEM WAIT LOADING PARTS' = 129, 'SYSTEM SYNC DATABASE REPLICA' = 130, 'SYSTEM SYNC TRANSACTION LOG' = 131, 'SYSTEM SYNC FILE CACHE' = 132, 'SYSTEM FLUSH DISTRIBUTED' = 133, 'SYSTEM FLUSH LOGS' = 134, 'SYSTEM FLUSH' = 135, 'SYSTEM THREAD FUZZER' = 136, 'SYSTEM UNFREEZE' = 137, 'SYSTEM FAILPOINT' = 138, 'SYSTEM' = 139, 'dictGet' = 140, 'displaySecretsInShowAndSelect' = 141, 'addressToLine' = 142, 'addressToLineWithInlines' = 143, 'addressToSymbol' = 144, 'demangle' = 145, 'INTROSPECTION' = 146, 'FILE' = 147, 'URL' = 148, 'REMOTE' = 149, 'MONGO' = 150, 'MEILISEARCH' = 151, 'MYSQL' = 152, 'POSTGRES' = 153, 'SQLITE' = 154, 'ODBC' = 155, 'JDBC' = 156, 'HDFS' = 157, 'S3' = 158, 'HIVE' = 159, 'SOURCES' = 160, 'CLUSTER' = 161, 'ALL' = 162, 'NONE' = 163), + `access_type` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE ARBITRARY TEMPORARY TABLE' = 53, 'CREATE FUNCTION' = 54, 'CREATE NAMED COLLECTION' = 55, 'CREATE' = 56, 'DROP DATABASE' = 57, 'DROP TABLE' = 58, 'DROP VIEW' = 59, 'DROP DICTIONARY' = 60, 'DROP FUNCTION' = 61, 'DROP NAMED COLLECTION' = 62, 'DROP' = 63, 'UNDROP TABLE' = 64, 'TRUNCATE' = 65, 'OPTIMIZE' = 66, 'BACKUP' = 67, 'KILL QUERY' = 68, 'KILL TRANSACTION' = 69, 'MOVE PARTITION BETWEEN SHARDS' = 70, 'CREATE USER' = 71, 'ALTER USER' = 72, 'DROP USER' = 73, 'CREATE ROLE' = 74, 'ALTER ROLE' = 75, 'DROP ROLE' = 76, 'ROLE ADMIN' = 77, 'CREATE ROW POLICY' = 78, 'ALTER ROW POLICY' = 79, 'DROP ROW POLICY' = 80, 'CREATE QUOTA' = 81, 'ALTER QUOTA' = 82, 'DROP QUOTA' = 83, 'CREATE SETTINGS PROFILE' = 84, 'ALTER SETTINGS PROFILE' = 85, 'DROP SETTINGS PROFILE' = 86, 'SHOW USERS' = 87, 'SHOW ROLES' = 88, 'SHOW ROW POLICIES' = 89, 'SHOW QUOTAS' = 90, 'SHOW SETTINGS PROFILES' = 91, 'SHOW ACCESS' = 92, 'ACCESS MANAGEMENT' = 93, 'SHOW NAMED COLLECTIONS' = 94, 'SHOW NAMED COLLECTIONS SECRETS' = 95, 'NAMED COLLECTION CONTROL' = 96, 'SYSTEM SHUTDOWN' = 97, 'SYSTEM DROP DNS CACHE' = 98, 'SYSTEM DROP MARK CACHE' = 99, 'SYSTEM DROP UNCOMPRESSED CACHE' = 100, 'SYSTEM DROP MMAP CACHE' = 101, 'SYSTEM DROP QUERY CACHE' = 102, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 103, 'SYSTEM DROP FILESYSTEM CACHE' = 104, 'SYSTEM DROP SCHEMA CACHE' = 105, 'SYSTEM DROP S3 CLIENT CACHE' = 106, 'SYSTEM DROP CACHE' = 107, 'SYSTEM RELOAD CONFIG' = 108, 'SYSTEM RELOAD USERS' = 109, 'SYSTEM RELOAD SYMBOLS' = 110, 'SYSTEM RELOAD DICTIONARY' = 111, 'SYSTEM RELOAD MODEL' = 112, 'SYSTEM RELOAD FUNCTION' = 113, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 114, 'SYSTEM RELOAD' = 115, 'SYSTEM RESTART DISK' = 116, 'SYSTEM MERGES' = 117, 'SYSTEM TTL MERGES' = 118, 'SYSTEM FETCHES' = 119, 'SYSTEM MOVES' = 120, 'SYSTEM DISTRIBUTED SENDS' = 121, 'SYSTEM REPLICATED SENDS' = 122, 'SYSTEM SENDS' = 123, 'SYSTEM REPLICATION QUEUES' = 124, 'SYSTEM DROP REPLICA' = 125, 'SYSTEM SYNC REPLICA' = 126, 'SYSTEM RESTART REPLICA' = 127, 'SYSTEM RESTORE REPLICA' = 128, 'SYSTEM WAIT LOADING PARTS' = 129, 'SYSTEM SYNC DATABASE REPLICA' = 130, 'SYSTEM SYNC TRANSACTION LOG' = 131, 'SYSTEM SYNC FILE CACHE' = 132, 'SYSTEM FLUSH DISTRIBUTED' = 133, 'SYSTEM FLUSH LOGS' = 134, 'SYSTEM FLUSH' = 135, 'SYSTEM THREAD FUZZER' = 136, 'SYSTEM UNFREEZE' = 137, 'SYSTEM FAILPOINT' = 138, 'SYSTEM' = 139, 'dictGet' = 140, 'displaySecretsInShowAndSelect' = 141, 'addressToLine' = 142, 'addressToLineWithInlines' = 143, 'addressToSymbol' = 144, 'demangle' = 145, 'INTROSPECTION' = 146, 'FILE' = 147, 'URL' = 148, 'REMOTE' = 149, 'MONGO' = 150, 'MEILISEARCH' = 151, 'MYSQL' = 152, 'POSTGRES' = 153, 'SQLITE' = 154, 'ODBC' = 155, 'JDBC' = 156, 'HDFS' = 157, 'S3' = 158, 'HIVE' = 159, 'AZURE' = 160, 'SOURCES' = 161, 'CLUSTER' = 162, 'ALL' = 163, 'NONE' = 164), `database` Nullable(String), `table` Nullable(String), `column` Nullable(String), @@ -581,10 +581,10 @@ ENGINE = SystemPartsColumns COMMENT 'SYSTEM TABLE is built on the fly.' CREATE TABLE system.privileges ( - `privilege` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE ARBITRARY TEMPORARY TABLE' = 53, 'CREATE FUNCTION' = 54, 'CREATE NAMED COLLECTION' = 55, 'CREATE' = 56, 'DROP DATABASE' = 57, 'DROP TABLE' = 58, 'DROP VIEW' = 59, 'DROP DICTIONARY' = 60, 'DROP FUNCTION' = 61, 'DROP NAMED COLLECTION' = 62, 'DROP' = 63, 'UNDROP TABLE' = 64, 'TRUNCATE' = 65, 'OPTIMIZE' = 66, 'BACKUP' = 67, 'KILL QUERY' = 68, 'KILL TRANSACTION' = 69, 'MOVE PARTITION BETWEEN SHARDS' = 70, 'CREATE USER' = 71, 'ALTER USER' = 72, 'DROP USER' = 73, 'CREATE ROLE' = 74, 'ALTER ROLE' = 75, 'DROP ROLE' = 76, 'ROLE ADMIN' = 77, 'CREATE ROW POLICY' = 78, 'ALTER ROW POLICY' = 79, 'DROP ROW POLICY' = 80, 'CREATE QUOTA' = 81, 'ALTER QUOTA' = 82, 'DROP QUOTA' = 83, 'CREATE SETTINGS PROFILE' = 84, 'ALTER SETTINGS PROFILE' = 85, 'DROP SETTINGS PROFILE' = 86, 'SHOW USERS' = 87, 'SHOW ROLES' = 88, 'SHOW ROW POLICIES' = 89, 'SHOW QUOTAS' = 90, 'SHOW SETTINGS PROFILES' = 91, 'SHOW ACCESS' = 92, 'ACCESS MANAGEMENT' = 93, 'SHOW NAMED COLLECTIONS' = 94, 'SHOW NAMED COLLECTIONS SECRETS' = 95, 'NAMED COLLECTION CONTROL' = 96, 'SYSTEM SHUTDOWN' = 97, 'SYSTEM DROP DNS CACHE' = 98, 'SYSTEM DROP MARK CACHE' = 99, 'SYSTEM DROP UNCOMPRESSED CACHE' = 100, 'SYSTEM DROP MMAP CACHE' = 101, 'SYSTEM DROP QUERY CACHE' = 102, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 103, 'SYSTEM DROP FILESYSTEM CACHE' = 104, 'SYSTEM DROP SCHEMA CACHE' = 105, 'SYSTEM DROP S3 CLIENT CACHE' = 106, 'SYSTEM DROP CACHE' = 107, 'SYSTEM RELOAD CONFIG' = 108, 'SYSTEM RELOAD USERS' = 109, 'SYSTEM RELOAD SYMBOLS' = 110, 'SYSTEM RELOAD DICTIONARY' = 111, 'SYSTEM RELOAD MODEL' = 112, 'SYSTEM RELOAD FUNCTION' = 113, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 114, 'SYSTEM RELOAD' = 115, 'SYSTEM RESTART DISK' = 116, 'SYSTEM MERGES' = 117, 'SYSTEM TTL MERGES' = 118, 'SYSTEM FETCHES' = 119, 'SYSTEM MOVES' = 120, 'SYSTEM DISTRIBUTED SENDS' = 121, 'SYSTEM REPLICATED SENDS' = 122, 'SYSTEM SENDS' = 123, 'SYSTEM REPLICATION QUEUES' = 124, 'SYSTEM DROP REPLICA' = 125, 'SYSTEM SYNC REPLICA' = 126, 'SYSTEM RESTART REPLICA' = 127, 'SYSTEM RESTORE REPLICA' = 128, 'SYSTEM WAIT LOADING PARTS' = 129, 'SYSTEM SYNC DATABASE REPLICA' = 130, 'SYSTEM SYNC TRANSACTION LOG' = 131, 'SYSTEM SYNC FILE CACHE' = 132, 'SYSTEM FLUSH DISTRIBUTED' = 133, 'SYSTEM FLUSH LOGS' = 134, 'SYSTEM FLUSH' = 135, 'SYSTEM THREAD FUZZER' = 136, 'SYSTEM UNFREEZE' = 137, 'SYSTEM FAILPOINT' = 138, 'SYSTEM' = 139, 'dictGet' = 140, 'displaySecretsInShowAndSelect' = 141, 'addressToLine' = 142, 'addressToLineWithInlines' = 143, 'addressToSymbol' = 144, 'demangle' = 145, 'INTROSPECTION' = 146, 'FILE' = 147, 'URL' = 148, 'REMOTE' = 149, 'MONGO' = 150, 'MEILISEARCH' = 151, 'MYSQL' = 152, 'POSTGRES' = 153, 'SQLITE' = 154, 'ODBC' = 155, 'JDBC' = 156, 'HDFS' = 157, 'S3' = 158, 'HIVE' = 159, 'SOURCES' = 160, 'CLUSTER' = 161, 'ALL' = 162, 'NONE' = 163), + `privilege` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE ARBITRARY TEMPORARY TABLE' = 53, 'CREATE FUNCTION' = 54, 'CREATE NAMED COLLECTION' = 55, 'CREATE' = 56, 'DROP DATABASE' = 57, 'DROP TABLE' = 58, 'DROP VIEW' = 59, 'DROP DICTIONARY' = 60, 'DROP FUNCTION' = 61, 'DROP NAMED COLLECTION' = 62, 'DROP' = 63, 'UNDROP TABLE' = 64, 'TRUNCATE' = 65, 'OPTIMIZE' = 66, 'BACKUP' = 67, 'KILL QUERY' = 68, 'KILL TRANSACTION' = 69, 'MOVE PARTITION BETWEEN SHARDS' = 70, 'CREATE USER' = 71, 'ALTER USER' = 72, 'DROP USER' = 73, 'CREATE ROLE' = 74, 'ALTER ROLE' = 75, 'DROP ROLE' = 76, 'ROLE ADMIN' = 77, 'CREATE ROW POLICY' = 78, 'ALTER ROW POLICY' = 79, 'DROP ROW POLICY' = 80, 'CREATE QUOTA' = 81, 'ALTER QUOTA' = 82, 'DROP QUOTA' = 83, 'CREATE SETTINGS PROFILE' = 84, 'ALTER SETTINGS PROFILE' = 85, 'DROP SETTINGS PROFILE' = 86, 'SHOW USERS' = 87, 'SHOW ROLES' = 88, 'SHOW ROW POLICIES' = 89, 'SHOW QUOTAS' = 90, 'SHOW SETTINGS PROFILES' = 91, 'SHOW ACCESS' = 92, 'ACCESS MANAGEMENT' = 93, 'SHOW NAMED COLLECTIONS' = 94, 'SHOW NAMED COLLECTIONS SECRETS' = 95, 'NAMED COLLECTION CONTROL' = 96, 'SYSTEM SHUTDOWN' = 97, 'SYSTEM DROP DNS CACHE' = 98, 'SYSTEM DROP MARK CACHE' = 99, 'SYSTEM DROP UNCOMPRESSED CACHE' = 100, 'SYSTEM DROP MMAP CACHE' = 101, 'SYSTEM DROP QUERY CACHE' = 102, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 103, 'SYSTEM DROP FILESYSTEM CACHE' = 104, 'SYSTEM DROP SCHEMA CACHE' = 105, 'SYSTEM DROP S3 CLIENT CACHE' = 106, 'SYSTEM DROP CACHE' = 107, 'SYSTEM RELOAD CONFIG' = 108, 'SYSTEM RELOAD USERS' = 109, 'SYSTEM RELOAD SYMBOLS' = 110, 'SYSTEM RELOAD DICTIONARY' = 111, 'SYSTEM RELOAD MODEL' = 112, 'SYSTEM RELOAD FUNCTION' = 113, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 114, 'SYSTEM RELOAD' = 115, 'SYSTEM RESTART DISK' = 116, 'SYSTEM MERGES' = 117, 'SYSTEM TTL MERGES' = 118, 'SYSTEM FETCHES' = 119, 'SYSTEM MOVES' = 120, 'SYSTEM DISTRIBUTED SENDS' = 121, 'SYSTEM REPLICATED SENDS' = 122, 'SYSTEM SENDS' = 123, 'SYSTEM REPLICATION QUEUES' = 124, 'SYSTEM DROP REPLICA' = 125, 'SYSTEM SYNC REPLICA' = 126, 'SYSTEM RESTART REPLICA' = 127, 'SYSTEM RESTORE REPLICA' = 128, 'SYSTEM WAIT LOADING PARTS' = 129, 'SYSTEM SYNC DATABASE REPLICA' = 130, 'SYSTEM SYNC TRANSACTION LOG' = 131, 'SYSTEM SYNC FILE CACHE' = 132, 'SYSTEM FLUSH DISTRIBUTED' = 133, 'SYSTEM FLUSH LOGS' = 134, 'SYSTEM FLUSH' = 135, 'SYSTEM THREAD FUZZER' = 136, 'SYSTEM UNFREEZE' = 137, 'SYSTEM FAILPOINT' = 138, 'SYSTEM' = 139, 'dictGet' = 140, 'displaySecretsInShowAndSelect' = 141, 'addressToLine' = 142, 'addressToLineWithInlines' = 143, 'addressToSymbol' = 144, 'demangle' = 145, 'INTROSPECTION' = 146, 'FILE' = 147, 'URL' = 148, 'REMOTE' = 149, 'MONGO' = 150, 'MEILISEARCH' = 151, 'MYSQL' = 152, 'POSTGRES' = 153, 'SQLITE' = 154, 'ODBC' = 155, 'JDBC' = 156, 'HDFS' = 157, 'S3' = 158, 'HIVE' = 159, 'AZURE' = 160, 'SOURCES' = 161, 'CLUSTER' = 162, 'ALL' = 163, 'NONE' = 164), `aliases` Array(String), `level` Nullable(Enum8('GLOBAL' = 0, 'DATABASE' = 1, 'TABLE' = 2, 'DICTIONARY' = 3, 'VIEW' = 4, 'COLUMN' = 5, 'NAMED_COLLECTION' = 6)), - `parent_group` Nullable(Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE ARBITRARY TEMPORARY TABLE' = 53, 'CREATE FUNCTION' = 54, 'CREATE NAMED COLLECTION' = 55, 'CREATE' = 56, 'DROP DATABASE' = 57, 'DROP TABLE' = 58, 'DROP VIEW' = 59, 'DROP DICTIONARY' = 60, 'DROP FUNCTION' = 61, 'DROP NAMED COLLECTION' = 62, 'DROP' = 63, 'UNDROP TABLE' = 64, 'TRUNCATE' = 65, 'OPTIMIZE' = 66, 'BACKUP' = 67, 'KILL QUERY' = 68, 'KILL TRANSACTION' = 69, 'MOVE PARTITION BETWEEN SHARDS' = 70, 'CREATE USER' = 71, 'ALTER USER' = 72, 'DROP USER' = 73, 'CREATE ROLE' = 74, 'ALTER ROLE' = 75, 'DROP ROLE' = 76, 'ROLE ADMIN' = 77, 'CREATE ROW POLICY' = 78, 'ALTER ROW POLICY' = 79, 'DROP ROW POLICY' = 80, 'CREATE QUOTA' = 81, 'ALTER QUOTA' = 82, 'DROP QUOTA' = 83, 'CREATE SETTINGS PROFILE' = 84, 'ALTER SETTINGS PROFILE' = 85, 'DROP SETTINGS PROFILE' = 86, 'SHOW USERS' = 87, 'SHOW ROLES' = 88, 'SHOW ROW POLICIES' = 89, 'SHOW QUOTAS' = 90, 'SHOW SETTINGS PROFILES' = 91, 'SHOW ACCESS' = 92, 'ACCESS MANAGEMENT' = 93, 'SHOW NAMED COLLECTIONS' = 94, 'SHOW NAMED COLLECTIONS SECRETS' = 95, 'NAMED COLLECTION CONTROL' = 96, 'SYSTEM SHUTDOWN' = 97, 'SYSTEM DROP DNS CACHE' = 98, 'SYSTEM DROP MARK CACHE' = 99, 'SYSTEM DROP UNCOMPRESSED CACHE' = 100, 'SYSTEM DROP MMAP CACHE' = 101, 'SYSTEM DROP QUERY CACHE' = 102, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 103, 'SYSTEM DROP FILESYSTEM CACHE' = 104, 'SYSTEM DROP SCHEMA CACHE' = 105, 'SYSTEM DROP S3 CLIENT CACHE' = 106, 'SYSTEM DROP CACHE' = 107, 'SYSTEM RELOAD CONFIG' = 108, 'SYSTEM RELOAD USERS' = 109, 'SYSTEM RELOAD SYMBOLS' = 110, 'SYSTEM RELOAD DICTIONARY' = 111, 'SYSTEM RELOAD MODEL' = 112, 'SYSTEM RELOAD FUNCTION' = 113, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 114, 'SYSTEM RELOAD' = 115, 'SYSTEM RESTART DISK' = 116, 'SYSTEM MERGES' = 117, 'SYSTEM TTL MERGES' = 118, 'SYSTEM FETCHES' = 119, 'SYSTEM MOVES' = 120, 'SYSTEM DISTRIBUTED SENDS' = 121, 'SYSTEM REPLICATED SENDS' = 122, 'SYSTEM SENDS' = 123, 'SYSTEM REPLICATION QUEUES' = 124, 'SYSTEM DROP REPLICA' = 125, 'SYSTEM SYNC REPLICA' = 126, 'SYSTEM RESTART REPLICA' = 127, 'SYSTEM RESTORE REPLICA' = 128, 'SYSTEM WAIT LOADING PARTS' = 129, 'SYSTEM SYNC DATABASE REPLICA' = 130, 'SYSTEM SYNC TRANSACTION LOG' = 131, 'SYSTEM SYNC FILE CACHE' = 132, 'SYSTEM FLUSH DISTRIBUTED' = 133, 'SYSTEM FLUSH LOGS' = 134, 'SYSTEM FLUSH' = 135, 'SYSTEM THREAD FUZZER' = 136, 'SYSTEM UNFREEZE' = 137, 'SYSTEM FAILPOINT' = 138, 'SYSTEM' = 139, 'dictGet' = 140, 'displaySecretsInShowAndSelect' = 141, 'addressToLine' = 142, 'addressToLineWithInlines' = 143, 'addressToSymbol' = 144, 'demangle' = 145, 'INTROSPECTION' = 146, 'FILE' = 147, 'URL' = 148, 'REMOTE' = 149, 'MONGO' = 150, 'MEILISEARCH' = 151, 'MYSQL' = 152, 'POSTGRES' = 153, 'SQLITE' = 154, 'ODBC' = 155, 'JDBC' = 156, 'HDFS' = 157, 'S3' = 158, 'HIVE' = 159, 'SOURCES' = 160, 'CLUSTER' = 161, 'ALL' = 162, 'NONE' = 163)) + `parent_group` Nullable(Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE ARBITRARY TEMPORARY TABLE' = 53, 'CREATE FUNCTION' = 54, 'CREATE NAMED COLLECTION' = 55, 'CREATE' = 56, 'DROP DATABASE' = 57, 'DROP TABLE' = 58, 'DROP VIEW' = 59, 'DROP DICTIONARY' = 60, 'DROP FUNCTION' = 61, 'DROP NAMED COLLECTION' = 62, 'DROP' = 63, 'UNDROP TABLE' = 64, 'TRUNCATE' = 65, 'OPTIMIZE' = 66, 'BACKUP' = 67, 'KILL QUERY' = 68, 'KILL TRANSACTION' = 69, 'MOVE PARTITION BETWEEN SHARDS' = 70, 'CREATE USER' = 71, 'ALTER USER' = 72, 'DROP USER' = 73, 'CREATE ROLE' = 74, 'ALTER ROLE' = 75, 'DROP ROLE' = 76, 'ROLE ADMIN' = 77, 'CREATE ROW POLICY' = 78, 'ALTER ROW POLICY' = 79, 'DROP ROW POLICY' = 80, 'CREATE QUOTA' = 81, 'ALTER QUOTA' = 82, 'DROP QUOTA' = 83, 'CREATE SETTINGS PROFILE' = 84, 'ALTER SETTINGS PROFILE' = 85, 'DROP SETTINGS PROFILE' = 86, 'SHOW USERS' = 87, 'SHOW ROLES' = 88, 'SHOW ROW POLICIES' = 89, 'SHOW QUOTAS' = 90, 'SHOW SETTINGS PROFILES' = 91, 'SHOW ACCESS' = 92, 'ACCESS MANAGEMENT' = 93, 'SHOW NAMED COLLECTIONS' = 94, 'SHOW NAMED COLLECTIONS SECRETS' = 95, 'NAMED COLLECTION CONTROL' = 96, 'SYSTEM SHUTDOWN' = 97, 'SYSTEM DROP DNS CACHE' = 98, 'SYSTEM DROP MARK CACHE' = 99, 'SYSTEM DROP UNCOMPRESSED CACHE' = 100, 'SYSTEM DROP MMAP CACHE' = 101, 'SYSTEM DROP QUERY CACHE' = 102, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 103, 'SYSTEM DROP FILESYSTEM CACHE' = 104, 'SYSTEM DROP SCHEMA CACHE' = 105, 'SYSTEM DROP S3 CLIENT CACHE' = 106, 'SYSTEM DROP CACHE' = 107, 'SYSTEM RELOAD CONFIG' = 108, 'SYSTEM RELOAD USERS' = 109, 'SYSTEM RELOAD SYMBOLS' = 110, 'SYSTEM RELOAD DICTIONARY' = 111, 'SYSTEM RELOAD MODEL' = 112, 'SYSTEM RELOAD FUNCTION' = 113, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 114, 'SYSTEM RELOAD' = 115, 'SYSTEM RESTART DISK' = 116, 'SYSTEM MERGES' = 117, 'SYSTEM TTL MERGES' = 118, 'SYSTEM FETCHES' = 119, 'SYSTEM MOVES' = 120, 'SYSTEM DISTRIBUTED SENDS' = 121, 'SYSTEM REPLICATED SENDS' = 122, 'SYSTEM SENDS' = 123, 'SYSTEM REPLICATION QUEUES' = 124, 'SYSTEM DROP REPLICA' = 125, 'SYSTEM SYNC REPLICA' = 126, 'SYSTEM RESTART REPLICA' = 127, 'SYSTEM RESTORE REPLICA' = 128, 'SYSTEM WAIT LOADING PARTS' = 129, 'SYSTEM SYNC DATABASE REPLICA' = 130, 'SYSTEM SYNC TRANSACTION LOG' = 131, 'SYSTEM SYNC FILE CACHE' = 132, 'SYSTEM FLUSH DISTRIBUTED' = 133, 'SYSTEM FLUSH LOGS' = 134, 'SYSTEM FLUSH' = 135, 'SYSTEM THREAD FUZZER' = 136, 'SYSTEM UNFREEZE' = 137, 'SYSTEM FAILPOINT' = 138, 'SYSTEM' = 139, 'dictGet' = 140, 'displaySecretsInShowAndSelect' = 141, 'addressToLine' = 142, 'addressToLineWithInlines' = 143, 'addressToSymbol' = 144, 'demangle' = 145, 'INTROSPECTION' = 146, 'FILE' = 147, 'URL' = 148, 'REMOTE' = 149, 'MONGO' = 150, 'MEILISEARCH' = 151, 'MYSQL' = 152, 'POSTGRES' = 153, 'SQLITE' = 154, 'ODBC' = 155, 'JDBC' = 156, 'HDFS' = 157, 'S3' = 158, 'HIVE' = 159, 'AZURE' = 160, 'SOURCES' = 161, 'CLUSTER' = 162, 'ALL' = 163, 'NONE' = 164)) ) ENGINE = SystemPrivileges COMMENT 'SYSTEM TABLE is built on the fly.'