Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consecutive optimization for vertical merge algorithm #272

Merged
merged 3 commits into from
Dec 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions dbms/include/DB/DataStreams/ColumnGathererStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,21 @@ class ColumnGathererStream : public IProfilingBlockInputStream
{
public:
ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_,
const MergedRowSources & pos_to_source_idx_, size_t block_size_ = DEFAULT_BLOCK_SIZE);
const MergedRowSources & row_source_, size_t block_size_ = DEFAULT_MERGE_BLOCK_SIZE);

String getName() const override { return "ColumnGatherer"; }

String getID() const override;

Block readImpl() override;

void readSuffixImpl() override;

private:

String name;
ColumnWithTypeAndName column;
const MergedRowSources & pos_to_source_idx;
const MergedRowSources & row_source;

/// Cache required fileds
struct Source
Expand All @@ -95,7 +97,7 @@ class ColumnGathererStream : public IProfilingBlockInputStream

std::vector<Source> sources;

size_t pos_global = 0;
size_t pos_global_start = 0;
size_t block_size;

Logger * log = &Logger::get("ColumnGathererStream");
Expand Down
40 changes: 9 additions & 31 deletions dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,44 +52,21 @@ inline void intrusive_ptr_release(detail::SharedBlock * ptr)
}


/** Соединяет несколько сортированных потоков в один.
/** Merges several sorted streams into one sorted stream.
*/
class MergingSortedBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - if isn't 0, then we can produce only first limit rows in sorted order.
/// out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag)
MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr)
: description(description_), max_block_size(max_block_size_), limit(limit_),
source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
/** limit - if isn't 0, then we can produce only first limit rows in sorted order.
* out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag)
* quiet - don't log profiling info
*/
MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr, bool quiet_ = false);

String getName() const override { return "MergingSorted"; }

String getID() const override
{
std::stringstream res;
res << "MergingSorted(";

Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();

/// Порядок не имеет значения.
std::sort(children_ids.begin(), children_ids.end());

for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];

for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();

res << ")";
return res.str();
}
String getID() const override;

protected:
struct RowRef
Expand Down Expand Up @@ -144,6 +121,7 @@ class MergingSortedBlockInputStream : public IProfilingBlockInputStream

bool first = true;
bool has_collation = false;
bool quiet = false;

/// May be smaller or equal to max_block_size. To do 'reserve' for columns.
size_t expected_block_size = 0;
Expand Down
54 changes: 43 additions & 11 deletions dbms/src/DataStreams/ColumnGathererStream.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <DB/DataStreams/ColumnGathererStream.h>
#include <iomanip>

namespace DB
{
Expand All @@ -13,8 +14,8 @@ namespace ErrorCodes
}

ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_,
const MergedRowSources & pos_to_source_idx_, size_t block_size_)
: name(column_name_), pos_to_source_idx(pos_to_source_idx_), block_size(block_size_)
const MergedRowSources & row_source_, size_t block_size_)
: name(column_name_), row_source(row_source_), block_size(block_size_)
{
if (source_streams.empty())
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
Expand Down Expand Up @@ -63,19 +64,20 @@ Block ColumnGathererStream::readImpl()
if (children.size() == 1)
return children[0]->read();

if (pos_global >= pos_to_source_idx.size())
if (pos_global_start >= row_source.size())
return Block();

Block block_res{column.cloneEmpty()};
IColumn & column_res = *block_res.unsafeGetByPosition(0).column;

size_t pos_finish = std::min(pos_global + block_size, pos_to_source_idx.size());
column_res.reserve(pos_finish - pos_global);
size_t pos_global_finish = std::min(pos_global_start + block_size, row_source.size());
size_t curr_block_size = pos_global_finish - pos_global_start;
column_res.reserve(curr_block_size);

for (size_t pos = pos_global; pos < pos_finish; ++pos)
for (size_t pos_global = pos_global_start; pos_global < pos_global_finish;)
{
auto source_id = pos_to_source_idx[pos].getSourceNum();
bool skip = pos_to_source_idx[pos].getSkipFlag();
auto source_id = row_source[pos_global].getSourceNum();
bool skip = row_source[pos_global].getSkipFlag();
Source & source = sources[source_id];

if (source.pos >= source.size) /// Fetch new block
Expand All @@ -98,14 +100,44 @@ Block ColumnGathererStream::readImpl()
}
}

/// Consecutive optimization. TODO: precompute lens
size_t len = 1;
size_t max_len = std::min(pos_global_finish - pos_global, source.size - source.pos); // interval should be in the same block
for (; len < max_len && row_source[pos_global].getData() == row_source[pos_global + len].getData(); ++len);

if (!skip)
column_res.insertFrom(*source.column, source.pos); //TODO: vectorize
++source.pos;
{
if (column_res.size() == 0 && source.pos == 0 && curr_block_size == len && source.size == len)
{
// Whole block could be produced via copying pointer from current block
block_res.unsafeGetByPosition(0).column = source.block.getByName(name).column;
}
else
{
column_res.insertRangeFrom(*source.column, source.pos, len);
}
}

source.pos += len;
pos_global += len;
}

pos_global = pos_finish;
pos_global_start = pos_global_finish;

return block_res;
}


void ColumnGathererStream::readSuffixImpl()
{
const BlockStreamProfileInfo & profile_info = getProfileInfo();
double seconds = profile_info.total_stopwatch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Gathred column " << column.name << " " << column.type->getName()
<< " (" << static_cast<double>(profile_info.bytes) / profile_info.rows << " bytes/elem.)"
<< " in " << seconds << " sec., "
<< profile_info.rows / seconds << " rows/sec., "
<< profile_info.bytes / 1000000.0 / seconds << " MiB/sec.");
}

}
35 changes: 34 additions & 1 deletion dbms/src/DataStreams/MergingSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,36 @@ namespace ErrorCodes
}


