Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure table function #50604

Merged
merged 62 commits into from Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
8e4a2a4
Some code
alesapin Jun 2, 2023
a0df856
Able to insert
alesapin Jun 3, 2023
0292313
Merge branch 'master' into azure
alesapin Jun 4, 2023
8d8d062
Add integration test
alesapin Jun 4, 2023
c9c9e1e
Merge branch 'master' into azure
alesapin Jun 5, 2023
3bd23f1
Merge branch 'master' into azure
alesapin Jun 5, 2023
2866bac
Add named collections and remove host filter support
alesapin Jun 5, 2023
bc8ee56
Support settings, test truncate
alesapin Jun 5, 2023
93e9752
Merge branch 'azure' into azure_table_function
SmitaRKulkarni Jun 5, 2023
dedb906
WIP : Azure Table Function, added read and StorageAzureSource
SmitaRKulkarni Jun 5, 2023
aa6f4e4
Fixed COLUMN_NOT_FOUND in block issue
SmitaRKulkarni Jun 5, 2023
bd6b0ff
Updated to read only 1st object
SmitaRKulkarni Jun 5, 2023
9342370
Merge remote-tracking branch 'origin/azure_table_function' into azure
alesapin Jun 5, 2023
b22e664
Implement blob-read
alesapin Jun 5, 2023
cf9936a
Automatic style fix
robot-clickhouse Jun 5, 2023
6144519
Fixed build by adding StorageAzureSource constructor and getHeader fu…
SmitaRKulkarni Jun 6, 2023
1506545
Fix merge conflicts
alesapin Jun 6, 2023
8028184
Fix read
alesapin Jun 6, 2023
3d99abe
Remove async reads
alesapin Jun 6, 2023
d497562
Copy and paste
alesapin Jun 6, 2023
18decb0
Automatic style fix
robot-clickhouse Jun 6, 2023
a1f3bd9
Fix reads
alesapin Jun 6, 2023
6caf808
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 6, 2023
e054fbc
Automatic style fix
robot-clickhouse Jun 6, 2023
88f2f4f
Added createAsyncAzureReadBuffer
SmitaRKulkarni Jun 6, 2023
75d0f9f
Updated to use readObjects for async
SmitaRKulkarni Jun 6, 2023
cbe4ea6
Removed unwanted code & debug lines
SmitaRKulkarni Jun 6, 2023
ebae79f
Schema inference
alesapin Jun 6, 2023
14470b4
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 6, 2023
3bda231
Automatic style fix
robot-clickhouse Jun 6, 2023
c910f00
Some code for table function
alesapin Jun 6, 2023
e76a702
Add some tests
alesapin Jun 6, 2023
a6185da
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 6, 2023
ae97f45
Automatic style fix
robot-clickhouse Jun 6, 2023
6a96cf4
Renamed to azure_blob_storage
SmitaRKulkarni Jun 6, 2023
b2db6b4
Renamed to azure_blob_storage
SmitaRKulkarni Jun 6, 2023
7100bc5
Fixes for azure table function
alesapin Jun 6, 2023
454c23f
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 6, 2023
d902592
Fix new tests
alesapin Jun 6, 2023
934df5e
Rename to AzureBlobStorage
alesapin Jun 6, 2023
95b054b
Automatic style fix
robot-clickhouse Jun 6, 2023
49b019b
Refactored TableFunction name to TableFunctionAzureBlobStorage
SmitaRKulkarni Jun 6, 2023
ceab511
Fxi style
alesapin Jun 6, 2023
8eaa32e
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 6, 2023
5637858
Fix the most important check in the world
alesapin Jun 6, 2023
99f0be8
Refactored to StorageAzureBlob
SmitaRKulkarni Jun 6, 2023
6ab2a50
Fix two tests and build
alesapin Jun 6, 2023
5d52c41
Merge branch 'master' into azure_table_function
alesapin Jun 7, 2023
71ae54f
Fix args
alesapin Jun 7, 2023
a67dd6e
Readuntilend
alesapin Jun 7, 2023
cf65ac4
Fix iterator
alesapin Jun 7, 2023
b78e330
Better test
alesapin Jun 7, 2023
8d67296
Automatic style fix
robot-clickhouse Jun 7, 2023
7d5b98f
Remove logging add sleeps
alesapin Jun 7, 2023
278543c
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 7, 2023
d35573a
Removed logs and small name fixes
SmitaRKulkarni Jun 8, 2023
a918f88
Fixes
alesapin Jun 8, 2023
caabbfd
Fix one more race
alesapin Jun 8, 2023
79edd06
Merge remote-tracking branch 'origin/master' into azure_table_function
alesapin Jun 8, 2023
116df09
Fix build
alesapin Jun 8, 2023
96d9b88
Fix build
alesapin Jun 9, 2023
7ac4349
Hacking azure function
alesapin Jun 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/azure-cmake/CMakeLists.txt
@@ -1,6 +1,6 @@
option (ENABLE_AZURE_BLOB_STORAGE "Enable Azure blob storage" ${ENABLE_LIBRARIES})

