Skip to content

Commit

Permalink
Fix race in plugin manager/services
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx authored and dennisklein committed Aug 8, 2018
1 parent a53ef79 commit ee8afd7
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 247 deletions.
1 change: 1 addition & 0 deletions fairmq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ target_link_libraries(FairMQ
INTERFACE # only consumers link against interface dependencies

PUBLIC # libFairMQ AND consumers of libFairMQ link aginst public dependencies
pthread
dl
Boost::boost
Boost::program_options
Expand Down
18 changes: 9 additions & 9 deletions fairmq/DeviceRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
using namespace fair::mq;

DeviceRunner::DeviceRunner(int argc, char* const argv[])
: fRawCmdLineArgs(tools::ToStrVector(argc, argv, false))
, fPluginManager(PluginManager::MakeFromCommandLineOptions(fRawCmdLineArgs))
: fDevice(nullptr)
, fRawCmdLineArgs(tools::ToStrVector(argc, argv, false))
, fConfig()
, fDevice(nullptr)
, fPluginManager(fRawCmdLineArgs)
, fEvents()
{}

Expand All @@ -27,16 +27,16 @@ auto DeviceRunner::Run() -> int
////////////////////////

// Load builtin plugins last
fPluginManager->LoadPlugin("s:control");
fPluginManager.LoadPlugin("s:control");

////// CALL HOOK ///////
fEvents.Emit<hooks::SetCustomCmdLineOptions>(*this);
////////////////////////

fPluginManager->ForEachPluginProgOptions([&](boost::program_options::options_description options){
fPluginManager.ForEachPluginProgOptions([&](boost::program_options::options_description options){
fConfig.AddToCmdLineOptions(options);
});
fConfig.AddToCmdLineOptions(fPluginManager->ProgramOptions());
fConfig.AddToCmdLineOptions(fPluginManager.ProgramOptions());

////// CALL HOOK ///////
fEvents.Emit<hooks::ModifyRawCmdLineArgs>(*this);
Expand Down Expand Up @@ -83,16 +83,16 @@ auto DeviceRunner::Run() -> int
fDevice->SetConfig(fConfig);

// Initialize plugin services
fPluginManager->EmplacePluginServices(&fConfig, fDevice);
fPluginManager.EmplacePluginServices(&fConfig, *fDevice);

// Instantiate and run plugins
fPluginManager->InstantiatePlugins();
fPluginManager.InstantiatePlugins();

// Run the device
fDevice->RunStateMachine();

// Wait for control plugin to release device control
fPluginManager->WaitForPluginsToReleaseDeviceControl();
fPluginManager.WaitForPluginsToReleaseDeviceControl();

return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions fairmq/DeviceRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ class DeviceRunner
template<typename H>
auto RemoveHook() -> void { fEvents.Unsubscribe<H>("runner"); }

std::unique_ptr<FairMQDevice> fDevice;
std::vector<std::string> fRawCmdLineArgs;
std::shared_ptr<PluginManager> fPluginManager;
FairMQProgOptions fConfig;
std::shared_ptr<FairMQDevice> fDevice;
PluginManager fPluginManager;

private:
EventManager fEvents;
Expand Down
172 changes: 46 additions & 126 deletions fairmq/FairMQStateMachine.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -119,42 +119,44 @@ static array<string, 12> eventNames =

static map<string, int> stateNumbers =
{
{ "OK", FairMQStateMachine::State::OK },
{ "Error", FairMQStateMachine::State::Error },
{ "IDLE", FairMQStateMachine::State::IDLE },
{ "OK", FairMQStateMachine::State::OK },
{ "Error", FairMQStateMachine::State::Error },
{ "IDLE", FairMQStateMachine::State::IDLE },
{ "INITIALIZING_DEVICE", FairMQStateMachine::State::INITIALIZING_DEVICE },
{ "DEVICE_READY", FairMQStateMachine::State::DEVICE_READY },
{ "INITIALIZING_TASK", FairMQStateMachine::State::INITIALIZING_TASK },
{ "READY", FairMQStateMachine::State::READY },
{ "RUNNING", FairMQStateMachine::State::RUNNING },
{ "PAUSED", FairMQStateMachine::State::PAUSED },
{ "RESETTING_TASK", FairMQStateMachine::State::RESETTING_TASK },
{ "RESETTING_DEVICE", FairMQStateMachine::State::RESETTING_DEVICE },
{ "EXITING", FairMQStateMachine::State::EXITING }
{ "DEVICE_READY", FairMQStateMachine::State::DEVICE_READY },
{ "INITIALIZING_TASK", FairMQStateMachine::State::INITIALIZING_TASK },
{ "READY", FairMQStateMachine::State::READY },
{ "RUNNING", FairMQStateMachine::State::RUNNING },
{ "PAUSED", FairMQStateMachine::State::PAUSED },
{ "RESETTING_TASK", FairMQStateMachine::State::RESETTING_TASK },
{ "RESETTING_DEVICE", FairMQStateMachine::State::RESETTING_DEVICE },
{ "EXITING", FairMQStateMachine::State::EXITING }
};

static map<string, int> eventNumbers =
{
{ "INIT_DEVICE", FairMQStateMachine::Event::INIT_DEVICE },
{ "INIT_DEVICE", FairMQStateMachine::Event::INIT_DEVICE },
{ "internal_DEVICE_READY", FairMQStateMachine::Event::internal_DEVICE_READY },
{ "INIT_TASK", FairMQStateMachine::Event::INIT_TASK },
{ "internal_READY", FairMQStateMachine::Event::internal_READY },
{ "RUN", FairMQStateMachine::Event::RUN },
{ "PAUSE", FairMQStateMachine::Event::PAUSE },
{ "STOP", FairMQStateMachine::Event::STOP },
{ "RESET_TASK", FairMQStateMachine::Event::RESET_TASK },
{ "RESET_DEVICE", FairMQStateMachine::Event::RESET_DEVICE },
{ "internal_IDLE", FairMQStateMachine::Event::internal_IDLE },
{ "END", FairMQStateMachine::Event::END },
{ "ERROR_FOUND", FairMQStateMachine::Event::ERROR_FOUND }
{ "INIT_TASK", FairMQStateMachine::Event::INIT_TASK },
{ "internal_READY", FairMQStateMachine::Event::internal_READY },
{ "RUN", FairMQStateMachine::Event::RUN },
{ "PAUSE", FairMQStateMachine::Event::PAUSE },
{ "STOP", FairMQStateMachine::Event::STOP },
{ "RESET_TASK", FairMQStateMachine::Event::RESET_TASK },
{ "RESET_DEVICE", FairMQStateMachine::Event::RESET_DEVICE },
{ "internal_IDLE", FairMQStateMachine::Event::internal_IDLE },
{ "END", FairMQStateMachine::Event::END },
{ "ERROR_FOUND", FairMQStateMachine::Event::ERROR_FOUND }
};

// defining the boost MSM state machine
struct Machine_ : public state_machine_def<Machine_>
{
public:
Machine_()
: fWork()
: fUnblockHandler()
, fStateHandlers()
, fWork()
, fWorkAvailableCondition()
, fWorkDoneCondition()
, fWorkMutex()
Expand Down Expand Up @@ -198,48 +200,10 @@ struct Machine_ : public state_machine_def<Machine_>
}
};

struct InitDeviceFct
struct DefaultFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = ts.Type();

unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fInitWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};

struct InitTaskFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = ts.Type();

unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fInitTaskWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};

