Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize group by constant keys #53549

Merged
merged 13 commits into from Sep 18, 2023
39 changes: 39 additions & 0 deletions src/Common/BoolArgsToTemplateArgsDispatcher.h
@@ -0,0 +1,39 @@
#pragma once
#include <utility>

namespace DB
{

/// Special struct that helps to convert bool variables to function template bool arguments.
/// It can be used to avoid multiple nested if/else on bool arguments. How to use it:
/// Imagine you have template function
/// template <bool b1, bool b2, ..., bool bn> return_type foo(...);
/// and bool variables b1, b2, ..., bn. To pass these variables as template for foo you can do the following:
///
/// auto call_foo = []<bool b1, bool b2, ..., bool bn>()
/// {
/// return foo<b1, b2, ..., bn>(...);
/// }
///
/// BoolArgsToTemplateArgsDispatcher::call<decltype(call_foo)>(call_foo, b1, b2, ..., bn);

template <class Functor, bool... Args>
struct BoolArgsToTemplateArgsDispatcher
{
template <typename... Args1>
static auto call(Functor f, Args1&&... args)
{
return f.template operator()<Args...>(std::forward<Args1>(args)...);
}

template <class... Args1>
static auto call(Functor f, bool b, Args1&&... ar1)
{
if (b)
return BoolArgsToTemplateArgsDispatcher<Functor, Args..., true>::call(f, std::forward<Args1>(ar1)...);
else
return BoolArgsToTemplateArgsDispatcher<Functor, Args..., false>::call(f, std::forward<Args1>(ar1)...);
}
};

}
1 change: 1 addition & 0 deletions src/Core/Settings.h
Expand Up @@ -650,6 +650,7 @@ class IColumn;
M(SetOperationMode, except_default_mode, SetOperationMode::ALL, "Set default mode in EXCEPT query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without mode will throw exception.", 0) \
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
M(Bool, optimize_group_by_constant_keys, true, "Optimize GROUP BY when all keys in block are constant", 0) \
M(Bool, legacy_column_name_of_tuple_literal, false, "List all names of element of large tuple literals in their column names instead of hash. This settings exists only for compatibility reasons. It makes sense to set to 'true', while doing rolling update of cluster from version lower than 21.7 to higher.", 0) \
\
M(Bool, query_plan_enable_optimizations, true, "Apply optimizations to query plan", 0) \
Expand Down
3 changes: 2 additions & 1 deletion src/Core/SettingsChangesHistory.h
Expand Up @@ -80,7 +80,8 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.8", {{"rewrite_count_distinct_if_with_count_distinct_implementation", false, true, "Rewrite countDistinctIf with count_distinct_implementation configuration"}}},
{"23.8", {{"rewrite_count_distinct_if_with_count_distinct_implementation", false, true, "Rewrite countDistinctIf with count_distinct_implementation configuration"},
{"optimize_group_by_constant_keys", false, true, "Optimize group by constant keys by default"}}},
{"23.7", {{"function_sleep_max_microseconds_per_block", 0, 3000000, "In previous versions, the maximum sleep time of 3 seconds was applied only for `sleep`, but not for `sleepEachRow` function. In the new version, we introduce this setting. If you set compatibility with the previous versions, we will disable the limit altogether."}}},
{"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."},
{"http_receive_timeout", 180, 30, "See http_send_timeout."}}},
Expand Down
152 changes: 104 additions & 48 deletions src/Interpreters/Aggregator.cpp
Expand Up @@ -39,6 +39,7 @@
#include <Common/JSONBuilder.h>
#include <Common/filesystemHelpers.h>
#include <Common/scope_guard_safe.h>
#include <Common/BoolArgsToTemplateArgsDispatcher.h>

#include <Parsers/ASTSelectQuery.h>

Expand Down Expand Up @@ -1036,11 +1037,12 @@ void Aggregator::executeImpl(
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const
{
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, row_begin, row_end, key_columns, aggregate_instructions, no_more_keys, overflow_row);
executeImpl(*result.NAME, result.aggregates_pool, row_begin, row_end, key_columns, aggregate_instructions, no_more_keys, all_keys_are_const, overflow_row);

if (false) {} // NOLINT
APPLY_FOR_AGGREGATED_VARIANTS(M)
Expand All @@ -1060,44 +1062,35 @@ void NO_INLINE Aggregator::executeImpl(
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);

auto call_execute_impl_batch = [&]<bool b1, bool b2, bool b3, bool b4>()
{
executeImplBatch<b1, b2, b3, b4>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
};

bool use_compiled_functions = false;
#if USE_EMBEDDED_COMPILER
use_compiled_functions = compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions);
#endif

