From 8aaface2c03ea94cecc3bc25badfee3e3c2b250a Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Tue, 21 Aug 2018 20:29:39 +0300 Subject: [PATCH 1/4] Obsolete tests disabled --- test/main.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/main.cpp b/test/main.cpp index c25e2977..b37c6f11 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -36,7 +36,7 @@ int main(int argc, char **argv) // disabling following test cases by default, but these tests can be still run // with explictily passed --gtest_filter="GryptonodeHandlersTest.*" - testing::GTEST_FLAG(filter) = "-CryptonodeHandlersTest.*:SupernodeTest.*:FullSupernodeListTest.*"; + testing::GTEST_FLAG(filter) = "-CryptonodeHandlersTest.*:SupernodeTest.*:FullSupernodeListTest.*:GraftServerCommonTest.test*"; testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } From 1c01ee06ec5ab746f76504261986cda6b08a2334 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Tue, 21 Aug 2018 22:04:42 +0300 Subject: [PATCH 2/4] The State Machine implemented --- include/graft_constants.h | 1 + include/state_machine.h | 98 ++++++++++++ include/task.h | 28 ++-- include/thread_pool.h | 33 +--- src/task.cpp | 324 +++++++++++++++++++++++++++----------- 5 files changed, 349 insertions(+), 135 deletions(-) create mode 100644 include/state_machine.h diff --git a/include/graft_constants.h b/include/graft_constants.h index 6078b67f..400c41fb 100644 --- a/include/graft_constants.h +++ b/include/graft_constants.h @@ -7,6 +7,7 @@ namespace graft EXP(None) \ EXP(Ok) \ EXP(Forward) \ + EXP(Again) \ EXP(Error) \ EXP(Drop) \ EXP(Busy) \ diff --git a/include/state_machine.h b/include/state_machine.h new file mode 100644 index 00000000..af1a6509 --- /dev/null +++ b/include/state_machine.h @@ -0,0 +1,98 @@ +#pragma once + +#include "task.h" + +namespace graft +{ + +class StateMachine final +{ +public: + enum State + { + EXECUTE, + PRE_ACTION, + CHK_PRE_ACTION, + WORKER_ACTION, + CHK_WORKER_ACTION, + WORKER_ACTION_DONE, + POST_ACTION, + CHK_POST_ACTION, + AGAIN, + EXIT, + }; + + using St = graft::Status; + using Statuses = std::initializer_list; + using Guard = std::function; + using Action = std::function; + + StateMachine(State initial_state = EXECUTE) + { + state(initial_state); + init_table(); + } + + void dispatch(BaseTaskPtr bt, State initial_state) + { +// state(State(initial_state)); + state(initial_state); + while(state() != EXIT) + { + process(bt); + } + } + +private: + void init_table(); + State state() const { return m_state; } + State state(State state) { return m_state = state; } + St status(BaseTaskPtr bt) const { return bt->getLastStatus(); } + + void process(BaseTaskPtr bt) + { + St cur_stat = status(bt); + + for(auto& r : m_table) + { + if(m_state != std::get<0>(r)) continue; + + Statuses& ss = std::get<1>(r); + if(ss.size()!=0) + { + bool res = false; + for(auto s : ss) + { + if(s == cur_stat) + { + res = true; + break; + } + } + if(!res) continue; + } + + Guard& g = std::get<3>(r); + if(g && !g(bt)) continue; + + Action& a = std::get<4>(r); + if(a) a(bt); + m_state = std::get<2>(r); + return; + } + throw std::runtime_error("State machine table is not complete"); + } +private: + + using H3 = Router::Handler3; + + const Guard has(Router::Handler H3::* act); + const Guard hasnt(Router::Handler H3::* act); + + State m_state; + using row = std::tuple; + std::vector m_table; +}; + +}//namespace graft + diff --git a/include/task.h b/include/task.h index 1e5695e1..4048e55f 100644 --- a/include/task.h +++ b/include/task.h @@ -202,16 +202,13 @@ class ClientTask : public BaseTask ConnectionManager* m_connectionManager; }; +class StateMachine; + class TaskManager { public: - TaskManager(const ConfigOpts& copts) - : m_copts(copts) - { - // TODO: validate options, throw exception if any mandatory options missing - initThreadPool(copts.workers_count, copts.worker_queue_len); - } - virtual ~TaskManager() { } + TaskManager(const ConfigOpts& copts); + virtual ~TaskManager(); void sendUpstream(BaseTaskPtr bt); void addPeriodicTask(const Router::Handler3& h3, std::chrono::milliseconds interval_ms); @@ -236,6 +233,8 @@ class TaskManager static void sendUpstreamBlocking(Output& output, Input& input, std::string& err); + void runWorkerActionFromTheThreadPool(BaseTaskPtr bt); + virtual void notifyJobReady() = 0; void cb_event(uint64_t cnt); @@ -247,13 +246,17 @@ class TaskManager ConfigOpts m_copts; private: - void ExecutePreAction(BaseTaskPtr bt); - void ExecutePostAction(BaseTaskPtr bt, GJ* gj = nullptr); //gj equals nullptr if threadPool was skipped for some reasons void Execute(BaseTaskPtr bt); - void processResult(BaseTaskPtr bt); - void respondAndDie(BaseTaskPtr bt, const std::string& s); + void processForward(BaseTaskPtr bt); + void processOk(BaseTaskPtr bt); + void respondAndDie(BaseTaskPtr bt, const std::string& s, bool die = true); void postponeTask(BaseTaskPtr bt); + void checkThreadPoolOverflow(BaseTaskPtr bt); + void runPreAction(BaseTaskPtr bt); + void runWorkerAction(BaseTaskPtr bt); + void runPostAction(BaseTaskPtr bt); + void initThreadPool(int threadCount = std::thread::hardware_concurrency(), int workersQueueSize = 32); bool tryProcessReadyJob(); @@ -281,6 +284,9 @@ class TaskManager std::unique_ptr m_promiseQueue; static thread_local bool io_thread; static TaskManager* g_upstreamManager; + + friend class StateMachine; + std::unique_ptr m_stateMachine; }; }//namespace graft diff --git a/include/thread_pool.h b/include/thread_pool.h index 8416290e..f5c7bc4a 100644 --- a/include/thread_pool.h +++ b/include/thread_pool.h @@ -39,37 +39,10 @@ class GraftJob //main payload virtual void operator () () { - { - decltype(auto) vars_cref = m_bt->getVars(); - decltype(auto) input_ref = m_bt->getInput(); - decltype(auto) output_ref = m_bt->getOutput(); - decltype(auto) h3_ref = m_bt->getHandler3(); - decltype(auto) ctx = m_bt->getCtx(); + // Please read the comment about exceptions and noexcept specifier + // near 'void terminate()' function in main.cpp + m_bt->getManager().runWorkerActionFromTheThreadPool(m_bt); - try - { - // Please read the comment about exceptions and noexcept specifier - // near 'void terminate()' function in main.cpp - Status status = h3_ref.worker_action(vars_cref, input_ref, ctx, output_ref); - Context::LocalFriend::setLastStatus(ctx.local, status); - if(Status::Ok == status && h3_ref.post_action || Status::Forward == status) - { - input_ref.assign(output_ref); - } - } - catch(const std::exception& e) - { - ctx.local.setError(e.what()); - input_ref.reset(); - throw; - } - catch(...) - { - ctx.local.setError("unknown exception"); - input_ref.reset(); - throw; - } - } Watcher* save_m_watcher = m_watcher; //save m_watcher before move itself into resulting queue m_rq->push(std::move(*this)); //similar to "delete this;" save_m_watcher->notifyJobReady(); diff --git a/src/task.cpp b/src/task.cpp index f01a7491..fe1a959d 100644 --- a/src/task.cpp +++ b/src/task.cpp @@ -1,6 +1,7 @@ #include "task.h" #include "connection.h" #include "router.h" +#include "state_machine.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "supernode.task" @@ -10,6 +11,131 @@ namespace graft { thread_local bool TaskManager::io_thread = false; TaskManager* TaskManager::g_upstreamManager{nullptr}; +const StateMachine::Guard StateMachine::has(Router::Handler H3::* act) +{ + return [act](BaseTaskPtr bt)->bool + { + return (bt->getHandler3().*act != nullptr); + }; +} + +const StateMachine::Guard StateMachine::hasnt(Router::Handler H3::* act) +{ + return [act](BaseTaskPtr bt)->bool + { + return (bt->getHandler3().*act == nullptr); + }; +} + +void StateMachine::init_table() +{ + + const Action run_forward = [](BaseTaskPtr bt) + { + bt->getManager().processForward(bt); + }; + + const Action run_response = [](BaseTaskPtr bt) + { + assert(St::Again == bt->getLastStatus()); + bt->getManager().respondAndDie(bt, bt->getOutput().data(), false); + }; + + const Action run_error_response = [](BaseTaskPtr bt) + { + assert(St::Error == bt->getLastStatus() || + St::InternalError == bt->getLastStatus() || + St::Stop == bt->getLastStatus()); + bt->getManager().respondAndDie(bt, bt->getOutput().data()); + }; + + const Action run_drop = [](BaseTaskPtr bt) + { + assert(St::Drop == bt->getLastStatus()); + bt->getManager().respondAndDie(bt, "Job done Drop."); //TODO: Expect HTTP Error Response + }; + + const Action run_ok_response = [](BaseTaskPtr bt) + { + assert(St::Ok == bt->getLastStatus()); + bt->getManager().processOk(bt); + }; + + const Action run_postpone = [](BaseTaskPtr bt) + { + assert(St::Postpone == bt->getLastStatus()); + bt->getManager().postponeTask(bt); + }; + + const Action check_overflow = [](BaseTaskPtr bt) + { + bt->getManager().checkThreadPoolOverflow(bt); + }; + + const Action run_preaction = [](BaseTaskPtr bt) + { + bt->getManager().runPreAction(bt); + }; + + const Action run_workeraction = [](BaseTaskPtr bt) + { + bt->getManager().runWorkerAction(bt); + }; + + const Action run_postaction = [](BaseTaskPtr bt) + { + bt->getManager().runPostAction(bt); + }; + +#define ANY { } + + m_table = std::vector( + { +// Start Status Target Guard Action + {EXECUTE, ANY, PRE_ACTION, nullptr, check_overflow }, + {PRE_ACTION, {St::Busy}, EXIT, nullptr, nullptr }, + {PRE_ACTION, {St::None, St::Ok, St::Forward, St::Postpone}, + CHK_PRE_ACTION, nullptr, run_preaction }, + {CHK_PRE_ACTION, {St::Again}, PRE_ACTION, nullptr, run_response }, + {CHK_PRE_ACTION, {St::Ok}, WORKER_ACTION, has(&H3::pre_action), nullptr }, + {CHK_PRE_ACTION, {St::Forward}, POST_ACTION, has(&H3::pre_action), nullptr }, + {CHK_PRE_ACTION, {St::Error, St::InternalError, St::Stop}, + EXIT, has(&H3::pre_action), run_error_response }, + {CHK_PRE_ACTION, {St::Drop}, EXIT, has(&H3::pre_action), run_drop }, + {CHK_PRE_ACTION, {St::None, St::Ok, St::Forward, St::Postpone}, + WORKER_ACTION, nullptr, nullptr }, + {WORKER_ACTION, ANY, CHK_WORKER_ACTION, nullptr, run_workeraction }, + {CHK_WORKER_ACTION, ANY, EXIT, has(&H3::worker_action), nullptr }, + {CHK_WORKER_ACTION, ANY, POST_ACTION, nullptr, nullptr }, + + {WORKER_ACTION_DONE, {St::Again}, WORKER_ACTION, nullptr, run_response }, + {WORKER_ACTION_DONE, ANY, POST_ACTION, nullptr, nullptr }, + {POST_ACTION, ANY, CHK_POST_ACTION, nullptr, run_postaction }, + {CHK_POST_ACTION, {St::Again}, POST_ACTION, nullptr, run_response }, + {CHK_POST_ACTION, {St::Forward}, EXIT, nullptr, run_forward }, + {CHK_POST_ACTION, {St::Ok}, EXIT, nullptr, run_ok_response }, + {CHK_POST_ACTION, {St::Error, St::InternalError, St::Stop}, + EXIT, nullptr, run_error_response }, + {CHK_POST_ACTION, {St::Drop}, EXIT, nullptr, run_drop }, + {CHK_POST_ACTION, {St::Postpone}, EXIT, nullptr, run_postpone }, + }); + +#undef ANY + +} + +TaskManager::TaskManager(const ConfigOpts& copts) : m_copts(copts) +{ + // TODO: validate options, throw exception if any mandatory options missing + initThreadPool(copts.workers_count, copts.worker_queue_len); + m_stateMachine = std::make_unique(); +} + +TaskManager::~TaskManager() +{ + +} + //pay attension, input is output and vice versa void TaskManager::sendUpstreamBlocking(Output& output, Input& input, std::string& err) { @@ -57,7 +183,7 @@ void TaskManager::onTimer(BaseTaskPtr bt) Execute(bt); } -void TaskManager::respondAndDie(BaseTaskPtr bt, const std::string& s) +void TaskManager::respondAndDie(BaseTaskPtr bt, const std::string& s, bool die) { ClientTask* ct = dynamic_cast(bt.get()); if(ct) @@ -74,7 +200,8 @@ void TaskManager::respondAndDie(BaseTaskPtr bt, const std::string& s) if (it != m_postponedTasks.end()) m_postponedTasks.erase(it); - bt->finalize(); + if(die) + bt->finalize(); } void TaskManager::schedule(PeriodicTask* pt) @@ -82,41 +209,6 @@ void TaskManager::schedule(PeriodicTask* pt) m_timerList.push(pt->getTimeout(), pt->getSelf()); } -void TaskManager::Execute(BaseTaskPtr bt) -{ - assert(m_cntJobDone <= m_cntJobSent); - if(m_cntJobSent - m_cntJobDone == m_threadPoolInputSize) - {//check overflow - bt->getCtx().local.setError("Service Unavailable", Status::Busy); - respondAndDie(bt,"Thread pool overflow"); - return; - } - assert(m_cntJobSent - m_cntJobDone < m_threadPoolInputSize); - - auto& params = bt->getParams(); - - ExecutePreAction(bt); - if(params.h3.pre_action && Status::Ok != bt->getLastStatus() && Status::Forward != bt->getLastStatus()) - { - processResult(bt); - return; - } - if(params.h3.worker_action) - { - ++m_cntJobSent; - m_threadPool->post( - GJPtr( bt, m_resQueue.get(), this ), - true - ); - } - else - { - //special case when worker_action is absent - ExecutePostAction(bt, nullptr); - processResult(bt); - } -} - bool TaskManager::canStop() { return (m_cntBaseTask == m_cntBaseTaskDone) @@ -131,15 +223,37 @@ bool TaskManager::tryProcessReadyJob() if(!res) return res; ++m_cntJobDone; BaseTaskPtr bt = gj->getTask(); - ExecutePostAction(bt, &*gj); - processResult(bt); + + LOG_PRINT_RQS_BT(2,bt,"worker_action completed with result " << bt->getStrStatus()); + m_stateMachine->dispatch(bt, StateMachine::State::WORKER_ACTION_DONE); + return true; } -void TaskManager::ExecutePreAction(BaseTaskPtr bt) +void TaskManager::Execute(BaseTaskPtr bt) +{ + m_stateMachine->dispatch(bt, StateMachine::State::EXECUTE); +} + +void TaskManager::checkThreadPoolOverflow(BaseTaskPtr bt) { auto& params = bt->getParams(); + + assert(m_cntJobDone <= m_cntJobSent); + if(params.h3.worker_action && m_cntJobSent - m_cntJobDone == m_threadPoolInputSize) + {//check overflow + bt->getCtx().local.setError("Service Unavailable", Status::Busy); + respondAndDie(bt,"Thread pool overflow"); + } + assert(m_cntJobSent - m_cntJobDone <= m_threadPoolInputSize); +} + +void TaskManager::runPreAction(BaseTaskPtr bt) +{ + auto& params = bt->getParams(); + if(!params.h3.pre_action) return; + auto& ctx = bt->getCtx(); auto& output = bt->getOutput(); @@ -170,17 +284,25 @@ void TaskManager::ExecutePreAction(BaseTaskPtr bt) LOG_PRINT_RQS_BT(3,bt,"pre_action completed with result " << bt->getStrStatus()); } -void TaskManager::ExecutePostAction(BaseTaskPtr bt, GJ* gj) +void TaskManager::runWorkerAction(BaseTaskPtr bt) { - if(gj) + auto& params = bt->getParams(); + + if(params.h3.worker_action) { - LOG_PRINT_RQS_BT(2,bt,"worker_action completed with result " << bt->getStrStatus()); + ++m_cntJobSent; + m_threadPool->post( + GJPtr( bt, m_resQueue.get(), this ), + true + ); } - //post_action if not empty, will be called in any case, even if worker_action results as some kind of error or exception. - //But, in case pre_action finishes as error both worker_action and post_action will be skipped. - //post_action has a chance to fix result of pre_action. In case of error was before it it should just return that error. +} + +//the function is called from the Thread Pool +//So pay attension this is another thread than others member functions +void TaskManager::runWorkerActionFromTheThreadPool(BaseTaskPtr bt) +{ auto& params = bt->getParams(); - if(!params.h3.post_action) return; auto& ctx = bt->getCtx(); auto& output = bt->getOutput(); @@ -188,14 +310,52 @@ void TaskManager::ExecutePostAction(BaseTaskPtr bt, GJ* gj) { // Please read the comment about exceptions and noexcept specifier // near 'void terminate()' function in main.cpp - Status status = params.h3.post_action(params.vars, params.input, ctx, output); + Status status = params.h3.worker_action(params.vars, params.input, ctx, output); bt->setLastStatus(status); - if(Status::Forward == status) + if(Status::Ok == status && params.h3.post_action || Status::Forward == status) { params.input.assign(output); } } catch(const std::exception& e) + { + ctx.local.setError(e.what()); + params.input.reset(); + throw; + } + catch(...) + { + ctx.local.setError("unknown exception"); + params.input.reset(); + throw; + } + +} + +void TaskManager::runPostAction(BaseTaskPtr bt) +{ + auto& params = bt->getParams(); + + if(!params.h3.post_action) return; + + auto& ctx = bt->getCtx(); + auto& output = bt->getOutput(); + + try + { + Status status = params.h3.post_action(params.vars, params.input, ctx, output); + //in case of pre_action or worker_action return Forward we call post_action in any case + //but we should ignore post_action result status and output + if(Status::Forward != bt->getLastStatus()) + { + bt->setLastStatus(status); + if(Status::Forward == status) + { + params.input.assign(output); + } + } + } + catch(const std::exception& e) { bt->setError(e.what()); params.input.reset(); @@ -255,46 +415,24 @@ void TaskManager::executePostponedTasks() } } -void TaskManager::processResult(BaseTaskPtr bt) +void TaskManager::processForward(BaseTaskPtr bt) { - switch(bt->getLastStatus()) - { - case Status::Forward: - { - LOG_PRINT_RQS_BT(3,bt,"Sending request to CryptoNode"); - sendUpstream(bt); - } break; - case Status::Ok: - { - Context::uuid_t nextUuid = bt->getCtx().getNextTaskId(); - if(!nextUuid.is_nil()) - { - auto it = m_postponedTasks.find(nextUuid); - assert(it != m_postponedTasks.end()); - m_readyToResume.push_back(it->second); - m_postponedTasks.erase(it); - } - respondAndDie(bt, bt->getOutput().data()); - } break; - case Status::InternalError: - case Status::Error: - case Status::Stop: - { - respondAndDie(bt, bt->getOutput().data()); - } break; - case Status::Drop: - { - respondAndDie(bt, "Job done Drop."); //TODO: Expect HTTP Error Response - } break; - case Status::Postpone: - { - postponeTask(bt); - } break; - default: + assert(Status::Forward == bt->getLastStatus()); + LOG_PRINT_RQS_BT(3,bt,"Sending request to CryptoNode"); + sendUpstream(bt); +} + +void TaskManager::processOk(BaseTaskPtr bt) +{ + Context::uuid_t nextUuid = bt->getCtx().getNextTaskId(); + if(!nextUuid.is_nil()) { - assert(false); - } break; + auto it = m_postponedTasks.find(nextUuid); + assert(it != m_postponedTasks.end()); + m_readyToResume.push_back(it->second); + m_postponedTasks.erase(it); } + respondAndDie(bt, bt->getOutput().data()); } void TaskManager::addPeriodicTask(const Router::Handler3& h3, std::chrono::milliseconds interval_ms) @@ -415,22 +553,20 @@ void TaskManager::onUpstreamDone(UpstreamSender& uss) { bt->setError(uss.getError().c_str(), uss.getStatus()); LOG_PRINT_RQS_BT(2,bt, "CryptoNode done with error: " << uss.getError().c_str()); - processResult(bt); + assert(Status::Error == bt->getLastStatus()); //Status::Error only possible value now + respondAndDie(bt, bt->getOutput().data()); + ++m_cntUpstreamSenderDone; return; } //here you can send a job to the thread pool or send response to client //uss will be destroyed on exit, save its result {//now always create a job and put it to the thread pool after CryptoNode - LOG_PRINT_RQS_BT(2,bt, "CryptoNode answered : '" << bt->getInput().body << "'"); - if(!bt->getSelf()) - {//it is possible that a client has closed connection already - ++m_cntUpstreamSenderDone; - return; - } + LOG_PRINT_RQS_BT(2,bt, "CryptoNode answered "); + if(!bt->getSelf()) return; //it is possible that a client has closed connection already Execute(bt); + ++m_cntUpstreamSenderDone; } - ++m_cntUpstreamSenderDone; //uss will be destroyed on exit } From 581c0b7061be2ea41d55bd4f4ab87a7064de2941 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Wed, 22 Aug 2018 17:08:27 +0300 Subject: [PATCH 3/4] Some lost code fixed --- src/task.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/task.cpp b/src/task.cpp index fe1a959d..2aad5156 100644 --- a/src/task.cpp +++ b/src/task.cpp @@ -563,10 +563,14 @@ void TaskManager::onUpstreamDone(UpstreamSender& uss) //uss will be destroyed on exit, save its result {//now always create a job and put it to the thread pool after CryptoNode LOG_PRINT_RQS_BT(2,bt, "CryptoNode answered "); - if(!bt->getSelf()) return; //it is possible that a client has closed connection already + if(!bt->getSelf()) + {//it is possible that a client has closed connection already + ++m_cntUpstreamSenderDone; + return; + } Execute(bt); - ++m_cntUpstreamSenderDone; } + ++m_cntUpstreamSenderDone; //uss will be destroyed on exit } From a5baf9531a5cb44ca95d1540d989909267476c65 Mon Sep 17 00:00:00 2001 From: Alexander Suprunenko Date: Tue, 28 Aug 2018 04:19:58 +0300 Subject: [PATCH 4/4] State machine columns added --- include/state_machine.h | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/include/state_machine.h b/include/state_machine.h index af1a6509..dc198f3f 100644 --- a/include/state_machine.h +++ b/include/state_machine.h @@ -55,9 +55,9 @@ class StateMachine final for(auto& r : m_table) { - if(m_state != std::get<0>(r)) continue; + if(m_state != std::get(r)) continue; - Statuses& ss = std::get<1>(r); + Statuses& ss = std::get(r); if(ss.size()!=0) { bool res = false; @@ -72,12 +72,12 @@ class StateMachine final if(!res) continue; } - Guard& g = std::get<3>(r); + Guard& g = std::get(r); if(g && !g(bt)) continue; - Action& a = std::get<4>(r); + Action& a = std::get(r); if(a) a(bt); - m_state = std::get<2>(r); + m_state = std::get(r); return; } throw std::runtime_error("State machine table is not complete"); @@ -90,7 +90,8 @@ class StateMachine final const Guard hasnt(Router::Handler H3::* act); State m_state; - using row = std::tuple; + enum columns { smStateStart, smStatuses, smStateEnd, smGuard, smAction}; + using row = std::tuple< State, Statuses, State, Guard, Action >; std::vector m_table; };