Skip to content

Commit

Permalink
Revert unnecessary change of the size of the internal buffer in Async…
Browse files Browse the repository at this point in the history
…hronousBoundedReadBuffer.
  • Loading branch information
vitlibar committed Feb 9, 2024
1 parent 1ebede7 commit 124f1ee
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 37 deletions.
61 changes: 29 additions & 32 deletions src/Disks/IO/AsynchronousBoundedReadBuffer.cpp
Expand Up @@ -69,7 +69,12 @@ bool AsynchronousBoundedReadBuffer::hasPendingDataToRead()
return false;

if (file_offset_of_buffer_end > *read_until_position)
throwReadBeyondLastOffset();
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
}
}

return true;
Expand Down Expand Up @@ -98,19 +103,6 @@ IAsynchronousReader::Result AsynchronousBoundedReadBuffer::readSync(char * data,
return reader.execute(request);
}

size_t AsynchronousBoundedReadBuffer::getBufferSizeForReading() const
{
size_t read_size = chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize());
if (read_until_position)
{
if (file_offset_of_buffer_end > *read_until_position)
throwReadBeyondLastOffset();

read_size = std::min(read_size, *read_until_position - file_offset_of_buffer_end);
}
return read_size;
}

void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
{
if (prefetch_future.valid())
Expand All @@ -122,7 +114,7 @@ void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
last_prefetch_info.submit_time = std::chrono::system_clock::now();
last_prefetch_info.priority = priority;

prefetch_buffer.resize(getBufferSizeForReading());
prefetch_buffer.resize(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
prefetch_future = readAsync(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
Expand Down Expand Up @@ -164,14 +156,6 @@ void AsynchronousBoundedReadBuffer::setReadUntilPosition(size_t position)
}
}

void AsynchronousBoundedReadBuffer::throwReadBeyondLastOffset() const
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Read beyond last offset ({} > {}, info: {})",
file_offset_of_buffer_end, *read_until_position, impl->getInfoForLog());
}

void AsynchronousBoundedReadBuffer::appendToPrefetchLog(
FilesystemPrefetchState state,
int64_t size,
Expand Down Expand Up @@ -203,6 +187,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
return false;

chassert(file_offset_of_buffer_end <= impl->getFileSize());
chassert(!read_until_position || (file_offset_of_buffer_end <= *read_until_position));

IAsynchronousReader::Result result;
if (prefetch_future.valid())
Expand All @@ -227,7 +212,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
}
else
{
memory.resize(getBufferSizeForReading());
memory.resize(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));

{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
Expand All @@ -247,18 +232,30 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
pos = working_buffer.begin();
}

file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
bytes_to_ignore = 0;
chassert(file_offset_of_buffer_end + available() == impl->getFileOffsetOfBufferEnd());

/// In case of multiple files for the same file in clickhouse (i.e. log family)
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
chassert(file_offset_of_buffer_end <= impl->getFileSize());
if (read_until_position)
{
size_t remaining_size_to_read = *read_until_position - file_offset_of_buffer_end;
if (working_buffer.size() > remaining_size_to_read)
{
/// file: [______________________________]
/// working buffer: [_______________]
/// ^
/// read_until_position
/// ^
/// file_offset_of_buffer_end
working_buffer.resize(remaining_size_to_read);
}
}

if (read_until_position && (file_offset_of_buffer_end > *read_until_position))
throwReadBeyondLastOffset();
file_offset_of_buffer_end += available();

chassert(file_offset_of_buffer_end <= impl->getFileSize());
chassert(!read_until_position || (file_offset_of_buffer_end <= *read_until_position));

return bytes_read;
return available();
}


Expand Down
4 changes: 0 additions & 4 deletions src/Disks/IO/AsynchronousBoundedReadBuffer.h
Expand Up @@ -94,11 +94,7 @@ class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase

IAsynchronousReader::Result readSync(char * data, size_t size);

size_t getBufferSizeForReading() const;

void resetPrefetch(FilesystemPrefetchState state);

[[noreturn]] void throwReadBeyondLastOffset() const;
};

}
2 changes: 1 addition & 1 deletion src/Disks/tests/gtest_asynchronous_bounded_read_buffer.cpp
Expand Up @@ -35,7 +35,7 @@ class AsynchronousBoundedReadBufferTest : public ::testing::TestWithParam<const

String getAlphabetWithDigits()
{
String contents = "";
String contents;
for (char c = 'a'; c <= 'z'; ++c)
contents += c;
for (char c = '0'; c <= '9'; ++c)
Expand Down

0 comments on commit 124f1ee

Please sign in to comment.