Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure table function #50604

Merged
merged 62 commits into from Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
8e4a2a4
Some code
alesapin Jun 2, 2023
a0df856
Able to insert
alesapin Jun 3, 2023
0292313
Merge branch 'master' into azure
alesapin Jun 4, 2023
8d8d062
Add integration test
alesapin Jun 4, 2023
c9c9e1e
Merge branch 'master' into azure
alesapin Jun 5, 2023
3bd23f1
Merge branch 'master' into azure
alesapin Jun 5, 2023
2866bac
Add named collections and remove host filter support
alesapin Jun 5, 2023
bc8ee56
Support settings, test truncate
alesapin Jun 5, 2023
93e9752
Merge branch 'azure' into azure_table_function
SmitaRKulkarni Jun 5, 2023
dedb906
WIP : Azure Table Function, added read and StorageAzureSource
SmitaRKulkarni Jun 5, 2023
aa6f4e4
Fixed COLUMN_NOT_FOUND in block issue
SmitaRKulkarni Jun 5, 2023
bd6b0ff
Updated to read only 1st object
SmitaRKulkarni Jun 5, 2023
9342370
Merge remote-tracking branch 'origin/azure_table_function' into azure
alesapin Jun 5, 2023
b22e664
Implement blob-read
alesapin Jun 5, 2023
cf9936a
Automatic style fix
robot-clickhouse Jun 5, 2023
6144519
Fixed build by adding StorageAzureSource constructor and getHeader fu…
SmitaRKulkarni Jun 6, 2023
1506545
Fix merge conflicts
alesapin Jun 6, 2023
8028184
Fix read
alesapin Jun 6, 2023
3d99abe
Remove async reads
alesapin Jun 6, 2023
d497562
Copy and paste
alesapin Jun 6, 2023
18decb0
Automatic style fix
robot-clickhouse Jun 6, 2023
a1f3bd9
Fix reads
alesapin Jun 6, 2023
6caf808
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 6, 2023
e054fbc
Automatic style fix
robot-clickhouse Jun 6, 2023
88f2f4f
Added createAsyncAzureReadBuffer
SmitaRKulkarni Jun 6, 2023
75d0f9f
Updated to use readObjects for async
SmitaRKulkarni Jun 6, 2023
cbe4ea6
Removed unwanted code & debug lines
SmitaRKulkarni Jun 6, 2023
ebae79f
Schema inference
alesapin Jun 6, 2023
14470b4
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 6, 2023
3bda231
Automatic style fix
robot-clickhouse Jun 6, 2023
c910f00
Some code for table function
alesapin Jun 6, 2023
e76a702
Add some tests
alesapin Jun 6, 2023
a6185da
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 6, 2023
ae97f45
Automatic style fix
robot-clickhouse Jun 6, 2023
6a96cf4
Renamed to azure_blob_storage
SmitaRKulkarni Jun 6, 2023
b2db6b4
Renamed to azure_blob_storage
SmitaRKulkarni Jun 6, 2023
7100bc5
Fixes for azure table function
alesapin Jun 6, 2023
454c23f
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 6, 2023
d902592
Fix new tests
alesapin Jun 6, 2023
934df5e
Rename to AzureBlobStorage
alesapin Jun 6, 2023
95b054b
Automatic style fix
robot-clickhouse Jun 6, 2023
49b019b
Refactored TableFunction name to TableFunctionAzureBlobStorage
SmitaRKulkarni Jun 6, 2023
ceab511
Fxi style
alesapin Jun 6, 2023
8eaa32e
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 6, 2023
5637858
Fix the most important check in the world
alesapin Jun 6, 2023
99f0be8
Refactored to StorageAzureBlob
SmitaRKulkarni Jun 6, 2023
6ab2a50
Fix two tests and build
alesapin Jun 6, 2023
5d52c41
Merge branch 'master' into azure_table_function
alesapin Jun 7, 2023
71ae54f
Fix args
alesapin Jun 7, 2023
a67dd6e
Readuntilend
alesapin Jun 7, 2023
cf65ac4
Fix iterator
alesapin Jun 7, 2023
b78e330
Better test
alesapin Jun 7, 2023
8d67296
Automatic style fix
robot-clickhouse Jun 7, 2023
7d5b98f
Remove logging add sleeps
alesapin Jun 7, 2023
278543c
Merge branch 'azure_table_function' of github.com:ClickHouse/ClickHou…
alesapin Jun 7, 2023
d35573a
Removed logs and small name fixes
SmitaRKulkarni Jun 8, 2023
a918f88
Fixes
alesapin Jun 8, 2023
caabbfd
Fix one more race
alesapin Jun 8, 2023
79edd06
Merge remote-tracking branch 'origin/master' into azure_table_function
alesapin Jun 8, 2023
116df09
Fix build
alesapin Jun 8, 2023
96d9b88
Fix build
alesapin Jun 9, 2023
7ac4349
Hacking azure function
alesapin Jun 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Expand Up @@ -210,6 +210,7 @@ enum class AccessType
M(HDFS, "", GLOBAL, SOURCES) \
M(S3, "", GLOBAL, SOURCES) \
M(HIVE, "", GLOBAL, SOURCES) \
M(AZURE, "", GLOBAL, SOURCES) \
M(SOURCES, "", GROUP, ALL) \
\
M(CLUSTER, "", GLOBAL, ALL) /* ON CLUSTER queries */ \
Expand Down
3 changes: 3 additions & 0 deletions src/Common/ProfileEvents.cpp
Expand Up @@ -348,6 +348,9 @@ The server successfully detected this situation and will download merged part fr
M(S3PutObject, "Number of S3 API PutObject calls.") \
M(S3GetObject, "Number of S3 API GetObject calls.") \
\
M(AzureDeleteObjects, "Number of S3 API DeleteObject(s) calls.") \
M(AzureListObjects, "Number of S3 API ListObjects calls.") \
\
M(DiskS3DeleteObjects, "Number of DiskS3 API DeleteObject(s) calls.") \
M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \
M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \
Expand Down
5 changes: 5 additions & 0 deletions src/Core/Settings.h
Expand Up @@ -81,7 +81,9 @@ class IColumn;
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, azure_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
Expand All @@ -90,8 +92,11 @@ class IColumn;
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(Bool, azure_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in azure engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
Expand Down
17 changes: 11 additions & 6 deletions src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp
Expand Up @@ -57,14 +57,22 @@ void validateContainerName(const String & container_name)

AzureBlobStorageEndpoint processAzureBlobStorageEndpoint(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
String storage_account_url = config.getString(config_prefix + ".storage_account_url");
validateStorageAccountUrl(storage_account_url);
std::string storage_url;
if (config.has(config_prefix + ".storage_account_url"))
{
storage_url = config.getString(config_prefix + ".storage_account_url");
validateStorageAccountUrl(storage_url);
}
else
{
storage_url = config.getString(config_prefix + ".connection_string");
}
String container_name = config.getString(config_prefix + ".container_name", "default-container");
validateContainerName(container_name);
std::optional<bool> container_already_exists {};
if (config.has(config_prefix + ".container_already_exists"))
container_already_exists = {config.getBool(config_prefix + ".container_already_exists")};
return {storage_account_url, container_name, container_already_exists};
return {storage_url, container_name, container_already_exists};
}


Expand Down Expand Up @@ -136,10 +144,7 @@ std::unique_ptr<BlobContainerClient> getAzureBlobContainerClient(
/// If container_already_exists is not set (in config), ignore already exists error.
/// (Conflict - The specified container already exists)
if (!endpoint.container_already_exists.has_value() && e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
{
tryLogCurrentException("Container already exists, returning the existing container");
return getAzureBlobStorageClientWithAuth<BlobContainerClient>(final_url, container_name, config, config_prefix);
}
throw;
}
}
Expand Down
12 changes: 7 additions & 5 deletions src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h
Expand Up @@ -37,11 +37,13 @@ struct AzureObjectStorageSettings
{
}

size_t max_single_part_upload_size; /// NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset
uint64_t min_bytes_for_seek;
size_t max_single_read_retries;
size_t max_single_download_retries;
int list_object_keys_size;
AzureObjectStorageSettings() = default;

size_t max_single_part_upload_size = 100 * 1024 * 1024; /// NOTE: on 32-bit machines it will be at most 4GB, but size_t is also used in BufferBase for offset
uint64_t min_bytes_for_seek = 1024 * 1024;
size_t max_single_read_retries = 3;
size_t max_single_download_retries = 3;
int list_object_keys_size = 1000;
};