MergingSortedBlockInputStream::MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_, MergedRowSources * out_row_sources_, bool quiet_)
: description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_),
source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}

String MergingSortedBlockInputStream::getID() const
{
std::stringstream res;
res << "MergingSorted(";

Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();

/// Порядок не имеет значения.
std::sort(children_ids.begin(), children_ids.end());

for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];

for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();

res << ")";
return res.str();
}

void MergingSortedBlockInputStream::init(Block & merged_block, ColumnPlainPtrs & merged_columns)
{
/// Читаем первые блоки, инициализируем очередь.
Expand Down Expand Up @@ -312,13 +342,16 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current,

void MergingSortedBlockInputStream::readSuffixImpl()
{
if (quiet)
return;

const BlockStreamProfileInfo & profile_info = getProfileInfo();
double seconds = profile_info.total_stopwatch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Merge sorted " << profile_info.blocks << " blocks, " << profile_info.rows << " rows"
<< " in " << seconds << " sec., "
<< profile_info.rows / seconds << " rows/sec., "
<< profile_info.bytes / 1000000.0 / seconds << " MiB/sec.");
<< profile_info.bytes / 1000000.0 / seconds << " MB/sec.");
}

}
50 changes: 33 additions & 17 deletions dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <cmath>
#include <numeric>
#include <iomanip>


namespace ProfileEvents
Expand Down Expand Up @@ -557,7 +558,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{
case MergeTreeData::MergingParams::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr);
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr, true);
break;

case MergeTreeData::MergingParams::Collapsing:
Expand Down Expand Up @@ -618,19 +619,25 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;

/// This update is unactual for VERTICAL algorithm sicne it requires more accurate per-column updates
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
if (disk_reservation && merge_alg == MergeAlgorithm::Horizontal)
if (disk_reservation)
{
Float64 relative_rows_written = std::min(1., 1. * rows_written / sum_input_rows_upper_bound);
disk_reservation->update(static_cast<size_t>((1. - relative_rows_written) * initial_reservation));
/// The same progress from merge_entry could be used for both algorithms (it should be more accurate)
/// But now we are using inaccurate row-based estimation in Horizontal case for backward compability
Float64 progress = (merge_alg == MergeAlgorithm::Horizontal)
? std::min(1., 1. * rows_written / sum_input_rows_upper_bound)
: std::min(1., merge_entry->progress);

disk_reservation->update(static_cast<size_t>((1. - progress) * initial_reservation));
}
}
merged_stream->readSuffix();
merged_stream.reset();

if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);

MergeTreeData::DataPart::Checksums checksums_ordinary_columns;
MergeTreeData::DataPart::Checksums checksums_gathered_columns;

/// Gather ordinary columns
if (merge_alg == MergeAlgorithm::Vertical)
Expand All @@ -649,18 +656,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
const String & column_name = it_name_and_type->name;
const DataTypePtr & column_type = it_name_and_type->type;
const String offset_column_name = DataTypeNested::extractNestedTableName(column_name);
Names column_name_(1, column_name);
NamesAndTypesList column_name_and_type_(1, *it_name_and_type);
Names column_name_{column_name};
NamesAndTypesList column_name_and_type_{*it_name_and_type};
Float64 progress_before = merge_entry->progress;
bool offset_written = offset_columns_written.count(offset_column_name);

LOG_TRACE(log, "Gathering column " << column_name << " " << column_type->getName());

for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
String part_path = data.getFullPath() + parts[part_num]->name + '/';

/// TODO: test perfomance with more accurate settings
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_name_, data, parts[part_num],
MarkRanges(1, MarkRange(0, parts[part_num]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE,
Expand All @@ -672,16 +676,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
column_part_streams[part_num] = std::move(column_part_stream);
}

ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_BLOCK_SIZE);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, true, compression_method, offset_written);
/// Block size should match with block size of column_part_stream to enable fast gathering via copying of column pointer
ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_MERGE_BLOCK_SIZE);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_method, offset_written);

column_to.writePrefix();
while ((block = column_gathered_stream.read()))
{
column_to.write(block);
}
/// NOTE: nested column contains duplicates checksums (and files)
checksums_ordinary_columns.add(column_to.writeSuffixAndGetChecksums());
column_gathered_stream.readSuffix();
checksums_gathered_columns.add(column_to.writeSuffixAndGetChecksums());

if (typeid_cast<const DataTypeArray *>(column_type.get()))
offset_columns_written.emplace(offset_column_name);
Expand All @@ -695,12 +700,23 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
}
}

merged_stream->readSuffix();
/// Print overall profiling info. NOTE: it may duplicates previous messages
{
double elapsed_seconds = merge_entry->watch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Merge sorted " << merge_entry->rows_read << " rows"
<< ", containing " << all_column_names.size() << " columns"
<< " (" << merging_column_names.size() << " merged, " << gathering_column_names.size() << " gathered)"
<< " in " << elapsed_seconds << " sec., "
<< merge_entry->rows_read / elapsed_seconds << " rows/sec., "
<< merge_entry->bytes_read_uncompressed / 1000000.0 / elapsed_seconds << " MB/sec.");
}

new_data_part->columns = all_columns;
if (merge_alg != MergeAlgorithm::Vertical)
new_data_part->checksums = to.writeSuffixAndGetChecksums();
else
new_data_part->checksums = to.writeSuffixAndGetChecksums(all_columns, &checksums_ordinary_columns);
new_data_part->checksums = to.writeSuffixAndGetChecksums(all_columns, &checksums_gathered_columns);
new_data_part->index.swap(to.getIndex());

/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.
Expand Down