diff --git a/src/AggregateFunctions/UniqExactSet.h b/src/AggregateFunctions/UniqExactSet.h index 916dfe4a4246..90cfe7001796 100644 --- a/src/AggregateFunctions/UniqExactSet.h +++ b/src/AggregateFunctions/UniqExactSet.h @@ -1,10 +1,11 @@ #pragma once +#include #include #include #include -#include #include +#include namespace DB @@ -48,30 +49,38 @@ class UniqExactSet } else { - auto next_bucket_to_merge = std::make_shared(0); - - auto thread_func = [&lhs, &rhs, next_bucket_to_merge, thread_group = CurrentThread::getGroup()]() + try { - SCOPE_EXIT_SAFE( - if (thread_group) - CurrentThread::detachFromGroupIfNotDetached(); - ); - if (thread_group) - CurrentThread::attachToGroupIfDetached(thread_group); - setThreadName("UniqExactMerger"); + auto next_bucket_to_merge = std::make_shared(0); - while (true) + auto thread_func = [&lhs, &rhs, next_bucket_to_merge, thread_group = CurrentThread::getGroup()]() { - const auto bucket = next_bucket_to_merge->fetch_add(1); - if (bucket >= rhs.NUM_BUCKETS) - return; - lhs.impls[bucket].merge(rhs.impls[bucket]); - } - }; - - for (size_t i = 0; i < std::min(thread_pool->getMaxThreads(), rhs.NUM_BUCKETS); ++i) - thread_pool->scheduleOrThrowOnError(thread_func); - thread_pool->wait(); + SCOPE_EXIT_SAFE( + if (thread_group) + CurrentThread::detachFromGroupIfNotDetached(); + ); + if (thread_group) + CurrentThread::attachToGroupIfDetached(thread_group); + setThreadName("UniqExactMerger"); + + while (true) + { + const auto bucket = next_bucket_to_merge->fetch_add(1); + if (bucket >= rhs.NUM_BUCKETS) + return; + lhs.impls[bucket].merge(rhs.impls[bucket]); + } + }; + + for (size_t i = 0; i < std::min(thread_pool->getMaxThreads(), rhs.NUM_BUCKETS); ++i) + thread_pool->scheduleOrThrowOnError(thread_func); + thread_pool->wait(); + } + catch (...) + { + thread_pool->wait(); + throw; + } } } } diff --git a/tests/queries/0_stateless/02782_uniq_exact_parallel_merging_bug.reference b/tests/queries/0_stateless/02782_uniq_exact_parallel_merging_bug.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/02782_uniq_exact_parallel_merging_bug.sh b/tests/queries/0_stateless/02782_uniq_exact_parallel_merging_bug.sh new file mode 100755 index 000000000000..d84ffd21b87a --- /dev/null +++ b/tests/queries/0_stateless/02782_uniq_exact_parallel_merging_bug.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +# Tags: long, no-random-settings, no-tsan, no-asan, no-ubsan, no-msan + +# shellcheck disable=SC2154 + +unset CLICKHOUSE_LOG_COMMENT + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +clickhouse-client -q " + CREATE TABLE ${CLICKHOUSE_DATABASE}.t(s String) + ENGINE = MergeTree + ORDER BY tuple(); +" + +clickhouse-client -q "insert into ${CLICKHOUSE_DATABASE}.t select number%10==0 ? toString(number) : '' from numbers_mt(1e7)" + +clickhouse-benchmark -q "select count(distinct s) from ${CLICKHOUSE_DATABASE}.t settings max_memory_usage = '50Mi'" --ignore-error -c 16 -i 1000 2>/dev/null