Skip to content

Commit

Permalink
Convert hashSets in parallel before merge (#50748)
Browse files Browse the repository at this point in the history
* Convert hashSets in parallel before merge

Before merge, if one of the lhs and rhs is singleLevelSet and the other is twoLevelSet,
then the SingleLevelSet will call convertToTwoLevel(). The convert process is not in parallel
and it will cost lots of cycle if it cosume all the singleLevelSet.

The idea of the patch is to convert all the singleLevelSets to twoLevelSets in parallel if
the hashsets are not all singleLevel or not all twoLevel.

I have tested the patch on Intel 2 x 112 vCPUs SPR server with clickbench and latest upstream
ClickHouse.
Q5 has got a big 264% performance improvement and 24 queries have got at least 5% performance
gain. The overall geomean of 43 queries has gained 7.4% more than the base code.

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>

* add resize() for the data_vec in parallelizeMergePrepare()

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>

* Add the performance test prepare_hash_before_merge.xml

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>

* Fit the CI to rename the data set from hits_v1 to test.hits.

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>

* remove the redundant branch in UniqExactSet

Co-authored-by: Nikita Taranov <nickita.taranov@gmail.com>

* Remove the empty methods and add throw exception in parallelizeMergePrepare()

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>

---------

Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
Co-authored-by: Nikita Taranov <nickita.taranov@gmail.com>
  • Loading branch information
jiebinn and nickitat committed Jul 27, 2023
1 parent 33300a9 commit 78f3a57
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 0 deletions.
39 changes: 39 additions & 0 deletions src/AggregateFunctions/AggregateFunctionUniq.h
Expand Up @@ -29,6 +29,10 @@
#include <AggregateFunctions/UniqVariadicHash.h>
#include <AggregateFunctions/UniquesHashSet.h>

namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}

namespace DB
{
Expand All @@ -42,6 +46,7 @@ struct AggregateFunctionUniqUniquesHashSetData
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniq"; }
Expand All @@ -55,6 +60,7 @@ struct AggregateFunctionUniqUniquesHashSetDataForVariadic
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
Expand All @@ -72,6 +78,7 @@ struct AggregateFunctionUniqHLL12Data
Set set;

constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniqHLL12"; }
Expand All @@ -84,6 +91,7 @@ struct AggregateFunctionUniqHLL12Data<String, false>
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniqHLL12"; }
Expand All @@ -96,6 +104,7 @@ struct AggregateFunctionUniqHLL12Data<UUID, false>
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniqHLL12"; }
Expand All @@ -108,6 +117,7 @@ struct AggregateFunctionUniqHLL12Data<IPv6, false>
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniqHLL12"; }
Expand All @@ -120,6 +130,7 @@ struct AggregateFunctionUniqHLL12DataForVariadic
Set set;

constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
Expand All @@ -143,6 +154,7 @@ struct AggregateFunctionUniqExactData
Set set;

constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = false;

static String getName() { return "uniqExact"; }
Expand All @@ -162,6 +174,7 @@ struct AggregateFunctionUniqExactData<String, is_able_to_parallelize_merge_>
Set set;

constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = false;

static String getName() { return "uniqExact"; }
Expand All @@ -181,6 +194,7 @@ struct AggregateFunctionUniqExactData<IPv6, is_able_to_parallelize_merge_>
Set set;

constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = false;

static String getName() { return "uniqExact"; }
Expand All @@ -190,6 +204,7 @@ template <bool is_exact_, bool argument_is_tuple_, bool is_able_to_parallelize_m
struct AggregateFunctionUniqExactDataForVariadic : AggregateFunctionUniqExactData<String, is_able_to_parallelize_merge_>
{
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_parallelize_merge_prepare_needed = true;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
Expand All @@ -204,6 +219,7 @@ struct AggregateFunctionUniqThetaData
Set set;

constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = false;

static String getName() { return "uniqTheta"; }
Expand All @@ -213,6 +229,7 @@ template <bool is_exact_, bool argument_is_tuple_>
struct AggregateFunctionUniqThetaDataForVariadic : AggregateFunctionUniqThetaData
{
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_parallelize_merge_prepare_needed = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
Expand Down Expand Up @@ -384,8 +401,10 @@ template <typename T, typename Data>
class AggregateFunctionUniq final : public IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>>
{
private:
using DataSet = typename Data::Set;
static constexpr size_t num_args = 1;
static constexpr bool is_able_to_parallelize_merge = Data::is_able_to_parallelize_merge;
static constexpr bool is_parallelize_merge_prepare_needed = Data::is_parallelize_merge_prepare_needed;

public:
explicit AggregateFunctionUniq(const DataTypes & argument_types_)
Expand Down Expand Up @@ -439,6 +458,26 @@ class AggregateFunctionUniq final : public IAggregateFunctionDataHelper<Data, Ag
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_begin, row_end, flags, null_map);
}

bool isParallelizeMergePrepareNeeded() const override { return is_parallelize_merge_prepare_needed;}

void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool) const override
{
if constexpr (is_parallelize_merge_prepare_needed)
{
std::vector<DataSet *> data_vec;
data_vec.resize(places.size());

for (unsigned long i = 0; i < data_vec.size(); i++)
data_vec[i] = &this->data(places[i]).set;

DataSet::parallelizeMergePrepare(data_vec, thread_pool);
}
else
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() is only implemented when is_parallelize_merge_prepare_needed is true for {} ", getName());
}
}

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).set.merge(this->data(rhs).set);
Expand Down
8 changes: 8 additions & 0 deletions src/AggregateFunctions/IAggregateFunction.h
Expand Up @@ -47,6 +47,7 @@ using DataTypePtr = std::shared_ptr<const IDataType>;
using DataTypes = std::vector<DataTypePtr>;

