Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Less seeks in compressed buffers #29766

Merged
merged 6 commits into from Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 20 additions & 13 deletions src/Compression/CachedCompressedReadBuffer.cpp
Expand Up @@ -30,7 +30,6 @@ void CachedCompressedReadBuffer::initInput()

bool CachedCompressedReadBuffer::nextImpl()
{

/// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.
UInt128 key = cache->hash(path, file_pos);

Expand Down Expand Up @@ -60,6 +59,13 @@ bool CachedCompressedReadBuffer::nextImpl()

working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes);

/// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to
/// check that we are not seeking beyond working buffer.
if (nextimpl_working_buffer_offset > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);

file_pos += owned_cell->compressed_size;

return true;
Expand All @@ -74,28 +80,29 @@ CachedCompressedReadBuffer::CachedCompressedReadBuffer(

void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
/// Nothing to do if we already at required position
if (file_pos == offset_in_compressed_file
&& (offset() == offset_in_decompressed_block ||
nextimpl_working_buffer_offset == offset_in_decompressed_block))
return;

if (owned_cell &&
offset_in_compressed_file == file_pos - owned_cell->compressed_size &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
}
else
{
/// Remember position in compressed file (will be moved in nextImpl)
file_pos = offset_in_compressed_file;

/// We will discard our working_buffer, but have to account rest bytes
bytes += offset();
nextImpl();

if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);

pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
/// No data, everything discarded
pos = working_buffer.end();
/// Remember required offset in decompressed block which will be set in
/// the next ReadBuffer::next() call
nextimpl_working_buffer_offset = offset_in_decompressed_block;
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/Compression/CachedCompressedReadBuffer.h
Expand Up @@ -15,7 +15,7 @@ namespace DB
* The external cache is passed as an argument to the constructor.
* Allows you to increase performance in cases where the same blocks are often read.
* Disadvantages:
* - in case you need to read a lot of data in a row, but of them only a part is cached, you have to do seek-and.
* - in case you need to read a lot of data in a row, but some of them only a part is cached, you have to do seek-and.
*/
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
{
Expand All @@ -25,6 +25,8 @@ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadB
std::unique_ptr<ReadBufferFromFileBase> file_in;

const std::string path;

/// Current position in file_in
size_t file_pos;

/// A piece of data from the cache, or a piece of read data that we put into the cache.
Expand All @@ -37,9 +39,15 @@ class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadB
ReadBufferFromFileBase::ProfileCallback profile_callback;
clockid_t clock_type {};


/// Check comment in CompressedReadBuffer
/* size_t nextimpl_working_buffer_offset; */

public:
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_, bool allow_different_codecs_ = false);

/// Seek is lazy. It doesn't move the position anywhere, just remember them and perform actual
/// seek inside nextImpl.
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)
Expand Down
50 changes: 32 additions & 18 deletions src/Compression/CompressedReadBufferFromFile.cpp
Expand Up @@ -33,6 +33,13 @@ bool CompressedReadBufferFromFile::nextImpl()

decompress(working_buffer, size_decompressed, size_compressed_without_checksum);

/// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to
/// check that we are not seeking beyond working buffer.
if (nextimpl_working_buffer_offset > working_buffer.size())
throw Exception("Required to move position beyond the decompressed block"
" (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);

return true;
}

Expand Down Expand Up @@ -67,33 +74,34 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(

void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
/// Nothing to do if we already at required position
if (!size_compressed && static_cast<size_t>(file_in.getPosition()) == offset_in_compressed_file && /// correct position in compressed file
(offset() == offset_in_decompressed_block /// correct position in buffer or
|| nextimpl_working_buffer_offset == offset_in_decompressed_block)) /// we will move our position to correct one
return;

/// Our seek is within working_buffer, so just move the position
if (size_compressed &&
offset_in_compressed_file == file_in.getPosition() - size_compressed &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
/// `bytes` can overflow and get negative, but in `count()` everything will overflow back and get right.
bytes -= offset();
}
else
else /// Our seek outside working buffer, so perform "lazy seek"
{
/// Actually seek compressed file
file_in.seek(offset_in_compressed_file, SEEK_SET);

/// We will discard our working_buffer, but have to account rest bytes
bytes += offset();
nextImpl();

if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);

pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
/// No data, everything discarded
pos = working_buffer.end();
size_compressed = 0;
/// Remember required offset in decompressed block which will be set in
/// the next ReadBuffer::next() call
nextimpl_working_buffer_offset = offset_in_decompressed_block;
}
}


size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
{
size_t bytes_read = 0;
Expand All @@ -115,9 +123,11 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)

auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();

