Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate IO scheduler with buffers for remote reads and writes #45711

Merged
merged 8 commits into from
Feb 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/Disks/IO/WriteBufferFromAzureBlobStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
#include <Common/Throttler.h>
#include <IO/ResourceGuard.h>


namespace ProfileEvents
Expand Down Expand Up @@ -40,10 +41,13 @@ WriteBufferFromAzureBlobStorage::~WriteBufferFromAzureBlobStorage()
finalize();
}

void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func, size_t num_tries)
void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func, size_t num_tries, size_t cost)
{
auto handle_exception = [&](const auto & e, size_t i)
auto handle_exception = [&, this](const auto & e, size_t i)
{
if (cost)
write_settings.resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it
serxa marked this conversation as resolved.
Show resolved Hide resolved

if (i == num_tries - 1)
throw;

Expand All @@ -54,6 +58,7 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
{
try
{
ResourceGuard rlock(write_settings.resource_link, cost); // Note that zero-cost requests are ignored
func();
break;
}
Expand All @@ -65,6 +70,12 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
{
handle_exception(e, i);
}
catch (...)
{
if (cost)
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw;
}
}
}

Expand All @@ -87,7 +98,7 @@ void WriteBufferFromAzureBlobStorage::uploadBlock(const char * data, size_t size
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));

Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(data), size);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, DEFAULT_RETRY_NUM);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, DEFAULT_RETRY_NUM, size);
tmp_buffer_write_offset = 0;

LOG_TRACE(log, "Staged block (id: {}) of size {} (blob path: {}).", block_id, size, blob_path);
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IO/WriteBufferFromAzureBlobStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class WriteBufferFromAzureBlobStorage : public BufferWithOwnMemory<WriteBuffer>

private:
void finalizeImpl() override;
void execWithRetry(std::function<void()> func, size_t num_tries);
void execWithRetry(std::function<void()> func, size_t num_tries, size_t cost = 0);
void uploadBlock(const char * data, size_t size);

Poco::Logger * log;
Expand Down
2 changes: 1 addition & 1 deletion src/IO/IResourceManager.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <IO/ResourceRequest.h>
#include <IO/ResourceLink.h>

#include <Poco/Util/AbstractConfiguration.h>

Expand Down
36 changes: 35 additions & 1 deletion src/IO/ISchedulerQueue.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <IO/ISchedulerNode.h>
#include <IO/ResourceBudget.h>
#include <IO/ResourceRequest.h>

#include <memory>

Expand All @@ -10,17 +12,49 @@ namespace DB

