From d3cdc8826eb5209562ae743c7a385a7f33bbc015 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Mon, 29 Jan 2024 14:04:23 +0000 Subject: [PATCH] lower memory usage in vertical merges --- .../Transforms/ColumnGathererTransform.cpp | 16 ++++++--- .../Transforms/ColumnGathererTransform.h | 29 ++++++++------- src/Storages/MergeTree/MergeTask.cpp | 10 +++++- ...981_vertical_merges_memory_usage.reference | 1 + .../02981_vertical_merges_memory_usage.sql | 35 +++++++++++++++++++ 5 files changed, 73 insertions(+), 18 deletions(-) create mode 100644 tests/queries/0_stateless/02981_vertical_merges_memory_usage.reference create mode 100644 tests/queries/0_stateless/02981_vertical_merges_memory_usage.sql diff --git a/src/Processors/Transforms/ColumnGathererTransform.cpp b/src/Processors/Transforms/ColumnGathererTransform.cpp index d7f52a538e1b..b2e8e9bc89e8 100644 --- a/src/Processors/Transforms/ColumnGathererTransform.cpp +++ b/src/Processors/Transforms/ColumnGathererTransform.cpp @@ -17,9 +17,14 @@ namespace ErrorCodes } ColumnGathererStream::ColumnGathererStream( - size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_) - : sources(num_inputs), row_sources_buf(row_sources_buf_) - , block_preferred_size(block_preferred_size_) + size_t num_inputs, + ReadBuffer & row_sources_buf_, + size_t block_preferred_size_rows_, + size_t block_preferred_size_bytes_) + : sources(num_inputs) + , row_sources_buf(row_sources_buf_) + , block_preferred_size_rows(block_preferred_size_rows_) + , block_preferred_size_bytes(block_preferred_size_bytes_) { if (num_inputs == 0) throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "There are no streams to gather"); @@ -124,10 +129,11 @@ ColumnGathererTransform::ColumnGathererTransform( const Block & header, size_t num_inputs, ReadBuffer & row_sources_buf_, - size_t block_preferred_size_) + size_t block_preferred_size_rows_, + size_t block_preferred_size_bytes_) : IMergingTransform( num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false, - num_inputs, row_sources_buf_, block_preferred_size_) + num_inputs, row_sources_buf_, block_preferred_size_rows_, block_preferred_size_bytes_) , log(getLogger("ColumnGathererStream")) { if (header.columns() != 1) diff --git a/src/Processors/Transforms/ColumnGathererTransform.h b/src/Processors/Transforms/ColumnGathererTransform.h index 885cb3f81ba3..821d04db0df1 100644 --- a/src/Processors/Transforms/ColumnGathererTransform.h +++ b/src/Processors/Transforms/ColumnGathererTransform.h @@ -5,7 +5,6 @@ #include #include - namespace Poco { class Logger; } @@ -57,7 +56,11 @@ using MergedRowSources = PODArray; class ColumnGathererStream final : public IMergingAlgorithm { public: - ColumnGathererStream(size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE); + ColumnGathererStream( + size_t num_inputs, + ReadBuffer & row_sources_buf_, + size_t block_preferred_size_rows_, + size_t block_preferred_size_bytes_); const char * getName() const override { return "ColumnGathererStream"; } void initialize(Inputs inputs) override; @@ -92,13 +95,12 @@ class ColumnGathererStream final : public IMergingAlgorithm std::vector sources; ReadBuffer & row_sources_buf; - const size_t block_preferred_size; + const size_t block_preferred_size_rows; + const size_t block_preferred_size_bytes; Source * source_to_fully_copy = nullptr; ssize_t next_required_source = -1; - size_t cur_block_preferred_size = 0; - UInt64 merged_rows = 0; UInt64 merged_bytes = 0; }; @@ -110,7 +112,8 @@ class ColumnGathererTransform final : public IMergingTransform(row_sources_end - row_source_pos), block_preferred_size); - column_res.reserve(cur_block_preferred_size); + /// Actually reserve works only for fixed size columns. + /// So it's safe to ignore preferred size in bytes and call reserve for number of rows. + size_t size_to_reserve = std::min(static_cast(row_sources_end - row_source_pos), block_preferred_size_rows); + column_res.reserve(size_to_reserve); } - size_t cur_size = column_res.size(); next_required_source = -1; - while (row_source_pos < row_sources_end && cur_size < cur_block_preferred_size) + while (row_source_pos < row_sources_end + && column_res.size() < block_preferred_size_rows + && column_res.allocatedBytes() < block_preferred_size_bytes) { RowSourcePart row_source = *row_source_pos; size_t source_num = row_source.getSourceNum(); @@ -159,6 +165,7 @@ void ColumnGathererStream::gather(Column & column_res) /// Consecutive optimization. TODO: precompute lengths size_t len = 1; size_t max_len = std::min(static_cast(row_sources_end - row_source_pos), source.size - source.pos); // interval should be in the same block + while (len < max_len && row_source_pos->data == row_source.data) { ++len; @@ -181,8 +188,6 @@ void ColumnGathererStream::gather(Column & column_res) column_res.insertFrom(*source.column, source.pos); else column_res.insertRangeFrom(*source.column, source.pos, len); - - cur_size += len; } source.pos += len; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 4b5b7ca8018a..59bdb7006b3a 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -588,7 +588,15 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const auto pipe = Pipe::unitePipes(std::move(pipes)); ctx->rows_sources_read_buf->seek(0, 0); - auto transform = std::make_unique(pipe.getHeader(), pipe.numOutputPorts(), *ctx->rows_sources_read_buf); + + const auto data_settings = global_ctx->data->getSettings(); + auto transform = std::make_unique( + pipe.getHeader(), + pipe.numOutputPorts(), + *ctx->rows_sources_read_buf, + data_settings->merge_max_block_size, + data_settings->merge_max_block_size_bytes); + pipe.addTransform(std::move(transform)); ctx->column_parts_pipeline = QueryPipeline(std::move(pipe)); diff --git a/tests/queries/0_stateless/02981_vertical_merges_memory_usage.reference b/tests/queries/0_stateless/02981_vertical_merges_memory_usage.reference new file mode 100644 index 000000000000..60c254e152bc --- /dev/null +++ b/tests/queries/0_stateless/02981_vertical_merges_memory_usage.reference @@ -0,0 +1 @@ +Vertical OK diff --git a/tests/queries/0_stateless/02981_vertical_merges_memory_usage.sql b/tests/queries/0_stateless/02981_vertical_merges_memory_usage.sql new file mode 100644 index 000000000000..1305f02c0444 --- /dev/null +++ b/tests/queries/0_stateless/02981_vertical_merges_memory_usage.sql @@ -0,0 +1,35 @@ +-- Tags: long + +DROP TABLE IF EXISTS t_vertical_merge_memory; + +CREATE TABLE t_vertical_merge_memory (id UInt64, arr Array(String)) +ENGINE = MergeTree ORDER BY id +SETTINGS + min_bytes_for_wide_part = 0, + vertical_merge_algorithm_min_rows_to_activate = 1, + vertical_merge_algorithm_min_columns_to_activate = 1, + index_granularity = 8192, + index_granularity_bytes = '10M', + merge_max_block_size = 8192, + merge_max_block_size_bytes = '10M'; + +INSERT INTO t_vertical_merge_memory SELECT number, arrayMap(x -> repeat('a', 50), range(1000)) FROM numbers(30000); +INSERT INTO t_vertical_merge_memory SELECT number, arrayMap(x -> repeat('a', 50), range(1000)) FROM numbers(30000); + +OPTIMIZE TABLE t_vertical_merge_memory FINAL; + +SYSTEM FLUSH LOGS; + +SELECT + merge_algorithm, + peak_memory_usage < 500 * 1024 * 1024 + ? 'OK' + : format('FAIL: memory usage: {}', formatReadableSize(peak_memory_usage)) +FROM system.part_log +WHERE + database = currentDatabase() + AND table = 't_vertical_merge_memory' + AND event_type = 'MergeParts' + AND length(merged_from) = 2; + +DROP TABLE IF EXISTS t_vertical_merge_memory;