Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid MEMORY_LIMIT_EXCEEDED during INSERT into Buffer with AggregateFunction #35072

Merged
merged 3 commits into from Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Columns/ColumnAggregateFunction.cpp
Expand Up @@ -204,6 +204,8 @@ MutableColumnPtr ColumnAggregateFunction::predictValues(const ColumnsWithTypeAnd

void ColumnAggregateFunction::ensureOwnership()
{
force_data_ownership = true;

if (src)
{
/// We must copy all data from src and take ownership.
Expand Down Expand Up @@ -269,7 +271,7 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start
+ ").",
ErrorCodes::PARAMETER_OUT_OF_BOUND);

if (!empty() && src.get() != &from_concrete)
if (force_data_ownership || (!empty() && src.get() != &from_concrete))
{
/// Must create new states of aggregate function and take ownership of it,
/// because ownership of states of aggregate function cannot be shared for individual rows,
Expand Down
13 changes: 8 additions & 5 deletions src/Columns/ColumnAggregateFunction.h
Expand Up @@ -75,6 +75,9 @@ class ColumnAggregateFunction final : public COWHelper<IColumn, ColumnAggregateF
/// Source column. Used (holds source from destruction),
/// if this column has been constructed from another and uses all or part of its values.
ColumnPtr src;
/// Do not share the source column (`src`) after further modifications (i.e. insertRangeFrom()).
/// This may be useful for proper memory tracking, since source column may contain aggregate states.
bool force_data_ownership = false;
azat marked this conversation as resolved.
Show resolved Hide resolved

/// Array of pointers to aggregation states, that are placed in arenas.
Container data;
Expand All @@ -89,11 +92,6 @@ class ColumnAggregateFunction final : public COWHelper<IColumn, ColumnAggregateF
/// Create a new column that has another column as a source.
MutablePtr createView() const;

/// If we have another column as a source (owner of data), copy all data to ourself and reset source.
/// This is needed before inserting new elements, because we must own these elements (to destroy them in destructor),
/// but ownership of different elements cannot be mixed by different columns.
void ensureOwnership();

ColumnAggregateFunction(const AggregateFunctionPtr & func_, std::optional<size_t> version_ = std::nullopt);

ColumnAggregateFunction(const AggregateFunctionPtr & func_, const ConstArenas & arenas_);
Expand All @@ -108,6 +106,11 @@ class ColumnAggregateFunction final : public COWHelper<IColumn, ColumnAggregateF
AggregateFunctionPtr getAggregateFunction() { return func; }
AggregateFunctionPtr getAggregateFunction() const { return func; }

/// If we have another column as a source (owner of data), copy all data to ourself and reset source.
/// This is needed before inserting new elements, because we must own these elements (to destroy them in destructor),
/// but ownership of different elements cannot be mixed by different columns.
void ensureOwnership() override;

/// Take shared ownership of Arena, that holds memory for states of aggregate functions.
void addArena(ConstArenaPtr arena_);

Expand Down
4 changes: 4 additions & 0 deletions src/Columns/ColumnArray.cpp
Expand Up @@ -411,6 +411,10 @@ void ColumnArray::reserve(size_t n)
getData().reserve(n); /// The average size of arrays is not taken into account here. Or it is considered to be no more than 1.
}

void ColumnArray::ensureOwnership()
{
getData().ensureOwnership();
}

size_t ColumnArray::byteSize() const
{
Expand Down
1 change: 1 addition & 0 deletions src/Columns/ColumnArray.h
Expand Up @@ -89,6 +89,7 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray>
void getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
void updatePermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int nan_direction_hint, Permutation & res, EqualRanges& equal_range) const override;
void reserve(size_t n) override;
void ensureOwnership() override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
Expand Down
5 changes: 5 additions & 0 deletions src/Columns/ColumnMap.cpp
Expand Up @@ -227,6 +227,11 @@ void ColumnMap::reserve(size_t n)
nested->reserve(n);
}

void ColumnMap::ensureOwnership()
{
nested->ensureOwnership();
}

size_t ColumnMap::byteSize() const
{
return nested->byteSize();
Expand Down
1 change: 1 addition & 0 deletions src/Columns/ColumnMap.h
Expand Up @@ -80,6 +80,7 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap>
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
void updatePermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res, EqualRanges & equal_range) const override;
void reserve(size_t n) override;
void ensureOwnership() override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
Expand Down
5 changes: 5 additions & 0 deletions src/Columns/ColumnNullable.cpp
Expand Up @@ -512,6 +512,11 @@ void ColumnNullable::reserve(size_t n)
getNullMapData().reserve(n);
}

