-
Notifications
You must be signed in to change notification settings - Fork 6.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update ReadBuffer for AzureBlobStorage #61884
Changes from all commits
5c6f1ca
35cc40b
e5cbc9f
67bbf88
32ee02a
f8e75a0
689d0ae
ccb055c
5c75f90
54e86b3
2dfe6b2
fa4d205
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,23 +23,20 @@ namespace ErrorCodes | |
{ | ||
extern const int CANNOT_SEEK_THROUGH_FILE; | ||
extern const int SEEK_POSITION_OUT_OF_BOUND; | ||
extern const int RECEIVED_EMPTY_DATA; | ||
extern const int LOGICAL_ERROR; | ||
} | ||
|
||
ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage( | ||
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_, | ||
const String & path_, | ||
const ReadSettings & read_settings_, | ||
size_t max_single_read_retries_, | ||
size_t max_single_download_retries_, | ||
bool use_external_buffer_, | ||
bool restricted_seek_, | ||
size_t read_until_position_) | ||
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size, nullptr, 0) | ||
, blob_container_client(blob_container_client_) | ||
, path(path_) | ||
, max_single_read_retries(max_single_read_retries_) | ||
, max_single_download_retries(max_single_download_retries_) | ||
, read_settings(read_settings_) | ||
, tmp_buffer_size(read_settings.remote_fs_buffer_size) | ||
|
@@ -73,11 +70,14 @@ void ReadBufferFromAzureBlobStorage::setReadUntilEnd() | |
void ReadBufferFromAzureBlobStorage::setReadUntilPosition(size_t position) | ||
{ | ||
read_until_position = position; | ||
offset = getPosition(); | ||
resetWorkingBuffer(); | ||
initialized = false; | ||
} | ||
|
||
bool ReadBufferFromAzureBlobStorage::nextImpl() | ||
{ | ||
size_t bytes_read = 0; | ||
if (read_until_position) | ||
{ | ||
if (read_until_position == offset) | ||
|
@@ -87,46 +87,16 @@ bool ReadBufferFromAzureBlobStorage::nextImpl() | |
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1); | ||
} | ||
|
||
if (!initialized) | ||
initialize(); | ||
|
||
if (use_external_buffer) | ||
{ | ||
data_ptr = internal_buffer.begin(); | ||
data_capacity = internal_buffer.size(); | ||
} | ||
|
||
size_t to_read_bytes = std::min(static_cast<size_t>(total_size - offset), data_capacity); | ||
size_t bytes_read = 0; | ||
|
||
size_t sleep_time_with_backoff_milliseconds = 100; | ||
if (static_cast<size_t>(offset) >= getFileSize()) | ||
return false; | ||
|
||
auto handle_exception = [&, this](const auto & e, size_t i) | ||
{ | ||
LOG_DEBUG(log, "Exception caught during Azure Read for file {} at attempt {}/{}: {}", path, i + 1, max_single_read_retries, e.Message); | ||
if (i + 1 == max_single_read_retries) | ||
throw; | ||
|
||
sleepForMilliseconds(sleep_time_with_backoff_milliseconds); | ||
sleep_time_with_backoff_milliseconds *= 2; | ||
initialized = false; | ||
initialize(); | ||
}; | ||
|
||
for (size_t i = 0; i < max_single_read_retries; ++i) | ||
{ | ||
try | ||
{ | ||
bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes); | ||
if (read_settings.remote_throttler) | ||
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds); | ||
break; | ||
} | ||
catch (const Azure::Core::RequestFailedException & e) | ||
{ | ||
handle_exception(e, i); | ||
} | ||
} | ||
bytes_read = readBytes(); | ||
|
||
if (bytes_read == 0) | ||
return false; | ||
|
@@ -195,18 +165,28 @@ off_t ReadBufferFromAzureBlobStorage::getPosition() | |
return offset - available(); | ||
} | ||
|
||
void ReadBufferFromAzureBlobStorage::initialize() | ||
size_t ReadBufferFromAzureBlobStorage::readBytes() | ||
{ | ||
if (initialized) | ||
return; | ||
|
||
Azure::Storage::Blobs::DownloadBlobOptions download_options; | ||
Azure::Storage::Blobs::DownloadBlobToOptions download_options; | ||
|
||
Azure::Nullable<int64_t> length {}; | ||
|
||
/// 0 means read full file | ||
int64_t to_read_bytes = 0; | ||
if (read_until_position != 0) | ||
length = {static_cast<int64_t>(read_until_position - offset)}; | ||
{ | ||
to_read_bytes = read_until_position - offset; | ||
to_read_bytes = std::min(to_read_bytes, static_cast<int64_t>(data_capacity)); | ||
} | ||
|
||
if (!to_read_bytes && (getFileSize() > data_capacity)) | ||
to_read_bytes = data_capacity; | ||
|
||
if (to_read_bytes) | ||
length = {static_cast<int64_t>(to_read_bytes)}; | ||
download_options.Range = {static_cast<int64_t>(offset), length}; | ||
LOG_INFO(log, "Read bytes range is offset = {}, read_until_position = {}, to_read_bytes = {}, data_capacity = {}, file_size = {}", offset, read_until_position, to_read_bytes, data_capacity, getFileSize()); | ||
|
||
|
||
if (!blob_client) | ||
blob_client = std::make_unique<Azure::Storage::Blobs::BlobClient>(blob_container_client->GetBlobClient(path)); | ||
|
@@ -223,12 +203,18 @@ void ReadBufferFromAzureBlobStorage::initialize() | |
sleep_time_with_backoff_milliseconds *= 2; | ||
}; | ||
|
||
long read_bytes = 0; | ||
|
||
for (size_t i = 0; i < max_single_download_retries; ++i) | ||
{ | ||
try | ||
{ | ||
auto download_response = blob_client->Download(download_options); | ||
data_stream = std::move(download_response.Value.BodyStream); | ||
auto download_response = blob_client->DownloadTo(reinterpret_cast<uint8_t *>(data_ptr), data_capacity, download_options); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it will not improve performance but will decrease it instead because it will make a GET request for each call of |
||
read_bytes = download_response.Value.ContentRange.Length.Value(); | ||
|
||
if (read_settings.remote_throttler) | ||
read_settings.remote_throttler->add(read_bytes, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds); | ||
|
||
break; | ||
} | ||
catch (const Azure::Core::RequestFailedException & e) | ||
|
@@ -237,12 +223,8 @@ void ReadBufferFromAzureBlobStorage::initialize() | |
} | ||
} | ||
|
||
if (data_stream == nullptr) | ||
throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Null data stream obtained while downloading file {} from Blob Storage", path); | ||
|
||
total_size = data_stream->Length() + offset; | ||
|
||
initialized = true; | ||
return read_bytes; | ||
} | ||
|
||
size_t ReadBufferFromAzureBlobStorage::getFileSize() | ||
|
@@ -268,12 +250,10 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran | |
size_t bytes_copied = 0; | ||
try | ||
{ | ||
Azure::Storage::Blobs::DownloadBlobOptions download_options; | ||
Azure::Storage::Blobs::DownloadBlobToOptions download_options; | ||
download_options.Range = {static_cast<int64_t>(range_begin), n}; | ||
auto download_response = blob_client->Download(download_options); | ||
|
||
std::unique_ptr<Azure::Core::IO::BodyStream> body_stream = std::move(download_response.Value.BodyStream); | ||
bytes_copied = body_stream->ReadToCount(reinterpret_cast<uint8_t *>(to), body_stream->Length()); | ||
auto download_response = blob_client->DownloadTo(reinterpret_cast<uint8_t *>(to), n, download_options); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it really give a performance improvement? Seems like just a different api (and this new way with |
||
bytes_copied = download_response.Value.ContentRange.Length.Value(); | ||
|
||
LOG_TEST(log, "AzureBlobStorage readBigAt read bytes {}", bytes_copied); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,6 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase | |
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_, | ||
const String & path_, | ||
const ReadSettings & read_settings_, | ||
size_t max_single_read_retries_, | ||
size_t max_single_download_retries_, | ||
bool use_external_buffer_ = false, | ||
bool restricted_seek_ = false, | ||
|
@@ -38,6 +37,7 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase | |
String getFileName() const override { return path; } | ||
|
||
void setReadUntilPosition(size_t position) override; | ||
|
||
void setReadUntilEnd() override; | ||
|
||
bool supportsRightBoundedReads() const override { return true; } | ||
|
@@ -50,14 +50,12 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase | |
|
||
private: | ||
|
||
void initialize(); | ||
size_t readBytes(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renamed this function, as it also reads data now. But we still need |
||
|
||
std::unique_ptr<Azure::Core::IO::BodyStream> data_stream; | ||
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client; | ||
std::unique_ptr<Azure::Storage::Blobs::BlobClient> blob_client; | ||
|
||
const String path; | ||
size_t max_single_read_retries; | ||
size_t max_single_download_retries; | ||
ReadSettings read_settings; | ||
std::vector<char> tmp_buffer; | ||
|
@@ -72,7 +70,6 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase | |
off_t read_until_position = 0; | ||
|
||
off_t offset = 0; | ||
size_t total_size; | ||
bool initialized = false; | ||
char * data_ptr; | ||
size_t data_capacity; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is a azure disk read it will be better not to execute additional request to azure just to get file size (getFileSize is only used for external table engines) as it is already known before we create the read buffer. So may be just pass file_size as optional argument in constructor of
ReadBufferFrpmAzureBlobStorage
?btw why do we need this check? There is no such check for other buffers AFAIK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For other buffers, I see we send the request to read the full intended length of the file.
Here we are using readBytes function to read bytes to a buffer and only read as much as its capacity (data_capacity). So every time we read we extend the offset at one point it will reach intended size or file_size .
In getFileSize() function we only read the value once, it is just returned for future calls.
Please let me know if you have any suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I saw that it is fetched only once, but if it is avoidable - better to avoid, we already have a file size when we create this read buffer, better to use it.
But isn't it the same as here? Here we have a buffer and try to read
data_capacity
size to it, but if we reach the end of file while reading into buffer - we will just read nothing and the buffer will be empty, so we returnfalse
as well. Or am I missing something?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DownloadTo function takes Range as a parameter (offset, length) if length is zero it reads full file, so when we reach end of file, the range will be (offset=file_size, length), even if length is zero we see error message that the "The range specified is invalid for the current size of the resource".
Previously when we used to Download & then ReadToBuffer, there we had a check to_read_bytes = min(total_size-offset, length), and we would just read 0 bytes & return false.
Hope that clarifies the need. Please suggest if you have better options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok understood. Also what if there are remaining N bytes to read from file and N is less that buffer_capacity, shouldn't it fail as well with incorrect range error? Seems currently code is not protected against this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is checked here : https://github.com/ClickHouse/ClickHouse/pull/61884/files/fa4d205ad943c7db334dc8788538d08b5cbdaa32#diff-3def5d979078691f30c3473edb5cadb9bdce981ec015f43c46588000f3d66281R175-R188
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But
read_until_position
is not always set. It is not set forremote_filesystem_read_method=read