/// If the decompressed block fits entirely where it needs to be copied.
if (size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
/// If the decompressed block fits entirely where it needs to be copied and we don't
/// need to skip some bytes in decompressed data (seek happened before readBig call).
if (nextimpl_working_buffer_offset == 0 && size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
{

decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;
Expand All @@ -134,7 +144,11 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);

decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
pos = working_buffer.begin();

/// Manually take nextimpl_working_buffer_offset into account, because we don't use
/// nextImpl in this method.
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
nextimpl_working_buffer_offset = 0;

bytes_read += read(to + bytes_read, n - bytes_read);
break;
Expand Down
16 changes: 16 additions & 0 deletions src/Compression/CompressedReadBufferFromFile.h
Expand Up @@ -28,6 +28,19 @@ class CompressedReadBufferFromFile : public CompressedReadBufferBase, public Buf
ReadBufferFromFileBase & file_in;
size_t size_compressed = 0;

/// This field inherited from ReadBuffer. It's used to perform "lazy" seek, so in seek() call we:
/// 1) actually seek only underlying compressed file_in to offset_in_compressed_file;
/// 2) reset current working_buffer;
/// 3) remember the position in decompressed block in nextimpl_working_buffer_offset.
/// After following ReadBuffer::next() -> nextImpl call we will read new data into working_buffer and
/// ReadBuffer::next() will move our position in the fresh working_buffer to nextimpl_working_buffer_offset and
/// reset it to zero.
///
/// NOTE: We have independent readBig implementation, so we have to take
/// nextimpl_working_buffer_offset into account there as well.
///
/* size_t nextimpl_working_buffer_offset; */

bool nextImpl() override;
void prefetch() override;

Expand All @@ -37,6 +50,9 @@ class CompressedReadBufferFromFile : public CompressedReadBufferBase, public Buf
CompressedReadBufferFromFile(
const std::string & path, const ReadSettings & settings, size_t estimated_size, bool allow_different_codecs_ = false);

/// Seek is lazy in some sense. We move position in compressed file_in to offset_in_compressed_file, but don't
/// read data into working_buffer and don't shit our position to offset_in_decompressed_block. Instead
/// we store this offset inside nextimpl_working_buffer_offset.
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

size_t readBig(char * to, size_t n) override;
Expand Down
41 changes: 29 additions & 12 deletions src/Storages/MergeTree/MergeTreeReaderWide.cpp
Expand Up @@ -75,6 +75,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
OffsetColumns offset_columns;
std::unordered_map<String, ISerialization::SubstreamsCache> caches;

std::unordered_set<std::string> prefetched_streams;
if (disk->isRemote() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
{
/// Request reading of data in advance,
Expand All @@ -86,7 +87,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
try
{
auto & cache = caches[column_from_part.getNameInStorage()];
prefetch(column_from_part, from_mark, continue_reading, cache);
prefetch(column_from_part, from_mark, continue_reading, cache, prefetched_streams);
}
catch (Exception & e)
{
Expand All @@ -98,6 +99,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
}

auto name_and_type = columns.begin();

for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
auto column_from_part = getColumnFromPart(*name_and_type);
Expand All @@ -114,7 +116,9 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
size_t column_size_before_reading = column->size();
auto & cache = caches[column_from_part.getNameInStorage()];

readData(column_from_part, column, from_mark, continue_reading, max_rows_to_read, cache);
readData(
column_from_part, column, from_mark, continue_reading,
max_rows_to_read, cache, /* was_prefetched =*/ !prefetched_streams.empty());
alesapin marked this conversation as resolved.
Show resolved Hide resolved

/// For elements of Nested, column_size_before_reading may be greater than column size
/// if offsets are not empty and were already read, but elements are empty.
Expand Down Expand Up @@ -190,11 +194,11 @@ void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type,


static ReadBuffer * getStream(
bool stream_for_prefix,
bool seek_to_start,
const ISerialization::SubstreamPath & substream_path,
MergeTreeReaderWide::FileStreams & streams,
const NameAndTypePair & name_and_type,
size_t from_mark, bool continue_reading,
size_t from_mark, bool seek_to_mark,
ISerialization::SubstreamsCache & cache)
{
/// If substream have already been read.
Expand All @@ -209,9 +213,9 @@ static ReadBuffer * getStream(

MergeTreeReaderStream & stream = *it->second;

if (stream_for_prefix)
if (seek_to_start)
stream.seekToStart();
else if (!continue_reading)
else if (seek_to_mark)
stream.seekToMark(from_mark);

return stream.data_buffer;
Expand All @@ -222,23 +226,32 @@ void MergeTreeReaderWide::prefetch(
const NameAndTypePair & name_and_type,
size_t from_mark,
bool continue_reading,
ISerialization::SubstreamsCache & cache)
ISerialization::SubstreamsCache & cache,
std::unordered_set<std::string> & prefetched_streams)
{
const auto & name = name_and_type.name;
auto & serialization = serializations[name];

serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache))
buf->prefetch();
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);

if (!prefetched_streams.count(stream_name))
{
bool seek_to_mark = !continue_reading;
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache))
buf->prefetch();

prefetched_streams.insert(stream_name);
}
});
}


void MergeTreeReaderWide::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
ISerialization::SubstreamsCache & cache)
ISerialization::SubstreamsCache & cache, bool was_prefetched)
{
double & avg_value_size_hint = avg_value_size_hints[name_and_type.name];
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
Expand All @@ -251,14 +264,18 @@ void MergeTreeReaderWide::readData(
{
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
{
return getStream(true, substream_path, streams, name_and_type, from_mark, continue_reading, cache);
return getStream(/* seek_to_start = */true, substream_path, streams, name_and_type, from_mark, /* seek_to_mark = */false, cache);
};
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]);
}

deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
{
return getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache);
bool seek_to_mark = !was_prefetched && !continue_reading;

return getStream(
/* seek_to_start = */false, substream_path, streams, name_and_type, from_mark,
seek_to_mark, cache);
};
deserialize_settings.continuous_reading = continue_reading;
auto & deserialize_state = deserialize_binary_bulk_state_map[name];
Expand Down
7 changes: 4 additions & 3 deletions src/Storages/MergeTree/MergeTreeReaderWide.h
Expand Up @@ -46,14 +46,15 @@ class MergeTreeReaderWide : public IMergeTreeReader
void readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
ISerialization::SubstreamsCache & cache);
ISerialization::SubstreamsCache & cache, bool was_prefetched);

/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers.
/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams).
void prefetch(
const NameAndTypePair & name_and_type,
size_t from_mark,
bool continue_reading,
ISerialization::SubstreamsCache & cache);
ISerialization::SubstreamsCache & cache,
std::unordered_set<std::string> & prefetched_streams); /// if stream was already prefetched do nothing
};

}