Skip to content

Commit

Permalink
Merge pull request #59929 from ClickHouse/Azure_write_buffer
Browse files Browse the repository at this point in the history
Asynchronous WriteBuffer for AzureBlobStorage
  • Loading branch information
SmitaRKulkarni committed Mar 25, 2024
2 parents f16dffa + 0d54dbf commit a642f4d
Show file tree
Hide file tree
Showing 15 changed files with 247 additions and 137 deletions.
5 changes: 2 additions & 3 deletions src/Backups/BackupIO_AzureBlobStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,9 @@ std::unique_ptr<WriteBuffer> BackupWriterAzureBlobStorage::writeFile(const Strin
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client,
key,
settings->max_single_part_upload_size,
settings->max_unexpected_write_error_retries,
DBMS_DEFAULT_BUFFER_SIZE,
write_settings);
write_settings,
settings);
}

void BackupWriterAzureBlobStorage::removeFile(const String & file_name)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
#include "config.h"

#if USE_AWS_S3

#include <IO/WriteBufferFromS3.h>
#include "BufferAllocationPolicy.h"

#include <memory>

namespace
namespace DB
{

class FixedSizeBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocationPolicy
class FixedSizeBufferAllocationPolicy : public BufferAllocationPolicy
{
const size_t buffer_size = 0;
size_t buffer_number = 0;

public:
explicit FixedSizeBufferAllocationPolicy(const DB::S3Settings::RequestSettings::PartUploadSettings & settings_)
: buffer_size(settings_.strict_upload_part_size)
explicit FixedSizeBufferAllocationPolicy(const BufferAllocationPolicy::Settings & settings_)
: buffer_size(settings_.strict_size)
{
chassert(buffer_size > 0);
}
Expand All @@ -36,7 +32,7 @@ class FixedSizeBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAll
};


class ExpBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocationPolicy
class ExpBufferAllocationPolicy : public DB::BufferAllocationPolicy
{
const size_t first_size = 0;
const size_t second_size = 0;
Expand All @@ -49,12 +45,12 @@ class ExpBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocatio
size_t buffer_number = 0;

public:
explicit ExpBufferAllocationPolicy(const DB::S3Settings::RequestSettings::PartUploadSettings & settings_)
: first_size(std::max(settings_.max_single_part_upload_size, settings_.min_upload_part_size))
, second_size(settings_.min_upload_part_size)
, multiply_factor(settings_.upload_part_size_multiply_factor)
, multiply_threshold(settings_.upload_part_size_multiply_parts_count_threshold)
, max_size(settings_.max_upload_part_size)
explicit ExpBufferAllocationPolicy(const BufferAllocationPolicy::Settings & settings_)
: first_size(std::max(settings_.max_single_size, settings_.min_size))
, second_size(settings_.min_size)
, multiply_factor(settings_.multiply_factor)
, multiply_threshold(settings_.multiply_parts_count_threshold)
, max_size(settings_.max_size)
{
chassert(first_size > 0);
chassert(second_size > 0);
Expand Down Expand Up @@ -92,21 +88,16 @@ class ExpBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocatio
}
};

}

namespace DB
{

WriteBufferFromS3::IBufferAllocationPolicy::~IBufferAllocationPolicy() = default;
BufferAllocationPolicy::~BufferAllocationPolicy() = default;

WriteBufferFromS3::IBufferAllocationPolicyPtr WriteBufferFromS3::ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_)
BufferAllocationPolicyPtr BufferAllocationPolicy::create(BufferAllocationPolicy::Settings settings_)
{
if (settings_.strict_upload_part_size > 0)
if (settings_.strict_size > 0)
return std::make_unique<FixedSizeBufferAllocationPolicy>(settings_);
else
return std::make_unique<ExpBufferAllocationPolicy>(settings_);
}

}

#endif
39 changes: 39 additions & 0 deletions src/Common/BufferAllocationPolicy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include "config.h"

#include "logger_useful.h"

#include <list>

