From eb3836ac5a8c38f5d68c59c692177a49abb17a2a Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Tue, 6 Feb 2024 11:16:08 +0100 Subject: [PATCH 1/3] Fix error "Read beyond last offset" for AsynchronousBoundedReadBuffer. --- .../IO/AsynchronousBoundedReadBuffer.cpp | 43 ++++++++++++++----- src/Disks/IO/AsynchronousBoundedReadBuffer.h | 3 ++ 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp b/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp index bd19540bf446..236ea486d36a 100644 --- a/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp +++ b/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp @@ -69,12 +69,7 @@ bool AsynchronousBoundedReadBuffer::hasPendingDataToRead() return false; if (file_offset_of_buffer_end > *read_until_position) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Read beyond last offset ({} > {}, info: {})", - file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog()); - } + throwReadBeyondLastOffset(); } return true; @@ -103,6 +98,18 @@ IAsynchronousReader::Result AsynchronousBoundedReadBuffer::readSync(char * data, return reader.execute(request); } +size_t AsynchronousBoundedReadBuffer::getBufferSizeForReading() const +{ + size_t buffer_size = chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()); + if (read_until_position) + { + if (file_offset_of_buffer_end > *read_until_position) + throwReadBeyondLastOffset(); + buffer_size = std::min(buffer_size, *read_until_position - file_offset_of_buffer_end); + } + return buffer_size; +} + void AsynchronousBoundedReadBuffer::prefetch(Priority priority) { if (prefetch_future.valid()) @@ -114,7 +121,7 @@ void AsynchronousBoundedReadBuffer::prefetch(Priority priority) last_prefetch_info.submit_time = std::chrono::system_clock::now(); last_prefetch_info.priority = priority; - prefetch_buffer.resize(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize())); + prefetch_buffer.resize(getBufferSizeForReading()); prefetch_future = readAsync(prefetch_buffer.data(), prefetch_buffer.size(), priority); ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches); } @@ -126,14 +133,15 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position) if (position < file_offset_of_buffer_end) { /// file has been read beyond new read until position already - if (working_buffer.size() >= file_offset_of_buffer_end - position) + if (available() >= file_offset_of_buffer_end - position) { - /// new read until position is inside working buffer + /// new read until position is after the current position in the working buffer file_offset_of_buffer_end = position; + working_buffer.resize(working_buffer.size() - (file_offset_of_buffer_end - position)); } else { - /// new read until position is before working buffer begin + /// new read until position is before the current position in the working buffer throw Exception( ErrorCodes::LOGICAL_ERROR, "Attempt to set read until position before already read data ({} > {}, info: {})", @@ -155,6 +163,16 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position) } } +void AsynchronousBoundedReadBuffer::throwReadBeyondLastOffset() const +{ + size_t file_size = impl->getFileSize(); + size_t read_end_position = read_until_position ? *read_until_position : file_size; + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Read beyond last offset ({} > {}): file size = {}, info: {}", + file_offset_of_buffer_end, read_end_position, file_size, impl->getInfoForLog()); +} + void AsynchronousBoundedReadBuffer::appendToPrefetchLog( FilesystemPrefetchState state, int64_t size, @@ -210,7 +228,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl() } else { - memory.resize(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize())); + memory.resize(getBufferSizeForReading()); { ProfileEventTimeIncrement watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds); @@ -238,6 +256,9 @@ bool AsynchronousBoundedReadBuffer::nextImpl() /// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()] chassert(file_offset_of_buffer_end <= impl->getFileSize()); + if (read_until_position && (file_offset_of_buffer_end > *read_until_position)) + throwReadBeyondLastOffset(); + return bytes_read; } diff --git a/src/Disks/IO/AsynchronousBoundedReadBuffer.h b/src/Disks/IO/AsynchronousBoundedReadBuffer.h index e5030f37b1d9..b945aed28f01 100644 --- a/src/Disks/IO/AsynchronousBoundedReadBuffer.h +++ b/src/Disks/IO/AsynchronousBoundedReadBuffer.h @@ -94,8 +94,11 @@ class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase IAsynchronousReader::Result readSync(char * data, size_t size); + size_t getBufferSizeForReading() const; + void resetPrefetch(FilesystemPrefetchState state); + [[noreturn]] void throwReadBeyondLastOffset() const; }; } From 8919e3b011796d837064a30400775497b5c6aeac Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Tue, 6 Feb 2024 18:28:26 +0100 Subject: [PATCH 2/3] Add test. --- ...gtest_asynchronous_bounded_read_buffer.cpp | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp diff --git a/src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp b/src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp new file mode 100644 index 000000000000..5fee295f53ac --- /dev/null +++ b/src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp @@ -0,0 +1,82 @@ +#include + +#include +#include +#include +#include +#include +#include + + +using namespace DB; +namespace fs = std::filesystem; + +class AsynchronousBoundedReadBufferTest : public ::testing::TestWithParam +{ +public: + AsynchronousBoundedReadBufferTest() { fs::create_directories(temp_folder.path()); } + + String makeTempFile(const String & contents) + { + String path = fmt::format("{}/{}", temp_folder.path(), counter); + ++counter; + + WriteBufferFromFile out{path}; + out.write(contents.data(), contents.size()); + out.finalize(); + + return path; + } + +private: + Poco::TemporaryFile temp_folder; + size_t counter = 0; +}; + +String getAlphabetWithDigits() +{ + String contents = ""; + for (char c = 'a'; c <= 'z'; ++c) + contents += c; + for (char c = '0'; c <= '9'; ++c) + contents += c; + return contents; +} + + +TEST_F(AsynchronousBoundedReadBufferTest, setReadUntilPosition) +{ + String file_path = makeTempFile(getAlphabetWithDigits()); + ThreadPoolRemoteFSReader remote_fs_reader(4, 0); + + for (bool with_prefetch : {false, true}) + { + AsynchronousBoundedReadBuffer read_buffer(createReadBufferFromFileBase(file_path, {}), remote_fs_reader, {}); + read_buffer.setReadUntilPosition(20); + + auto try_read = [&](size_t count) + { + if (with_prefetch) + read_buffer.prefetch(Priority{0}); + + String str; + str.resize(count); + str.resize(read_buffer.read(str.data(), str.size())); + return str; + }; + + EXPECT_EQ(try_read(15), "abcdefghijklmno"); + EXPECT_EQ(try_read(15), "pqrst"); + EXPECT_EQ(try_read(15), ""); + + read_buffer.setReadUntilPosition(25); + + EXPECT_EQ(try_read(15), "uvwxy"); + EXPECT_EQ(try_read(15), ""); + + read_buffer.setReadUntilEnd(); + + EXPECT_EQ(try_read(15), "z0123456789"); + EXPECT_EQ(try_read(15), ""); + } +} From d842c497e6f0de7bf5544367faae9f38d260b647 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Wed, 14 Feb 2024 18:41:25 +0100 Subject: [PATCH 3/3] Change code to pass test 02963_remote_read_small_buffer_size_bug. --- .../IO/AsynchronousBoundedReadBuffer.cpp | 50 ++++++++----------- src/Disks/IO/AsynchronousBoundedReadBuffer.h | 4 -- ...gtest_asynchronous_bounded_read_buffer.cpp | 2 +- 3 files changed, 23 insertions(+), 33 deletions(-) diff --git a/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp b/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp index 236ea486d36a..2373640704b0 100644 --- a/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp +++ b/src/Disks/IO/AsynchronousBoundedReadBuffer.cpp @@ -69,7 +69,10 @@ bool AsynchronousBoundedReadBuffer::hasPendingDataToRead() return false; if (file_offset_of_buffer_end > *read_until_position) - throwReadBeyondLastOffset(); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Read beyond last offset ({} > {}): file size = {}, info: {}", + file_offset_of_buffer_end, *read_until_position, impl->getFileSize(), impl->getInfoForLog()); } return true; @@ -98,18 +101,6 @@ IAsynchronousReader::Result AsynchronousBoundedReadBuffer::readSync(char * data, return reader.execute(request); } -size_t AsynchronousBoundedReadBuffer::getBufferSizeForReading() const -{ - size_t buffer_size = chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()); - if (read_until_position) - { - if (file_offset_of_buffer_end > *read_until_position) - throwReadBeyondLastOffset(); - buffer_size = std::min(buffer_size, *read_until_position - file_offset_of_buffer_end); - } - return buffer_size; -} - void AsynchronousBoundedReadBuffer::prefetch(Priority priority) { if (prefetch_future.valid()) @@ -121,7 +112,7 @@ void AsynchronousBoundedReadBuffer::prefetch(Priority priority) last_prefetch_info.submit_time = std::chrono::system_clock::now(); last_prefetch_info.priority = priority; - prefetch_buffer.resize(getBufferSizeForReading()); + prefetch_buffer.resize(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize())); prefetch_future = readAsync(prefetch_buffer.data(), prefetch_buffer.size(), priority); ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches); } @@ -163,16 +154,6 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position) } } -void AsynchronousBoundedReadBuffer::throwReadBeyondLastOffset() const -{ - size_t file_size = impl->getFileSize(); - size_t read_end_position = read_until_position ? *read_until_position : file_size; - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Read beyond last offset ({} > {}): file size = {}, info: {}", - file_offset_of_buffer_end, read_end_position, file_size, impl->getInfoForLog()); -} - void AsynchronousBoundedReadBuffer::appendToPrefetchLog( FilesystemPrefetchState state, int64_t size, @@ -204,6 +185,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl() return false; chassert(file_offset_of_buffer_end <= impl->getFileSize()); + size_t old_file_offset_of_buffer_end = file_offset_of_buffer_end; IAsynchronousReader::Result result; if (prefetch_future.valid()) @@ -228,7 +210,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl() } else { - memory.resize(getBufferSizeForReading()); + memory.resize(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize())); { ProfileEventTimeIncrement watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds); @@ -239,6 +221,9 @@ bool AsynchronousBoundedReadBuffer::nextImpl() ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedBytes, result.size); } + bytes_to_ignore = 0; + resetWorkingBuffer(); + size_t bytes_read = result.size - result.offset; if (bytes_read) { @@ -249,7 +234,6 @@ bool AsynchronousBoundedReadBuffer::nextImpl() } file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd(); - bytes_to_ignore = 0; /// In case of multiple files for the same file in clickhouse (i.e. log family) /// file_offset_of_buffer_end will not match getImplementationBufferOffset() @@ -257,9 +241,19 @@ bool AsynchronousBoundedReadBuffer::nextImpl() chassert(file_offset_of_buffer_end <= impl->getFileSize()); if (read_until_position && (file_offset_of_buffer_end > *read_until_position)) - throwReadBeyondLastOffset(); + { + size_t excessive_bytes_read = file_offset_of_buffer_end - *read_until_position; + + if (excessive_bytes_read > working_buffer.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "File offset moved too far: old_file_offset = {}, new_file_offset = {}, read_until_position = {}, bytes_read = {}", + old_file_offset_of_buffer_end, file_offset_of_buffer_end, *read_until_position, bytes_read); + + working_buffer.resize(working_buffer.size() - excessive_bytes_read); + file_offset_of_buffer_end = *read_until_position; + } - return bytes_read; + return !working_buffer.empty(); } diff --git a/src/Disks/IO/AsynchronousBoundedReadBuffer.h b/src/Disks/IO/AsynchronousBoundedReadBuffer.h index b945aed28f01..6dc76352aca4 100644 --- a/src/Disks/IO/AsynchronousBoundedReadBuffer.h +++ b/src/Disks/IO/AsynchronousBoundedReadBuffer.h @@ -94,11 +94,7 @@ class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase IAsynchronousReader::Result readSync(char * data, size_t size); - size_t getBufferSizeForReading() const; - void resetPrefetch(FilesystemPrefetchState state); - - [[noreturn]] void throwReadBeyondLastOffset() const; }; } diff --git a/src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp b/src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp index 5fee295f53ac..63a39fe39c71 100644 --- a/src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp +++ b/src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp @@ -35,7 +35,7 @@ class AsynchronousBoundedReadBufferTest : public ::testing::TestWithParam