Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert hashSets in parallel before merge #50748

Merged
merged 11 commits into from Jul 27, 2023
39 changes: 39 additions & 0 deletions src/AggregateFunctions/AggregateFunctionUniq.h
Expand Up @@ -29,6 +29,10 @@
#include <AggregateFunctions/UniqVariadicHash.h>
#include <AggregateFunctions/UniquesHashSet.h>

namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}

namespace DB
{
Expand All @@ -42,6 +46,7 @@ struct AggregateFunctionUniqUniquesHashSetData
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniq"; }
Expand All @@ -55,6 +60,7 @@ struct AggregateFunctionUniqUniquesHashSetDataForVariadic
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
Expand All @@ -72,6 +78,7 @@ struct AggregateFunctionUniqHLL12Data
Set set;

constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniqHLL12"; }
Expand All @@ -84,6 +91,7 @@ struct AggregateFunctionUniqHLL12Data<String, false>
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniqHLL12"; }
Expand All @@ -96,6 +104,7 @@ struct AggregateFunctionUniqHLL12Data<UUID, false>
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniqHLL12"; }
Expand All @@ -108,6 +117,7 @@ struct AggregateFunctionUniqHLL12Data<IPv6, false>
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniqHLL12"; }
Expand All @@ -120,6 +130,7 @@ struct AggregateFunctionUniqHLL12DataForVariadic
Set set;

constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
Expand All @@ -143,6 +154,7 @@ struct AggregateFunctionUniqExactData
Set set;

constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = false;

static String getName() { return "uniqExact"; }
Expand All @@ -162,6 +174,7 @@ struct AggregateFunctionUniqExactData<String, is_able_to_parallelize_merge_>
Set set;

constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = false;

static String getName() { return "uniqExact"; }
Expand All @@ -181,6 +194,7 @@ struct AggregateFunctionUniqExactData<IPv6, is_able_to_parallelize_merge_>
Set set;

constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = false;

static String getName() { return "uniqExact"; }
Expand All @@ -190,6 +204,7 @@ template <bool is_exact_, bool argument_is_tuple_, bool is_able_to_parallelize_m
struct AggregateFunctionUniqExactDataForVariadic : AggregateFunctionUniqExactData<String, is_able_to_parallelize_merge_>
{
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
Expand All @@ -204,6 +219,7 @@ struct AggregateFunctionUniqThetaData
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniqTheta"; }
Expand All @@ -213,6 +229,7 @@ template <bool is_exact_, bool argument_is_tuple_>
struct AggregateFunctionUniqThetaDataForVariadic : AggregateFunctionUniqThetaData
{
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
Expand Down Expand Up @@ -384,8 +401,10 @@ template <typename T, typename Data>
class AggregateFunctionUniq final : public IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>>
{
private:
using DataSet = typename Data::Set;
static constexpr size_t num_args = 1;
static constexpr bool is_able_to_parallelize_merge = Data::is_able_to_parallelize_merge;
static constexpr bool is_parallelize_merge_prepare_needed = Data::is_parallelize_merge_prepare_needed;

public:
explicit AggregateFunctionUniq(const DataTypes & argument_types_)
Expand Down Expand Up @@ -439,6 +458,26 @@ class AggregateFunctionUniq final : public IAggregateFunctionDataHelper<Data, Ag
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_begin, row_end, flags, null_map);
}

bool isParallelizeMergePrepareNeeded() const override { return is_parallelize_merge_prepare_needed;}

void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool) const override
jiebinn marked this conversation as resolved.
Show resolved Hide resolved
{
if constexpr (is_parallelize_merge_prepare_needed)
{
std::vector<DataSet *> data_vec;
data_vec.resize(places.size());

for (unsigned long i = 0; i < data_vec.size(); i++)
data_vec[i] = &this->data(places[i]).set;

DataSet::parallelizeMergePrepare(data_vec, thread_pool);
}
else
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() is only implemented when is_parallelize_merge_prepare_needed is true for {} ", getName());
}
}

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).set.merge(this->data(rhs).set);
Expand Down
8 changes: 8 additions & 0 deletions src/AggregateFunctions/IAggregateFunction.h
Expand Up @@ -47,6 +47,7 @@ using DataTypePtr = std::shared_ptr<const IDataType>;
using DataTypes = std::vector<DataTypePtr>;