if (NOT ENABLE_AZURE_BLOB_STORAGE OR BUILD_STANDALONE_KEEPER OR OS_FREEBSD)
if (NOT ENABLE_AZURE_BLOB_STORAGE OR BUILD_STANDALONE_KEEPER OR OS_FREEBSD OR ARCH_PPC64LE)
message(STATUS "Not using Azure blob storage")
return()
endif()
Expand Down
11 changes: 11 additions & 0 deletions docs/en/sql-reference/table-functions/azure_blob_storage.md
@@ -0,0 +1,11 @@
---
slug: /en/sql-reference/table-functions/azure_blob_storage
sidebar_position: 45
sidebar_label: azure_blob_storage
keywords: [azure blob storage]
---

# azure\_blob\_storage Table Function

Provides a table-like interface to select/insert files in [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs). This table function is similar to the [s3 function](../../sql-reference/table-functions/s3.md).

1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Expand Up @@ -210,6 +210,7 @@ enum class AccessType
M(HDFS, "", GLOBAL, SOURCES) \
M(S3, "", GLOBAL, SOURCES) \
M(HIVE, "", GLOBAL, SOURCES) \
M(AZURE, "", GLOBAL, SOURCES) \
M(SOURCES, "", GROUP, ALL) \
\
M(CLUSTER, "", GLOBAL, ALL) /* ON CLUSTER queries */ \
Expand Down
3 changes: 3 additions & 0 deletions src/Common/ProfileEvents.cpp
Expand Up @@ -348,6 +348,9 @@ The server successfully detected this situation and will download merged part fr
M(S3PutObject, "Number of S3 API PutObject calls.") \
M(S3GetObject, "Number of S3 API GetObject calls.") \
\
M(AzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.") \
M(AzureListObjects, "Number of Azure blob storage API ListObjects calls.") \
\
M(DiskS3DeleteObjects, "Number of DiskS3 API DeleteObject(s) calls.") \
M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \
M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \
Expand Down
6 changes: 6 additions & 0 deletions src/Core/Settings.h
Expand Up @@ -81,7 +81,9 @@ class IColumn;
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, azure_max_single_read_retries, 4, "The maximum number of retries during single Azure blob storage read.", 0) \
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
Expand All @@ -90,8 +92,11 @@ class IColumn;
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(Bool, azure_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in azure engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
Expand Down Expand Up @@ -708,6 +713,7 @@ class IColumn;
\
M(Bool, schema_inference_use_cache_for_file, true, "Use cache in schema inference while using file table function", 0) \
M(Bool, schema_inference_use_cache_for_s3, true, "Use cache in schema inference while using s3 table function", 0) \
M(Bool, schema_inference_use_cache_for_azure, true, "Use cache in schema inference while using azure table function", 0) \
M(Bool, schema_inference_use_cache_for_hdfs, true, "Use cache in schema inference while using hdfs table function", 0) \
M(Bool, schema_inference_use_cache_for_url, true, "Use cache in schema inference while using url table function", 0) \
M(Bool, schema_inference_cache_require_modification_time_for_url, true, "Use schema from cache for URL with last modification time validation (for urls with Last-Modified header)", 0) \
Expand Down
76 changes: 72 additions & 4 deletions src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp
Expand Up @@ -35,6 +35,7 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
size_t max_single_read_retries_,
size_t max_single_download_retries_,
bool use_external_buffer_,
bool restricted_seek_,
size_t read_until_position_)
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size, nullptr, 0)
, blob_container_client(blob_container_client_)
Expand All @@ -44,6 +45,7 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
, read_settings(read_settings_)
, tmp_buffer_size(read_settings.remote_fs_buffer_size)
, use_external_buffer(use_external_buffer_)
, restricted_seek(restricted_seek_)
, read_until_position(read_until_position_)
{
if (!use_external_buffer)
Expand All @@ -54,6 +56,22 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
}
}