/*
* Queue for pending requests for specific resource, leaf of hierarchy.
* Note that every queue has budget associated with it.
*/
class ISchedulerQueue : public ISchedulerNode
{
public:
ISchedulerQueue(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
explicit ISchedulerQueue(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: ISchedulerNode(event_queue_, config, config_prefix)
{}

// Wrapper for `enqueueRequest()` that should be used to account for available resource budget
void enqueueRequestUsingBudget(ResourceRequest * request)
{
request->cost = budget.ask(request->cost);
enqueueRequest(request);
}

// Should be called to account for difference between real and estimated costs
void adjustBudget(ResourceCost estimated_cost, ResourceCost real_cost)
{
budget.adjust(estimated_cost, real_cost);
}

// Adjust budget to account for extra consumption of `cost` resource units
void consumeBudget(ResourceCost cost)
{
adjustBudget(0, cost);
}

// Adjust budget to account for requested, but not consumed `cost` resource units
void accumulateBudget(ResourceCost cost)
{
adjustBudget(cost, 0);
}

/// Enqueue new request to be executed using underlying resource.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0;

private:
// Allows multiple consumers to synchronize with common "debit/credit" balance.
// 1) (positive) to avoid wasting of allocated but not used resource (e.g in case of a failure);
// 2) (negative) to account for overconsumption (e.g. if cost is not know in advance and estimation from below is applied).
ResourceBudget budget;
};

}
13 changes: 10 additions & 3 deletions src/IO/ReadBufferFromS3.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include "config.h"
#include "IO/S3Common.h"
#include <IO/S3Common.h>

#if USE_AWS_S3

#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ResourceGuard.h>
#include <IO/S3/getObjectInfo.h>

#include <IO/S3/Requests.h>

#include <Common/Stopwatch.h>
Expand Down Expand Up @@ -323,16 +323,23 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
if (read_settings.for_object_storage)
ProfileEvents::increment(ProfileEvents::DiskS3GetObject);

// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
kssenii marked this conversation as resolved.
Show resolved Hide resolved
constexpr ResourceCost estimated_cost = 1;
ResourceGuard rlock(read_settings.resource_link, estimated_cost);
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
rlock.unlock();

if (outcome.IsSuccess())
{
read_result = outcome.GetResultWithOwnership();
ResourceCost bytes_read = outcome.GetResult().GetContentLength();
read_settings.resource_link.adjust(estimated_cost, bytes_read);
size_t buffer_size = use_external_buffer ? 0 : read_settings.remote_fs_buffer_size;
read_result = outcome.GetResultWithOwnership();
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
}
else
{
read_settings.resource_link.accumulate(estimated_cost);
const auto & error = outcome.GetError();
throw S3Exception(error.GetMessage(), error.GetErrorType());
}
Expand Down
4 changes: 4 additions & 0 deletions src/IO/ReadSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Core/Defines.h>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <Common/Throttler_fwd.h>
#include <IO/ResourceLink.h>

namespace DB
{
Expand Down Expand Up @@ -107,6 +108,9 @@ struct ReadSettings
/// Bandwidth throttler to use during reading
ThrottlerPtr remote_throttler;

// Resource to be used during reading
ResourceLink resource_link;

size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 1600;
Expand Down
1 change: 1 addition & 0 deletions src/IO/Resource/tests/gtest_resource_manager_static.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ TEST(IOResourceStaticResourceManager, Smoke)
for (int i = 0; i < 10; i++)
{
ResourceGuard ga(ca->get("res1"));
ga.unlock();
ResourceGuard gb(cb->get("res1"));
}
}
Expand Down
55 changes: 55 additions & 0 deletions src/IO/ResourceBudget.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#pragma once

#include <IO/ResourceRequest.h>
#include <atomic>

namespace DB
{

/*
* Helper class to keep track of requested and consumed amount of resource.
* Useful if real amount of consumed resource can differ from requested amount of resource (e.g. in case of failures).
* Can be safely used from multiple threads.
* Usage example:
* ResourceBudget budget;
* while (!stop) {
* ResourceCost est_cost = myEstimateOfCostOrJustUseOne();
* myAllocateResource(budget.ask(est_cost)); // Ask external system to allocate resource for you
* ResourceCost real_cost = mySynchronousConsumptionOfResource(); // Real consumption can differ from est_cost
* budget.adjust(est_cost, real_cost); // Adjust balance according to the actual cost, may affect the next iteration
* }
*/
class ResourceBudget
{
public:
// Returns amount of resource to be requested according to current balance and estimated cost of new consumption
ResourceCost ask(ResourceCost estimated_cost)
{
ResourceCost budget = available.load();
while (true)
{
// Valid resource request must have positive `cost`. Also takes consumption history into account.
ResourceCost cost = std::max<ResourceCost>(1ll, estimated_cost - budget);

// Assume every request is satisfied (no resource request cancellation is possible now)
// So we requested additional `cost` units and are going to consume `estimated_cost`
ResourceCost new_budget = budget + cost - estimated_cost;

// Try to commit this transaction
if (new_budget == budget || available.compare_exchange_strong(budget, new_budget))
return cost;
}
}

// Should be called to account for difference between real and estimated costs
// Optional. May be skipped if `real_cost` is known in advance (equals `estimated_cost`).
void adjust(ResourceCost estimated_cost, ResourceCost real_cost)
{
available.fetch_add(estimated_cost - real_cost);
}

private:
std::atomic<ResourceCost> available = 0; // requested - consumed
};

}