Skip to content

Commit

Permalink
Merge pull request #29766 from ClickHouse/remove_redundant_seeks
Browse files Browse the repository at this point in the history
Less seeks in compressed buffers
  • Loading branch information
kssenii committed Oct 7, 2021
2 parents 25e2eba + 621c988 commit 185d58a
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 47 deletions.
33 changes: 20 additions & 13 deletions src/Compression/CachedCompressedReadBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ void CachedCompressedReadBuffer::initInput()

bool CachedCompressedReadBuffer::nextImpl()
{

/// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.
UInt128 key = cache->hash(path, file_pos);

Expand Down Expand Up @@ -60,6 +59,13 @@ bool CachedCompressedReadBuffer::nextImpl()

working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes);

/// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to
/// check that we are not seeking beyond working buffer.
if (nextimpl_working_buffer_offset > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);

file_pos += owned_cell->compressed_size;

return true;
Expand All @@ -74,28 +80,29 @@ CachedCompressedReadBuffer::CachedCompressedReadBuffer(

void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
/// Nothing to do if we already at required position
if (file_pos == offset_in_compressed_file
&& (offset() == offset_in_decompressed_block ||
nextimpl_working_buffer_offset == offset_in_decompressed_block))
return;

if (owned_cell &&
offset_in_compressed_file == file_pos - owned_cell->compressed_size &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
}
else
{
/// Remember position in compressed file (will be moved in nextImpl)
file_pos = offset_in_compressed_file;

/// We will discard our working_buffer, but have to account rest bytes
bytes += offset();
nextImpl();

if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);

pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
/// No data, everything discarded
pos = working_buffer.end();
/// Remember required offset in decompressed block which will be set in
/// the next ReadBuffer::next() call
nextimpl_working_buffer_offset = offset_in_decompressed_block;
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/Compression/CachedCompressedReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace DB
* The external cache is passed as an argument to the constructor.
* Allows you to increase performance in cases where the same blocks are often read.
* Disadvantages:
* - in case you need to read a lot of data in a row, but of them only a part is cached, you have to do seek-and.
* - in case you need to read a lot of data in a row, but some of them only a part is cached, you have to do seek-and.
*/
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
{
Expand All @@ -25,6 +25,8 @@ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadB
std::unique_ptr<ReadBufferFromFileBase> file_in;

const std::string path;

/// Current position in file_in
size_t file_pos;

/// A piece of data from the cache, or a piece of read data that we put into the cache.
Expand All @@ -37,9 +39,15 @@ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadB
ReadBufferFromFileBase::ProfileCallback profile_callback;
clockid_t clock_type {};


/// Check comment in CompressedReadBuffer
/* size_t nextimpl_working_buffer_offset; */

public:
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_, bool allow_different_codecs_ = false);

/// Seek is lazy. It doesn't move the position anywhere, just remember them and perform actual
/// seek inside nextImpl.
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)
Expand Down
50 changes: 32 additions & 18 deletions src/Compression/CompressedReadBufferFromFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ bool CompressedReadBufferFromFile::nextImpl()

decompress(working_buffer, size_decompressed, size_compressed_without_checksum);

/// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to
/// check that we are not seeking beyond working buffer.
if (nextimpl_working_buffer_offset > working_buffer.size())
throw Exception("Required to move position beyond the decompressed block"
" (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);

return true;
}

Expand Down Expand Up @@ -67,33 +74,34 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(

void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
/// Nothing to do if we already at required position
if (!size_compressed && static_cast<size_t>(file_in.getPosition()) == offset_in_compressed_file && /// correct position in compressed file
(offset() == offset_in_decompressed_block /// correct position in buffer or
|| nextimpl_working_buffer_offset == offset_in_decompressed_block)) /// we will move our position to correct one
return;

/// Our seek is within working_buffer, so just move the position
if (size_compressed &&
offset_in_compressed_file == file_in.getPosition() - size_compressed &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
/// `bytes` can overflow and get negative, but in `count()` everything will overflow back and get right.
bytes -= offset();
}
else
else /// Our seek outside working buffer, so perform "lazy seek"
{
/// Actually seek compressed file
file_in.seek(offset_in_compressed_file, SEEK_SET);

/// We will discard our working_buffer, but have to account rest bytes
bytes += offset();
nextImpl();

if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);

pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
/// No data, everything discarded
pos = working_buffer.end();
size_compressed = 0;
/// Remember required offset in decompressed block which will be set in
/// the next ReadBuffer::next() call
nextimpl_working_buffer_offset = offset_in_decompressed_block;
}
}


size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
{
size_t bytes_read = 0;
Expand All @@ -115,9 +123,11 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)

auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();