struct RunFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = ts.Type();

Expand All @@ -250,7 +214,7 @@ struct Machine_ : public state_machine_def<Machine_>
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fRunWrapperHandler;
fsm.fWork = fsm.fStateHandlers.at(e.Type());
fsm.fWorkAvailableCondition.notify_one();
}
};
Expand Down Expand Up @@ -303,48 +267,10 @@ struct Machine_ : public state_machine_def<Machine_>
}
};

struct ResetTaskFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = ts.Type();

unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fResetTaskWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};

struct ResetDeviceFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
fsm.fState = ts.Type();

unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fWork = fsm.fResetWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};

struct ExitingFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts)
void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts)
{
LOG(state) << "Entering " << ts.Name() << " state";
fsm.fState = ts.Type();
Expand All @@ -357,7 +283,7 @@ struct Machine_ : public state_machine_def<Machine_>
fsm.fWorkAvailableCondition.notify_one();
}

fsm.fExitHandler();
fsm.fStateHandlers.at(e.Type())();
}
};

Expand All @@ -375,18 +301,18 @@ struct Machine_ : public state_machine_def<Machine_>
// Transition table for Machine_
struct transition_table : boost::mpl::vector<
// Start Event Next Action Guard
Row<IDLE_FSM_STATE, INIT_DEVICE_FSM_EVENT, INITIALIZING_DEVICE_FSM_STATE, InitDeviceFct, none>,
Row<IDLE_FSM_STATE, INIT_DEVICE_FSM_EVENT, INITIALIZING_DEVICE_FSM_STATE, DefaultFct, none>,
Row<IDLE_FSM_STATE, END_FSM_EVENT, EXITING_FSM_STATE, ExitingFct, none>,
Row<INITIALIZING_DEVICE_FSM_STATE, internal_DEVICE_READY_FSM_EVENT, DEVICE_READY_FSM_STATE, AutomaticFct, none>,
Row<DEVICE_READY_FSM_STATE, INIT_TASK_FSM_EVENT, INITIALIZING_TASK_FSM_STATE, InitTaskFct, none>,
Row<DEVICE_READY_FSM_STATE, RESET_DEVICE_FSM_EVENT, RESETTING_DEVICE_FSM_STATE, ResetDeviceFct, none>,
Row<DEVICE_READY_FSM_STATE, INIT_TASK_FSM_EVENT, INITIALIZING_TASK_FSM_STATE, DefaultFct, none>,
Row<DEVICE_READY_FSM_STATE, RESET_DEVICE_FSM_EVENT, RESETTING_DEVICE_FSM_STATE, DefaultFct, none>,
Row<INITIALIZING_TASK_FSM_STATE, internal_READY_FSM_EVENT, READY_FSM_STATE, AutomaticFct, none>,
Row<READY_FSM_STATE, RUN_FSM_EVENT, RUNNING_FSM_STATE, RunFct, none>,
Row<READY_FSM_STATE, RESET_TASK_FSM_EVENT, RESETTING_TASK_FSM_STATE, ResetTaskFct, none>,
Row<RUNNING_FSM_STATE, PAUSE_FSM_EVENT, PAUSED_FSM_STATE, PauseFct, none>,
Row<READY_FSM_STATE, RUN_FSM_EVENT, RUNNING_FSM_STATE, DefaultFct, none>,
Row<READY_FSM_STATE, RESET_TASK_FSM_EVENT, RESETTING_TASK_FSM_STATE, DefaultFct, none>,
Row<RUNNING_FSM_STATE, PAUSE_FSM_EVENT, PAUSED_FSM_STATE, DefaultFct, none>,
Row<RUNNING_FSM_STATE, STOP_FSM_EVENT, READY_FSM_STATE, StopFct, none>,
Row<RUNNING_FSM_STATE, internal_READY_FSM_EVENT, READY_FSM_STATE, InternalStopFct, none>,
Row<PAUSED_FSM_STATE, RUN_FSM_EVENT, RUNNING_FSM_STATE, RunFct, none>,
Row<PAUSED_FSM_STATE, RUN_FSM_EVENT, RUNNING_FSM_STATE, DefaultFct, none>,
Row<RESETTING_TASK_FSM_STATE, internal_DEVICE_READY_FSM_EVENT, DEVICE_READY_FSM_STATE, AutomaticFct, none>,
Row<RESETTING_DEVICE_FSM_STATE, internal_IDLE_FSM_EVENT, IDLE_FSM_STATE, AutomaticFct, none>,
Row<OK_FSM_STATE, ERROR_FOUND_FSM_EVENT, ERROR_FSM_STATE, ErrorFoundFct, none>>
Expand Down Expand Up @@ -425,14 +351,8 @@ struct Machine_ : public state_machine_def<Machine_>
}
}