using AggregateDataPtr = char *;
using AggregateDataPtrs = std::vector<AggregateDataPtr>;
using ConstAggregateDataPtr = const char *;

class IAggregateFunction;
Expand Down Expand Up @@ -148,6 +149,13 @@ class IAggregateFunction : public std::enable_shared_from_this<IAggregateFunctio
/// Default values must be a the 0-th positions in columns.
virtual void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t length, Arena * arena) const = 0;

virtual bool isParallelizeMergePrepareNeeded() const { return false; }

virtual void parallelizeMergePrepare(AggregateDataPtrs & /*places*/, ThreadPool & /*thread_pool*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() with thread pool parameter isn't implemented for {} ", getName());
}

/// Merges state (on which place points to) with other state of current aggregation function.
virtual void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;

Expand Down
51 changes: 51 additions & 0 deletions src/AggregateFunctions/UniqExactSet.h
Expand Up @@ -28,6 +28,57 @@ class UniqExactSet
asTwoLevel().insert(std::forward<Arg>(arg));
}

/// In merge, if one of the lhs and rhs is twolevelset and the other is singlelevelset, then the singlelevelset will need to convertToTwoLevel().
/// It's not in parallel and will cost extra large time if the thread_num is large.
/// This method will convert all the SingleLevelSet to TwoLevelSet in parallel if the hashsets are not all singlelevel or not all twolevel.
static void parallelizeMergePrepare(const std::vector<UniqExactSet *> & data_vec, ThreadPool & thread_pool)
{
unsigned long single_level_set_num = 0;

for (auto ele : data_vec)
{
if (ele->isSingleLevel())
single_level_set_num ++;
}

if (single_level_set_num > 0 && single_level_set_num < data_vec.size())
{
try
{
auto data_vec_atomic_index = std::make_shared<std::atomic_uint32_t>(0);
auto thread_func = [data_vec, data_vec_atomic_index, thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);

setThreadName("UniqExaConvert");

while (true)
{
const auto i = data_vec_atomic_index->fetch_add(1);
if (i >= data_vec.size())
return;
if (data_vec[i]->isSingleLevel())
data_vec[i]->convertToTwoLevel();
}
};
for (size_t i = 0; i < std::min<size_t>(thread_pool.getMaxThreads(), single_level_set_num); ++i)
thread_pool.scheduleOrThrowOnError(thread_func);

thread_pool.wait();
}
catch (...)
{
thread_pool.wait();
throw;
}
}
}

auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr)
{
if (isSingleLevel() && other.isTwoLevel())
Expand Down
14 changes: 14 additions & 0 deletions src/Interpreters/Aggregator.cpp
Expand Up @@ -2601,6 +2601,20 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(

AggregatedDataVariantsPtr & res = non_empty_data[0];

for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (aggregate_functions[i]->isParallelizeMergePrepareNeeded())
{
size_t size = non_empty_data.size();
std::vector<AggregateDataPtr> data_vec;

for (size_t result_num = 0; result_num < size; ++result_num)
data_vec.emplace_back(non_empty_data[result_num]->without_key + offsets_of_aggregate_states[i]);

aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool);
}
}

/// We merge all aggregation results to the first.
for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num)
{
Expand Down
4 changes: 4 additions & 0 deletions tests/performance/prepare_hash_before_merge.xml
@@ -0,0 +1,4 @@
<test>
<query>SELECT COUNT(DISTINCT Title) FROM test.hits SETTINGS max_threads = 24</query>
<query>SELECT COUNT(DISTINCT Referer) FROM test.hits SETTINGS max_threads = 22</query>
</test>