namespace DB
{

class BufferAllocationPolicy;
using BufferAllocationPolicyPtr = std::unique_ptr<BufferAllocationPolicy>;

/// Buffer number starts with 0
class BufferAllocationPolicy
{
public:

struct Settings
{
size_t strict_size = 0;
size_t min_size = 16 * 1024 * 1024;
size_t max_size = 5ULL * 1024 * 1024 * 1024;
size_t multiply_factor = 2;
size_t multiply_parts_count_threshold = 500;
size_t max_single_size = 32 * 1024 * 1024; /// Max size for a single buffer/block
};

virtual size_t getBufferNumber() const = 0;
virtual size_t getBufferSize() const = 0;
virtual void nextBuffer() = 0;
virtual ~BufferAllocationPolicy() = 0;

static BufferAllocationPolicyPtr create(Settings settings_);

};

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#include "config.h"

#if USE_AWS_S3

#include <IO/WriteBufferFromS3TaskTracker.h>
#include "ThreadPoolTaskTracker.h"

namespace ProfileEvents
{
Expand All @@ -12,19 +10,19 @@ namespace ProfileEvents
namespace DB
{

WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_, LogSeriesLimiterPtr limitedLog_)
TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_, LogSeriesLimiterPtr limitedLog_)
: is_async(bool(scheduler_))
, scheduler(scheduler_ ? std::move(scheduler_) : syncRunner())
, max_tasks_inflight(max_tasks_inflight_)
, limitedLog(limitedLog_)
{}

WriteBufferFromS3::TaskTracker::~TaskTracker()
TaskTracker::~TaskTracker()
{
safeWaitAll();
}

ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
ThreadPoolCallbackRunner<void> TaskTracker::syncRunner()
{
return [](Callback && callback, int64_t) mutable -> std::future<void>
{
Expand All @@ -35,7 +33,7 @@ ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
};
}

void WriteBufferFromS3::TaskTracker::waitAll()
void TaskTracker::waitAll()
{
/// Exceptions are propagated
for (auto & future : futures)
Expand All @@ -48,7 +46,7 @@ void WriteBufferFromS3::TaskTracker::waitAll()
finished_futures.clear();
}

void WriteBufferFromS3::TaskTracker::safeWaitAll()
void TaskTracker::safeWaitAll()
{
for (auto & future : futures)
{
Expand All @@ -71,7 +69,7 @@ void WriteBufferFromS3::TaskTracker::safeWaitAll()
finished_futures.clear();
}

void WriteBufferFromS3::TaskTracker::waitIfAny()
void TaskTracker::waitIfAny()
{
if (futures.empty())
return;
Expand Down Expand Up @@ -99,7 +97,7 @@ void WriteBufferFromS3::TaskTracker::waitIfAny()
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
}

void WriteBufferFromS3::TaskTracker::add(Callback && func)
void TaskTracker::add(Callback && func)
{
/// All this fuzz is about 2 things. This is the most critical place of TaskTracker.
/// The first is not to fail insertion in the list `futures`.
Expand Down Expand Up @@ -134,7 +132,7 @@ void WriteBufferFromS3::TaskTracker::add(Callback && func)
waitTilInflightShrink();
}

void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
void TaskTracker::waitTilInflightShrink()
{
if (!max_tasks_inflight)
return;
Expand Down Expand Up @@ -166,11 +164,10 @@ void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
}

bool WriteBufferFromS3::TaskTracker::isAsync() const
bool TaskTracker::isAsync() const
{
return is_async;
}

}

#endif
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
#pragma once

#include "config.h"
#include "threadPoolCallbackRunner.h"
#include "IO/WriteBufferFromS3.h"

#if USE_AWS_S3

#include "WriteBufferFromS3.h"

#include <Common/logger_useful.h>
#include "logger_useful.h"

#include <list>

namespace DB
{

/// That class is used only in WriteBufferFromS3 for now.
/// Therefore it declared as a part of WriteBufferFromS3.
/// TaskTracker takes a Callback which is run by scheduler in some external shared ThreadPool.
/// TaskTracker brings the methods waitIfAny, waitAll/safeWaitAll
/// to help with coordination of the running tasks.

/// Basic exception safety is provided. If exception occurred the object has to be destroyed.
/// No thread safety is provided. Use this object with no concurrency.

class WriteBufferFromS3::TaskTracker
class TaskTracker
{
public:
using Callback = std::function<void()>;
Expand Down Expand Up @@ -68,5 +64,3 @@ class WriteBufferFromS3::TaskTracker
};

}

#endif
8 changes: 7 additions & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ class IColumn;
M(UInt64, distributed_connections_pool_size, 1024, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, connections_with_failover_max_tries, 3, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \
M(UInt64, azure_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, azure_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, s3_upload_part_size_multiply_factor, 2, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3, s3_min_upload_part_size is multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \
M(UInt64, azure_upload_part_size_multiply_factor, 2, "Multiply azure_min_upload_part_size by this factor each time azure_multiply_parts_count_threshold parts were uploaded from a single write to Azure blob storage.", 0) \
M(UInt64, azure_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to Azure blob storage, azure_min_upload_part_size is multiplied by azure_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited.", 0) \
M(UInt64, azure_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited.", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_single_part_copy_size, 256*1024*1024, "The maximum size of object to copy using single part copy to Azure blob storage.", 0) \
Expand Down
6 changes: 6 additions & 0 deletions src/Core/SettingsChangesHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"output_format_parquet_compression_method", "lz4", "zstd", "Parquet/ORC/Arrow support many compression methods, including lz4 and zstd. ClickHouse supports each and every compression method. Some inferior tools, such as 'duckdb', lack support for the faster `lz4` compression method, that's why we set zstd by default."},
{"output_format_orc_compression_method", "lz4", "zstd", "Parquet/ORC/Arrow support many compression methods, including lz4 and zstd. ClickHouse supports each and every compression method. Some inferior tools, such as 'duckdb', lack support for the faster `lz4` compression method, that's why we set zstd by default."},
{"output_format_pretty_highlight_digit_groups", false, true, "If enabled and if output is a terminal, highlight every digit corresponding to the number of thousands, millions, etc. with underline."},
{"azure_max_inflight_parts_for_one_file", 20, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited."},
{"azure_strict_upload_part_size", 0, 0, "The exact size of part to upload during multipart upload to Azure blob storage."},
{"azure_min_upload_part_size", 16*1024*1024, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage."},
{"azure_max_upload_part_size", 5ull*1024*1024*1024, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to Azure blob storage."},
{"azure_upload_part_size_multiply_factor", 2, 2, "Multiply azure_min_upload_part_size by this factor each time azure_multiply_parts_count_threshold parts were uploaded from a single write to Azure blob storage."},
{"azure_upload_part_size_multiply_parts_count_threshold", 500, 500, "Each time this number of parts was uploaded to Azure blob storage, azure_min_upload_part_size is multiplied by azure_upload_part_size_multiply_factor."},
}},
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},
{"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"},
Expand Down

0 comments on commit a642f4d

Please sign in to comment.