Skip to content

Commit

Permalink
Merge pull request #210 from fasiondog/feature/factor
Browse files Browse the repository at this point in the history
并行优化调整
  • Loading branch information
fasiondog committed Mar 28, 2024
2 parents f2e60c6 + 4e61548 commit 82eb014
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 34 deletions.
27 changes: 27 additions & 0 deletions hikyuu_cpp/hikyuu/analysis/analysis_sys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,31 @@ vector<AnalysisSystemWithBlockOut> HKU_API analysisSystemList(const SystemList&
return result;
}

vector<AnalysisSystemWithBlockOut> HKU_API analysisSystemList(const StockList& stk_list,
const KQuery& query,
const SystemPtr& pro_sys) {
HKU_CHECK(pro_sys, "pro_sys is null!");

return parallel_for_range(0, stk_list.size(), [=](const range_t& range) {
vector<AnalysisSystemWithBlockOut> ret;
auto sys = pro_sys->clone();
Performance per;
AnalysisSystemWithBlockOut out;
for (size_t i = range.first; i < range.second; i++) {
try {
auto stk = stk_list[i];
sys->run(stk, query);
per.statistics(sys->getTM());
out.market_code = stk.market_code();
out.name = stk.name();
out.values = per.values();
ret.emplace_back(std::move(out));
} catch (const std::exception& e) {
HKU_ERROR(e.what());
}
}
return ret;
});
}

} // namespace hku
23 changes: 3 additions & 20 deletions hikyuu_cpp/hikyuu/analysis/analysis_sys.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,8 @@ vector<AnalysisSystemWithBlockOut> HKU_API analysisSystemList(const SystemList&
const StockList& stk_list,
const KQuery& query);

template <class Container>
inline vector<AnalysisSystemWithBlockOut> analysisSystemListWith(const Container& blk,
const KQuery& query,
const SystemPtr sys_proto) {
vector<AnalysisSystemWithBlockOut> result;
HKU_IF_RETURN(blk.size() == 0 || !sys_proto, result);

sys_proto->forceResetAll();
SystemList sys_list;
StockList stk_list;
for (const auto& stk : blk) {
if (!stk.isNull()) {
sys_list.emplace_back(std::move(sys_proto->clone()));
stk_list.emplace_back(stk);
}
}

result = analysisSystemList(sys_list, stk_list, query);
return result;
}
vector<AnalysisSystemWithBlockOut> HKU_API analysisSystemList(const StockList& stk_list,
const KQuery& query,
const SystemPtr& pro_sys);

} // namespace hku
16 changes: 5 additions & 11 deletions hikyuu_cpp/hikyuu/analysis/combinate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include "hikyuu/utilities/thread/thread.h"
#include "hikyuu/indicator/crt/EXIST.h"
#include "hikyuu/trade_sys/signal/crt/SG_Bool.h"
#include "hikyuu/global/GlobalTaskGroup.h"
#include "combinate.h"

namespace hku {
Expand Down Expand Up @@ -80,22 +79,18 @@ vector<CombinateAnalysisOutput> HKU_API combinateIndicatorAnalysisWithBlock(
MQStealThreadPool tg(work_num);
vector<std::future<vector<CombinateAnalysisOutput>>> tasks;

size_t per_num = total > work_num ? total / (work_num * 10) : 1;
size_t count = total % per_num == 0 ? total / per_num : total / per_num + 1;

vector<Stock> buf;
for (size_t i = 0; i < count; i++) {
auto ranges = parallelIndexRange(0, total);
for (const auto& range : ranges) {
buf.clear();
for (size_t j = i * per_num, end = (i + 1) * per_num; j < end; j++) {
if (j >= stocks.size()) {
break;
}
buf.emplace_back(stocks[j]);
for (size_t i = range.first; i < range.second; i++) {
buf.emplace_back(stocks[i]);
}
tasks.emplace_back(tg.submit([sgs, stks = std::move(buf), n_query = query,
n_tm = tm->clone(), n_sys = sys->clone()]() {
vector<CombinateAnalysisOutput> ret;
Performance per;
CombinateAnalysisOutput out;
for (size_t i = 0, len = stks.size(); i < len; i++) {
const Stock& n_stk = stks[i];
for (const auto& sg : sgs) {
Expand All @@ -105,7 +100,6 @@ vector<CombinateAnalysisOutput> HKU_API combinateIndicatorAnalysisWithBlock(
n_sys->setTM(n_tm);
n_sys->run(n_stk, n_query);
per.statistics(n_tm, Datetime::now());
CombinateAnalysisOutput out;
out.combinateName = n_sg->name();
out.market_code = n_stk.market_code();
out.name = n_stk.name();
Expand Down
29 changes: 26 additions & 3 deletions hikyuu_cpp/hikyuu/utilities/thread/algorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

namespace hku {

inline std::vector<std::pair<size_t, size_t>> parallelIndexRange(size_t start, size_t end) {
typedef std::pair<size_t, size_t> range_t;

inline std::vector<range_t> parallelIndexRange(size_t start, size_t end) {
std::vector<std::pair<size_t, size_t>> ret;
if (start >= end) {
return ret;
Expand Down Expand Up @@ -65,14 +67,15 @@ auto parallel_for_index(size_t start, size_t end, FunctionType f) {
std::vector<std::future<std::vector<typename std::result_of<FunctionType(size_t)>::type>>>
tasks;
for (size_t i = 0, total = ranges.size(); i < total; i++) {
tasks.emplace_back(tg.submit([&, range = ranges[i]]() {
tasks.emplace_back(tg.submit([func = f, range = ranges[i]]() {
std::vector<typename std::result_of<FunctionType(size_t)>::type> one_ret;
for (size_t ix = range.first; ix < range.second; ix++) {
one_ret.emplace_back(f(ix));
one_ret.emplace_back(func(ix));
}
return one_ret;
}));
}

std::vector<typename std::result_of<FunctionType(size_t)>::type> ret;
for (auto& task : tasks) {
auto one = task.get();
Expand All @@ -84,4 +87,24 @@ auto parallel_for_index(size_t start, size_t end, FunctionType f) {
return ret;
}

template <typename FunctionType, class TaskGroup = MQStealThreadPool>
auto parallel_for_range(size_t start, size_t end, FunctionType f) {
auto ranges = parallelIndexRange(start, end);
TaskGroup tg;
std::vector<std::future<typename std::result_of<FunctionType(range_t)>::type>> tasks;
for (size_t i = 0, total = ranges.size(); i < total; i++) {
tasks.emplace_back(tg.submit([func = f, range = ranges[i]]() { return func(range); }));
}

typename std::result_of<FunctionType(range_t)>::type ret;
for (auto& task : tasks) {
auto one = task.get();
for (auto&& value : one) {
ret.emplace_back(std::move(value));
}
}

return ret;
}

} // namespace hku
1 change: 1 addition & 0 deletions hikyuu_pywrap/_analysis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ static py::dict analysis_sys_list(const py::object& pystk_list, const KQuery& qu
OStreamToPython guard(false);
py::gil_scoped_release release;
records = analysisSystemList(sys_list, stk_list, query);
// records = analysisSystemList(stk_list, query, sys_proto);
}

Performance per;
Expand Down

0 comments on commit 82eb014

Please sign in to comment.