Skip to content

Commit

Permalink
Fix AsynchronousReadIndirectBufferFromRemoteFS breaking on short seeks
Browse files Browse the repository at this point in the history
  • Loading branch information
al13n321 committed May 7, 2023
1 parent 920b42f commit 1066e83
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 13 deletions.
28 changes: 16 additions & 12 deletions src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp
Expand Up @@ -116,12 +116,7 @@ std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemot
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = base_priority + priority;

if (bytes_to_ignore)
{
request.ignore = bytes_to_ignore;
bytes_to_ignore = 0;
}
request.ignore = bytes_to_ignore;
return reader.submit(request);
}

Expand Down Expand Up @@ -163,8 +158,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos

void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd()
{
read_until_position = impl->getFileSize();
impl->setReadUntilPosition(*read_until_position);
setReadUntilPosition(impl->getFileSize());
}


Expand Down Expand Up @@ -226,12 +220,13 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()

chassert(memory.size() == read_settings.prefetch_buffer_size || memory.size() == read_settings.remote_fs_buffer_size);
std::tie(size, offset) = impl->readInto(memory.data(), memory.size(), file_offset_of_buffer_end, bytes_to_ignore);
bytes_to_ignore = 0;

ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedBytes, size);
}

bytes_to_ignore = 0;

chassert(size >= offset);

size_t bytes_read = size - offset;
Expand Down Expand Up @@ -267,21 +262,23 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else if (whence == SEEK_CUR)
{
new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset;
new_pos = static_cast<size_t>(getPosition()) + offset;
}
else
{
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence");
}

/// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
if (new_pos == static_cast<size_t>(getPosition()))
return new_pos;

bool read_from_prefetch = false;
while (true)
{
if (file_offset_of_buffer_end - working_buffer.size() <= new_pos && new_pos <= file_offset_of_buffer_end)
/// The first condition implies bytes_to_ignore = 0.
if (!working_buffer.empty() && file_offset_of_buffer_end - working_buffer.size() <= new_pos &&
new_pos <= file_offset_of_buffer_end)
{
/// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
Expand Down Expand Up @@ -318,6 +315,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)

/// First reset the buffer so the next read will fetch new data to the buffer.
resetWorkingBuffer();
bytes_to_ignore = 0;

if (read_until_position && new_pos > *read_until_position)
{
Expand Down Expand Up @@ -354,6 +352,12 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}


off_t AsynchronousReadIndirectBufferFromRemoteFS::getPosition()
{
return file_offset_of_buffer_end - available() + bytes_to_ignore;
}


void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
{
resetPrefetch(FilesystemPrefetchState::UNNEEDED);
Expand Down
4 changes: 3 additions & 1 deletion src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h
Expand Up @@ -40,7 +40,7 @@ class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase

off_t seek(off_t offset_, int whence) override;

off_t getPosition() override { return file_offset_of_buffer_end - available(); }
off_t getPosition() override;

String getFileName() const override;

Expand Down Expand Up @@ -89,6 +89,8 @@ class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase

std::string current_reader_id;

/// If nonzero then working_buffer is empty.
/// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
size_t bytes_to_ignore = 0;

std::optional<size_t> read_until_position;
Expand Down
1 change: 1 addition & 0 deletions tests/queries/0_stateless/02731_parquet_s3.reference
@@ -0,0 +1 @@
12639441726720293784
7 changes: 7 additions & 0 deletions tests/queries/0_stateless/02731_parquet_s3.sql
@@ -0,0 +1,7 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS

-- Reading from s3 a parquet file of size between ~1 MB and ~2 MB was broken at some point.
insert into function s3(s3_conn, filename='test_02731_parquet_s3.parquet') select cityHash64(number) from numbers(170000) settings s3_truncate_on_insert=1;

select sum(*) from s3(s3_conn, filename='test_02731_parquet_s3.parquet') settings remote_filesystem_read_method='threadpool', remote_filesystem_read_prefetch=1;

0 comments on commit 1066e83

Please sign in to comment.