Skip to content

Commit

Permalink
Merge pull request #13709 from ClickHouse/fix-array-combinator
Browse files Browse the repository at this point in the history
Fix error with batch aggregation and -Array combinator
  • Loading branch information
alexey-milovidov committed Aug 14, 2020
2 parents 9a640ff + 8c85ab3 commit 9e33f41
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
5 changes: 3 additions & 2 deletions src/AggregateFunctions/IAggregateFunction.h
Expand Up @@ -388,13 +388,14 @@ class IAggregateFunctionDataHelper : public IAggregateFunctionHelper<Derived>
{
for (size_t j = 0; j < UNROLL_COUNT; ++j)
{
if (has_data[j * 256 + k])
size_t idx = j * 256 + k;
if (has_data[idx])
{
AggregateDataPtr & place = map[k];
if (unlikely(!place))
init(place);

func.merge(place + place_offset, reinterpret_cast<const char *>(&places[256 * j + k]), arena);
func.merge(place + place_offset, reinterpret_cast<const char *>(&places[idx]), nullptr);
}
}
}
Expand Down
43 changes: 28 additions & 15 deletions src/Interpreters/Aggregator.cpp
Expand Up @@ -449,7 +449,6 @@ void NO_INLINE Aggregator::executeImpl(
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);

if (!no_more_keys)
//executeImplCase<false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions);
else
executeImplCase<true>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
Expand Down Expand Up @@ -534,22 +533,36 @@ void NO_INLINE Aggregator::executeImplBatch(
/// Optimization for special case when aggregating by 8bit key.
if constexpr (std::is_same_v<Method, typename decltype(AggregatedDataVariants::key8)::element_type>)
{
/// We use another method if there are aggregate functions with -Array combinator.
bool has_arrays = false;
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
inst->batch_that->addBatchLookupTable8(
rows,
reinterpret_cast<AggregateDataPtr *>(method.data.data()),
inst->state_offset,
[&](AggregateDataPtr & aggregate_data)
{
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(aggregate_data);
},
state.getKeyData(),
inst->batch_arguments,
aggregates_pool);
if (inst->offsets)
{
has_arrays = true;
break;
}
}

if (!has_arrays)
{
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
inst->batch_that->addBatchLookupTable8(
rows,
reinterpret_cast<AggregateDataPtr *>(method.data.data()),
inst->state_offset,
[&](AggregateDataPtr & aggregate_data)
{
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(aggregate_data);
},
state.getKeyData(),
inst->batch_arguments,
aggregates_pool);
}
return;
}
return;
}

/// Generic case.
Expand Down Expand Up @@ -629,7 +642,7 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(


void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder)
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
Expand Down
10 changes: 10 additions & 0 deletions tests/queries/0_stateless/01441_array_combinator.reference
@@ -0,0 +1,10 @@
0 0
1 0
2 0
3 0
4 0
5 0
6 0
7 0
8 0
9 0
1 change: 1 addition & 0 deletions tests/queries/0_stateless/01441_array_combinator.sql
@@ -0,0 +1 @@
SELECT number % 100 AS k, sumArray(emptyArrayUInt8()) AS v FROM numbers(10) GROUP BY k;

0 comments on commit 9e33f41

Please sign in to comment.