Skip to content

Commit

Permalink
[Enhancement] Reduce cache miss when evaluate lots of expr in aggrega…
Browse files Browse the repository at this point in the history
…te (#15998)

1. move some function call expr from project node to aggregate node
2. evaluate function call expr before call aggregate node
  • Loading branch information
stdpain committed Jan 3, 2023
1 parent 9ca669b commit 4c2e089
Show file tree
Hide file tree
Showing 45 changed files with 515 additions and 238 deletions.
10 changes: 5 additions & 5 deletions be/src/exec/aggregate/aggregate_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Status AggregateBlockingNode::open(RuntimeState* state) {

DCHECK_LE(chunk->num_rows(), runtime_state()->chunk_size());

RETURN_IF_ERROR(_aggregator->evaluate_exprs(chunk.get()));
RETURN_IF_ERROR(_aggregator->evaluate_groupby_exprs(chunk.get()));

size_t chunk_size = chunk->num_rows();
{
Expand All @@ -87,19 +87,19 @@ Status AggregateBlockingNode::open(RuntimeState* state) {
TRY_CATCH_ALLOC_SCOPE_END()
}
if (_aggregator->is_none_group_by_exprs()) {
_aggregator->compute_single_agg_state(chunk_size);
RETURN_IF_ERROR(_aggregator->compute_single_agg_state(chunk.get(), chunk_size));
} else {
if (agg_group_by_with_limit) {
// use `_aggregator->streaming_selection()` here to mark whether needs to filter key when compute agg states,
// it's generated in `build_hash_map`
size_t zero_count = SIMD::count_zero(_aggregator->streaming_selection().data(), chunk_size);
if (zero_count == chunk_size) {
_aggregator->compute_batch_agg_states(chunk_size);
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(chunk.get(), chunk_size));
} else {
_aggregator->compute_batch_agg_states_with_selection(chunk_size);
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states_with_selection(chunk.get(), chunk_size));
}
} else {
_aggregator->compute_batch_agg_states(chunk_size);
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(chunk.get(), chunk_size));
}
}

Expand Down
22 changes: 12 additions & 10 deletions be/src/exec/aggregate/aggregate_streaming_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,22 @@ Status AggregateStreamingNode::get_next(RuntimeState* state, ChunkPtr* chunk, bo
size_t input_chunk_size = input_chunk->num_rows();
_aggregator->update_num_input_rows(input_chunk_size);
COUNTER_SET(_aggregator->input_row_count(), _aggregator->num_input_rows());
RETURN_IF_ERROR(_aggregator->evaluate_exprs(input_chunk.get()));
RETURN_IF_ERROR(_aggregator->evaluate_groupby_exprs(input_chunk.get()));

if (_aggregator->streaming_preaggregation_mode() == TStreamingPreaggregationMode::FORCE_STREAMING) {
// force execute streaming
SCOPED_TIMER(_aggregator->streaming_timer());
_aggregator->output_chunk_by_streaming(chunk);
RETURN_IF_ERROR(_aggregator->output_chunk_by_streaming(input_chunk.get(), chunk));
break;
} else if (_aggregator->streaming_preaggregation_mode() ==
TStreamingPreaggregationMode::FORCE_PREAGGREGATION) {
RETURN_IF_ERROR(state->check_mem_limit("AggrNode"));
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map(input_chunk_size));
if (_aggregator->is_none_group_by_exprs()) {
_aggregator->compute_single_agg_state(input_chunk_size);
RETURN_IF_ERROR(_aggregator->compute_single_agg_state(input_chunk.get(), input_chunk_size));
} else {
_aggregator->compute_batch_agg_states(input_chunk_size);
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(input_chunk.get(), input_chunk_size));
}

_mem_tracker->set(_aggregator->hash_map_variant().reserved_memory_usage(_aggregator->mem_pool()));
Expand Down Expand Up @@ -120,9 +120,9 @@ Status AggregateStreamingNode::get_next(RuntimeState* state, ChunkPtr* chunk, bo
SCOPED_TIMER(_aggregator->agg_compute_timer());
TRY_CATCH_BAD_ALLOC(_aggregator->build_hash_map(input_chunk_size));
if (_aggregator->is_none_group_by_exprs()) {
_aggregator->compute_single_agg_state(input_chunk_size);
RETURN_IF_ERROR(_aggregator->compute_single_agg_state(input_chunk.get(), input_chunk_size));
} else {
_aggregator->compute_batch_agg_states(input_chunk_size);
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(input_chunk.get(), input_chunk_size));
}