if (!no_more_keys)
{
/// Prefetching doesn't make sense for small hash tables, because they fit in caches entirely.
const bool prefetch = Method::State::has_cheap_key_calculation && params.enable_prefetch
&& (method.data.getBufferSizeInBytes() > min_bytes_for_prefetch);

#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions))
{
if (prefetch)
executeImplBatch<false, true, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
else
executeImplBatch<false, true, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
}
else
#endif
{
if (prefetch)
executeImplBatch<false, false, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
else
executeImplBatch<false, false, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
}
BoolArgsToTemplateArgsDispatcher<decltype(call_execute_impl_batch)>::call(call_execute_impl_batch, no_more_keys, use_compiled_functions, prefetch, all_keys_are_const);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it better? I didn't understand why we need this class.

Copy link
Member Author

@Avogar Avogar Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise we will need to write ~16 if/else branches to check all 4 bool arguments (maybe some of them won't actually appear, but still a lot).
Like https://pastila.nl/?01f3324d/a170ed3f747f063f81f5821188dd8aa3#B7nUAYOQzbzMvGUQ6wClTw==
And IMHO do it in a few lines with some dispatcher class is better. But you can disagree and I can bring back this batch of if/else

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like dispatchers as well. I think the issue is with executeImplBatch and we should refactor it somehow.

}
else
{
executeImplBatch<true, false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
BoolArgsToTemplateArgsDispatcher<decltype(call_execute_impl_batch)>::call(call_execute_impl_batch, no_more_keys, false, false, all_keys_are_const);
}
}

template <bool no_more_keys, bool use_compiled_functions, bool prefetch, typename Method>
template <bool no_more_keys, bool use_compiled_functions, bool prefetch, bool all_keys_are_const, typename Method>
void NO_INLINE Aggregator::executeImplBatch(
Method & method,
typename Method::State & state,
Expand All @@ -1121,27 +1114,34 @@ void NO_INLINE Aggregator::executeImplBatch(

/// For all rows.
AggregateDataPtr place = aggregates_pool->alloc(0);
for (size_t i = row_begin; i < row_end; ++i)
if constexpr (all_keys_are_const)
{
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
state.emplaceKey(method.data, 0, *aggregates_pool).setMapped(place);
}
else
{
for (size_t i = row_begin; i < row_end; ++i)
{
if (i == row_begin + prefetching.iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();

if (i + prefetch_look_ahead < row_end)
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
{
auto && key_holder = state.getKeyHolder(i + prefetch_look_ahead, *aggregates_pool);
method.data.prefetch(std::move(key_holder));
if (i == row_begin + prefetching.iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();

if (i + prefetch_look_ahead < row_end)
{
auto && key_holder = state.getKeyHolder(i + prefetch_look_ahead, *aggregates_pool);
method.data.prefetch(std::move(key_holder));
}
}
}

state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place);
state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place);
}
}
return;
}

/// Optimization for special case when aggregating by 8bit key.
if constexpr (!no_more_keys && std::is_same_v<Method, typename decltype(AggregatedDataVariants::key8)::element_type>)
if constexpr (!no_more_keys && !all_keys_are_const && std::is_same_v<Method, typename decltype(AggregatedDataVariants::key8)::element_type>)
{
/// We use another method if there are aggregate functions with -Array combinator.
bool has_arrays = false;
Expand Down Expand Up @@ -1180,16 +1180,29 @@ void NO_INLINE Aggregator::executeImplBatch(
/// - this affects only optimize_aggregation_in_order,
/// - this is just a pointer, so it should not be significant,
/// - and plus this will require other changes in the interface.
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[row_end]);
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[all_keys_are_const ? 1 : row_end]);

/// For all rows.
for (size_t i = row_begin; i < row_end; ++i)
size_t start, end;
/// If all keys are const, key columns contain only 1 row.
if constexpr (all_keys_are_const)
{
start = 0;
nickitat marked this conversation as resolved.
Show resolved Hide resolved
end = 1;
}
else
{
start = row_begin;
end = row_end;
}