/// If the decompressed block fits entirely where it needs to be copied.
if (size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
/// If the decompressed block fits entirely where it needs to be copied and we don't
/// need to skip some bytes in decompressed data (seek happened before readBig call).
if (nextimpl_working_buffer_offset == 0 && size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
{

decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;
Expand All @@ -134,7 +144,11 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);

decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
pos = working_buffer.begin();

/// Manually take nextimpl_working_buffer_offset into account, because we don't use
/// nextImpl in this method.
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
nextimpl_working_buffer_offset = 0;

bytes_read += read(to + bytes_read, n - bytes_read);
break;
Expand Down
16 changes: 16 additions & 0 deletions src/Compression/CompressedReadBufferFromFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ class CompressedReadBufferFromFile : public CompressedReadBufferBase, public Buf
ReadBufferFromFileBase & file_in;
size_t size_compressed = 0;

/// This field inherited from ReadBuffer. It's used to perform "lazy" seek, so in seek() call we:
/// 1) actually seek only underlying compressed file_in to offset_in_compressed_file;
/// 2) reset current working_buffer;
/// 3) remember the position in decompressed block in nextimpl_working_buffer_offset.
/// After following ReadBuffer::next() -> nextImpl call we will read new data into working_buffer and
/// ReadBuffer::next() will move our position in the fresh working_buffer to nextimpl_working_buffer_offset and
/// reset it to zero.
///
/// NOTE: We have independent readBig implementation, so we have to take
/// nextimpl_working_buffer_offset into account there as well.
///
/* size_t nextimpl_working_buffer_offset; */

bool nextImpl() override;
void prefetch() override;

Expand All @@ -37,6 +50,9 @@ class CompressedReadBufferFromFile : public CompressedReadBufferBase, public Buf
CompressedReadBufferFromFile(
const std::string & path, const ReadSettings & settings, size_t estimated_size, bool allow_different_codecs_ = false);

/// Seek is lazy in some sense. We move position in compressed file_in to offset_in_compressed_file, but don't
/// read data into working_buffer and don't shit our position to offset_in_decompressed_block. Instead
/// we store this offset inside nextimpl_working_buffer_offset.
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

size_t readBig(char * to, size_t n) override;
Expand Down
41 changes: 29 additions & 12 deletions src/Storages/MergeTree/MergeTreeReaderWide.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
OffsetColumns offset_columns;
std::unordered_map<String, ISerialization::SubstreamsCache> caches;

std::unordered_set<std::string> prefetched_streams;
if (disk->isRemote() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
{
/// Request reading of data in advance,
Expand All @@ -86,7 +87,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
try
{
auto & cache = caches[column_from_part.getNameInStorage()];
prefetch(column_from_part, from_mark, continue_reading, cache);
prefetch(column_from_part, from_mark, continue_reading, cache, prefetched_streams);
}
catch (Exception & e)
{
Expand All @@ -98,6 +99,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
}

auto name_and_type = columns.begin();

for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
auto column_from_part = getColumnFromPart(*name_and_type);
Expand All @@ -114,7 +116,9 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
size_t column_size_before_reading = column->size();
auto & cache = caches[column_from_part.getNameInStorage()];

readData(column_from_part, column, from_mark, continue_reading, max_rows_to_read, cache);
readData(
column_from_part, column, from_mark, continue_reading,
max_rows_to_read, cache, /* was_prefetched =*/ !prefetched_streams.empty());

/// For elements of Nested, column_size_before_reading may be greater than column size
/// if offsets are not empty and were already read, but elements are empty.
Expand Down Expand Up @@ -190,11 +194,11 @@ void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type,


static ReadBuffer * getStream(
bool stream_for_prefix,
bool seek_to_start,
const ISerialization::SubstreamPath & substream_path,
MergeTreeReaderWide::FileStreams & streams,
const NameAndTypePair & name_and_type,
size_t from_mark, bool continue_reading,
size_t from_mark, bool seek_to_mark,
ISerialization::SubstreamsCache & cache)
{
/// If substream have already been read.
Expand All @@ -209,9 +213,9 @@ static ReadBuffer * getStream(

MergeTreeReaderStream & stream = *it->second;

if (stream_for_prefix)
if (seek_to_start)
stream.seekToStart();
else if (!continue_reading)
else if (seek_to_mark)
stream.seekToMark(from_mark);

return stream.data_buffer;
Expand All @@ -222,23 +226,32 @@ void MergeTreeReaderWide::prefetch(
const NameAndTypePair & name_and_type,
size_t from_mark,
bool continue_reading,
ISerialization::SubstreamsCache & cache)
ISerialization::SubstreamsCache & cache,
std::unordered_set<std::string> & prefetched_streams)
{
const auto & name = name_and_type.name;
auto & serialization = serializations[name];

serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache))
buf->prefetch();
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);

if (!prefetched_streams.count(stream_name))
{
bool seek_to_mark = !continue_reading;
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache))
buf->prefetch();

prefetched_streams.insert(stream_name);
}
});
}


void MergeTreeReaderWide::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
ISerialization::SubstreamsCache & cache)
ISerialization::SubstreamsCache & cache, bool was_prefetched)
{
double & avg_value_size_hint = avg_value_size_hints[name_and_type.name];
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
Expand All @@ -251,14 +264,18 @@ void MergeTreeReaderWide::readData(
{
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
{
return getStream(true, substream_path, streams, name_and_type, from_mark, continue_reading, cache);
return getStream(/* seek_to_start = */true, substream_path, streams, name_and_type, from_mark, /* seek_to_mark = */false, cache);
};
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]);
}

deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
{
return getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache);
bool seek_to_mark = !was_prefetched && !continue_reading;

return getStream(
/* seek_to_start = */false, substream_path, streams, name_and_type, from_mark,
seek_to_mark, cache);
};
deserialize_settings.continuous_reading = continue_reading;
auto & deserialize_state = deserialize_binary_bulk_state_map[name];
Expand Down
7 changes: 4 additions & 3 deletions src/Storages/MergeTree/MergeTreeReaderWide.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ class MergeTreeReaderWide : public IMergeTreeReader
void readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
ISerialization::SubstreamsCache & cache);
ISerialization::SubstreamsCache & cache, bool was_prefetched);

/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers.
/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams).
void prefetch(
const NameAndTypePair & name_and_type,
size_t from_mark,
bool continue_reading,
ISerialization::SubstreamsCache & cache);
ISerialization::SubstreamsCache & cache,
std::unordered_set<std::string> & prefetched_streams); /// if stream was already prefetched do nothing
};

}

0 comments on commit 185d58a

Please sign in to comment.