diff --git a/src/Backups/BackupIO_AzureBlobStorage.cpp b/src/Backups/BackupIO_AzureBlobStorage.cpp index b9b208e321cf..af443c133a9c 100644 --- a/src/Backups/BackupIO_AzureBlobStorage.cpp +++ b/src/Backups/BackupIO_AzureBlobStorage.cpp @@ -89,7 +89,7 @@ std::unique_ptr BackupReaderAzureBlobStorage::readFile(const key = file_name; } return std::make_unique( - client, key, read_settings, settings->max_single_read_retries, + client, key, read_settings, settings->max_single_download_retries); } @@ -258,7 +258,7 @@ std::unique_ptr BackupWriterAzureBlobStorage::readFile(const String } return std::make_unique( - client, key, read_settings, settings->max_single_read_retries, + client, key, read_settings, settings->max_single_download_retries); } diff --git a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp index 5947b742339e..9cd17061722b 100644 --- a/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp +++ b/src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp @@ -23,7 +23,6 @@ namespace ErrorCodes { extern const int CANNOT_SEEK_THROUGH_FILE; extern const int SEEK_POSITION_OUT_OF_BOUND; - extern const int RECEIVED_EMPTY_DATA; extern const int LOGICAL_ERROR; } @@ -31,7 +30,6 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage( std::shared_ptr blob_container_client_, const String & path_, const ReadSettings & read_settings_, - size_t max_single_read_retries_, size_t max_single_download_retries_, bool use_external_buffer_, bool restricted_seek_, @@ -39,7 +37,6 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage( : ReadBufferFromFileBase(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size, nullptr, 0) , blob_container_client(blob_container_client_) , path(path_) - , max_single_read_retries(max_single_read_retries_) , max_single_download_retries(max_single_download_retries_) , read_settings(read_settings_) , tmp_buffer_size(read_settings.remote_fs_buffer_size) @@ -73,11 +70,14 @@ void ReadBufferFromAzureBlobStorage::setReadUntilEnd() void ReadBufferFromAzureBlobStorage::setReadUntilPosition(size_t position) { read_until_position = position; + offset = getPosition(); + resetWorkingBuffer(); initialized = false; } bool ReadBufferFromAzureBlobStorage::nextImpl() { + size_t bytes_read = 0; if (read_until_position) { if (read_until_position == offset) @@ -87,46 +87,16 @@ bool ReadBufferFromAzureBlobStorage::nextImpl() throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1); } - if (!initialized) - initialize(); - if (use_external_buffer) { data_ptr = internal_buffer.begin(); data_capacity = internal_buffer.size(); } - size_t to_read_bytes = std::min(static_cast(total_size - offset), data_capacity); - size_t bytes_read = 0; - - size_t sleep_time_with_backoff_milliseconds = 100; + if (static_cast(offset) >= getFileSize()) + return false; - auto handle_exception = [&, this](const auto & e, size_t i) - { - LOG_DEBUG(log, "Exception caught during Azure Read for file {} at attempt {}/{}: {}", path, i + 1, max_single_read_retries, e.Message); - if (i + 1 == max_single_read_retries) - throw; - - sleepForMilliseconds(sleep_time_with_backoff_milliseconds); - sleep_time_with_backoff_milliseconds *= 2; - initialized = false; - initialize(); - }; - - for (size_t i = 0; i < max_single_read_retries; ++i) - { - try - { - bytes_read = data_stream->ReadToCount(reinterpret_cast(data_ptr), to_read_bytes); - if (read_settings.remote_throttler) - read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds); - break; - } - catch (const Azure::Core::RequestFailedException & e) - { - handle_exception(e, i); - } - } + bytes_read = readBytes(); if (bytes_read == 0) return false; @@ -195,18 +165,28 @@ off_t ReadBufferFromAzureBlobStorage::getPosition() return offset - available(); } -void ReadBufferFromAzureBlobStorage::initialize() +size_t ReadBufferFromAzureBlobStorage::readBytes() { - if (initialized) - return; - - Azure::Storage::Blobs::DownloadBlobOptions download_options; + Azure::Storage::Blobs::DownloadBlobToOptions download_options; Azure::Nullable length {}; + + /// 0 means read full file + int64_t to_read_bytes = 0; if (read_until_position != 0) - length = {static_cast(read_until_position - offset)}; + { + to_read_bytes = read_until_position - offset; + to_read_bytes = std::min(to_read_bytes, static_cast(data_capacity)); + } + if (!to_read_bytes && (getFileSize() > data_capacity)) + to_read_bytes = data_capacity; + + if (to_read_bytes) + length = {static_cast(to_read_bytes)}; download_options.Range = {static_cast(offset), length}; + LOG_INFO(log, "Read bytes range is offset = {}, read_until_position = {}, to_read_bytes = {}, data_capacity = {}, file_size = {}", offset, read_until_position, to_read_bytes, data_capacity, getFileSize()); + if (!blob_client) blob_client = std::make_unique(blob_container_client->GetBlobClient(path)); @@ -223,12 +203,18 @@ void ReadBufferFromAzureBlobStorage::initialize() sleep_time_with_backoff_milliseconds *= 2; }; + long read_bytes = 0; + for (size_t i = 0; i < max_single_download_retries; ++i) { try { - auto download_response = blob_client->Download(download_options); - data_stream = std::move(download_response.Value.BodyStream); + auto download_response = blob_client->DownloadTo(reinterpret_cast(data_ptr), data_capacity, download_options); + read_bytes = download_response.Value.ContentRange.Length.Value(); + + if (read_settings.remote_throttler) + read_settings.remote_throttler->add(read_bytes, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds); + break; } catch (const Azure::Core::RequestFailedException & e) @@ -237,12 +223,8 @@ void ReadBufferFromAzureBlobStorage::initialize() } } - if (data_stream == nullptr) - throw Exception(ErrorCodes::RECEIVED_EMPTY_DATA, "Null data stream obtained while downloading file {} from Blob Storage", path); - - total_size = data_stream->Length() + offset; - initialized = true; + return read_bytes; } size_t ReadBufferFromAzureBlobStorage::getFileSize() @@ -268,12 +250,10 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran size_t bytes_copied = 0; try { - Azure::Storage::Blobs::DownloadBlobOptions download_options; + Azure::Storage::Blobs::DownloadBlobToOptions download_options; download_options.Range = {static_cast(range_begin), n}; - auto download_response = blob_client->Download(download_options); - - std::unique_ptr body_stream = std::move(download_response.Value.BodyStream); - bytes_copied = body_stream->ReadToCount(reinterpret_cast(to), body_stream->Length()); + auto download_response = blob_client->DownloadTo(reinterpret_cast(to), n, download_options); + bytes_copied = download_response.Value.ContentRange.Length.Value(); LOG_TEST(log, "AzureBlobStorage readBigAt read bytes {}", bytes_copied); diff --git a/src/Disks/IO/ReadBufferFromAzureBlobStorage.h b/src/Disks/IO/ReadBufferFromAzureBlobStorage.h index d328195cc26e..04b022ecbbea 100644 --- a/src/Disks/IO/ReadBufferFromAzureBlobStorage.h +++ b/src/Disks/IO/ReadBufferFromAzureBlobStorage.h @@ -21,7 +21,6 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase std::shared_ptr blob_container_client_, const String & path_, const ReadSettings & read_settings_, - size_t max_single_read_retries_, size_t max_single_download_retries_, bool use_external_buffer_ = false, bool restricted_seek_ = false, @@ -38,6 +37,7 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase String getFileName() const override { return path; } void setReadUntilPosition(size_t position) override; + void setReadUntilEnd() override; bool supportsRightBoundedReads() const override { return true; } @@ -50,14 +50,12 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase private: - void initialize(); + size_t readBytes(); - std::unique_ptr data_stream; std::shared_ptr blob_container_client; std::unique_ptr blob_client; const String path; - size_t max_single_read_retries; size_t max_single_download_retries; ReadSettings read_settings; std::vector tmp_buffer; @@ -72,7 +70,6 @@ class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase off_t read_until_position = 0; off_t offset = 0; - size_t total_size; bool initialized = false; char * data_ptr; size_t data_capacity; diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp index b5c2bfab8b9c..f8d42f8e1002 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp @@ -190,7 +190,7 @@ std::unique_ptr AzureObjectStorage::readObject( /// NOLI auto settings_ptr = settings.get(); return std::make_unique( - client.get(), object.remote_path, patchSettings(read_settings), settings_ptr->max_single_read_retries, + client.get(), object.remote_path, patchSettings(read_settings), settings_ptr->max_single_download_retries); } @@ -212,7 +212,6 @@ std::unique_ptr AzureObjectStorage::readObjects( /// NOL client.get(), path, disk_read_settings, - settings_ptr->max_single_read_retries, settings_ptr->max_single_download_retries, /* use_external_buffer */true, restricted_seek); diff --git a/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp b/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp index 4714c7959278..fe07327c732f 100644 --- a/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp +++ b/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp @@ -326,7 +326,7 @@ void copyAzureBlobStorageFile( LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob); auto create_read_buffer = [&] { - return std::make_unique(src_client, src_blob, read_settings, settings->max_single_read_retries, + return std::make_unique(src_client, src_blob, read_settings, settings->max_single_download_retries); };