Skip to content

Commit

Permalink
Fix AsynchronousReadIndirectBufferFromRemoteFS breaking on short seeks
Browse files Browse the repository at this point in the history
  • Loading branch information
al13n321 committed May 5, 2023
1 parent bbd6be8 commit d40b0ea
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
25 changes: 20 additions & 5 deletions src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp
Expand Up @@ -137,6 +137,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch(int64_t priority)

last_prefetch_info.submit_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
last_prefetch_info.priority = priority;
last_prefetch_info.bytes_ignored = bytes_to_ignore;

/// Prefetch even in case hasPendingData() == true.
chassert(prefetch_buffer.size() == read_settings.prefetch_buffer_size || prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
Expand All @@ -163,8 +164,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos

void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd()
{
read_until_position = impl->getFileSize();
impl->setReadUntilPosition(*read_until_position);
setReadUntilPosition(impl->getFileSize());
}


Expand Down Expand Up @@ -267,21 +267,23 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else if (whence == SEEK_CUR)
{
new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset;
new_pos = static_cast<size_t>(getPosition()) + offset;
}
else
{
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence");
}

/// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
if (new_pos == static_cast<size_t>(getPosition()))
return new_pos;

bool read_from_prefetch = false;
while (true)
{
if (file_offset_of_buffer_end - working_buffer.size() <= new_pos && new_pos <= file_offset_of_buffer_end)
/// The first condition implies bytes_to_ignore = bytes_ignored = 0.
if (!working_buffer.empty() && file_offset_of_buffer_end - working_buffer.size() <= new_pos &&
new_pos <= file_offset_of_buffer_end)
{
/// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
Expand Down Expand Up @@ -318,6 +320,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)

/// First reset the buffer so the next read will fetch new data to the buffer.
resetWorkingBuffer();
bytes_to_ignore = 0;

if (read_until_position && new_pos > *read_until_position)
{
Expand Down Expand Up @@ -354,6 +357,15 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}


off_t AsynchronousReadIndirectBufferFromRemoteFS::getPosition()
{
off_t res = static_cast<off_t>(file_offset_of_buffer_end - available() + bytes_to_ignore);
if (prefetch_future.valid())
res += static_cast<off_t>(last_prefetch_info.bytes_ignored);
return res;
}


void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
{
resetPrefetch(FilesystemPrefetchState::UNNEEDED);
Expand All @@ -377,6 +389,9 @@ void AsynchronousReadIndirectBufferFromRemoteFS::resetPrefetch(FilesystemPrefetc
if (!prefetch_future.valid())
return;

if (last_prefetch_info.bytes_ignored != 0)
bytes_to_ignore = last_prefetch_info.bytes_ignored;

auto [size, offset, _] = prefetch_future.get();
prefetch_future = {};
last_prefetch_info = {};
Expand Down
5 changes: 4 additions & 1 deletion src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h
Expand Up @@ -40,7 +40,7 @@ class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase

off_t seek(off_t offset_, int whence) override;

off_t getPosition() override { return file_offset_of_buffer_end - available(); }
off_t getPosition() override;

String getFileName() const override;

Expand Down Expand Up @@ -89,6 +89,7 @@ class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase

std::string current_reader_id;

/// If nonzero then working_buffer is empty and prefetch_future is invalid.
size_t bytes_to_ignore = 0;

std::optional<size_t> read_until_position;
Expand All @@ -99,6 +100,8 @@ class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{
UInt64 submit_time = 0;
size_t priority = 0;
/// If nonzero then working_buffer is empty and bytes_to_ignore = 0.
size_t bytes_ignored = 0;
};
LastPrefetchInfo last_prefetch_info;
};
Expand Down
1 change: 1 addition & 0 deletions tests/queries/0_stateless/02731_parquet_s3.reference
@@ -0,0 +1 @@
12639441726720293784
7 changes: 7 additions & 0 deletions tests/queries/0_stateless/02731_parquet_s3.sql
@@ -0,0 +1,7 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS

-- Reading from s3 a parquet file of size between ~1 MB and ~2 MB was broken at some point.
insert into function s3(s3_conn, filename='test_02731_parquet_s3.parquet') select cityHash64(number) from numbers(170000) settings s3_truncate_on_insert=1;

select sum(*) from s3(s3_conn, filename='test_02731_parquet_s3.parquet') settings remote_filesystem_read_method='threadpool', remote_filesystem_read_prefetch=1;

0 comments on commit d40b0ea

Please sign in to comment.