Skip to content

Commit

Permalink
[perf] optimize parallel tasks with batch.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Apr 14, 2024
1 parent d7b3cb8 commit ec5c929
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ option(CGRAPH_BUILD_PERFORMANCE_TESTS "Enables builds of performance tests" OFF)
# 不建议作为验证自行逻辑的唯一标准,不建议使用者打开
# add_definitions(-D_CGRAPH_SHOW_THREAD_METRICS_)

# 此宏可以在纯并发的微小任务下,用于提升整体。主要用于在性能测试的情况下使用,一般情况不推荐打开
# add_definitions(-D_CGRAPH_PARALLEL_MICRO_BATCH_ENABLE_)

# add CGraph environment info
include(cmake/CGraph-env-include.cmake)

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ int main() {
* [taskflow](https://github.com/taskflow/taskflow) : A General-purpose Parallel and Heterogeneous Task Programming System
* [awesome-cpp](https://github.com/fffaraz/awesome-cpp) : A curated list of awesome C++ (or C) frameworks, libraries, resources, and shiny things. Inspired by awesome-... stuff.
* [awesome-workflow-engines](https://github.com/meirwah/awesome-workflow-engines) : A curated list of awesome open source workflow engines
* [nndeploy](https://github.com/DeployAI/nndeploy) : nndeploy是一款模型端到端部署框架。以多端推理以及基于有向无环图模型部署为内核,致力为用户提供跨平台、简单易用、高性能的模型部署体验。
* [KuiperInfer](https://github.com/zjhellofss/KuiperInfer) : 带你从零实现一个高性能的深度学习推理库,支持大模型 llama2 、Unet、Yolov5、Resnet等模型的推理。Implement a high-performance deep learning inference library step by step

------------
<details>
Expand Down
12 changes: 4 additions & 8 deletions README_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,14 @@ int main() {
GElementPtr a, b, c, d = nullptr;
/* register node with dependency info */
CStatus status = pipeline->registerGElement<MyNode1>(&a, {}, "nodeA"); // register nodeA with no dependency
status += pipeline->registerGElement<MyNode2>(&b, {a}, "nodeB"); // b depends a
status += pipeline->registerGElement<MyNode1>(&c, {a}, "nodeC");
status += pipeline->registerGElement<MyNode2>(&d, {b, c}, "nodeD"); // d depends b and c
if (!status.isOK()) {
return;
}
pipeline->registerGElement<MyNode1>(&a, {}, "nodeA"); // register nodeA with no dependency
pipeline->registerGElement<MyNode2>(&b, {a}, "nodeB"); // b depends a
pipeline->registerGElement<MyNode1>(&c, {a}, "nodeC");
pipeline->registerGElement<MyNode2>(&d, {b, c}, "nodeD"); // d depends b and c
/* run dag pipeline */
status = pipeline->process();
GPipelineFactory::remove(pipeline);
return 0;
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,40 @@ CVoid GDynamicEngine::fatWait() {
}


#ifdef _CGRAPH_PARALLEL_MICRO_BATCH_ENABLE_
CVoid GDynamicEngine::parallelRunAll() {
// 微任务模式,主要用于性能测试的场景下
const UThreadPoolConfig& config = thread_pool_->getConfig();
CSize thdNum = config.default_thread_size_ + config.secondary_thread_size_;
CGRAPH_THROW_EXCEPTION_BY_CONDITION(thdNum <= 0,
"default thread size cannot smaller than 1");

std::vector<std::future<CStatus>> futures;
CSize taskNumPerThd = total_end_size_ / thdNum + (CSize)(0 != total_end_size_ % thdNum);
for (int i = 0; i < thdNum; i++) {
GElementPtrArr elements;
for (int j = 0; j < taskNumPerThd; j++) {
auto cur = i * taskNumPerThd + j;
if (cur < total_end_size_) {
elements.emplace_back(front_element_arr_[cur]);
}
}
auto curFut = thread_pool_->commit([elements] {
CGRAPH_FUNCTION_BEGIN
for (auto* element : elements) {
status += element->fatProcessor(CFunctionType::RUN);
CGRAPH_FUNCTION_CHECK_STATUS
}
CGRAPH_FUNCTION_END;
});
futures.emplace_back(std::move(curFut));
}

for (auto& fut : futures) {
cur_status_ += fut.get();
}
}
#else
CVoid GDynamicEngine::parallelRunAll() {
/**
* 主要适用于dag是纯并发逻辑的情况
Expand All @@ -220,7 +254,7 @@ CVoid GDynamicEngine::parallelRunAll() {
cur_status_ += fut.get();
}
}

#endif

CVoid GDynamicEngine::serialRunAll() {
/**
Expand Down
5 changes: 5 additions & 0 deletions src/UtilsCtrl/ThreadPool/UThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ CStatus UThreadPool::setConfig(const UThreadPoolConfig &config) {
}


UThreadPoolConfig UThreadPool::getConfig() const {
return config_;
}


CStatus UThreadPool::init() {
CGRAPH_FUNCTION_BEGIN
if (is_init_) {
Expand Down
6 changes: 6 additions & 0 deletions src/UtilsCtrl/ThreadPool/UThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ class UThreadPool : public UThreadObject {
*/
CStatus setConfig(const UThreadPoolConfig &config);

/**
* 获取线程池配置信息
* @return
*/
UThreadPoolConfig getConfig() const;

/**
* 开启所有的线程信息
* @return
Expand Down

0 comments on commit ec5c929

Please sign in to comment.