New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix SimpleAggregateFunction for String longer MAX_SMALL_STRING_SIZE #5311
Fix SimpleAggregateFunction for String longer MAX_SMALL_STRING_SIZE #5311
Conversation
@@ -63,6 +64,9 @@ class AggregatingSortedBlockInputStream : public MergingSortedBlockInputStream | |||
*/ | |||
void insertSimpleAggregationResult(MutableColumns & merged_columns); | |||
|
|||
/// Memory pool for SimpleAggregateFunction. | |||
Arena arena; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I'm not sure about this bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will accumulate data without clearing, that will result in memory leak.
We should clear arena after processing every aggregate function state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My code was "inspired" by SummingSortedBlockInputStream.cpp :
// Specialized case for unary functions
if (desc.column_numbers.size() == 1)
{
auto & col = cursor->all_columns[desc.column_numbers[0]];
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr);
}
else
{
// Gather all source columns into a vector
ColumnRawPtrs columns(desc.column_numbers.size());
for (size_t i = 0; i < desc.column_numbers.size(); ++i)
columns[i] = cursor->all_columns[desc.column_numbers[i]];
desc.add_function(desc.function.get(), desc.state.data(), columns.data(), cursor->pos, nullptr);
}
Should this be fixed too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it must be fixed too. (Also look at AggregatingMergeTree just in case).
But first let's try to provide a failing test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will accumulate data without clearing, that will result in memory leak.
We should clear arena after processing every aggregate function state.
I was thinking that Arena will take care of this, seems that I was wrong, okay I will take a closer look and take care of two new codepath
But first let's try to provide a failing test.
There is a test for the SimpleAggregateFunction, and it is pretty simple, so that said that if AggregatingMergeTree has these problems I guess that this will be triggered by someone already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it is safe too, since nullptr passed only for sumWithOverflow which do not use arena (but maybe I missing something?)
That's Ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is IAggregateFunction::allocatesMemoryInArena. We should check if any of the aggregate functions require Arena (store it in a bool flag) and create Arena only in that case.
- 4252ffae8e Allocates arena for SimpleAggregateFunction only if IAggregateFunction requires
- 18621c1a48 Set allocatesMemoryInArena for SingleValueDataString to true
By the way, before merging, I would like to squash first two patches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Thank you! Happy that another issue in our codebase was fixed.
I would like to squash first two patches
Do you want us to squash all commits before merge or you'll squash some commits in your branch? PS. We take it easy: no problem if some commits in master will contain wrong code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want us to squash all commits before merge or you'll squash some commits in your branch?
Not all, just the first two of them, rebased
PS. We take it easy: no problem if some commits in master will contain wrong code.
Yeah, but I like to keep the history clean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And thank you for your comments!
SimpleAggregateFunction do not pass arena to the add_function -> getAddressOfAddFunction(), hence next crash happens: (gdb) bt #0 DB::Arena::alloc (size=64, this=0x0) at ../dbms/src/Common/Arena.h:124 ClickHouse#1 DB::SingleValueDataString::changeImpl (this=0x7f97424a27d8, value=..., arena=0x0) at ../dbms/src/AggregateFunctions/AggregateFunctionMinMaxAny.h:274 ClickHouse#2 0x0000000005ea5319 in DB::AggregateFunctionNullUnary<true>::add (arena=<optimized out>, row_num=<optimized out>, columns=<optimized out>, place=<optimized out>, this=<optimized out>) at ../dbms/src/AggregateFunctions/AggregateFunctionNull.h:43 ClickHouse#3 DB::IAggregateFunctionHelper<DB::AggregateFunctionNullUnary<true> >::addFree (that=<optimized out>, place=<optimized out>, columns=<optimized out>, row_num=<optimized out>, arena=<optimized out>) at ../dbms/src/AggregateFunctions/IAggregateFunction.h:131 ClickHouse#4 0x000000000679772f in DB::AggregatingSortedBlockInputStream::addRow (this=this@entry=0x7f982de19c00, cursor=...) at ../dbms/src/Common/AlignedBuffer.h:31 ClickHouse#5 0x0000000006797faa in DB::AggregatingSortedBlockInputStream::merge (this=this@entry=0x7f982de19c00, merged_columns=..., queue=...) at ../dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp:140 ClickHouse#6 0x0000000006798979 in DB::AggregatingSortedBlockInputStream::readImpl (this=0x7f982de19c00) at ../dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp:78 ClickHouse#7 0x000000000622db55 in DB::IBlockInputStream::read (this=0x7f982de19c00) at ../dbms/src/DataStreams/IBlockInputStream.cpp:56 ClickHouse#8 0x0000000006613bee in DB::MergeTreeDataMergerMutator::mergePartsToTemporaryPart (this=this@entry=0x7f97ec65e1a0, future_part=..., merge_entry=..., time_of_merge=<optimized out>, disk_reservation=<optimized out>, deduplicate=<optimized out>) at /usr/include/c++/8/bits/shared_ptr_base.h:1018 ClickHouse#9 0x000000000658f7a4 in DB::StorageReplicatedMergeTree::tryExecuteMerge (this=0x7f97ec65b810, entry=...) at /usr/include/c++/8/bits/unique_ptr.h:342 ClickHouse#10 0x00000000065940ab in DB::StorageReplicatedMergeTree::executeLogEntry (this=0x7f97ec65b810, entry=...) at ../dbms/src/Storages/StorageReplicatedMergeTree.cpp:910 <snip> (gdb) f 1 (gdb) p MAX_SMALL_STRING_SIZE $1 = 48 (gdb) p capacity $2 = 64 (gdb) p value $3 = {data = 0x7f97242fcbd0 "HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH", size = 61} v2: avoid leaking of allocated by Arena memory on the intermediate step Fixes: 8f8d2c0 ("Merge pull request ClickHouse#4629 from bgranvea/simple_aggregate_function")
This includes next aggregate functions for String() type: - *min* - *max* - *any*
4252ffa
to
db274bf
Compare
SimpleAggregateFunction do not pass arena to the
add_function -> getAddressOfAddFunction(), hence next crash happens:
Fixes: 8f8d2c0 ("Merge pull request #4629 from bgranvea/simple_aggregate_function")
Category (leave one):