Skip to content

Commit

Permalink
Backport #61257 to 23.12: Fix possible incorrect result of aggregate …
Browse files Browse the repository at this point in the history
…function `uniqExact`
  • Loading branch information
robot-clickhouse committed Mar 15, 2024
1 parent b28e249 commit 6d185f9
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/AggregateFunctions/AggregateFunctionUniq.h
Expand Up @@ -483,6 +483,7 @@ class AggregateFunctionUniq final : public IAggregateFunctionDataHelper<Data, Ag
}

bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; }

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
{
Expand Down Expand Up @@ -576,6 +577,7 @@ class AggregateFunctionUniqVariadic final : public IAggregateFunctionDataHelper<
}

bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
bool canOptimizeEqualKeysRanges() const override { return !is_able_to_parallelize_merge; }

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
{
Expand Down
Expand Up @@ -142,6 +142,7 @@ class AggregateFunctionArray final : public IAggregateFunctionHelper<AggregateFu
}

bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
Expand Down
1 change: 1 addition & 0 deletions src/AggregateFunctions/Combinators/AggregateFunctionIf.h
Expand Up @@ -165,6 +165,7 @@ class AggregateFunctionIf final : public IAggregateFunctionHelper<AggregateFunct
}

bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
Expand Down
Expand Up @@ -111,6 +111,7 @@ class AggregateFunctionMerge final : public IAggregateFunctionHelper<AggregateFu
}

bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
Expand Down
1 change: 1 addition & 0 deletions src/AggregateFunctions/Combinators/AggregateFunctionNull.h
Expand Up @@ -149,6 +149,7 @@ class AggregateFunctionNullBase : public IAggregateFunctionHelper<Derived>
}

bool isAbleToParallelizeMerge() const override { return nested_function->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_function->canOptimizeEqualKeysRanges(); }

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
Expand Down
Expand Up @@ -92,6 +92,7 @@ class AggregateFunctionState final : public IAggregateFunctionHelper<AggregateFu
}

bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
bool canOptimizeEqualKeysRanges() const override { return nested_func->canOptimizeEqualKeysRanges(); }

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
Expand Down
4 changes: 4 additions & 0 deletions src/AggregateFunctions/IAggregateFunction.h
Expand Up @@ -162,6 +162,10 @@ class IAggregateFunction : public std::enable_shared_from_this<IAggregateFunctio
/// Tells if merge() with thread pool parameter could be used.
virtual bool isAbleToParallelizeMerge() const { return false; }

/// Return true if it is allowed to replace call of `addBatch`
/// to `addBatchSinglePlace` for ranges of consecutive equal keys.
virtual bool canOptimizeEqualKeysRanges() const { return true; }

/// Should be used only if isAbleToParallelizeMerge() returned true.
virtual void
merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, Arena * /*arena*/) const
Expand Down
3 changes: 1 addition & 2 deletions src/Common/ColumnsHashingImpl.h
Expand Up @@ -62,7 +62,6 @@ struct LastElementCache
bool check(const Key & key) const { return value.first == key; }

bool hasOnlyOneValue() const { return found && misses == 1; }
UInt64 getMisses() const { return misses; }
};

template <typename Data>
Expand Down Expand Up @@ -232,7 +231,7 @@ class HashMethodBase
ALWAYS_INLINE UInt64 getCacheMissesSinceLastReset() const
{
if constexpr (consecutive_keys_optimization)
return cache.getMisses();
return cache.misses;
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions src/Common/ProfileEvents.cpp
Expand Up @@ -484,6 +484,7 @@ The server successfully detected this situation and will download merged part fr
\
M(AggregationPreallocatedElementsInHashTables, "How many elements were preallocated in hash tables for aggregation.") \
M(AggregationHashTablesInitializedAsTwoLevel, "How many hash tables were inited as two-level for aggregation.") \
M(AggregationOptimizedEqualRangesOfKeys, "For how many blocks optimization of equal ranges of keys was applied") \
\
M(KafkaRebalanceRevocations, "Number of partition revocations (the first stage of consumer group rebalance)") \
M(KafkaRebalanceAssignments, "Number of partition assignments (the final stage of consumer group rebalance)") \
Expand Down
14 changes: 12 additions & 2 deletions src/Interpreters/Aggregator.cpp
Expand Up @@ -53,6 +53,7 @@ namespace ProfileEvents
extern const Event OverflowThrow;
extern const Event OverflowBreak;
extern const Event OverflowAny;
extern const Event AggregationOptimizedEqualRangesOfKeys;
}