function<void(void)> fInitWrapperHandler;
function<void(void)> fInitTaskWrapperHandler;
function<void(void)> fRunWrapperHandler;
function<void(void)> fPauseWrapperHandler;
function<void(void)> fResetWrapperHandler;
function<void(void)> fResetTaskWrapperHandler;
function<void(void)> fExitHandler;
function<void(void)> fUnblockHandler;
unordered_map<FairMQStateMachine::Event, function<void(void)>> fStateHandlers;

// function to execute user states in a worker thread
function<void(void)> fWork;
Expand All @@ -457,7 +377,7 @@ struct Machine_ : public state_machine_def<Machine_>
// Wait for work to be done.
while (!fWorkAvailable && !fWorkerTerminated)
{
fWorkAvailableCondition.wait_for(lock, chrono::milliseconds(300));
fWorkAvailableCondition.wait_for(lock, chrono::milliseconds(100));
}

if (fWorkerTerminated)
Expand Down Expand Up @@ -493,13 +413,13 @@ FairMQStateMachine::FairMQStateMachine()
: fChangeStateMutex()
, fFsm(new FairMQFSM)
{
static_pointer_cast<FairMQFSM>(fFsm)->fInitWrapperHandler = bind(&FairMQStateMachine::InitWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fInitTaskWrapperHandler = bind(&FairMQStateMachine::InitTaskWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fRunWrapperHandler = bind(&FairMQStateMachine::RunWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fPauseWrapperHandler = bind(&FairMQStateMachine::PauseWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fResetWrapperHandler = bind(&FairMQStateMachine::ResetWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fResetTaskWrapperHandler = bind(&FairMQStateMachine::ResetTaskWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fExitHandler = bind(&FairMQStateMachine::Exit, this);
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(INIT_DEVICE, bind(&FairMQStateMachine::InitWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(INIT_TASK, bind(&FairMQStateMachine::InitTaskWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(RUN, bind(&FairMQStateMachine::RunWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(PAUSE, bind(&FairMQStateMachine::PauseWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(RESET_TASK, bind(&FairMQStateMachine::ResetTaskWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(RESET_DEVICE, bind(&FairMQStateMachine::ResetWrapper, this));
static_pointer_cast<FairMQFSM>(fFsm)->fStateHandlers.emplace(END, bind(&FairMQStateMachine::Exit, this));
static_pointer_cast<FairMQFSM>(fFsm)->fUnblockHandler = bind(&FairMQStateMachine::Unblock, this);

static_pointer_cast<FairMQFSM>(fFsm)->start();
Expand Down
Loading

0 comments on commit ee8afd7

Please sign in to comment.