Skip to content

Commit

Permalink
Merge pull request #34390 from ClickHouse/backport/21.12/34327
Browse files Browse the repository at this point in the history
Backport #34327 to 21.12: Try to fix rare bug in reading of empty arrays
  • Loading branch information
CurtizJ committed Feb 22, 2022
2 parents 412a3c8 + 8da20fb commit f60f99d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 3 deletions.
1 change: 0 additions & 1 deletion src/Compression/CompressedReadBufferFromFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
/// need to skip some bytes in decompressed data (seek happened before readBig call).
if (nextimpl_working_buffer_offset == 0 && size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
{

decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;
Expand Down
10 changes: 8 additions & 2 deletions src/IO/ReadBufferFromFileDescriptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ bool ReadBufferFromFileDescriptor::nextImpl()
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
working_buffer = internal_buffer;
working_buffer.resize(bytes_read);
buffer_is_dirty = false;
}
else
return false;
Expand Down Expand Up @@ -148,10 +149,10 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
}

/// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
if (!buffer_is_dirty && (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end))
return new_pos;

if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
if (!buffer_is_dirty && file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos <= file_offset_of_buffer_end)
{
/// Position is still inside the buffer.
Expand All @@ -175,6 +176,10 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
/// First put position at the end of the buffer so the next read will fetch new data to the buffer.
pos = working_buffer.end();

/// Mark buffer as dirty to disallow further seek optimizations, because fetching data to the buffer
/// is delayed to the next call of 'nextImpl', but it may be not called before next seek.
buffer_is_dirty = true;

/// In case of using 'pread' we just update the info about the next position in file.
/// In case of using 'read' we call 'lseek'.

Expand Down Expand Up @@ -225,6 +230,7 @@ void ReadBufferFromFileDescriptor::rewind()
working_buffer.resize(0);
pos = working_buffer.begin();
file_offset_of_buffer_end = 0;
buffer_is_dirty = true;
}


Expand Down
3 changes: 3 additions & 0 deletions src/IO/ReadBufferFromFileDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class ReadBufferFromFileDescriptor : public ReadBufferFromFileBase
private:
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
bool poll(size_t timeout_microseconds);

/// If it's true then we cannot assume on content of buffer to optimize seek calls.
bool buffer_is_dirty = true;
};


Expand Down
36 changes: 36 additions & 0 deletions src/IO/tests/gtest_seek_backwards.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include <gtest/gtest.h>
#include <Common/filesystemHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>

using namespace DB;

TEST(ReadBufferFromFile, seekBackwards)
{
static constexpr size_t N = 256;
static constexpr size_t BUF_SIZE = 64;

auto tmp_file = createTemporaryFile("/tmp/");

{
WriteBufferFromFile out(tmp_file->path());
for (size_t i = 0; i < N; ++i)
writeIntBinary(i, out);
}

ReadBufferFromFile in(tmp_file->path(), BUF_SIZE);
size_t x;

/// Read something to initialize the buffer.
in.seek(BUF_SIZE * 10, SEEK_SET);
readIntBinary(x, in);

/// Check 2 consecutive seek calls without reading.
in.seek(BUF_SIZE * 2, SEEK_SET);
in.seek(BUF_SIZE, SEEK_SET);

readIntBinary(x, in);
ASSERT_EQ(x, 8);
}

0 comments on commit f60f99d

Please sign in to comment.