using AggregateDataPtr = char *;
using AggregateDataPtrs = std::vector<AggregateDataPtr>;
using ConstAggregateDataPtr = const char *;

class IAggregateFunction;
Expand Down Expand Up @@ -148,6 +149,13 @@ class IAggregateFunction : public std::enable_shared_from_this<IAggregateFunctio
/// Default values must be a the 0-th positions in columns.
virtual void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t length, Arena * arena) const = 0;

virtual bool isParallelizeMergePrepareNeeded() const { return false; }

virtual void parallelizeMergePrepare(AggregateDataPtrs & /*places*/, ThreadPool & /*thread_pool*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "parallelizeMergePrepare() with thread pool parameter isn't implemented for {} ", getName());
}

/// Merges state (on which place points to) with other state of current aggregation function.
virtual void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;

Expand Down
51 changes: 51 additions & 0 deletions src/AggregateFunctions/UniqExactSet.h
Expand Up @@ -28,6 +28,57 @@ class UniqExactSet
asTwoLevel().insert(std::forward<Arg>(arg));
}

/// In merge, if one of the lhs and rhs is twolevelset and the other is singlelevelset, then the singlelevelset will need to convertToTwoLevel().
/// It's not in parallel and will cost extra large time if the thread_num is large.
/// This method will convert all the SingleLevelSet to TwoLevelSet in parallel if the hashsets are not all singlelevel or not all twolevel.
static void parallelizeMergePrepare(const std::vector<UniqExactSet *> & data_vec, ThreadPool & thread_pool)
{
unsigned long single_level_set_num = 0;

for (auto ele : data_vec)
{
if (ele->isSingleLevel())
single_level_set_num ++;
}

if (single_level_set_num > 0 && single_level_set_num < data_vec.size())
{
try
{
auto data_vec_atomic_index = std::make_shared<std::atomic_uint32_t>(0);
auto thread_func = [data_vec, data_vec_atomic_index, thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);

setThreadName("UniqExaConvert");

while (true)
{
const auto i = data_vec_atomic_index->fetch_add(1);
if (i >= data_vec.size())
return;
if (data_vec[i]->isSingleLevel())
data_vec[i]->convertToTwoLevel();
}
};
for (size_t i = 0; i < std::min<size_t>(thread_pool.getMaxThreads(), single_level_set_num); ++i)
thread_pool.scheduleOrThrowOnError(thread_func);

thread_pool.wait();
}
catch (...)
{
thread_pool.wait();
throw;
}
}
}

auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr)
{
if (isSingleLevel() && other.isTwoLevel())
Expand Down
14 changes: 14 additions & 0 deletions src/Interpreters/Aggregator.cpp
Expand Up @@ -2603,6 +2603,20 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(

AggregatedDataVariantsPtr & res = non_empty_data[0];

for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (aggregate_functions[i]->isParallelizeMergePrepareNeeded())
{
size_t size = non_empty_data.size();
std::vector<AggregateDataPtr> data_vec;

for (size_t result_num = 0; result_num < size; ++result_num)
data_vec.emplace_back(non_empty_data[result_num]->without_key + offsets_of_aggregate_states[i]);

aggregate_functions[i]->parallelizeMergePrepare(data_vec, thread_pool);
}
}

/// We merge all aggregation results to the first.
for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num)
{
Expand Down
4 changes: 4 additions & 0 deletions tests/performance/prepare_hash_before_merge.xml
@@ -0,0 +1,4 @@
<test>
<query>SELECT COUNT(DISTINCT Title) FROM test.hits SETTINGS max_threads = 24</query>
<query>SELECT COUNT(DISTINCT Referer) FROM test.hits SETTINGS max_threads = 22</query>
</test>

0 comments on commit 78f3a57

Please sign in to comment.