From b7b50f3ec26af2bbed7b95fe7ed9f3db20c80c05 Mon Sep 17 00:00:00 2001 From: Chunel Date: Mon, 23 Mar 2026 00:36:49 +0800 Subject: [PATCH] [bugfix] fix schedule run and destory wait_for 1s problem --- src/GraphCtrl/GraphElement/GElementDefine.h | 8 ++--- .../GGroup/GCondition/GMultiCondition.inl | 4 +++ .../GraphElement/GGroup/GMutable/GMutable.cpp | 1 + .../GraphElement/GGroup/GRegion/GRegion.cpp | 6 ++-- .../GraphElement/GGroup/GSome/GSome.inl | 1 + .../GDynamicEngine/GDynamicEngine.cpp | 4 +-- src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h | 5 ++- src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h | 35 +++++++++++-------- .../ThreadPool/Thread/UThreadPrimary.h | 2 +- src/UtilsCtrl/ThreadPool/UThreadPool.cpp | 19 ++++++++-- src/UtilsCtrl/ThreadPool/UThreadPool.h | 2 +- src/UtilsCtrl/ThreadPool/UThreadPool.inl | 4 ++- src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h | 7 ++-- 13 files changed, 62 insertions(+), 36 deletions(-) diff --git a/src/GraphCtrl/GraphElement/GElementDefine.h b/src/GraphCtrl/GraphElement/GElementDefine.h index 4d214b07..841f3299 100644 --- a/src/GraphCtrl/GraphElement/GElementDefine.h +++ b/src/GraphCtrl/GraphElement/GElementDefine.h @@ -13,10 +13,10 @@ CGRAPH_NAMESPACE_BEGIN -const static CSize CGRAPH_DEFAULT_LOOP_TIMES = 1; // 默认循环次数信息 -const static CLevel CGRAPH_DEFAULT_ELEMENT_LEVEL = 0; // 默认的element级别,用于控制init函数 -const static CIndex CGRAPH_DEFAULT_BINDING_INDEX = -1; // 默认绑定线程id,-1表示不绑定 -const static CMSec CGRAPH_DEFAULT_ELEMENT_TIMEOUT = 0; // 默认element的超时时间 +const static CSize CGRAPH_DEFAULT_LOOP_TIMES = 1; // 默认循环次数信息 +const static CLevel CGRAPH_DEFAULT_ELEMENT_LEVEL = 0; // 默认的element级别,用于控制init函数 +const static CMSec CGRAPH_DEFAULT_ELEMENT_TIMEOUT = 0; // 默认element的超时时间 +const static CIndex CGRAPH_DEFAULT_BINDING_INDEX = CGRAPH_DEFAULT_TASK_STRATEGY; // 默认绑定线程id,-1表示不绑定 enum class GElementType { ELEMENT = 0x00000000, // 元素 diff --git a/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.inl b/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.inl index 80e4d776..3c32bbda 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.inl +++ b/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.inl @@ -16,6 +16,10 @@ CGRAPH_NAMESPACE_BEGIN template GMultiCondition::GMultiCondition() { element_type_ = GElementType::MULTI_CONDITION; + if (type == GMultiConditionType::PARALLEL) { + // 多并发的情况下,需要触发线程唤醒 + binding_index_ = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY; + } session_ = URandom<>::generateSession(CGRAPH_STR_MULTI_CONDITION); } diff --git a/src/GraphCtrl/GraphElement/GGroup/GMutable/GMutable.cpp b/src/GraphCtrl/GraphElement/GGroup/GMutable/GMutable.cpp index 35bd173d..f10f3853 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GMutable/GMutable.cpp +++ b/src/GraphCtrl/GraphElement/GGroup/GMutable/GMutable.cpp @@ -13,6 +13,7 @@ CGRAPH_NAMESPACE_BEGIN GMutable::GMutable() { element_type_ = GElementType::MUTABLE; session_ = URandom<>::generateSession(CGRAPH_STR_MUTABLE); + binding_index_ = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY; manager_ = CGRAPH_SAFE_MALLOC_COBJECT(GElementManager) } diff --git a/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp b/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp index e0527535..32a4a306 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp +++ b/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp @@ -11,11 +11,11 @@ CGRAPH_NAMESPACE_BEGIN -GRegion::GRegion() : GGroup() { - is_init_ = false; - manager_ = CGRAPH_SAFE_MALLOC_COBJECT(GElementManager) +GRegion::GRegion() { element_type_ = GElementType::REGION; + binding_index_ = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY; session_ = URandom<>::generateSession(CGRAPH_STR_REGION); + manager_ = CGRAPH_SAFE_MALLOC_COBJECT(GElementManager) } diff --git a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl index fbcd5fd4..d0fb5ad7 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl +++ b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl @@ -16,6 +16,7 @@ CGRAPH_NAMESPACE_BEGIN template GSome::GSome() { element_type_ = GElementType::SOME; + binding_index_ = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY; session_ = URandom<>::generateSession(CGRAPH_STR_SOME); } diff --git a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp index 908529f7..e0969312 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp +++ b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp @@ -56,7 +56,7 @@ CVoid GDynamicEngine::commonRunAll() { */ finished_end_size_ = 0; for (const auto& element : front_element_arr_) { - process(element, element == front_element_arr_.back()); + process(element, element == front_element_arr_.back() && element->isDefaultBinding()); } fatWait(); @@ -257,7 +257,7 @@ CVoid GDynamicEngine::parallelRunAll() { if (parallel_element_matrix_.size() < static_cast(thread_pool_->getConfig().default_thread_size_)) { // 确保所有的 pt 都可以被唤醒,从而快速执行 - thread_pool_->wakeupAllThread(); + (void)thread_pool_->wakeupAllThread(); } { diff --git a/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h b/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h index 530086ff..34cce6b2 100644 --- a/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h +++ b/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h @@ -81,7 +81,7 @@ class UAtomicQueue : public UQueueObject { * 阻塞式等待弹出 * @return */ - std::unique_ptr popWithTimeout(CMSec ms) { + std::unique_ptr popWithTimeout(const CMSec ms) { CGRAPH_UNIQUE_LOCK lk(mutex_); if (!cv_.wait_for(lk, std::chrono::milliseconds(ms), [this] { return (!queue_.empty()) || (!ready_flag_); })) { @@ -123,9 +123,8 @@ class UAtomicQueue : public UQueueObject { queue_.push(std::move(task)); mutex_.unlock(); break; - } else { - CGRAPH_YIELD(); } + CGRAPH_YIELD(); } cv_.notify_one(); } diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h index 3455c555..9c652e14 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h @@ -105,7 +105,12 @@ class UThreadBase : public UThreadObject { * 清空所有任务内容 */ CVoid reset() { - done_ = false; + { + // 这里必须要加 lock,避免退出的时候,cv_.wait_for() 的 pred竞争 + CGRAPH_UNIQUE_LOCK lk(mutex_); + done_ = false; + } + cv_.notify_one(); // 防止主线程 wait时间过长,导致的结束缓慢问题 if (thread_.joinable()) { thread_.join(); // 等待线程结束 @@ -176,9 +181,9 @@ class UThreadBase : public UThreadObject { policy = config_->secondary_thread_policy_; } - auto handle = thread_.native_handle(); - sched_param param = { calcPriority(priority) }; - int ret = pthread_setschedparam(handle, calcPolicy(policy), ¶m); + const auto handle = thread_.native_handle(); + const sched_param& param = { calcPriority(priority) }; + const int ret = pthread_setschedparam(handle, calcPolicy(policy), ¶m); if (0 != ret) { CGRAPH_ECHO("warning : set thread sched param failed, system error code is [%d]", ret); } @@ -214,7 +219,7 @@ class UThreadBase : public UThreadObject { * @param policy * @return */ - static CInt calcPolicy(int policy) { + static CInt calcPolicy(const int policy) { return (CGRAPH_THREAD_SCHED_OTHER == policy || CGRAPH_THREAD_SCHED_RR == policy || CGRAPH_THREAD_SCHED_FIFO == policy) @@ -228,7 +233,7 @@ class UThreadBase : public UThreadObject { * @param priority * @return */ - static CInt calcPriority(int priority) { + static CInt calcPriority(const int priority) { return (priority >= CGRAPH_THREAD_MIN_PRIORITY && priority <= CGRAPH_THREAD_MAX_PRIORITY) ? priority : CGRAPH_THREAD_MIN_PRIORITY; @@ -236,17 +241,17 @@ class UThreadBase : public UThreadObject { protected: - CBool done_; // 线程状态标记 - CBool is_init_; // 标记初始化状态 - CBool is_running_; // 是否正在执行 - CInt type_ = 0; // 用于区分线程类型(主线程、辅助线程) - CULong total_task_num_ = 0; // 处理的任务的数字 + CBool done_; // 线程状态标记 + CBool is_init_; // 标记初始化状态 + CBool is_running_; // 是否正在执行 + CInt type_ = 0; // 用于区分线程类型(主线程、辅助线程) + CULong total_task_num_ = 0; // 处理的任务的数字 - UAtomicQueue* pool_task_queue_; // 用于存放线程池中的普通任务 - UAtomicPriorityQueue* pool_priority_task_queue_; // 用于存放线程池中的包含优先级任务的队列,仅辅助线程可以执行 - UThreadPoolConfigPtr config_ = nullptr; // 配置参数信息 + UAtomicQueue* pool_task_queue_ { nullptr }; // 用于存放线程池中的普通任务 + UAtomicPriorityQueue* pool_priority_task_queue_ { nullptr }; // 用于存放线程池中的包含优先级任务的队列,仅辅助线程可以执行 + UThreadPoolConfigPtr config_ { nullptr }; // 配置参数信息 - std::thread thread_; // 线程类 + std::thread thread_; // 线程类 std::mutex mutex_; std::condition_variable cv_; }; diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h index abe5d995..2094badd 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h @@ -119,7 +119,7 @@ class UThreadPrimary : public UThreadBase { if (cur_empty_epoch_ >= config_->primary_thread_busy_epoch_) { CGRAPH_UNIQUE_LOCK lk(mutex_); cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_), - [this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty() || !done_; }); + [this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty() || !pool_task_queue_->empty() || !done_; }); cur_empty_epoch_ = 0; } } diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.cpp b/src/UtilsCtrl/ThreadPool/UThreadPool.cpp index a020a87a..f39acb6e 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.cpp +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.cpp @@ -210,6 +210,12 @@ CIndex UThreadPool::dispatch(const CIndex origIndex) { CIndex realIndex = 0; if (CGRAPH_DEFAULT_TASK_STRATEGY == origIndex) { realIndex = cur_index_++; + if (realIndex >= 0 && realIndex < config_.default_thread_size_ + && primary_threads_[realIndex]->is_running_) { + // 如果是默认调度,并且被放置到 正在running 的pt中,则切换为 trigger_one 的策略,防止阻塞 + realIndex = CGRAPH_TRIGGER_ALL_THREAD_STRATEGY; + } + if (cur_index_ >= config_.max_thread_size_ || cur_index_ < 0) { cur_index_ = 0; } @@ -269,14 +275,21 @@ CVoid UThreadPool::monitor() { } -CVoid UThreadPool::wakeupAllThread() const { +CSize UThreadPool::wakeupAllThread() const { + CSize size = 0; for (auto& pt : primary_threads_) { - pt->wakeup(); + if (pt->wakeup()) { + ++size; + } } for (auto& st : secondary_threads_) { - st->wakeup(); + if (st->wakeup()) { + ++size; + } } + + return size; } CGRAPH_NAMESPACE_END \ No newline at end of file diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.h b/src/UtilsCtrl/ThreadPool/UThreadPool.h index 4eb809e9..adca1708 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.h @@ -179,7 +179,7 @@ class UThreadPool : public UThreadObject { * 通知所有thread 开启 * @return */ - CVoid wakeupAllThread() const; + CSize wakeupAllThread() const; protected: /** diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.inl b/src/UtilsCtrl/ThreadPool/UThreadPool.inl index a3c09af2..6b4e9ad1 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.inl +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.inl @@ -12,7 +12,6 @@ #include "UThreadPool.h" CGRAPH_NAMESPACE_BEGIN - template auto UThreadPool::commit(const FunctionType& func, CIndex index) -> std::future()())> { @@ -62,6 +61,9 @@ CVoid UThreadPool::execute(FunctionType&& task, const CIndex index) { primary_threads_[realIndex]->pushTask(std::forward(task)); } else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) { priority_task_queue_.push(std::forward(task), CGRAPH_LONG_TIME_TASK_STRATEGY); + } else if (CGRAPH_TRIGGER_ALL_THREAD_STRATEGY == realIndex) { + task_queue_.push(std::forward(task)); + (void)wakeupAllThread(); } else { task_queue_.push(std::forward(task)); } diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h b/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h index bb5afd04..d090ba1a 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h @@ -38,9 +38,10 @@ static const CUInt CGRAPH_DEFAULT_RINGBUFFER_SIZE = 64; static const CIndex CGRAPH_MAIN_THREAD_ID = -1; // 启动线程id标识(非上述主线程) static const CIndex CGRAPH_SECONDARY_THREAD_COMMON_ID = -2; // 辅助线程统一id标识 -static const CInt CGRAPH_DEFAULT_TASK_STRATEGY = -1; // 默认线程调度策略 -static const CInt CGRAPH_POOL_TASK_STRATEGY = -2; // 固定用pool中的队列的调度策略 -static const CInt CGRAPH_LONG_TIME_TASK_STRATEGY = -101; // 长时间任务调度策略 +static const CIndex CGRAPH_DEFAULT_TASK_STRATEGY = -1; // 默认线程调度策略 +static const CIndex CGRAPH_POOL_TASK_STRATEGY = -2; // 固定用pool中的队列的调度策略 +static const CIndex CGRAPH_LONG_TIME_TASK_STRATEGY = -101; // 长时间任务调度策略 +static const CIndex CGRAPH_TRIGGER_ALL_THREAD_STRATEGY = -102; // 触发线程逻辑调度策略 /** * 以下为线程池配置信息