Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize group by constant keys #53549

Merged
merged 13 commits into from Sep 18, 2023
1 change: 1 addition & 0 deletions src/Core/Settings.h
Expand Up @@ -663,6 +663,7 @@ class IColumn;
M(SetOperationMode, except_default_mode, SetOperationMode::ALL, "Set default mode in EXCEPT query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without mode will throw exception.", 0) \
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
M(Bool, optimize_group_by_constant_keys, true, "Optimize GROUP BY when all keys in block are constant", 0) \
M(Bool, legacy_column_name_of_tuple_literal, false, "List all names of element of large tuple literals in their column names instead of hash. This settings exists only for compatibility reasons. It makes sense to set to 'true', while doing rolling update of cluster from version lower than 21.7 to higher.", 0) \
\
M(Bool, query_plan_enable_optimizations, true, "Apply optimizations to query plan", 0) \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.h
Expand Up @@ -80,6 +80,7 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.9", {{"optimize_group_by_constant_keys", false, true, "Optimize group by constant keys by default"}}},
{"23.8", {{"rewrite_count_distinct_if_with_count_distinct_implementation", false, true, "Rewrite countDistinctIf with count_distinct_implementation configuration"}}},
{"23.7", {{"function_sleep_max_microseconds_per_block", 0, 3000000, "In previous versions, the maximum sleep time of 3 seconds was applied only for `sleep`, but not for `sleepEachRow` function. In the new version, we introduce this setting. If you set compatibility with the previous versions, we will disable the limit altogether."}}},
{"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."},
Expand Down
120 changes: 91 additions & 29 deletions src/Interpreters/Aggregator.cpp
Expand Up @@ -1035,11 +1035,12 @@ void Aggregator::executeImpl(
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const
{
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, row_begin, row_end, key_columns, aggregate_instructions, no_more_keys, overflow_row);
executeImpl(*result.NAME, result.aggregates_pool, row_begin, row_end, key_columns, aggregate_instructions, no_more_keys, all_keys_are_const, overflow_row);

if (false) {} // NOLINT
APPLY_FOR_AGGREGATED_VARIANTS(M)
Expand All @@ -1059,6 +1060,7 @@ void NO_INLINE Aggregator::executeImpl(
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
Expand All @@ -1074,25 +1076,25 @@ void NO_INLINE Aggregator::executeImpl(
{
if (prefetch)
executeImplBatch<false, true, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
else
executeImplBatch<false, true, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
}
else
#endif
{
if (prefetch)
executeImplBatch<false, false, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
else
executeImplBatch<false, false, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
}
}
else
{
executeImplBatch<true, false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
executeImplBatch<true, false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
}
}

Expand All @@ -1104,6 +1106,7 @@ void NO_INLINE Aggregator::executeImplBatch(
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const
{
using KeyHolder = decltype(state.getKeyHolder(0, std::declval<Arena &>()));
Expand All @@ -1120,21 +1123,28 @@ void NO_INLINE Aggregator::executeImplBatch(

/// For all rows.
AggregateDataPtr place = aggregates_pool->alloc(0);
for (size_t i = row_begin; i < row_end; ++i)
if (all_keys_are_const)
{
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
state.emplaceKey(method.data, 0, *aggregates_pool).setMapped(place);
}
else
{
for (size_t i = row_begin; i < row_end; ++i)
{
if (i == row_begin + prefetching.iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();

if (i + prefetch_look_ahead < row_end)
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
{
auto && key_holder = state.getKeyHolder(i + prefetch_look_ahead, *aggregates_pool);
method.data.prefetch(std::move(key_holder));
if (i == row_begin + prefetching.iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();

if (i + prefetch_look_ahead < row_end)
{
auto && key_holder = state.getKeyHolder(i + prefetch_look_ahead, *aggregates_pool);
method.data.prefetch(std::move(key_holder));
}
}
}

state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place);
state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place);
}
}
return;
}
Expand All @@ -1153,7 +1163,7 @@ void NO_INLINE Aggregator::executeImplBatch(
}
}

if (!has_arrays && !hasSparseArguments(aggregate_instructions))
if (!has_arrays && !hasSparseArguments(aggregate_instructions) && !all_keys_are_const)
{
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
Expand All @@ -1179,10 +1189,23 @@ void NO_INLINE Aggregator::executeImplBatch(
/// - this affects only optimize_aggregation_in_order,
/// - this is just a pointer, so it should not be significant,
/// - and plus this will require other changes in the interface.
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[row_end]);
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[all_keys_are_const ? 1 : row_end]);

/// For all rows.
for (size_t i = row_begin; i < row_end; ++i)
size_t start, end;
/// If all keys are const, key columns contain only 1 row.
if (all_keys_are_const)
{
start = 0;
nickitat marked this conversation as resolved.
Show resolved Hide resolved
end = 1;
}
else
{
start = row_begin;
end = row_end;
}

for (size_t i = start; i < end; ++i)
{
AggregateDataPtr aggregate_data = nullptr;

Expand Down Expand Up @@ -1253,9 +1276,13 @@ void NO_INLINE Aggregator::executeImplBatch(
/// Add only if the key already exists.
auto find_result = state.findKey(method.data, i, *aggregates_pool);
if (find_result.isFound())
{
aggregate_data = find_result.getMapped();
}
else
{
aggregate_data = overflow_row;
}
}

places[i] = aggregate_data;
Expand All @@ -1278,8 +1305,16 @@ void NO_INLINE Aggregator::executeImplBatch(
columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
}

auto add_into_aggregate_states_function = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function;
add_into_aggregate_states_function(row_begin, row_end, columns_data.data(), places.get());
if (all_keys_are_const)
{
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), places[0]);
}
else
{
auto add_into_aggregate_states_function = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function;
add_into_aggregate_states_function(row_begin, row_end, columns_data.data(), places.get());
}
}
#endif

