Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous WriteBuffer for AzureBlobStorage #59929

Merged
merged 27 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d12ecdc
Asynchronous WriteBuffer for AzureBlobStorage
SmitaRKulkarni Feb 13, 2024
26fd3d0
Removed offset check
SmitaRKulkarni Feb 14, 2024
7bf42fd
Fix upgrade check
SmitaRKulkarni Feb 16, 2024
62dfa4c
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Feb 16, 2024
3eb1f24
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Feb 22, 2024
3c00d19
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Feb 29, 2024
07b407a
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Feb 29, 2024
5641fd8
Fix build after merge
SmitaRKulkarni Feb 29, 2024
0f2d47e
Renamed WriteBufferFromS3TaskTracker to ThreadPoolTaskTracker
SmitaRKulkarni Mar 1, 2024
10b5ce8
Updated BufferAllocationPolicy
SmitaRKulkarni Mar 7, 2024
c724d36
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Mar 11, 2024
8a11afe
Updated settings changed history
SmitaRKulkarni Mar 11, 2024
0530055
Updated names in BufferAllocationPolicy
SmitaRKulkarni Mar 18, 2024
45e15d1
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Mar 19, 2024
8494e73
Update setting names
SmitaRKulkarni Mar 19, 2024
c169224
Removed detachPart & reallocateBuffer functions
SmitaRKulkarni Mar 19, 2024
824092b
Updated to use single part upload for single block of small size and …
SmitaRKulkarni Mar 19, 2024
2c0e266
Addressed review comments
SmitaRKulkarni Mar 20, 2024
9f2d44f
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Mar 20, 2024
e0d14a1
Updated lambda and name of BufferAllocationPolicy
SmitaRKulkarni Mar 21, 2024
6c5e881
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Mar 21, 2024
71eee6a
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Mar 21, 2024
929173c
Fix S3 buffer allocation
SmitaRKulkarni Mar 21, 2024
a08c16e
Fixed clang tidy build
SmitaRKulkarni Mar 22, 2024
19e7022
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Mar 23, 2024
a41c005
Merge branch 'master' into Azure_write_buffer
alesapin Mar 24, 2024
0d54dbf
Merge branch 'master' into Azure_write_buffer
SmitaRKulkarni Mar 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/Backups/BackupIO_AzureBlobStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,9 @@ std::unique_ptr<WriteBuffer> BackupWriterAzureBlobStorage::writeFile(const Strin
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client,
key,
settings->max_single_part_upload_size,
settings->max_unexpected_write_error_retries,
DBMS_DEFAULT_BUFFER_SIZE,
write_settings);
write_settings,
settings);
}

void BackupWriterAzureBlobStorage::removeFile(const String & file_name)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
#include "config.h"

#if USE_AWS_S3

#include <IO/WriteBufferFromS3.h>
#include "BufferAllocationPolicy.h"

#include <memory>

namespace
namespace DB
{

class FixedSizeBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocationPolicy
class FixedSizeBufferAllocationPolicy : public IBufferAllocationPolicy
{
const size_t buffer_size = 0;
size_t buffer_number = 0;

public:
explicit FixedSizeBufferAllocationPolicy(const DB::S3Settings::RequestSettings::PartUploadSettings & settings_)
explicit FixedSizeBufferAllocationPolicy(const BufferAllocationSettings & settings_)
: buffer_size(settings_.strict_upload_part_size)
{
chassert(buffer_size > 0);
Expand All @@ -36,7 +32,7 @@ class FixedSizeBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAll
};


class ExpBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocationPolicy
class ExpBufferAllocationPolicy : public DB::IBufferAllocationPolicy
{
const size_t first_size = 0;
const size_t second_size = 0;
Expand All @@ -49,7 +45,7 @@ class ExpBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocatio
size_t buffer_number = 0;

public:
explicit ExpBufferAllocationPolicy(const DB::S3Settings::RequestSettings::PartUploadSettings & settings_)
explicit ExpBufferAllocationPolicy(const BufferAllocationSettings & settings_)
: first_size(std::max(settings_.max_single_part_upload_size, settings_.min_upload_part_size))
, second_size(settings_.min_upload_part_size)
, multiply_factor(settings_.upload_part_size_multiply_factor)
Expand Down Expand Up @@ -92,14 +88,10 @@ class ExpBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocatio
}
};

}