using AzureClient = Azure::Storage::Blobs::BlobContainerClient;
Expand Down
12 changes: 12 additions & 0 deletions src/Disks/ObjectStorages/ObjectStorageIterator.h
Expand Up @@ -10,8 +10,10 @@ class IObjectStorageIterator
{
public:
virtual void next() = 0;
virtual void nextBatch() = 0;
virtual bool isValid() const = 0;
virtual RelativePathWithMetadata current() const = 0;
virtual RelativePathsWithMetadata currentBatch() const = 0;
virtual size_t getAccumulatedSize() const = 0;

virtual ~IObjectStorageIterator() = default;
Expand All @@ -34,13 +36,23 @@ class ObjectStorageIteratorFromList : public IObjectStorageIterator
++batch_iterator;
}

void nextBatch() override
{
batch_iterator = batch.end();
}

bool isValid() const override
{
return batch_iterator != batch.end();
}

RelativePathWithMetadata current() const override;

RelativePathsWithMetadata currentBatch() const override
{
return batch;
}

size_t getAccumulatedSize() const override
{
return batch.size();
Expand Down
26 changes: 26 additions & 0 deletions src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp
Expand Up @@ -8,6 +8,25 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

void IObjectStorageIteratorAsync::nextBatch()
{
std::lock_guard lock(mutex);
if (!is_finished)
{
if (outcome_future.valid())
{
BatchAndHasNext next_batch = outcome_future.get();
current_batch = std::move(next_batch.batch);
accumulated_size.fetch_add(current_batch.size(), std::memory_order_relaxed);
current_batch_iterator = current_batch.begin();
if (next_batch.has_next)
outcome_future = scheduleBatch();
else
is_finished = true;
}
}
}

void IObjectStorageIteratorAsync::next()
{
std::lock_guard lock(mutex);
Expand Down Expand Up @@ -56,6 +75,13 @@ RelativePathWithMetadata IObjectStorageIteratorAsync::current() const
return *current_batch_iterator;
}


RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch() const
{
std::lock_guard lock(mutex);
return current_batch;
}

size_t IObjectStorageIteratorAsync::getAccumulatedSize() const
{
return accumulated_size.load(std::memory_order_relaxed);
Expand Down
5 changes: 4 additions & 1 deletion src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h
Expand Up @@ -19,11 +19,14 @@ class IObjectStorageIteratorAsync : public IObjectStorageIterator
: list_objects_pool(threads_metric, threads_active_metric, 1)
, list_objects_scheduler(threadPoolCallbackRunner<BatchAndHasNext>(list_objects_pool, thread_name))
{
nextBatch();
}

void next() override;
void nextBatch() override;
bool isValid() const override;
RelativePathWithMetadata current() const override;
RelativePathsWithMetadata currentBatch() const override;
size_t getAccumulatedSize() const override;

~IObjectStorageIteratorAsync() override
Expand All @@ -45,7 +48,7 @@ class IObjectStorageIteratorAsync : public IObjectStorageIterator

bool is_finished{false};

std::mutex mutex;
mutable std::mutex mutex;
ThreadPool list_objects_pool;
ThreadPoolCallbackRunner<BatchAndHasNext> list_objects_scheduler;
std::future<BatchAndHasNext> outcome_future;
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/ActionsDAG.cpp
Expand Up @@ -598,6 +598,8 @@ Block ActionsDAG::updateHeader(Block header) const
}

ColumnsWithTypeAndName result_columns;


result_columns.reserve(outputs.size());

struct Frame
Expand Down