void ColumnNullable::ensureOwnership()
{
getNestedColumn().ensureOwnership();
}

size_t ColumnNullable::byteSize() const
{
return getNestedColumn().byteSize() + getNullMapColumn().byteSize();
Expand Down
1 change: 1 addition & 0 deletions src/Columns/ColumnNullable.h
Expand Up @@ -104,6 +104,7 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable>
void updatePermutationWithCollation(
const Collator & collator, bool reverse, size_t limit, int null_direction_hint, Permutation & res, EqualRanges& equal_range) const override;
void reserve(size_t n) override;
void ensureOwnership() override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
Expand Down
7 changes: 7 additions & 0 deletions src/Columns/ColumnTuple.cpp
Expand Up @@ -448,6 +448,13 @@ void ColumnTuple::reserve(size_t n)
getColumn(i).reserve(n);
}

void ColumnTuple::ensureOwnership()
{
const size_t tuple_size = columns.size();
for (size_t i = 0; i < tuple_size; ++i)
getColumn(i).ensureOwnership();
}

size_t ColumnTuple::byteSize() const
{
size_t res = 0;
Expand Down
1 change: 1 addition & 0 deletions src/Columns/ColumnTuple.h
Expand Up @@ -86,6 +86,7 @@ class ColumnTuple final : public COWHelper<IColumn, ColumnTuple>
void getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
void updatePermutationWithCollation(const Collator & collator, bool reverse, size_t limit, int nan_direction_hint, Permutation & res, EqualRanges& equal_ranges) const override;
void reserve(size_t n) override;
void ensureOwnership() override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
Expand Down
3 changes: 3 additions & 0 deletions src/Columns/IColumn.h
Expand Up @@ -363,6 +363,9 @@ class IColumn : public COW<IColumn>
/// It affects performance only (not correctness).
virtual void reserve(size_t /*n*/) {}

/// If we have another column as a source (owner of data), copy all data to ourself and reset source.
virtual void ensureOwnership() {}

/// Size of column data in memory (may be approximate) - for profiling. Zero, if could not be determined.
virtual size_t byteSize() const = 0;

Expand Down
8 changes: 8 additions & 0 deletions src/Storages/StorageBuffer.cpp
Expand Up @@ -474,6 +474,14 @@ static void appendBlock(const Block & from, Block & to)
const IColumn & col_from = *from.getByPosition(column_no).column.get();
last_col = IColumn::mutate(std::move(to.getByPosition(column_no).column));

/// In case of ColumnAggregateFunction aggregate states will
/// be allocated from the query context but can be destroyed from the
/// server context (in case of background flush), and thus memory
/// will be leaked from the query, but only tracked memory, not
/// memory itself.
///
/// To avoid this, prohibit sharing the aggregate states.
last_col->ensureOwnership();
last_col->insertRangeFrom(col_from, 0, rows);

to.getByPosition(column_no).column = std::move(last_col);
Expand Down
Empty file.
36 changes: 36 additions & 0 deletions tests/queries/0_stateless/02231_buffer_aggregate_states_leak.sql
@@ -0,0 +1,36 @@
-- Tags: long

drop table if exists buffer_02231;
drop table if exists out_02231;
drop table if exists in_02231;
drop table if exists mv_02231;

-- To reproduce leak of memory tracking of aggregate states,
-- background flush is required.
create table buffer_02231
(
key Int,
v1 AggregateFunction(groupArray, String)
) engine=Buffer(currentDatabase(), 'out_02231',
/* layers= */1,
/* min/max time */ 86400, 86400,
/* min/max rows */ 1e9, 1e9,
/* min/max bytes */ 1e12, 1e12,
/* flush time */ 1
);
create table out_02231 as buffer_02231 engine=Null();
create table in_02231 (number Int) engine=Null();

-- Create lots of INSERT blocks with MV
create materialized view mv_02231 to buffer_02231 as select
number as key,
groupArrayState(toString(number)) as v1
from in_02231
group by key;

insert into in_02231 select * from numbers(10e6) settings max_memory_usage='300Mi';

drop table buffer_02231;
drop table out_02231;
drop table in_02231;
drop table mv_02231;