namespace DB
{

WriteBufferFromS3::IBufferAllocationPolicy::~IBufferAllocationPolicy() = default;
IBufferAllocationPolicy::~IBufferAllocationPolicy() = default;

WriteBufferFromS3::IBufferAllocationPolicyPtr WriteBufferFromS3::ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_)
IBufferAllocationPolicyPtr ChooseBufferPolicy(BufferAllocationSettings settings_)
{
if (settings_.strict_upload_part_size > 0)
return std::make_unique<FixedSizeBufferAllocationPolicy>(settings_);
Expand All @@ -109,4 +101,3 @@ WriteBufferFromS3::IBufferAllocationPolicyPtr WriteBufferFromS3::ChooseBufferPol

}

#endif
35 changes: 35 additions & 0 deletions src/Common/BufferAllocationPolicy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include "config.h"

#include "logger_useful.h"

#include <list>

namespace DB
{

struct BufferAllocationSettings
SmitaRKulkarni marked this conversation as resolved.
Show resolved Hide resolved
{
size_t strict_upload_part_size = 0;
SmitaRKulkarni marked this conversation as resolved.
Show resolved Hide resolved
size_t min_upload_part_size = 16 * 1024 * 1024;
size_t max_upload_part_size = 5ULL * 1024 * 1024 * 1024;
size_t upload_part_size_multiply_factor = 2;
size_t upload_part_size_multiply_parts_count_threshold = 500;
size_t max_single_part_upload_size = 32 * 1024 * 1024;
};

class IBufferAllocationPolicy
SmitaRKulkarni marked this conversation as resolved.
Show resolved Hide resolved
{
public:
SmitaRKulkarni marked this conversation as resolved.
Show resolved Hide resolved
virtual size_t getBufferNumber() const = 0;
virtual size_t getBufferSize() const = 0;
virtual void nextBuffer() = 0;
virtual ~IBufferAllocationPolicy() = 0;
};

using IBufferAllocationPolicyPtr = std::unique_ptr<IBufferAllocationPolicy>;

IBufferAllocationPolicyPtr ChooseBufferPolicy(BufferAllocationSettings settings_);
SmitaRKulkarni marked this conversation as resolved.
Show resolved Hide resolved
SmitaRKulkarni marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#include "config.h"

#if USE_AWS_S3

#include <IO/WriteBufferFromS3TaskTracker.h>
#include "ThreadPoolTaskTracker.h"

namespace ProfileEvents
{
Expand All @@ -12,19 +10,19 @@ namespace ProfileEvents
namespace DB
{

WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_, LogSeriesLimiterPtr limitedLog_)
TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_, LogSeriesLimiterPtr limitedLog_)
: is_async(bool(scheduler_))
, scheduler(scheduler_ ? std::move(scheduler_) : syncRunner())
, max_tasks_inflight(max_tasks_inflight_)
, limitedLog(limitedLog_)
{}

WriteBufferFromS3::TaskTracker::~TaskTracker()
TaskTracker::~TaskTracker()
{
safeWaitAll();
}

ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
ThreadPoolCallbackRunner<void> TaskTracker::syncRunner()
{
return [](Callback && callback, int64_t) mutable -> std::future<void>
{
Expand All @@ -35,7 +33,7 @@ ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
};
}

void WriteBufferFromS3::TaskTracker::waitAll()
void TaskTracker::waitAll()
{
/// Exceptions are propagated
for (auto & future : futures)
Expand All @@ -48,7 +46,7 @@ void WriteBufferFromS3::TaskTracker::waitAll()
finished_futures.clear();
}

void WriteBufferFromS3::TaskTracker::safeWaitAll()
void TaskTracker::safeWaitAll()
{
for (auto & future : futures)
{
Expand All @@ -71,7 +69,7 @@ void WriteBufferFromS3::TaskTracker::safeWaitAll()
finished_futures.clear();
}

void WriteBufferFromS3::TaskTracker::waitIfAny()
void TaskTracker::waitIfAny()
{
if (futures.empty())
return;
Expand Down Expand Up @@ -99,7 +97,7 @@ void WriteBufferFromS3::TaskTracker::waitIfAny()
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
}

void WriteBufferFromS3::TaskTracker::add(Callback && func)
void TaskTracker::add(Callback && func)
{
/// All this fuzz is about 2 things. This is the most critical place of TaskTracker.
/// The first is not to fail insertion in the list `futures`.
Expand Down Expand Up @@ -134,7 +132,7 @@ void WriteBufferFromS3::TaskTracker::add(Callback && func)
waitTilInflightShrink();
}

void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
void TaskTracker::waitTilInflightShrink()
{
if (!max_tasks_inflight)
return;
Expand Down Expand Up @@ -166,11 +164,10 @@ void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
}

bool WriteBufferFromS3::TaskTracker::isAsync() const
bool TaskTracker::isAsync() const
{
return is_async;
}

}

#endif
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
#pragma once

#include "config.h"
#include "threadPoolCallbackRunner.h"
#include "IO/WriteBufferFromS3.h"

#if USE_AWS_S3

#include "WriteBufferFromS3.h"

#include <Common/logger_useful.h>
#include "logger_useful.h"

#include <list>

namespace DB
{

/// That class is used only in WriteBufferFromS3 for now.
/// Therefore it declared as a part of WriteBufferFromS3.
/// TaskTracker takes a Callback which is run by scheduler in some external shared ThreadPool.
/// TaskTracker brings the methods waitIfAny, waitAll/safeWaitAll
/// to help with coordination of the running tasks.

/// Basic exception safety is provided. If exception occurred the object has to be destroyed.
/// No thread safety is provided. Use this object with no concurrency.

class WriteBufferFromS3::TaskTracker
class TaskTracker
{
public:
using Callback = std::function<void()>;
Expand Down Expand Up @@ -68,5 +64,3 @@ class WriteBufferFromS3::TaskTracker
};

}

#endif
8 changes: 7 additions & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ class IColumn;
M(UInt64, distributed_connections_pool_size, 1024, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, connections_with_failover_max_tries, 3, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \
M(UInt64, azure_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, azure_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, s3_upload_part_size_multiply_factor, 2, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3, s3_min_upload_part_size is multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \
M(UInt64, azure_upload_part_size_multiply_factor, 2, "Multiply azure_min_upload_part_size by this factor each time azure_multiply_parts_count_threshold parts were uploaded from a single write to Azure blob storage.", 0) \
M(UInt64, azure_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to Azure blob storage, azure_min_upload_part_size is multiplied by azure_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited.", 0) \
M(UInt64, azure_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited.", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, azure_max_single_part_upload_size, 100*1024*1024, "The maximum size of object to upload using singlepart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_single_part_copy_size, 256*1024*1024, "The maximum size of object to copy using single part copy to Azure blob storage.", 0) \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"default_normal_view_sql_security", "INVOKER", "INVOKER", "Allows to set default `SQL SECURITY` option while creating a normal view"},
{"mysql_map_string_to_text_in_show_columns", false, true, "Reduce the configuration effort to connect ClickHouse with BI tools."},
{"mysql_map_fixed_string_to_text_in_show_columns", false, true, "Reduce the configuration effort to connect ClickHouse with BI tools."},
{"azure_max_inflight_parts_for_one_file", 20, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited."},
}},
{"24.1", {{"print_pretty_type_names", false, true, "Better user experience."},
{"input_format_json_read_bools_as_strings", false, true, "Allow to read bools as strings in JSON formats by default"},
Expand Down