Skip to content

Commit

Permalink
Merge pull request #59001 from ClickHouse/Add_configurable_write_retr…
Browse files Browse the repository at this point in the history
…y_azure

Add settings max_unexpected_write_error_retries for Azure Blob Storage
  • Loading branch information
kssenii committed Jan 29, 2024
2 parents a29ab8f + d22fc3a commit 6903957
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/Core/Settings.h
Expand Up @@ -85,6 +85,7 @@ class IColumn;
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, azure_max_single_read_retries, 4, "The maximum number of retries during single Azure blob storage read.", 0) \
M(UInt64, azure_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during Azure blob storage write", 0) \
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
Expand Down
10 changes: 5 additions & 5 deletions src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp
Expand Up @@ -18,17 +18,17 @@ namespace ProfileEvents
namespace DB
{

static constexpr auto DEFAULT_RETRY_NUM = 3;

WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & blob_path_,
size_t max_single_part_upload_size_,
size_t max_unexpected_write_error_retries_,
size_t buf_size_,
const WriteSettings & write_settings_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, log(getLogger("WriteBufferFromAzureBlobStorage"))
, max_single_part_upload_size(max_single_part_upload_size_)
, max_unexpected_write_error_retries(max_unexpected_write_error_retries_)
, blob_path(blob_path_)
, write_settings(write_settings_)
, blob_container_client(blob_container_client_)
Expand Down Expand Up @@ -77,13 +77,13 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,

void WriteBufferFromAzureBlobStorage::finalizeImpl()
{
execWithRetry([this](){ next(); }, DEFAULT_RETRY_NUM);
execWithRetry([this](){ next(); }, max_unexpected_write_error_retries);

if (tmp_buffer_write_offset > 0)
uploadBlock(tmp_buffer->data(), tmp_buffer_write_offset);

auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, DEFAULT_RETRY_NUM);
execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, max_unexpected_write_error_retries);

LOG_TRACE(log, "Committed {} blocks for blob `{}`", block_ids.size(), blob_path);
}
Expand All @@ -94,7 +94,7 @@ void WriteBufferFromAzureBlobStorage::uploadBlock(const char * data, size_t size
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));

Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(data), size);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, DEFAULT_RETRY_NUM, size);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, max_unexpected_write_error_retries, size);
tmp_buffer_write_offset = 0;

LOG_TRACE(log, "Staged block (id: {}) of size {} (blob path: {}).", block_id, size, blob_path);
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/IO/WriteBufferFromAzureBlobStorage.h
Expand Up @@ -30,6 +30,7 @@ class WriteBufferFromAzureBlobStorage : public WriteBufferFromFileBase
AzureClientPtr blob_container_client_,
const String & blob_path_,
size_t max_single_part_upload_size_,
size_t max_unexpected_write_error_retries_,
size_t buf_size_,
const WriteSettings & write_settings_);

Expand All @@ -48,6 +49,7 @@ class WriteBufferFromAzureBlobStorage : public WriteBufferFromFileBase
LoggerPtr log;

const size_t max_single_part_upload_size;
const size_t max_unexpected_write_error_retries;
const std::string blob_path;
const WriteSettings write_settings;

Expand Down
Expand Up @@ -7,6 +7,7 @@
#include <optional>
#include <azure/identity/managed_identity_credential.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>

using namespace Azure::Storage::Blobs;

Expand Down Expand Up @@ -157,14 +158,15 @@ std::unique_ptr<BlobContainerClient> getAzureBlobContainerClient(
}
}

std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/)
std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context)
{
return std::make_unique<AzureObjectStorageSettings>(
config.getUInt64(config_prefix + ".max_single_part_upload_size", 100 * 1024 * 1024),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".max_single_read_retries", 3),
config.getInt(config_prefix + ".max_single_download_retries", 3),
config.getInt(config_prefix + ".list_object_keys_size", 1000)
config.getInt(config_prefix + ".list_object_keys_size", 1000),
config.getUInt64(config_prefix + ".max_unexpected_write_error_retries", context->getSettings().azure_max_unexpected_write_error_retries)
);
}

Expand Down
Expand Up @@ -264,6 +264,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
client.get(),
object.remote_path,
settings.get()->max_single_part_upload_size,
settings.get()->max_unexpected_write_error_retries,
buf_size,
patchSettings(write_settings));
}
Expand Down
Expand Up @@ -23,12 +23,14 @@ struct AzureObjectStorageSettings
uint64_t min_bytes_for_seek_,
int max_single_read_retries_,
int max_single_download_retries_,
int list_object_keys_size_)
int list_object_keys_size_,
size_t max_unexpected_write_error_retries_)
: max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, max_single_read_retries(max_single_read_retries_)
, max_single_download_retries(max_single_download_retries_)
, list_object_keys_size(list_object_keys_size_)
, max_unexpected_write_error_retries (max_unexpected_write_error_retries_)
{
}

Expand All @@ -39,6 +41,7 @@ struct AzureObjectStorageSettings
size_t max_single_read_retries = 3;
size_t max_single_download_retries = 3;
int list_object_keys_size = 1000;
size_t max_unexpected_write_error_retries = 4;
};

using AzureClient = Azure::Storage::Blobs::BlobContainerClient;
Expand Down

0 comments on commit 6903957

Please sign in to comment.