namespace CurrentMetrics
Expand Down Expand Up @@ -1325,6 +1326,7 @@ void NO_INLINE Aggregator::executeImplBatch(
if constexpr (use_compiled_functions)
{
std::vector<ColumnData> columns_data;
bool can_optimize_equal_keys_ranges = true;

for (size_t i = 0; i < aggregate_functions.size(); ++i)
{
Expand All @@ -1333,13 +1335,15 @@ void NO_INLINE Aggregator::executeImplBatch(

AggregateFunctionInstruction * inst = aggregate_instructions + i;
size_t arguments_size = inst->that->getArgumentTypes().size(); // NOLINT
can_optimize_equal_keys_ranges &= inst->can_optimize_equal_keys_ranges;

for (size_t argument_index = 0; argument_index < arguments_size; ++argument_index)
columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
}

if (all_keys_are_const || (!no_more_keys && state.hasOnlyOneValueSinceLastReset()))
if (all_keys_are_const || (can_optimize_equal_keys_ranges && state.hasOnlyOneValueSinceLastReset()))
{
ProfileEvents::increment(ProfileEvents::AggregationOptimizedEqualRangesOfKeys);
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), places[key_start]);
}
Expand All @@ -1362,10 +1366,15 @@ void NO_INLINE Aggregator::executeImplBatch(

AggregateFunctionInstruction * inst = aggregate_instructions + i;

if (all_keys_are_const || (!no_more_keys && state.hasOnlyOneValueSinceLastReset()))
if (all_keys_are_const || (inst->can_optimize_equal_keys_ranges && state.hasOnlyOneValueSinceLastReset()))
{
ProfileEvents::increment(ProfileEvents::AggregationOptimizedEqualRangesOfKeys);
addBatchSinglePlace(row_begin, row_end, inst, places[key_start] + inst->state_offset, aggregates_pool);
}
else
{
addBatch(row_begin, row_end, inst, places.get(), aggregates_pool);
}
}
}

Expand Down Expand Up @@ -1571,6 +1580,7 @@ void Aggregator::prepareAggregateInstructions(
}

aggregate_functions_instructions[i].has_sparse_arguments = has_sparse_arguments;
aggregate_functions_instructions[i].can_optimize_equal_keys_ranges = aggregate_functions[i]->canOptimizeEqualKeysRanges();
aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];

Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Aggregator.h
Expand Up @@ -1186,6 +1186,7 @@ class Aggregator final
const IColumn ** batch_arguments{};
const UInt64 * offsets{};
bool has_sparse_arguments = false;
bool can_optimize_equal_keys_ranges = true;
};

/// Used for optimize_aggregation_in_order:
Expand Down
16 changes: 16 additions & 0 deletions tests/queries/0_stateless/03008_optimize_equal_ranges.reference
@@ -0,0 +1,16 @@
0 30000
1 30000
2 30000
0 30000
1 30000
2 30000
0 449985000
1 449985000
2 449985000
0 449985000
1 449985000
2 449985000
sum 1 1
sum 16 1
uniqExact 1 1
uniqExact 16 0
29 changes: 29 additions & 0 deletions tests/queries/0_stateless/03008_optimize_equal_ranges.sql
@@ -0,0 +1,29 @@
DROP TABLE IF EXISTS t_optimize_equal_ranges;

CREATE TABLE t_optimize_equal_ranges (a UInt64, b String, c UInt64) ENGINE = MergeTree ORDER BY a;

SET max_block_size = 1024;
SET max_bytes_before_external_group_by = 0;
SET optimize_aggregation_in_order = 0;
SET optimize_use_projections = 0;

INSERT INTO t_optimize_equal_ranges SELECT 0, toString(number), number FROM numbers(30000);
INSERT INTO t_optimize_equal_ranges SELECT 1, toString(number), number FROM numbers(30000);
INSERT INTO t_optimize_equal_ranges SELECT 2, toString(number), number FROM numbers(30000);

SELECT a, uniqExact(b) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 16;
SELECT a, uniqExact(b) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 1;
SELECT a, sum(c) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 16;
SELECT a, sum(c) FROM t_optimize_equal_ranges GROUP BY a ORDER BY a SETTINGS max_threads = 1;

SYSTEM FLUSH LOGS;

SELECT
used_aggregate_functions[1] AS func,
Settings['max_threads'] AS threads,
ProfileEvents['AggregationOptimizedEqualRangesOfKeys'] > 0
FROM system.query_log
WHERE type = 'QueryFinish' AND current_database = currentDatabase() AND query LIKE '%SELECT%FROM%t_optimize_equal_ranges%'
ORDER BY func, threads;

DROP TABLE t_optimize_equal_ranges;
Empty file.
36 changes: 36 additions & 0 deletions tests/queries/0_stateless/03008_uniq_exact_equal_ranges.sql
@@ -0,0 +1,36 @@
DROP TABLE IF EXISTS t_uniq_exact;

CREATE TABLE t_uniq_exact (a UInt64, b String, c UInt64) ENGINE = MergeTree ORDER BY a;

SET group_by_two_level_threshold_bytes = 1;
SET group_by_two_level_threshold = 1;
SET max_threads = 4;
SET max_bytes_before_external_group_by = 0;
SET optimize_aggregation_in_order = 0;

INSERT INTO t_uniq_exact SELECT 0, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 1, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 2, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 3, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 4, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 5, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 6, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 7, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 8, randomPrintableASCII(5), rand() FROM numbers(300000);
INSERT INTO t_uniq_exact SELECT 9, randomPrintableASCII(5), rand() FROM numbers(300000);

OPTIMIZE TABLE t_uniq_exact FINAL;

SELECT a, uniqExact(b) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 1.0
EXCEPT
SELECT a, uniqExact(b) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 0.5;

SELECT a, sum(c) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 1.0
EXCEPT
SELECT a, sum(c) FROM t_uniq_exact GROUP BY a ORDER BY a
SETTINGS min_hit_rate_to_use_consecutive_keys_optimization = 0.5;

DROP TABLE t_uniq_exact;

0 comments on commit 6d185f9

Please sign in to comment.