-
Notifications
You must be signed in to change notification settings - Fork 6.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update ReadBuffer for AzureBlobStorage #61884
Changes from 5 commits
5c6f1ca
35cc40b
e5cbc9f
67bbf88
32ee02a
f8e75a0
689d0ae
ccb055c
5c75f90
54e86b3
2dfe6b2
fa4d205
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,23 +23,20 @@ namespace ErrorCodes | |
{ | ||
extern const int CANNOT_SEEK_THROUGH_FILE; | ||
extern const int SEEK_POSITION_OUT_OF_BOUND; | ||
extern const int RECEIVED_EMPTY_DATA; | ||
extern const int LOGICAL_ERROR; | ||
} | ||
|
||
ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage( | ||
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_, | ||
const String & path_, | ||
const ReadSettings & read_settings_, | ||
size_t max_single_read_retries_, | ||
size_t max_single_download_retries_, | ||
bool use_external_buffer_, | ||
bool restricted_seek_, | ||
size_t read_until_position_) | ||
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size, nullptr, 0) | ||
, blob_container_client(blob_container_client_) | ||
, path(path_) | ||
, max_single_read_retries(max_single_read_retries_) | ||
, max_single_download_retries(max_single_download_retries_) | ||
, read_settings(read_settings_) | ||
, tmp_buffer_size(read_settings.remote_fs_buffer_size) | ||
|
@@ -78,6 +75,7 @@ void ReadBufferFromAzureBlobStorage::setReadUntilPosition(size_t position) | |
|
||
bool ReadBufferFromAzureBlobStorage::nextImpl() | ||
{ | ||
size_t bytes_read = 0; | ||
if (read_until_position) | ||
{ | ||
if (read_until_position == offset) | ||
|
@@ -87,46 +85,13 @@ bool ReadBufferFromAzureBlobStorage::nextImpl() | |
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1); | ||
} | ||
|
||
if (!initialized) | ||
initialize(); | ||
|
||
if (use_external_buffer) | ||
{ | ||
data_ptr = internal_buffer.begin(); | ||
data_capacity = internal_buffer.size(); | ||
} | ||
|
||
size_t to_read_bytes = std::min(static_cast<size_t>(total_size - offset), data_capacity); | ||
size_t bytes_read = 0; | ||
|
||
size_t sleep_time_with_backoff_milliseconds = 100; | ||
|
||
auto handle_exception = [&, this](const auto & e, size_t i) | ||
{ | ||
LOG_DEBUG(log, "Exception caught during Azure Read for file {} at attempt {}/{}: {}", path, i + 1, max_single_read_retries, e.Message); | ||
if (i + 1 == max_single_read_retries) | ||
throw; | ||
|
||
sleepForMilliseconds(sleep_time_with_backoff_milliseconds); | ||
sleep_time_with_backoff_milliseconds *= 2; | ||
initialized = false; | ||
initialize(); | ||
}; | ||
|
||
for (size_t i = 0; i < max_single_read_retries; ++i) | ||
{ | ||
try | ||
{ | ||
bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes); | ||
if (read_settings.remote_throttler) | ||
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds); | ||
break; | ||
} | ||
catch (const Azure::Core::RequestFailedException & e) | ||
{ | ||
handle_exception(e, i); | ||
} | ||
} | ||
bytes_read = readBytes(); | ||
|
||
if (bytes_read == 0) | ||
return false; | ||
|
@@ -195,17 +160,26 @@ off_t ReadBufferFromAzureBlobStorage::getPosition() | |
return offset - available(); | ||
} | ||
|
||
void ReadBufferFromAzureBlobStorage::initialize() | ||
size_t ReadBufferFromAzureBlobStorage::readBytes() | ||
{ | ||
if (initialized) | ||
return; | ||
|
||
Azure::Storage::Blobs::DownloadBlobOptions download_options; | ||
Azure::Storage::Blobs::DownloadBlobToOptions download_options; | ||
|
||
Azure::Nullable<int64_t> length {}; | ||
|
||
/// 0 means read full file | ||
size_t to_read_bytes = 0; | ||
if (read_until_position != 0) | ||
length = {static_cast<int64_t>(read_until_position - offset)}; | ||
{ | ||
to_read_bytes = read_until_position - offset; | ||
to_read_bytes = std::min(to_read_bytes, data_capacity); | ||
} | ||
|
||
auto file_size = getFileSize(); | ||
if (!to_read_bytes && (file_size > data_capacity)) | ||
to_read_bytes = std::min(file_size, data_capacity); | ||
|
||
if (to_read_bytes) | ||
length = {static_cast<int64_t>(to_read_bytes)}; | ||
download_options.Range = {static_cast<int64_t>(offset), length}; | ||
|
||
if (!blob_client) | ||
|
@@ -223,12 +197,18 @@ void ReadBufferFromAzureBlobStorage::initialize() | |
sleep_time_with_backoff_milliseconds *= 2; | ||
}; | ||
|
||
long read_bytes = 0; | ||
|
||
for (size_t i = 0; i < max_single_download_retries; ++i) | ||
{ | ||
try | ||
{ | ||
auto download_response = blob_client->Download(download_options); | ||
data_stream = std::move(download_response.Value.BodyStream); | ||
auto download_response = blob_client->DownloadTo(reinterpret_cast<uint8_t *>(data_ptr), data_capacity, download_options); | ||
read_bytes = download_response.Value.ContentRange.Length.Value(); | ||
|
||
if (read_settings.remote_throttler) | ||
read_settings.remote_throttler->add(read_bytes, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds); | ||
|
||
break; | ||
} | ||
catch (const Azure::Core::RequestFailedException & e) | ||
|
@@ -237,12 +217,10 @@ void ReadBufferFromAzureBlobStorage::initialize() | |
} | ||
} | ||
|
||
if (data_stream == nullptr) | ||
throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Null data stream obtained while downloading file {} from Blob Storage", path); | ||
|
||
total_size = data_stream->Length() + offset; | ||
total_size = read_bytes + offset; | ||
|
||
initialized = true; | ||
return read_bytes; | ||
} | ||
|
||
size_t ReadBufferFromAzureBlobStorage::getFileSize() | ||
|
@@ -268,12 +246,10 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran | |
size_t bytes_copied = 0; | ||
try | ||
{ | ||
Azure::Storage::Blobs::DownloadBlobOptions download_options; | ||
Azure::Storage::Blobs::DownloadBlobToOptions download_options; | ||
download_options.Range = {static_cast<int64_t>(range_begin), n}; | ||
auto download_response = blob_client->Download(download_options); | ||
|
||
std::unique_ptr<Azure::Core::IO::BodyStream> body_stream = std::move(download_response.Value.BodyStream); | ||
bytes_copied = body_stream->ReadToCount(reinterpret_cast<uint8_t *>(to), body_stream->Length()); | ||
auto download_response = blob_client->DownloadTo(reinterpret_cast<uint8_t *>(to), n, download_options); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it really give a performance improvement? Seems like just a different api (and this new way with |
||
bytes_copied = download_response.Value.ContentRange.Length.Value(); | ||
|
||
LOG_TEST(log, "AzureBlobStorage readBigAt read bytes {}", bytes_copied); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,6 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase | |
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_, | ||
const String & path_, | ||
const ReadSettings & read_settings_, | ||
size_t max_single_read_retries_, | ||
size_t max_single_download_retries_, | ||
bool use_external_buffer_ = false, | ||
bool restricted_seek_ = false, | ||
|
@@ -38,6 +37,7 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase | |
String getFileName() const override { return path; } | ||
|
||
void setReadUntilPosition(size_t position) override; | ||
|
||
void setReadUntilEnd() override; | ||
|
||
bool supportsRightBoundedReads() const override { return true; } | ||
|
@@ -50,14 +50,12 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase | |
|
||
private: | ||
|
||
void initialize(); | ||
size_t readBytes(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renamed this function, as it also reads data now. But we still need |
||
|
||
std::unique_ptr<Azure::Core::IO::BodyStream> data_stream; | ||
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client; | ||
std::unique_ptr<Azure::Storage::Blobs::BlobClient> blob_client; | ||
|
||
const String path; | ||
size_t max_single_read_retries; | ||
size_t max_single_download_retries; | ||
ReadSettings read_settings; | ||
std::vector<char> tmp_buffer; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will not improve performance but will decrease it instead because it will make a GET request for each call of
nextImpl
instead of one request ininitialize
method, as I understand the azure sdk.DownloadTo
looks more suitable for reading small blobs that can be read with one call.