Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ class UWorkStealingQueue : public UQueueObject {
return result; // 如果非空,表示盗取成功
}


/**
* 判断当前队列是否为空
* @return
*/
CBool isEmpty() const {
return deque_.empty();
}

UWorkStealingQueue() = default;

CGRAPH_NO_ALLOWED_COPY(UWorkStealingQueue)
Expand Down
19 changes: 12 additions & 7 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ class UThreadPrimary : public UThreadBase {
* 休眠一定时间后,然后恢复执行状态,避免出现异常情况导致无法唤醒
*/
CVoid fatWait() {
cur_empty_epoch_++;
++cur_empty_epoch_;
CGRAPH_YIELD();
if (cur_empty_epoch_ >= config_->primary_thread_busy_epoch_) {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_));
cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_),
[this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty(); });
cur_empty_epoch_ = 0;
}
}
Expand All @@ -130,10 +131,11 @@ class UThreadPrimary : public UThreadBase {
* @return
*/
CVoid pushTask(UTask&& task) {
while (!wsq_.tryPush(std::move(task))) {
CGRAPH_YIELD();
wsq_.push(std::move(task));
{
CGRAPH_LOCK_GUARD lk(mutex_);
cur_empty_epoch_ = 0;
}
cur_empty_epoch_ = 0;
cv_.notify_one();
}

Expand All @@ -148,7 +150,10 @@ class UThreadPrimary : public UThreadBase {
CVoid pushTask(UTask&& task, const CBool enable, const CBool lockable) {
wsq_.push(std::move(task), enable, lockable);
if (enable && !lockable) {
cur_empty_epoch_ = 0;
{
CGRAPH_LOCK_GUARD lk(mutex_);
cur_empty_epoch_ = 0;
}
cv_.notify_one();
}
}
Expand Down Expand Up @@ -250,7 +255,7 @@ class UThreadPrimary : public UThreadBase {

private:
CInt index_ {0}; // 线程index
CInt cur_empty_epoch_ {0}; // 当前空转的轮数信息
std::atomic<CInt> cur_empty_epoch_ {0 }; // 当前空转的轮数信息
UWorkStealingQueue<UTask> wsq_ {}; // 内部队列信息
std::vector<UThreadPrimary *>* pool_threads_ {}; // 用于存放线程池中的线程信息
std::vector<CInt> steal_targets_ {}; // 被偷的目标信息
Expand Down
2 changes: 1 addition & 1 deletion src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static const CBool CGRAPH_BATCH_TASK_ENABLE = false;
static const CInt CGRAPH_MAX_LOCAL_BATCH_SIZE = 2; // 批量执行本地任务最大值
static const CInt CGRAPH_MAX_POOL_BATCH_SIZE = 2; // 批量执行通用任务最大值
static const CInt CGRAPH_MAX_STEAL_BATCH_SIZE = 2; // 批量盗取任务最大值
static const CInt CGRAPH_PRIMARY_THREAD_BUSY_EPOCH = 5; // 主线程进入wait状态的轮数,数值越大,理论性能越高,但空转可能性也越大
static const CInt CGRAPH_PRIMARY_THREAD_BUSY_EPOCH = 3; // 主线程进入wait状态的轮数,数值越大,理论性能越高,但空转可能性也越大
static const CMSec CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL = 1000; // 主线程进入休眠状态的默认时间
static const CSec CGRAPH_SECONDARY_THREAD_TTL = 10; // 辅助线程ttl,单位为s
static const CBool CGRAPH_MONITOR_ENABLE = false; // 是否开启监控程序
Expand Down
Loading