Skip to content

Commit

Permalink
Backport #59630 to 23.8: Fix error "Read beyond last offset" for Asyn…
Browse files Browse the repository at this point in the history
…chronousBoundedReadBuffer
  • Loading branch information
robot-clickhouse committed Feb 19, 2024
1 parent 7a0621a commit 115b4e6
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 10 deletions.
31 changes: 22 additions & 9 deletions src/Disks/IO/AsynchronousBoundedReadBuffer.cpp
Expand Up @@ -70,12 +70,10 @@ bool AsynchronousBoundedReadBuffer::hasPendingDataToRead()
return false;

if (file_offset_of_buffer_end > *read_until_position)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
}
"Read beyond last offset ({} > {}): file size = {}, info: {}",
file_offset_of_buffer_end, *read_until_position, impl->getFileSize(), impl->getInfoForLog());
}

return true;
Expand Down Expand Up @@ -117,14 +115,15 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
if (position < file_offset_of_buffer_end)
{
/// file has been read beyond new read until position already
if (working_buffer.size() >= file_offset_of_buffer_end - position)
if (available() >= file_offset_of_buffer_end - position)
{
/// new read until position is inside working buffer
/// new read until position is after the current position in the working buffer
file_offset_of_buffer_end = position;
working_buffer.resize(working_buffer.size() - (file_offset_of_buffer_end - position));
}
else
{
/// new read until position is before working buffer begin
/// new read until position is before the current position in the working buffer
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to set read until position before already read data ({} > {}, info: {})",
Expand Down Expand Up @@ -177,6 +176,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
return false;

chassert(file_offset_of_buffer_end <= impl->getFileSize());
size_t old_file_offset_of_buffer_end = file_offset_of_buffer_end;

size_t size, offset;
if (prefetch_future.valid())
Expand Down Expand Up @@ -212,9 +212,9 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
}

bytes_to_ignore = 0;
resetWorkingBuffer();

chassert(size >= offset);

size_t bytes_read = size - offset;
if (bytes_read)
{
Expand All @@ -232,7 +232,20 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
chassert(file_offset_of_buffer_end >= impl->getFileOffsetOfBufferEnd());
chassert(file_offset_of_buffer_end <= impl->getFileSize());

return bytes_read;
if (read_until_position && (file_offset_of_buffer_end > *read_until_position))
{
size_t excessive_bytes_read = file_offset_of_buffer_end - *read_until_position;

if (excessive_bytes_read > working_buffer.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"File offset moved too far: old_file_offset = {}, new_file_offset = {}, read_until_position = {}, bytes_read = {}",
old_file_offset_of_buffer_end, file_offset_of_buffer_end, *read_until_position, bytes_read);

working_buffer.resize(working_buffer.size() - excessive_bytes_read);
file_offset_of_buffer_end = *read_until_position;
}

return !working_buffer.empty();
}


Expand Down
1 change: 0 additions & 1 deletion src/Disks/IO/AsynchronousBoundedReadBuffer.h
Expand Up @@ -93,7 +93,6 @@ class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size, Priority priority);

void resetPrefetch(FilesystemPrefetchState state);

};

}
82 changes: 82 additions & 0 deletions src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp
@@ -0,0 +1,82 @@
#include <gtest/gtest.h>

#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <Poco/TemporaryFile.h>
#include <filesystem>


using namespace DB;
namespace fs = std::filesystem;

class AsynchronousBoundedReadBufferTest : public ::testing::TestWithParam<const char *>
{
public:
AsynchronousBoundedReadBufferTest() { fs::create_directories(temp_folder.path()); }

String makeTempFile(const String & contents)
{
String path = fmt::format("{}/{}", temp_folder.path(), counter);
++counter;

WriteBufferFromFile out{path};
out.write(contents.data(), contents.size());
out.finalize();

return path;
}

private:
Poco::TemporaryFile temp_folder;
size_t counter = 0;
};

String getAlphabetWithDigits()
{
String contents;
for (char c = 'a'; c <= 'z'; ++c)
contents += c;
for (char c = '0'; c <= '9'; ++c)
contents += c;
return contents;
}


TEST_F(AsynchronousBoundedReadBufferTest, setReadUntilPosition)
{
String file_path = makeTempFile(getAlphabetWithDigits());
ThreadPoolRemoteFSReader remote_fs_reader(4, 0);

for (bool with_prefetch : {false, true})
{
AsynchronousBoundedReadBuffer read_buffer(createReadBufferFromFileBase(file_path, {}), remote_fs_reader, {});
read_buffer.setReadUntilPosition(20);

auto try_read = [&](size_t count)
{
if (with_prefetch)
read_buffer.prefetch(Priority{0});

String str;
str.resize(count);
str.resize(read_buffer.read(str.data(), str.size()));
return str;
};

EXPECT_EQ(try_read(15), "abcdefghijklmno");
EXPECT_EQ(try_read(15), "pqrst");
EXPECT_EQ(try_read(15), "");

read_buffer.setReadUntilPosition(25);

EXPECT_EQ(try_read(15), "uvwxy");
EXPECT_EQ(try_read(15), "");

read_buffer.setReadUntilEnd();

EXPECT_EQ(try_read(15), "z0123456789");
EXPECT_EQ(try_read(15), "");
}
}

0 comments on commit 115b4e6

Please sign in to comment.