Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The State Machine implemented #92

Merged
merged 4 commits into from Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions include/graft_constants.h
Expand Up @@ -7,6 +7,7 @@ namespace graft
EXP(None) \
EXP(Ok) \
EXP(Forward) \
EXP(Again) \
EXP(Error) \
EXP(Drop) \
EXP(Busy) \
Expand Down
98 changes: 98 additions & 0 deletions include/state_machine.h
@@ -0,0 +1,98 @@
#pragma once

#include "task.h"

namespace graft
{

class StateMachine final
{
public:
enum State
{
EXECUTE,
PRE_ACTION,
CHK_PRE_ACTION,
WORKER_ACTION,
CHK_WORKER_ACTION,
WORKER_ACTION_DONE,
POST_ACTION,
CHK_POST_ACTION,
AGAIN,
EXIT,
};

using St = graft::Status;
using Statuses = std::initializer_list<graft::Status>;
using Guard = std::function<bool (BaseTaskPtr bt)>;
using Action = std::function<void (BaseTaskPtr bt)>;

StateMachine(State initial_state = EXECUTE)
{
state(initial_state);
init_table();
}

void dispatch(BaseTaskPtr bt, State initial_state)
{
// state(State(initial_state));
state(initial_state);
while(state() != EXIT)
{
process(bt);
}
}

private:
void init_table();
State state() const { return m_state; }
State state(State state) { return m_state = state; }
St status(BaseTaskPtr bt) const { return bt->getLastStatus(); }

void process(BaseTaskPtr bt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need some logging here??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. On one hand the messages would accompany others logging messages. On the other, they would be useful if we forgot to log such others, and they should be of special categorie, which we have no yet, to not overwhelm the log.

{
St cur_stat = status(bt);

for(auto& r : m_table)
{
if(m_state != std::get<0>(r)) continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid these magic numbers? I mean all these get.
Enum wouldn't be more clear?


Statuses& ss = std::get<1>(r);
if(ss.size()!=0)
{
bool res = false;
for(auto s : ss)
{
if(s == cur_stat)
{
res = true;
break;
}
}
if(!res) continue;
}

Guard& g = std::get<3>(r);
if(g && !g(bt)) continue;

Action& a = std::get<4>(r);
if(a) a(bt);
m_state = std::get<2>(r);
return;
}
throw std::runtime_error("State machine table is not complete");
}
private:

using H3 = Router::Handler3;

const Guard has(Router::Handler H3::* act);
const Guard hasnt(Router::Handler H3::* act);

State m_state;
using row = std::tuple<State, Statuses, State, Guard, Action>;
std::vector<row> m_table;
};

}//namespace graft

28 changes: 17 additions & 11 deletions include/task.h
Expand Up @@ -202,16 +202,13 @@ class ClientTask : public BaseTask
ConnectionManager* m_connectionManager;
};

class StateMachine;

class TaskManager
{
public:
TaskManager(const ConfigOpts& copts)
: m_copts(copts)
{
// TODO: validate options, throw exception if any mandatory options missing
initThreadPool(copts.workers_count, copts.worker_queue_len);
}
virtual ~TaskManager() { }
TaskManager(const ConfigOpts& copts);
virtual ~TaskManager();

void sendUpstream(BaseTaskPtr bt);
void addPeriodicTask(const Router::Handler3& h3, std::chrono::milliseconds interval_ms);
Expand All @@ -236,6 +233,8 @@ class TaskManager

static void sendUpstreamBlocking(Output& output, Input& input, std::string& err);

void runWorkerActionFromTheThreadPool(BaseTaskPtr bt);

virtual void notifyJobReady() = 0;

void cb_event(uint64_t cnt);
Expand All @@ -247,13 +246,17 @@ class TaskManager

ConfigOpts m_copts;
private:
void ExecutePreAction(BaseTaskPtr bt);
void ExecutePostAction(BaseTaskPtr bt, GJ* gj = nullptr); //gj equals nullptr if threadPool was skipped for some reasons
void Execute(BaseTaskPtr bt);
void processResult(BaseTaskPtr bt);
void respondAndDie(BaseTaskPtr bt, const std::string& s);
void processForward(BaseTaskPtr bt);
void processOk(BaseTaskPtr bt);
void respondAndDie(BaseTaskPtr bt, const std::string& s, bool die = true);
void postponeTask(BaseTaskPtr bt);

void checkThreadPoolOverflow(BaseTaskPtr bt);
void runPreAction(BaseTaskPtr bt);
void runWorkerAction(BaseTaskPtr bt);
void runPostAction(BaseTaskPtr bt);

void initThreadPool(int threadCount = std::thread::hardware_concurrency(), int workersQueueSize = 32);
bool tryProcessReadyJob();

Expand Down Expand Up @@ -281,6 +284,9 @@ class TaskManager
std::unique_ptr<PromiseQueue> m_promiseQueue;
static thread_local bool io_thread;
static TaskManager* g_upstreamManager;

friend class StateMachine;
std::unique_ptr<StateMachine> m_stateMachine;
};

}//namespace graft
Expand Down
33 changes: 3 additions & 30 deletions include/thread_pool.h
Expand Up @@ -39,37 +39,10 @@ class GraftJob
//main payload
virtual void operator () ()
{
{
decltype(auto) vars_cref = m_bt->getVars();
decltype(auto) input_ref = m_bt->getInput();
decltype(auto) output_ref = m_bt->getOutput();
decltype(auto) h3_ref = m_bt->getHandler3();
decltype(auto) ctx = m_bt->getCtx();
// Please read the comment about exceptions and noexcept specifier
// near 'void terminate()' function in main.cpp
m_bt->getManager().runWorkerActionFromTheThreadPool(m_bt);

try
{
// Please read the comment about exceptions and noexcept specifier
// near 'void terminate()' function in main.cpp
Status status = h3_ref.worker_action(vars_cref, input_ref, ctx, output_ref);
Context::LocalFriend::setLastStatus(ctx.local, status);
if(Status::Ok == status && h3_ref.post_action || Status::Forward == status)
{
input_ref.assign(output_ref);
}
}
catch(const std::exception& e)
{
ctx.local.setError(e.what());
input_ref.reset();
throw;
}
catch(...)
{
ctx.local.setError("unknown exception");
input_ref.reset();
throw;
}
}
Watcher* save_m_watcher = m_watcher; //save m_watcher before move itself into resulting queue
m_rq->push(std::move(*this)); //similar to "delete this;"
save_m_watcher->notifyJobReady();
Expand Down