Skip to content

Commit

Permalink
Merge pull request #60073 from ClickHouse/cherrypick/24.1/0f9cf007406…
Browse files Browse the repository at this point in the history
…a1bd3516e864290903bcfe5dc2c04

Cherry pick #59630 to 24.1: Fix error "Read beyond last offset" for AsynchronousBoundedReadBuffer
  • Loading branch information
robot-ch-test-poll2 committed Feb 16, 2024
2 parents 324f9f4 + 0f9cf00 commit 5dcb148
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 10 deletions.
33 changes: 24 additions & 9 deletions src/Disks/IO/AsynchronousBoundedReadBuffer.cpp
Expand Up @@ -69,12 +69,10 @@ bool AsynchronousBoundedReadBuffer::hasPendingDataToRead()
return false;

if (file_offset_of_buffer_end > *read_until_position)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
}
"Read beyond last offset ({} > {}): file size = {}, info: {}",
file_offset_of_buffer_end, *read_until_position, impl->getFileSize(), impl->getInfoForLog());
}

return true;
Expand Down Expand Up @@ -126,14 +124,15 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
if (position < file_offset_of_buffer_end)
{
/// file has been read beyond new read until position already
if (working_buffer.size() >= file_offset_of_buffer_end - position)
if (available() >= file_offset_of_buffer_end - position)
{
/// new read until position is inside working buffer
/// new read until position is after the current position in the working buffer
file_offset_of_buffer_end = position;
working_buffer.resize(working_buffer.size() - (file_offset_of_buffer_end - position));
}
else
{
/// new read until position is before working buffer begin
/// new read until position is before the current position in the working buffer
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to set read until position before already read data ({} > {}, info: {})",
Expand Down Expand Up @@ -186,6 +185,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
return false;

chassert(file_offset_of_buffer_end <= impl->getFileSize());
size_t old_file_offset_of_buffer_end = file_offset_of_buffer_end;

IAsynchronousReader::Result result;
if (prefetch_future.valid())
Expand Down Expand Up @@ -221,6 +221,9 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedBytes, result.size);
}

bytes_to_ignore = 0;
resetWorkingBuffer();

size_t bytes_read = result.size - result.offset;
if (bytes_read)
{
Expand All @@ -231,14 +234,26 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
}

file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
bytes_to_ignore = 0;

/// In case of multiple files for the same file in clickhouse (i.e. log family)
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
chassert(file_offset_of_buffer_end <= impl->getFileSize());

return bytes_read;
if (read_until_position && (file_offset_of_buffer_end > *read_until_position))
{
size_t excessive_bytes_read = file_offset_of_buffer_end - *read_until_position;

if (excessive_bytes_read > working_buffer.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"File offset moved too far: old_file_offset = {}, new_file_offset = {}, read_until_position = {}, bytes_read = {}",
old_file_offset_of_buffer_end, file_offset_of_buffer_end, *read_until_position, bytes_read);

working_buffer.resize(working_buffer.size() - excessive_bytes_read);
file_offset_of_buffer_end = *read_until_position;
}

return !working_buffer.empty();
}


Expand Down
1 change: 0 additions & 1 deletion src/Disks/IO/AsynchronousBoundedReadBuffer.h
Expand Up @@ -95,7 +95,6 @@ class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase
IAsynchronousReader::Result readSync(char * data, size_t size);

void resetPrefetch(FilesystemPrefetchState state);

};

}
82 changes: 82 additions & 0 deletions src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp
@@ -0,0 +1,82 @@
#include <gtest/gtest.h>

#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <Poco/TemporaryFile.h>
#include <filesystem>


using namespace DB;
namespace fs = std::filesystem;

class AsynchronousBoundedReadBufferTest : public ::testing::TestWithParam<const char *>
{
public:
AsynchronousBoundedReadBufferTest() { fs::create_directories(temp_folder.path()); }

String makeTempFile(const String & contents)
{
String path = fmt::format("{}/{}", temp_folder.path(), counter);
++counter;

WriteBufferFromFile out{path};
out.write(contents.data(), contents.size());
out.finalize();

return path;
}

private:
Poco::TemporaryFile temp_folder;
size_t counter = 0;
};

String getAlphabetWithDigits()
{
String contents;
for (char c = 'a'; c <= 'z'; ++c)
contents += c;
for (char c = '0'; c <= '9'; ++c)
contents += c;
return contents;
}


TEST_F(AsynchronousBoundedReadBufferTest, setReadUntilPosition)
{
String file_path = makeTempFile(getAlphabetWithDigits());
ThreadPoolRemoteFSReader remote_fs_reader(4, 0);

for (bool with_prefetch : {false, true})
{
AsynchronousBoundedReadBuffer read_buffer(createReadBufferFromFileBase(file_path, {}), remote_fs_reader, {});
read_buffer.setReadUntilPosition(20);

auto try_read = [&](size_t count)
{
if (with_prefetch)
read_buffer.prefetch(Priority{0});

String str;
str.resize(count);
str.resize(read_buffer.read(str.data(), str.size()));
return str;
};

EXPECT_EQ(try_read(15), "abcdefghijklmno");
EXPECT_EQ(try_read(15), "pqrst");
EXPECT_EQ(try_read(15), "");

read_buffer.setReadUntilPosition(25);

EXPECT_EQ(try_read(15), "uvwxy");
EXPECT_EQ(try_read(15), "");

read_buffer.setReadUntilEnd();

EXPECT_EQ(try_read(15), "z0123456789");
EXPECT_EQ(try_read(15), "");
}
}

0 comments on commit 5dcb148

Please sign in to comment.