for (size_t i = start; i < end; ++i)
{
AggregateDataPtr aggregate_data = nullptr;

if constexpr (!no_more_keys)
{
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
if constexpr (prefetch && !all_keys_are_const && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
{
if (i == row_begin + prefetching.iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();
Expand Down Expand Up @@ -1254,9 +1267,17 @@ void NO_INLINE Aggregator::executeImplBatch(
/// Add only if the key already exists.
auto find_result = state.findKey(method.data, i, *aggregates_pool);
if (find_result.isFound())
{
aggregate_data = find_result.getMapped();
}
else
{
/// If all keys are constant and this is new key
/// we don't need to do anything and just skip the whole block.
if constexpr (all_keys_are_const)
return;
aggregate_data = overflow_row;
}
}

places[i] = aggregate_data;
Expand All @@ -1279,8 +1300,16 @@ void NO_INLINE Aggregator::executeImplBatch(
columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
}

auto add_into_aggregate_states_function = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function;
add_into_aggregate_states_function(row_begin, row_end, columns_data.data(), places.get());
if constexpr (all_keys_are_const)
{
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), places[0]);
}
else
{
auto add_into_aggregate_states_function = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function;
add_into_aggregate_states_function(row_begin, row_end, columns_data.data(), places.get());
}
}
#endif

Expand All @@ -1295,12 +1324,24 @@ void NO_INLINE Aggregator::executeImplBatch(

AggregateFunctionInstruction * inst = aggregate_instructions + i;

if (inst->offsets)
inst->batch_that->addBatchArray(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparse(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
if constexpr (all_keys_are_const)
{
if (inst->offsets)
inst->batch_that->addBatchSinglePlace(inst->offsets[static_cast<ssize_t>(row_begin) - 1], inst->offsets[row_end - 1], places[0] + inst->state_offset, inst->batch_arguments, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparseSinglePlace(row_begin, row_end, places[0] + inst->state_offset, inst->batch_arguments, aggregates_pool);
else
inst->batch_that->addBatchSinglePlace(row_begin, row_end, places[0] + inst->state_offset, inst->batch_arguments, aggregates_pool);
}
else
inst->batch_that->addBatch(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
{
if (inst->offsets)
inst->batch_that->addBatchArray(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparse(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
else
inst->batch_that->addBatch(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
}
}

Expand Down Expand Up @@ -1540,12 +1581,27 @@ bool Aggregator::executeOnBlock(Columns columns,
* To make them work anyway, we materialize them.
*/
Columns materialized_columns;
bool all_keys_are_const = false;
if (params.optimize_group_by_constant_keys)
{
all_keys_are_const = true;
for (size_t i = 0; i < params.keys_size; ++i)
all_keys_are_const &= isColumnConst(*columns.at(keys_positions[i]));
}

/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
{
materialized_columns.push_back(recursiveRemoveSparse(columns.at(keys_positions[i]))->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
if (all_keys_are_const)
{
key_columns[i] = assert_cast<const ColumnConst &>(*columns.at(keys_positions[i])).getDataColumnPtr().get();
}
else
{
materialized_columns.push_back(recursiveRemoveSparse(columns.at(keys_positions[i]))->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
}


if (!result.isLowCardinality())
{
Expand Down Expand Up @@ -1590,7 +1646,7 @@ bool Aggregator::executeOnBlock(Columns columns,
{
/// This is where data is written that does not fit in `max_rows_to_group_by` with `group_by_overflow_mode = any`.
AggregateDataPtr overflow_row_ptr = params.overflow_row ? result.without_key : nullptr;
executeImpl(result, row_begin, row_end, key_columns, aggregate_functions_instructions.data(), no_more_keys, overflow_row_ptr);
executeImpl(result, row_begin, row_end, key_columns, aggregate_functions_instructions.data(), no_more_keys, all_keys_are_const, overflow_row_ptr);
}

size_t result_size = result.sizeWithoutOverflowRow();
Expand Down
8 changes: 7 additions & 1 deletion src/Interpreters/Aggregator.h
Expand Up @@ -1020,6 +1020,8 @@ class Aggregator final

bool enable_prefetch;

bool optimize_group_by_constant_keys;

struct StatsCollectingParams
{
StatsCollectingParams();
Expand Down Expand Up @@ -1057,6 +1059,7 @@ class Aggregator final
size_t max_block_size_,
bool enable_prefetch_,
bool only_merge_, // true for projections
bool optimize_group_by_constant_keys_,
const StatsCollectingParams & stats_collecting_params_ = {})
: keys(keys_)
, aggregates(aggregates_)
Expand All @@ -1077,6 +1080,7 @@ class Aggregator final
, max_block_size(max_block_size_)
, only_merge(only_merge_)
, enable_prefetch(enable_prefetch_)
, optimize_group_by_constant_keys(optimize_group_by_constant_keys_)
, stats_collecting_params(stats_collecting_params_)
{
}
Expand Down Expand Up @@ -1276,6 +1280,7 @@ class Aggregator final
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys = false,
bool all_keys_are_const = false,
AggregateDataPtr overflow_row = nullptr) const;

/// Process one data block, aggregate the data into a hash table.
Expand All @@ -1288,10 +1293,11 @@ class Aggregator final
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const;

/// Specialization for a particular value no_more_keys.
template <bool no_more_keys, bool use_compiled_functions, bool prefetch, typename Method>
template <bool no_more_keys, bool use_compiled_functions, bool prefetch, bool all_keys_are_const, typename Method>
void executeImplBatch(
Method & method,
typename Method::State & state,
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/InterpreterSelectQuery.cpp
Expand Up @@ -2579,6 +2579,7 @@ static Aggregator::Params getAggregatorParams(
settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
/* only_merge */ false,
settings.optimize_group_by_constant_keys,
stats_collecting_params
};
}
Expand Down
1 change: 1 addition & 0 deletions src/Planner/Planner.cpp
Expand Up @@ -290,6 +290,7 @@ Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context
settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
/* only_merge */ false,
settings.optimize_group_by_constant_keys,
stats_collecting_params);

return aggregator_params;
Expand Down
1 change: 1 addition & 0 deletions src/Processors/QueryPlan/AggregatingStep.cpp
Expand Up @@ -230,6 +230,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params->params.max_block_size,
transform_params->params.enable_prefetch,
/* only_merge */ false,
transform_params->params.optimize_group_by_constant_keys,
transform_params->params.stats_collecting_params};
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(src_header, std::move(params_for_set), final);

Expand Down