void ReadBufferFromAzureBlobStorage::setReadUntilEnd()
{
if (read_until_position)
{
read_until_position = 0;
if (initialized)
{
offset = getPosition();
resetWorkingBuffer();
initialized = false;
}
}

}

void ReadBufferFromAzureBlobStorage::setReadUntilPosition(size_t position)
{
read_until_position = position;
Expand Down Expand Up @@ -118,17 +136,54 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()

off_t ReadBufferFromAzureBlobStorage::seek(off_t offset_, int whence)
{
if (initialized)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Seek is allowed only before first read attempt from the buffer.");
if (offset_ == getPosition() && whence == SEEK_SET)
return offset_;

if (initialized && restricted_seek)
{
throw Exception(
ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
"Seek is allowed only before first read attempt from the buffer (current offset: "
"{}, new offset: {}, reading until position: {}, available: {})",
getPosition(), offset_, read_until_position, available());
}

if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed.");

if (offset_ < 0)
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset_);

offset = offset_;
if (!restricted_seek)
{
if (!working_buffer.empty()
&& static_cast<size_t>(offset_) >= offset - working_buffer.size()
&& offset_ < offset)
{
pos = working_buffer.end() - (offset - offset_);
assert(pos >= working_buffer.begin());
assert(pos < working_buffer.end());

return getPosition();
}

off_t position = getPosition();
if (initialized && offset_ > position)
{
size_t diff = offset_ - position;
if (diff < read_settings.remote_read_min_bytes_for_seek)
{
ignore(diff);
return offset_;
}
}

resetWorkingBuffer();
if (initialized)
initialized = false;
}

offset = offset_;
return offset;
}

Expand All @@ -152,7 +207,8 @@ void ReadBufferFromAzureBlobStorage::initialize()

download_options.Range = {static_cast<int64_t>(offset), length};

blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));
if (!blob_client)
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));

size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t i = 0; i < max_single_download_retries; ++i)
Expand Down Expand Up @@ -182,6 +238,18 @@ void ReadBufferFromAzureBlobStorage::initialize()
initialized = true;
}

size_t ReadBufferFromAzureBlobStorage::getFileSize()
{
if (!blob_client)
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path));

if (file_size.has_value())
return *file_size;

file_size = blob_client->GetProperties().Value.BlobSize;
return *file_size;
}

}

#endif
10 changes: 10 additions & 0 deletions src/Disks/IO/ReadBufferFromAzureBlobStorage.h
Expand Up @@ -24,6 +24,7 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase
size_t max_single_read_retries_,
size_t max_single_download_retries_,
bool use_external_buffer_ = false,
bool restricted_seek_ = false,
size_t read_until_position_ = 0);

off_t seek(off_t off, int whence) override;
Expand All @@ -37,9 +38,12 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase
String getFileName() const override { return path; }

void setReadUntilPosition(size_t position) override;
void setReadUntilEnd() override;

bool supportsRightBoundedReads() const override { return true; }

size_t getFileSize() override;

private:

void initialize();
Expand All @@ -55,6 +59,12 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase
std::vector<char> tmp_buffer;
size_t tmp_buffer_size;
bool use_external_buffer;

/// There is different seek policy for disk seek and for non-disk seek
/// (non-disk seek is applied for seekable input formats: orc, arrow, parquet).
bool restricted_seek;


off_t read_until_position = 0;

off_t offset = 0;
Expand Down
17 changes: 11 additions & 6 deletions src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp
Expand Up @@ -57,14 +57,22 @@ void validateContainerName(const String & container_name)

AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
String storage_account_url = config.getString(config_prefix + ".storage_account_url");
validateStorageAccountUrl(storage_account_url);
std::string storage_url;
if (config.has(config_prefix + ".storage_account_url"))
{
storage_url = config.getString(config_prefix + ".storage_account_url");
validateStorageAccountUrl(storage_url);
}
else
{
storage_url = config.getString(config_prefix + ".connection_string");
}
String container_name = config.getString(config_prefix + ".container_name", "default-container");
validateContainerName(container_name);
std::optional<bool> container_already_exists {};
if (config.has(config_prefix + ".container_already_exists"))
container_already_exists = {config.getBool(config_prefix + ".container_already_exists")};
return {storage_account_url, container_name, container_already_exists};
return {storage_url, container_name, container_already_exists};
}


Expand Down Expand Up @@ -136,10 +144,7 @@ std::unique_ptr<BlobContainerClient> getAzureBlobContainerClient(
/// If container_already_exists is not set (in config), ignore already exists error.
/// (Conflict - The specified container already exists)
if (!endpoint.container_already_exists.has_value() && e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
{
tryLogCurrentException("Container already exists, returning the existing container");
return getAzureBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
}
throw;
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp
Expand Up @@ -57,6 +57,7 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync
private:
bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override
{
batch.clear();
auto outcome = client->ListBlobs(options);
auto blob_list_response = client->ListBlobs(options);
auto blobs_list = blob_list_response.Blobs;
Expand All @@ -73,11 +74,11 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync
{}});
}

options.ContinuationToken = blob_list_response.NextPageToken;
if (blob_list_response.HasPage())
return true;
if (!blob_list_response.NextPageToken.HasValue() || blob_list_response.NextPageToken.Value().empty())
return false;

return false;
options.ContinuationToken = blob_list_response.NextPageToken;
return true;
}

std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
Expand Down Expand Up @@ -215,6 +216,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
/* use_external_buffer */true,
/* restricted_seek */true,
read_until_position);
};

Expand Down
15 changes: 7 additions & 8 deletions src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h
Expand Up @@ -8,10 +8,7 @@
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/MultiVersion.h>

#if USE_AZURE_BLOB_STORAGE
#include <azure/storage/blobs.hpp>
#endif

namespace Poco
{
Expand All @@ -37,11 +34,13 @@ struct AzureObjectStorageSettings
{
}

size_t max_single_part_upload_size; /// NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset
uint64_t min_bytes_for_seek;
size_t max_single_read_retries;
size_t max_single_download_retries;
int list_object_keys_size;
AzureObjectStorageSettings() = default;

size_t max_single_part_upload_size = 100 * 1024 * 1024; /// NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset
uint64_t min_bytes_for_seek = 1024 * 1024;
size_t max_single_read_retries = 3;
size_t max_single_download_retries = 3;
int list_object_keys_size = 1000;
};

using AzureClient = Azure::Storage::Blobs::BlobContainerClient;
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/ObjectStorages/ObjectStorageIterator.cpp
Expand Up @@ -9,7 +9,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

RelativePathWithMetadata ObjectStorageIteratorFromList::current() const
RelativePathWithMetadata ObjectStorageIteratorFromList::current()
{
if (!isValid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator");
Expand Down
20 changes: 16 additions & 4 deletions src/Disks/ObjectStorages/ObjectStorageIterator.h
Expand Up @@ -10,8 +10,10 @@ class IObjectStorageIterator
{
public:
virtual void next() = 0;
virtual bool isValid() const = 0;
virtual RelativePathWithMetadata current() const = 0;
virtual void nextBatch() = 0;
virtual bool isValid() = 0;
virtual RelativePathWithMetadata current() = 0;
virtual RelativePathsWithMetadata currentBatch() = 0;
virtual size_t getAccumulatedSize() const = 0;

virtual ~IObjectStorageIterator() = default;
Expand All @@ -34,12 +36,22 @@ class ObjectStorageIteratorFromList : public IObjectStorageIterator
++batch_iterator;
}

bool isValid() const override
void nextBatch() override
{
batch_iterator = batch.end();
}

bool isValid() override
{
return batch_iterator != batch.end();
}

RelativePathWithMetadata current() const override;
RelativePathWithMetadata current() override;

RelativePathsWithMetadata currentBatch() override
{
return batch;
}

size_t getAccumulatedSize() const override
{
Expand Down