New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert hashSets in parallel before merge #50748
Conversation
lastest upstream master
This is an automatic comment. The PR descriptions does not match the template. Please, edit it accordingly. The error is: Changelog entry required for category 'Performance Improvement' |
1 similar comment
This is an automatic comment. The PR descriptions does not match the template. Please, edit it accordingly. The error is: Changelog entry required for category 'Performance Improvement' |
This is an automated comment for commit 635e9d7 with description of existing statuses. It's updated for the latest CI running
|
pls take a look
|
@@ -147,6 +148,10 @@ class IAggregateFunction : public std::enable_shared_from_this<IAggregateFunctio | |||
/// Default values must be a the 0-th positions in columns. | |||
virtual void addManyDefaults(AggregateDataPtr __restrict place, const IColumn ** columns, size_t length, Arena * arena) const = 0; | |||
|
|||
virtual bool isParallelizeMergePrepareNeeded() const { return false; } | |||
|
|||
virtual void parallelizeMergePrepare(AggregateDataPtrs & /*places*/, ThreadPool & /*thread_pool*/) const {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw Exception(ErrorCodes::NOT_IMPLEMENTED, ...);
here and override only where we actually do smth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Add the exception in virtual function.
src/Interpreters/Aggregator.cpp
Outdated
@@ -2601,6 +2601,21 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl( | |||
|
|||
AggregatedDataVariantsPtr & res = non_empty_data[0]; | |||
|
|||
for (size_t i = 0; i < params.aggregates_size; ++i) | |||
{ | |||
if (aggregate_functions[i]->isAbleToParallelizeMerge() && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isAbleToParallelizeMerge
looks irrelevant here, isParallelizeMergePrepareNeeded
should be enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flag isAbleToParallelizeMerge
is for the parallel merge of UniqExact. If the merge stage is not parallel (isAbleToParallelizeMerge = false
), we could still do hashset converting in parallel. The merge stage and hashset converting stage are independent and irrelevant. Then I think isAbleToParallelizeMerge
could be removed.
/// In merge, if one of the lhs and rhs is twolevelset and the other is singlelevelset, then the singlelevelset will need to convertToTwoLevel(). | ||
/// It's not in parallel and will cost extra large time if the thread_num is large. | ||
/// This method will convert all the SingleLevelSet to TwoLevelSet in parallel if the hashsets are not all singlelevel or not all twolevel. | ||
static void parallelizeMergePrepare(const std::vector<UniqExactSet *> & data_vec, ThreadPool * thread_pool = nullptr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we make sure that ThreadPool
is always not null here and pass it as a reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool) const override
in AggregateFunctionUniq.h
, the ThreadPool
is alway not null. The latest code has used the reference instead of a pointer.
} | ||
}; | ||
for (size_t i = 0; i < std::min<size_t>(thread_pool->getMaxThreads(), single_level_set_num); ++i) | ||
thread_pool->scheduleOrThrowOnError(thread_func); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if exception will be thrown here, we will not reach wait()
. all this code should be wrapped into try-catch that also does wait
in case of exception (refer to #50590).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add try catch in latest code.
would be good to find perf-tests that shows speed-up in the addressed case. |
|
||
void parallelizeMergePrepare(AggregateDataPtrs & places, ThreadPool & thread_pool) const override | ||
{ | ||
std::vector<DataSet *> data_vec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
data_vec.resize(places.size());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried to add the code line data_vec.resize(places.size());
after the initialization of std::vector<DataSet *> data_vec;
But the ClickHouse server crashed. I will check that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the resize() for data_vec in parallelizeMergePrepare.
@nickitat @devcrafter Thanks for your kind review. I will think about that and let you know once I finish. |
The latest code has involved the head file |
I have used the clickbench for the performance test. You mean we might find the "distinct/uniq" operation in tests and check the performance? |
9a19224
to
e42a62f
Compare
Hi @nickitat @devcrafter, thanks for your previous code review. I have updated the patch code and comments according to the review of PR. |
Update to the master
Before merge, if one of the lhs and rhs is singleLevelSet and the other is twoLevelSet, then the SingleLevelSet will call convertToTwoLevel(). The convert process is not in parallel and it will cost lots of cycle if it cosume all the singleLevelSet. The idea of the patch is to convert all the singleLevelSets to twoLevelSets in parallel if the hashsets are not all singleLevel or not all twoLevel. I have tested the patch on Intel 2 x 112 vCPUs SPR server with clickbench and latest upstream ClickHouse. Q5 has got a big 264% performance improvement and 24 queries have got at least 5% performance gain. The overall geomean of 43 queries has gained 7.4% more than the base code. Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
@nickitat The CI error has succeeded in the latest attempt. But the status doesn't show that. |
Update the |
@nickitat @devcrafter @kitaisreal SELECT COUNT(DISTINCT Title) FROM hits_v1 SETTINGS max_threads = 24 |
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
Rename the data set name of performance test from hits_v1 to test.hits to fit the CI rule. |
Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
@@ -66,6 +66,8 @@ class ThetaSketchData : private boost::noncopyable | |||
return 0; | |||
} | |||
|
|||
static void parallelizeMergePrepare(const std::vector<ThetaSketchData *> & /*places*/, ThreadPool & /*thread_pool*/) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I'm not missing smth all parallelizeMergePrepare
could be one of:
- not defined/declared at all
- throw exception
- do meaningful work (empty implementation is not meaningful)
I have some prejudice towards empty methods, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree.
-
For these empty methods in other datasets, we could add the throw exception.
-
Or we may just delete these empty methods in other datasets. Call the
DataSet::parallelizeMergePrepare()
only whenis_parallelize_merge_prepare_needed
is true (UniqExactSet
) and useconstexpr
to avoid compiling error.
I would like to suggest the 2nd way.
Co-authored-by: Nikita Taranov <nickita.taranov@gmail.com>
53fa19d
to
635e9d7
Compare
…repare() Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
In PR(ClickHouse#50748), it has added new phase `parallelizeMergePrepare` before merge if all the hashSets are not all singleLevel or not all twoLevel. Then it will convert all the singleLevelSet to twoLevelSet in parallel, which will increase the CPU utilization and QPS. But if all the hashtables are singleLevel, it could also benefit from the `parallelizeMergePrepare` optimization in most cases if the hashtable size are not too small. By tuning the Query `SELECT COUNT(DISTINCT SearchPhase) FROM hits_v1` in different threads, we have got the mild threshold 6,000. Test patch with the Query 'SELECT COUNT(DISTINCT Title) FROM hits_v1' on 2x80 vCPUs server. If the threads are less than 48, the hashSets are all twoLevel or mixed by singleLevel and twoLevel. If the threads are over 56, all the hashSets are singleLevel. And the QPS has got at most 2.35x performance gain. Threads Opt/Base 8 100.0% 16 99.4% 24 110.3% 32 99.9% 40 99.3% 48 99.8% 56 183.0% 64 234.7% 72 233.1% 80 229.9% 88 224.5% 96 229.6% 104 235.1% 112 229.5% 120 229.1% 128 217.8% 136 222.9% 144 217.8% 152 204.3% 160 203.2% Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
…52973) * Optimize the merge if all hashSets are singleLevel In PR(#50748), it has added new phase `parallelizeMergePrepare` before merge if all the hashSets are not all singleLevel or not all twoLevel. Then it will convert all the singleLevelSet to twoLevelSet in parallel, which will increase the CPU utilization and QPS. But if all the hashtables are singleLevel, it could also benefit from the `parallelizeMergePrepare` optimization in most cases if the hashtable size are not too small. By tuning the Query `SELECT COUNT(DISTINCT SearchPhase) FROM hits_v1` in different threads, we have got the mild threshold 6,000. Test patch with the Query 'SELECT COUNT(DISTINCT Title) FROM hits_v1' on 2x80 vCPUs server. If the threads are less than 48, the hashSets are all twoLevel or mixed by singleLevel and twoLevel. If the threads are over 56, all the hashSets are singleLevel. And the QPS has got at most 2.35x performance gain. Threads Opt/Base 8 100.0% 16 99.4% 24 110.3% 32 99.9% 40 99.3% 48 99.8% 56 183.0% 64 234.7% 72 233.1% 80 229.9% 88 224.5% 96 229.6% 104 235.1% 112 229.5% 120 229.1% 128 217.8% 136 222.9% 144 217.8% 152 204.3% 160 203.2% Signed-off-by: Jiebin Sun <jiebin.sun@intel.com> * Add the comment and explanation for PR#52973 Signed-off-by: Jiebin Sun <jiebin.sun@intel.com> --------- Signed-off-by: Jiebin Sun <jiebin.sun@intel.com>
Before merge, if one of the lhs and rhs is singleLevelSet and the other is twoLevelSet, then the SingleLevelSet will call convertToTwoLevel(). The convert process is serial and not in parallel. It will cost lots of cycle before it cosumes all the singleLevelSet.
The idea of the patch is to convert all the singleLevelSets to twoLevelSets in parallel before merge.
I have tested the patch on Intel 2 x 112 vCPUs SPR server with clickbench and latest upstream ClickHouse. Q5 has got a big 264% performance improvement and 24 queries have got at least 5% performance gain. The overall geomean of 43 queries has gained 7.4% more than the base code.
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
This patch will provide a method to deal with all the hashsets in parallel before merge.
The detail to define the performance issue and how to resolve it.
First, we have found that there is a performance drop for the Q5 of clickbench as the core count increases.
Then we have collected a pipeline visualization if the max_threads of thread_pool is 80 or 112.
max_threads = 80
max_threads = 112
If the max_threads increase from 80 to 112, the merge stage does not decrease, but merge time is 3.2x more.
when merging two twoLevelHash, there is already an optimization to start thread_pool and merge in parallel. However, when merge one singleLevelHash and one TwoLevelHash, the singleLevelHash has to convert to twoLevelHash. The convert progress is serial.
If there is at least one singleLevelHash and one TwoLevelHash, all the singleLevelHash have to be converted to twoLevelHash before merged with the other twoLevelHash. We could add a new stage before merge called Prepare_Hash_before_Merge, where all the hashset would be processed before merging. All the singleLevelHash would be converted to twoLevelHash in parallel in this stage instead of in serial in merge stage.
with this patch, Q5 has got 2.64x performance improvement on a 2x112 vCPUs system (max_threads = 112).