Expand All @@ -1294,12 +1329,24 @@ void NO_INLINE Aggregator::executeImplBatch(

AggregateFunctionInstruction * inst = aggregate_instructions + i;

if (inst->offsets)
inst->batch_that->addBatchArray(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparse(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
if (all_keys_are_const)
{
if (inst->offsets)
inst->batch_that->addBatchSinglePlace(inst->offsets[static_cast<ssize_t>(row_begin) - 1], inst->offsets[row_end - 1], places[0] + inst->state_offset, inst->batch_arguments, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparseSinglePlace(row_begin, row_end, places[0] + inst->state_offset, inst->batch_arguments, aggregates_pool);
else
inst->batch_that->addBatchSinglePlace(row_begin, row_end, places[0] + inst->state_offset, inst->batch_arguments, aggregates_pool);
}
else
inst->batch_that->addBatch(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
{
if (inst->offsets)
inst->batch_that->addBatchArray(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparse(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
else
inst->batch_that->addBatch(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
}
}

Expand Down Expand Up @@ -1539,12 +1586,27 @@ bool Aggregator::executeOnBlock(Columns columns,
* To make them work anyway, we materialize them.
*/
Columns materialized_columns;
bool all_keys_are_const = false;
if (params.optimize_group_by_constant_keys)
{
all_keys_are_const = true;
for (size_t i = 0; i < params.keys_size; ++i)
all_keys_are_const &= isColumnConst(*columns.at(keys_positions[i]));
}

/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
{
materialized_columns.push_back(recursiveRemoveSparse(columns.at(keys_positions[i]))->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
if (all_keys_are_const)
{
key_columns[i] = assert_cast<const ColumnConst &>(*columns.at(keys_positions[i])).getDataColumnPtr().get();
}
else
{
materialized_columns.push_back(recursiveRemoveSparse(columns.at(keys_positions[i]))->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
}


if (!result.isLowCardinality())
{
Expand Down Expand Up @@ -1589,7 +1651,7 @@ bool Aggregator::executeOnBlock(Columns columns,
{
/// This is where data is written that does not fit in `max_rows_to_group_by` with `group_by_overflow_mode = any`.
AggregateDataPtr overflow_row_ptr = params.overflow_row ? result.without_key : nullptr;
executeImpl(result, row_begin, row_end, key_columns, aggregate_functions_instructions.data(), no_more_keys, overflow_row_ptr);
executeImpl(result, row_begin, row_end, key_columns, aggregate_functions_instructions.data(), no_more_keys, all_keys_are_const, overflow_row_ptr);
}

size_t result_size = result.sizeWithoutOverflowRow();
Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/Aggregator.h
Expand Up @@ -1023,6 +1023,8 @@ class Aggregator final

bool enable_prefetch;

bool optimize_group_by_constant_keys;

struct StatsCollectingParams
{
StatsCollectingParams();
Expand Down Expand Up @@ -1060,6 +1062,7 @@ class Aggregator final
size_t max_block_size_,
bool enable_prefetch_,
bool only_merge_, // true for projections
bool optimize_group_by_constant_keys_,
const StatsCollectingParams & stats_collecting_params_ = {})
: keys(keys_)
, aggregates(aggregates_)
Expand All @@ -1080,6 +1083,7 @@ class Aggregator final
, max_block_size(max_block_size_)
, only_merge(only_merge_)
, enable_prefetch(enable_prefetch_)
, optimize_group_by_constant_keys(optimize_group_by_constant_keys_)
, stats_collecting_params(stats_collecting_params_)
{
}
Expand Down Expand Up @@ -1280,6 +1284,7 @@ class Aggregator final
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys = false,
bool all_keys_are_const = false,
AggregateDataPtr overflow_row = nullptr) const;

/// Process one data block, aggregate the data into a hash table.
Expand All @@ -1292,6 +1297,7 @@ class Aggregator final
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const;

/// Specialization for a particular value no_more_keys.
Expand All @@ -1303,6 +1309,7 @@ class Aggregator final
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const;

/// For case when there are no keys (all aggregate into one row).
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/InterpreterSelectQuery.cpp
Expand Up @@ -2574,6 +2574,7 @@ static Aggregator::Params getAggregatorParams(
settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
/* only_merge */ false,
settings.optimize_group_by_constant_keys,
stats_collecting_params
};
}
Expand Down
1 change: 1 addition & 0 deletions src/Planner/Planner.cpp
Expand Up @@ -290,6 +290,7 @@ Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context
settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
/* only_merge */ false,
settings.optimize_group_by_constant_keys,
stats_collecting_params);

return aggregator_params;
Expand Down
1 change: 1 addition & 0 deletions src/Processors/QueryPlan/AggregatingStep.cpp
Expand Up @@ -230,6 +230,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params->params.max_block_size,
transform_params->params.enable_prefetch,
/* only_merge */ false,
transform_params->params.optimize_group_by_constant_keys,
transform_params->params.stats_collecting_params};
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(src_header, std::move(params_for_set), final);

Expand Down
3 changes: 2 additions & 1 deletion src/Processors/TTL/TTLAggregationAlgorithm.cpp
Expand Up @@ -41,7 +41,8 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
settings.min_count_to_compile_aggregate_expression,
settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
false /* only_merge */);
false /* only_merge */,
settings.optimize_group_by_constant_keys);

aggregator = std::make_unique<Aggregator>(header, params);

Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Expand Up @@ -329,7 +329,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.min_count_to_compile_aggregate_expression,
settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
only_merge);
only_merge,
settings.optimize_group_by_constant_keys);

return std::make_pair(params, only_merge);
};
Expand Down
40 changes: 40 additions & 0 deletions tests/queries/0_stateless/02845_group_by_constant_keys.reference
@@ -0,0 +1,40 @@
10000000 1 2 3
10000000 1 2 3
10000000 1 2 3
10000000 1 2 3
10
10
10
10
10
10
10
10
10
10
10
10
10 data.1.JSON
10 data.2.JSON
10 data.JSON
10 data.1.JSON
10 data.2.JSON
10 data.JSON
10 data.1.JSON
10 data.2.JSON
10 data.JSON
10 data.1.JSON
10 data.2.JSON
10 data.JSON
10
10
10
10
10
10
10
10
10
10
10
10