Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lower memory usage in vertical merges #59340

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/Processors/Transforms/ColumnGathererTransform.cpp
Expand Up @@ -17,9 +17,14 @@ namespace ErrorCodes
}

ColumnGathererStream::ColumnGathererStream(
size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_)
: sources(num_inputs), row_sources_buf(row_sources_buf_)
, block_preferred_size(block_preferred_size_)
size_t num_inputs,
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_rows_,
size_t block_preferred_size_bytes_)
: sources(num_inputs)
, row_sources_buf(row_sources_buf_)
, block_preferred_size_rows(block_preferred_size_rows_)
, block_preferred_size_bytes(block_preferred_size_bytes_)
{
if (num_inputs == 0)
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "There are no streams to gather");
Expand Down Expand Up @@ -124,10 +129,11 @@ ColumnGathererTransform::ColumnGathererTransform(
const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
size_t block_preferred_size_rows_,
size_t block_preferred_size_bytes_)
: IMergingTransform<ColumnGathererStream>(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
num_inputs, row_sources_buf_, block_preferred_size_)
num_inputs, row_sources_buf_, block_preferred_size_rows_, block_preferred_size_bytes_)
, log(getLogger("ColumnGathererStream"))
{
if (header.columns() != 1)
Expand Down
29 changes: 17 additions & 12 deletions src/Processors/Transforms/ColumnGathererTransform.h
Expand Up @@ -5,7 +5,6 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/IMergingTransform.h>


namespace Poco { class Logger; }


Expand Down Expand Up @@ -57,7 +56,11 @@ using MergedRowSources = PODArray<RowSourcePart>;
class ColumnGathererStream final : public IMergingAlgorithm
{
public:
ColumnGathererStream(size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
ColumnGathererStream(
size_t num_inputs,
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_rows_,
size_t block_preferred_size_bytes_);

const char * getName() const override { return "ColumnGathererStream"; }
void initialize(Inputs inputs) override;
Expand Down Expand Up @@ -92,13 +95,12 @@ class ColumnGathererStream final : public IMergingAlgorithm
std::vector<Source> sources;
ReadBuffer & row_sources_buf;

const size_t block_preferred_size;
const size_t block_preferred_size_rows;
const size_t block_preferred_size_bytes;

Source * source_to_fully_copy = nullptr;

ssize_t next_required_source = -1;
size_t cur_block_preferred_size = 0;

UInt64 merged_rows = 0;
UInt64 merged_bytes = 0;
};
Expand All @@ -110,7 +112,8 @@ class ColumnGathererTransform final : public IMergingTransform<ColumnGathererStr
const Block & header,
size_t num_inputs,
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
size_t block_preferred_size_rows_,
size_t block_preferred_size_bytes_);

String getName() const override { return "ColumnGathererTransform"; }

Expand All @@ -134,14 +137,17 @@ void ColumnGathererStream::gather(Column & column_res)
if (next_required_source == -1)
{
/// Start new column.
cur_block_preferred_size = std::min(static_cast<size_t>(row_sources_end - row_source_pos), block_preferred_size);
column_res.reserve(cur_block_preferred_size);
/// Actually reserve works only for fixed size columns.
/// So it's safe to ignore preferred size in bytes and call reserve for number of rows.
size_t size_to_reserve = std::min(static_cast<size_t>(row_sources_end - row_source_pos), block_preferred_size_rows);
column_res.reserve(size_to_reserve);
}

size_t cur_size = column_res.size();
next_required_source = -1;

while (row_source_pos < row_sources_end && cur_size < cur_block_preferred_size)
while (row_source_pos < row_sources_end
&& column_res.size() < block_preferred_size_rows
&& column_res.allocatedBytes() < block_preferred_size_bytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really minor. maybe it makes sense to use byteSize() because I'd expect block_size_bytes to refer to the actual data size.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked that allocatedBytes is used inMergedData and did the same. But we can change it in both places.

{
RowSourcePart row_source = *row_source_pos;
size_t source_num = row_source.getSourceNum();
Expand All @@ -159,6 +165,7 @@ void ColumnGathererStream::gather(Column & column_res)
/// Consecutive optimization. TODO: precompute lengths
size_t len = 1;
size_t max_len = std::min(static_cast<size_t>(row_sources_end - row_source_pos), source.size - source.pos); // interval should be in the same block

while (len < max_len && row_source_pos->data == row_source.data)
{
++len;
Expand All @@ -181,8 +188,6 @@ void ColumnGathererStream::gather(Column & column_res)
column_res.insertFrom(*source.column, source.pos);
else
column_res.insertRangeFrom(*source.column, source.pos, len);

cur_size += len;
}

source.pos += len;
Expand Down
10 changes: 9 additions & 1 deletion src/Storages/MergeTree/MergeTask.cpp
Expand Up @@ -588,7 +588,15 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
auto pipe = Pipe::unitePipes(std::move(pipes));

ctx->rows_sources_read_buf->seek(0, 0);
auto transform = std::make_unique<ColumnGathererTransform>(pipe.getHeader(), pipe.numOutputPorts(), *ctx->rows_sources_read_buf);

const auto data_settings = global_ctx->data->getSettings();
auto transform = std::make_unique<ColumnGathererTransform>(
pipe.getHeader(),
pipe.numOutputPorts(),
*ctx->rows_sources_read_buf,
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes);

pipe.addTransform(std::move(transform));

ctx->column_parts_pipeline = QueryPipeline(std::move(pipe));
Expand Down
@@ -0,0 +1 @@
Vertical OK
35 changes: 35 additions & 0 deletions tests/queries/0_stateless/02981_vertical_merges_memory_usage.sql
@@ -0,0 +1,35 @@
-- Tags: long

DROP TABLE IF EXISTS t_vertical_merge_memory;

CREATE TABLE t_vertical_merge_memory (id UInt64, arr Array(String))
ENGINE = MergeTree ORDER BY id
SETTINGS
min_bytes_for_wide_part = 0,
vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 1,
index_granularity = 8192,
index_granularity_bytes = '10M',
merge_max_block_size = 8192,
merge_max_block_size_bytes = '10M';

INSERT INTO t_vertical_merge_memory SELECT number, arrayMap(x -> repeat('a', 50), range(1000)) FROM numbers(30000);
INSERT INTO t_vertical_merge_memory SELECT number, arrayMap(x -> repeat('a', 50), range(1000)) FROM numbers(30000);

OPTIMIZE TABLE t_vertical_merge_memory FINAL;

SYSTEM FLUSH LOGS;

SELECT
merge_algorithm,
peak_memory_usage < 500 * 1024 * 1024
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peak_memory_usage before: 4.57 GiB, after: 89.95 MiB.

? 'OK'
: format('FAIL: memory usage: {}', formatReadableSize(peak_memory_usage))
FROM system.part_log
WHERE
database = currentDatabase()
AND table = 't_vertical_merge_memory'
AND event_type = 'MergeParts'
AND length(merged_from) = 2;

DROP TABLE IF EXISTS t_vertical_merge_memory;