_mem_tracker->set(_aggregator->hash_map_variant().reserved_memory_usage(_aggregator->mem_pool()));
Expand All @@ -139,18 +139,20 @@ Status AggregateStreamingNode::get_next(RuntimeState* state, ChunkPtr* chunk, bo
size_t zero_count = SIMD::count_zero(_aggregator->streaming_selection());
if (zero_count == 0) {
SCOPED_TIMER(_aggregator->streaming_timer());
_aggregator->output_chunk_by_streaming(chunk);
RETURN_IF_ERROR(_aggregator->output_chunk_by_streaming(input_chunk.get(), chunk));
} else if (zero_count == _aggregator->streaming_selection().size()) {
SCOPED_TIMER(_aggregator->agg_compute_timer());
_aggregator->compute_batch_agg_states(input_chunk_size);
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states(input_chunk.get(), input_chunk_size));
} else {
{
SCOPED_TIMER(_aggregator->agg_compute_timer());
_aggregator->compute_batch_agg_states_with_selection(input_chunk_size);
RETURN_IF_ERROR(_aggregator->compute_batch_agg_states_with_selection(input_chunk.get(),
input_chunk_size));
}
{
SCOPED_TIMER(_aggregator->streaming_timer());
_aggregator->output_chunk_by_streaming_with_selection(chunk);
RETURN_IF_ERROR(
_aggregator->output_chunk_by_streaming_with_selection(input_chunk.get(), chunk));
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/aggregate/distinct_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Status DistinctBlockingNode::open(RuntimeState* state) {
}
DCHECK_LE(chunk->num_rows(), runtime_state()->chunk_size());

RETURN_IF_ERROR(_aggregator->evaluate_exprs(chunk.get()));
RETURN_IF_ERROR(_aggregator->evaluate_groupby_exprs(chunk.get()));

{
SCOPED_TIMER(_aggregator->agg_compute_timer());
Expand Down
9 changes: 5 additions & 4 deletions be/src/exec/aggregate/distinct_streaming_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ Status DistinctStreamingNode::get_next(RuntimeState* state, ChunkPtr* chunk, boo
size_t input_chunk_size = input_chunk->num_rows();
_aggregator->update_num_input_rows(input_chunk_size);
COUNTER_SET(_aggregator->input_row_count(), _aggregator->num_input_rows());
RETURN_IF_ERROR(_aggregator->evaluate_exprs(input_chunk.get()));
RETURN_IF_ERROR(_aggregator->evaluate_groupby_exprs(input_chunk.get()));

if (_aggregator->streaming_preaggregation_mode() == TStreamingPreaggregationMode::FORCE_STREAMING) {
// force execute streaming
SCOPED_TIMER(_aggregator->streaming_timer());
_aggregator->output_chunk_by_streaming(chunk);
RETURN_IF_ERROR(_aggregator->output_chunk_by_streaming(input_chunk.get(), chunk));
break;
} else if (_aggregator->streaming_preaggregation_mode() ==
TStreamingPreaggregationMode::FORCE_PREAGGREGATION) {
Expand Down Expand Up @@ -123,9 +123,10 @@ Status DistinctStreamingNode::get_next(RuntimeState* state, ChunkPtr* chunk, boo
SCOPED_TIMER(_aggregator->streaming_timer());
size_t zero_count = SIMD::count_zero(_aggregator->streaming_selection());
if (zero_count == 0) {
_aggregator->output_chunk_by_streaming(chunk);
RETURN_IF_ERROR(_aggregator->output_chunk_by_streaming(input_chunk.get(), chunk));
} else if (zero_count != _aggregator->streaming_selection().size()) {
_aggregator->output_chunk_by_streaming_with_selection(chunk);
RETURN_IF_ERROR(
_aggregator->output_chunk_by_streaming_with_selection(input_chunk.get(), chunk));
}
}

Expand Down
103 changes: 66 additions & 37 deletions be/src/exec/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,10 +488,40 @@ bool Aggregator::should_expand_preagg_hash_tables(size_t prev_row_returned, size
return current_reduction > min_reduction;
}

void Aggregator::compute_single_agg_state(size_t chunk_size) {
Status Aggregator::evaluate_agg_input_column(Chunk* chunk, std::vector<ExprContext*>& agg_expr_ctxs, int i) {
{
SCOPED_TIMER(_agg_stat->expr_release_timer);
for (size_t j = 0; j < agg_expr_ctxs.size(); j++) {
_agg_input_columns[i][j] = nullptr;
}
}

SCOPED_TIMER(_agg_stat->expr_compute_timer);
for (size_t j = 0; j < agg_expr_ctxs.size(); j++) {
// For simplicity and don't change the overall processing flow,
// We handle const column as normal data column
// TODO(kks): improve const column aggregate later
if (j == 0) {
ASSIGN_OR_RETURN(auto&& col, agg_expr_ctxs[j]->evaluate(chunk));
_agg_input_columns[i][j] = ColumnHelper::unpack_and_duplicate_const_column(chunk->num_rows(), col);
} else {
ASSIGN_OR_RETURN(auto&& col, agg_expr_ctxs[j]->evaluate(chunk));
_agg_input_columns[i][j] = std::move(col);
}
_agg_input_raw_columns[i][j] = _agg_input_columns[i][j].get();
}
return Status::OK();
}

Status Aggregator::compute_single_agg_state(Chunk* chunk, size_t chunk_size) {
SCOPED_TIMER(_agg_stat->agg_function_compute_timer);
bool use_intermediate = _use_intermediate_as_input();
auto& agg_expr_ctxs = use_intermediate ? _intermediate_agg_expr_ctxs : _agg_expr_ctxs;

for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
// evaluate arguments at i-th agg function
RETURN_IF_ERROR(evaluate_agg_input_column(chunk, agg_expr_ctxs[i], i));
// batch call update or merge for singe stage
if (!_is_merge_funcs[i] && !use_intermediate) {
_agg_functions[i]->update_batch_single_state(_agg_fn_ctxs[i], chunk_size, _agg_input_raw_columns[i].data(),
_single_agg_state + _agg_states_offsets[i]);
Expand All @@ -501,12 +531,18 @@ void Aggregator::compute_single_agg_state(size_t chunk_size) {
_agg_input_columns[i][0].get(), 0, chunk_size);
}
}
return Status::OK();
}

void Aggregator::compute_batch_agg_states(size_t chunk_size) {
Status Aggregator::compute_batch_agg_states(Chunk* chunk, size_t chunk_size) {
SCOPED_TIMER(_agg_stat->agg_function_compute_timer);
bool use_intermediate = _use_intermediate_as_input();
auto& agg_expr_ctxs = use_intermediate ? _intermediate_agg_expr_ctxs : _agg_expr_ctxs;

for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
// evaluate arguments at i-th agg function
RETURN_IF_ERROR(evaluate_agg_input_column(chunk, agg_expr_ctxs[i], i));
// batch call update or merge
if (!_is_merge_funcs[i] && !use_intermediate) {
_agg_functions[i]->update_batch(_agg_fn_ctxs[i], chunk_size, _agg_states_offsets[i],
_agg_input_raw_columns[i].data(), _tmp_agg_states.data());
Expand All @@ -516,12 +552,17 @@ void Aggregator::compute_batch_agg_states(size_t chunk_size) {
_agg_input_columns[i][0].get(), _tmp_agg_states.data());
}
}
return Status::OK();
}

void Aggregator::compute_batch_agg_states_with_selection(size_t chunk_size) {
Status Aggregator::compute_batch_agg_states_with_selection(Chunk* chunk, size_t chunk_size) {
SCOPED_TIMER(_agg_stat->agg_function_compute_timer);
bool use_intermediate = _use_intermediate_as_input();
auto& agg_expr_ctxs = use_intermediate ? _intermediate_agg_expr_ctxs : _agg_expr_ctxs;

for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
RETURN_IF_ERROR(evaluate_agg_input_column(chunk, agg_expr_ctxs[i], i));

if (!_is_merge_funcs[i] && !use_intermediate) {
_agg_functions[i]->update_batch_selectively(_agg_fn_ctxs[i], chunk_size, _agg_states_offsets[i],
_agg_input_raw_columns[i].data(), _tmp_agg_states.data(),
Expand All @@ -533,6 +574,7 @@ void Aggregator::compute_batch_agg_states_with_selection(size_t chunk_size) {
_tmp_agg_states.data(), _streaming_selection);
}
}
return Status::OK();
}

Status Aggregator::_evaluate_const_columns(int i) {
Expand Down Expand Up @@ -593,13 +635,13 @@ void Aggregator::process_limit(ChunkPtr* chunk) {
}
}

Status Aggregator::evaluate_exprs(Chunk* chunk) {
Status Aggregator::evaluate_groupby_exprs(Chunk* chunk) {
_set_passthrough(chunk->owner_info().is_passthrough());
_reset_exprs();
return _evaluate_exprs(chunk);
_reset_groupby_exprs();
return _evaluate_group_by_exprs(chunk);
}

void Aggregator::output_chunk_by_streaming(ChunkPtr* chunk) {
Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk) {
// The input chunk is already intermediate-typed, so there is no need to convert it again.
// Only when the input chunk is input-typed, we should convert it into intermediate-typed chunk.
// is_passthrough is on indicate that the chunk is input-typed.
Expand All @@ -614,11 +656,15 @@ void Aggregator::output_chunk_by_streaming(ChunkPtr* chunk) {

if (!_agg_fn_ctxs.empty()) {
DCHECK(!_group_by_columns.empty());

RETURN_IF_ERROR(evaluate_agg_fn_exprs(input_chunk));

const auto num_rows = _group_by_columns[0]->size();
Columns agg_result_column = _create_agg_result_columns(num_rows, use_intermediate_as_output);
for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
size_t id = _group_by_columns.size() + i;
auto slot_id = slots[id]->id();
// TODO: evaluate agg input columns
if (use_intermediate_as_input) {
DCHECK(i < _agg_input_columns.size() && _agg_input_columns[i].size() >= 1);
result_chunk->append_column(std::move(_agg_input_columns[i][0]), slot_id);
Expand All @@ -635,9 +681,10 @@ void Aggregator::output_chunk_by_streaming(ChunkPtr* chunk) {
_num_rows_processed += result_chunk->num_rows();
*chunk = std::move(result_chunk);
COUNTER_SET(_agg_stat->pass_through_row_count, _num_pass_through_rows);
return Status::OK();
}

void Aggregator::output_chunk_by_streaming_with_selection(ChunkPtr* chunk) {
Status Aggregator::output_chunk_by_streaming_with_selection(Chunk* input_chunk, ChunkPtr* chunk) {
// Streaming aggregate at least has one group by column
size_t chunk_size = _group_by_columns[0]->size();
for (auto& _group_by_column : _group_by_columns) {
Expand Down Expand Up @@ -666,7 +713,9 @@ void Aggregator::output_chunk_by_streaming_with_selection(ChunkPtr* chunk) {
}
}
}
output_chunk_by_streaming(chunk);

RETURN_IF_ERROR(output_chunk_by_streaming(input_chunk, chunk));
return Status::OK();
}

void Aggregator::try_convert_to_two_level_map() {
Expand Down Expand Up @@ -767,22 +816,14 @@ ChunkPtr Aggregator::_build_output_chunk(const Columns& group_by_columns, const
return result_chunk;
}

void Aggregator::_reset_exprs() {
void Aggregator::_reset_groupby_exprs() {
SCOPED_TIMER(_agg_stat->expr_release_timer);
for (auto& _group_by_column : _group_by_columns) {
_group_by_column = nullptr;
}

DCHECK(_agg_input_columns.size() == _agg_fn_ctxs.size());
for (size_t i = 0; i < _agg_input_columns.size(); i++) {
for (size_t j = 0; j < _agg_input_columns[i].size(); j++) {
_agg_input_columns[i][j] = nullptr;
_agg_input_raw_columns[i][j] = nullptr;
}
}
}

Status Aggregator::_evaluate_exprs(Chunk* chunk) {
Status Aggregator::_evaluate_group_by_exprs(Chunk* chunk) {
SCOPED_TIMER(_agg_stat->expr_compute_timer);
// Compute group by columns
for (size_t i = 0; i < _group_by_expr_ctxs.size(); i++) {
Expand All @@ -809,27 +850,15 @@ Status Aggregator::_evaluate_exprs(Chunk* chunk) {
}
}

// Compute agg function columns
auto use_intermediate = _use_intermediate_as_input();
return Status::OK();
}

Status Aggregator::evaluate_agg_fn_exprs(Chunk* chunk) {
bool use_intermediate = _use_intermediate_as_input();
auto& agg_expr_ctxs = use_intermediate ? _intermediate_agg_expr_ctxs : _agg_expr_ctxs;
DCHECK(agg_expr_ctxs.size() == _agg_input_columns.size());
DCHECK(agg_expr_ctxs.size() == _agg_fn_ctxs.size());
for (size_t i = 0; i < agg_expr_ctxs.size(); i++) {
for (size_t j = 0; j < agg_expr_ctxs[i].size(); j++) {
// For simplicity and don't change the overall processing flow,
// We handle const column as normal data column
// TODO(kks): improve const column aggregate later
if (j == 0) {
ASSIGN_OR_RETURN(auto&& col, agg_expr_ctxs[i][j]->evaluate(chunk));
_agg_input_columns[i][j] = ColumnHelper::unpack_and_duplicate_const_column(chunk->num_rows(), col);
} else {
ASSIGN_OR_RETURN(auto&& col, agg_expr_ctxs[i][j]->evaluate(chunk));
_agg_input_columns[i][j] = std::move(col);
}
_agg_input_raw_columns[i][j] = _agg_input_columns[i][j].get();
}
RETURN_IF_ERROR(evaluate_agg_input_column(chunk, agg_expr_ctxs[i], i));
}

return Status::OK();
}

Expand Down

0 comments on commit 4c2e089

Please sign in to comment.