Skip to content

Commit

Permalink
Avoid MEMORY_LIMIT_EXCEEDED during INSERT into Buffer with AggregateF…
Browse files Browse the repository at this point in the history
…unction

In case of Buffer table has columns of AggregateFunction type,
aggregate states for such columns will be allocated from the query
context but those states can be destroyed from the server context (in
case of background flush), and thus memory will be leaked from the query
since aggregate states can be shared, and eventually this will lead to
MEMORY_LIMIT_EXCEEDED error.

To avoid this, prohibit sharing the aggregate states.

But note, that this problem only about memory accounting, not memory
usage itself.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
  • Loading branch information
azat committed Mar 9, 2022
1 parent 4118b06 commit e2960e1
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/Storages/StorageBuffer.cpp
Expand Up @@ -474,6 +474,14 @@ static void appendBlock(const Block & from, Block & to)
const IColumn & col_from = *from.getByPosition(column_no).column.get();
last_col = IColumn::mutate(std::move(to.getByPosition(column_no).column));

/// In case of ColumnAggregateFunction aggregate states will
/// be allocated from the query context but can be destroyed from the
/// server context (in case of background flush), and thus memory
/// will be leaked from the query, but only tracked memory, not
/// memory itself.
///
/// To avoid this, prohibit sharing the aggregate states.
last_col->ensureOwnership();
last_col->insertRangeFrom(col_from, 0, rows);

to.getByPosition(column_no).column = std::move(last_col);
Expand Down
Empty file.
36 changes: 36 additions & 0 deletions tests/queries/0_stateless/02231_buffer_aggregate_states_leak.sql
@@ -0,0 +1,36 @@
-- Tags: long

drop table if exists buffer_02231;
drop table if exists out_02231;
drop table if exists in_02231;
drop table if exists mv_02231;

-- To reproduce leak of memory tracking of aggregate states,
-- background flush is required.
create table buffer_02231
(
key Int,
v1 AggregateFunction(groupArray, String)
) engine=Buffer(currentDatabase(), 'out_02231',
/* layers= */1,
/* min/max time */ 86400, 86400,
/* min/max rows */ 1e9, 1e9,
/* min/max bytes */ 1e12, 1e12,
/* flush time */ 1
);
create table out_02231 as buffer_02231 engine=Null();
create table in_02231 (number Int) engine=Null();

-- Create lots of INSERT blocks with MV
create materialized view mv_02231 to buffer_02231 as select
number as key,
groupArrayState(toString(number)) as v1
from in_02231
group by key;

insert into in_02231 select * from numbers(10e6) settings max_memory_usage='300Mi';

drop table buffer_02231;
drop table out_02231;
drop table in_02231;
drop table mv_02231;

0 comments on commit e2960e1

Please sign in to comment.