Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add settings max_unexpected_write_error_retries for Azure Blob Storage #59001

Merged
merged 2 commits into from Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Core/Settings.h
Expand Up @@ -86,6 +86,7 @@ class IColumn;
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, azure_max_single_read_retries, 4, "The maximum number of retries during single Azure blob storage read.", 0) \
M(UInt64, azure_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during Azure blob storage write", 0) \
kssenii marked this conversation as resolved.
Show resolved Hide resolved
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
Expand Down
10 changes: 5 additions & 5 deletions src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp
Expand Up @@ -18,17 +18,17 @@ namespace ProfileEvents
namespace DB
{

static constexpr auto DEFAULT_RETRY_NUM = 3;

WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const String & blob_path_,
size_t max_single_part_upload_size_,
size_t max_unexpected_write_error_retries_,
size_t buf_size_,
const WriteSettings & write_settings_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, log(&Poco::Logger::get("WriteBufferFromAzureBlobStorage"))
, max_single_part_upload_size(max_single_part_upload_size_)
, max_unexpected_write_error_retries(max_unexpected_write_error_retries_)
, blob_path(blob_path_)
, write_settings(write_settings_)
, blob_container_client(blob_container_client_)
Expand Down Expand Up @@ -77,13 +77,13 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,

void WriteBufferFromAzureBlobStorage::finalizeImpl()
{
execWithRetry([this](){ next(); }, DEFAULT_RETRY_NUM);
execWithRetry([this](){ next(); }, max_unexpected_write_error_retries);

if (tmp_buffer_write_offset > 0)
uploadBlock(tmp_buffer->data(), tmp_buffer_write_offset);

auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, DEFAULT_RETRY_NUM);
execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, max_unexpected_write_error_retries);

LOG_TRACE(log, "Committed {} blocks for blob `{}`", block_ids.size(), blob_path);
}
Expand All @@ -94,7 +94,7 @@ void WriteBufferFromAzureBlobStorage::uploadBlock(const char * data, size_t size
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));

Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(data), size);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, DEFAULT_RETRY_NUM, size);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, max_unexpected_write_error_retries, size);
tmp_buffer_write_offset = 0;

LOG_TRACE(log, "Staged block (id: {}) of size {} (blob path: {}).", block_id, size, blob_path);
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/IO/WriteBufferFromAzureBlobStorage.h
Expand Up @@ -30,6 +30,7 @@ class WriteBufferFromAzureBlobStorage : public WriteBufferFromFileBase
AzureClientPtr blob_container_client_,
const String & blob_path_,
size_t max_single_part_upload_size_,
size_t max_unexpected_write_error_retries_,
size_t buf_size_,
const WriteSettings & write_settings_);

Expand All @@ -48,6 +49,7 @@ class WriteBufferFromAzureBlobStorage : public WriteBufferFromFileBase
Poco::Logger * log;

const size_t max_single_part_upload_size;
const size_t max_unexpected_write_error_retries;
const std::string blob_path;
const WriteSettings write_settings;

Expand Down
Expand Up @@ -7,6 +7,7 @@
#include <optional>
#include <azure/identity/managed_identity_credential.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>

using namespace Azure::Storage::Blobs;

Expand Down Expand Up @@ -157,14 +158,15 @@ std::unique_ptr<BlobContainerClient> getAzureBlobContainerClient(
}
}

std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr /*context*/)
std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context)
{
return std::make_unique<AzureObjectStorageSettings>(
config.getUInt64(config_prefix + ".max_single_part_upload_size", 100 * 1024 * 1024),
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".max_single_read_retries", 3),
config.getInt(config_prefix + ".max_single_download_retries", 3),
config.getInt(config_prefix + ".list_object_keys_size", 1000)
config.getInt(config_prefix + ".list_object_keys_size", 1000),
config.getUInt64(config_prefix + ".max_unexpected_write_error_retries", context->getSettings().azure_max_unexpected_write_error_retries)
);
}

Expand Down
Expand Up @@ -268,6 +268,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
client.get(),
object.remote_path,
settings.get()->max_single_part_upload_size,
settings.get()->max_unexpected_write_error_retries,
buf_size,
patchSettings(write_settings));
}
Expand Down
Expand Up @@ -23,12 +23,14 @@ struct AzureObjectStorageSettings
uint64_t min_bytes_for_seek_,
int max_single_read_retries_,
int max_single_download_retries_,
int list_object_keys_size_)
int list_object_keys_size_,
size_t max_unexpected_write_error_retries_)
: max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
, max_single_read_retries(max_single_read_retries_)
, max_single_download_retries(max_single_download_retries_)
, list_object_keys_size(list_object_keys_size_)
, max_unexpected_write_error_retries (max_unexpected_write_error_retries_)
{
}

Expand All @@ -39,6 +41,7 @@ struct AzureObjectStorageSettings
size_t max_single_read_retries = 3;
size_t max_single_download_retries = 3;
int list_object_keys_size = 1000;
size_t max_unexpected_write_error_retries = 4;
};

using AzureClient = Azure::Storage::Blobs::BlobContainerClient;
Expand Down