diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 5817fdf98..0d57a6af6 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -223,6 +223,7 @@ target_link_libraries(FairMQ INTERFACE # only consumers link against interface dependencies PUBLIC # libFairMQ AND consumers of libFairMQ link aginst public dependencies + pthread dl Boost::boost Boost::program_options diff --git a/fairmq/DeviceRunner.cxx b/fairmq/DeviceRunner.cxx index 65553b011..3b06adf57 100644 --- a/fairmq/DeviceRunner.cxx +++ b/fairmq/DeviceRunner.cxx @@ -13,10 +13,10 @@ using namespace fair::mq; DeviceRunner::DeviceRunner(int argc, char* const argv[]) - : fRawCmdLineArgs(tools::ToStrVector(argc, argv, false)) - , fPluginManager(PluginManager::MakeFromCommandLineOptions(fRawCmdLineArgs)) + : fDevice(nullptr) + , fRawCmdLineArgs(tools::ToStrVector(argc, argv, false)) , fConfig() - , fDevice(nullptr) + , fPluginManager(fRawCmdLineArgs) , fEvents() {} @@ -27,16 +27,16 @@ auto DeviceRunner::Run() -> int //////////////////////// // Load builtin plugins last - fPluginManager->LoadPlugin("s:control"); + fPluginManager.LoadPlugin("s:control"); ////// CALL HOOK /////// fEvents.Emit(*this); //////////////////////// - fPluginManager->ForEachPluginProgOptions([&](boost::program_options::options_description options){ + fPluginManager.ForEachPluginProgOptions([&](boost::program_options::options_description options){ fConfig.AddToCmdLineOptions(options); }); - fConfig.AddToCmdLineOptions(fPluginManager->ProgramOptions()); + fConfig.AddToCmdLineOptions(fPluginManager.ProgramOptions()); ////// CALL HOOK /////// fEvents.Emit(*this); @@ -83,16 +83,16 @@ auto DeviceRunner::Run() -> int fDevice->SetConfig(fConfig); // Initialize plugin services - fPluginManager->EmplacePluginServices(&fConfig, fDevice); + fPluginManager.EmplacePluginServices(&fConfig, *fDevice); // Instantiate and run plugins - fPluginManager->InstantiatePlugins(); + fPluginManager.InstantiatePlugins(); // Run the device fDevice->RunStateMachine(); // Wait for control plugin to release device control - fPluginManager->WaitForPluginsToReleaseDeviceControl(); + fPluginManager.WaitForPluginsToReleaseDeviceControl(); return 0; } diff --git a/fairmq/DeviceRunner.h b/fairmq/DeviceRunner.h index 77794c0fc..07bcee505 100644 --- a/fairmq/DeviceRunner.h +++ b/fairmq/DeviceRunner.h @@ -61,10 +61,10 @@ class DeviceRunner template auto RemoveHook() -> void { fEvents.Unsubscribe("runner"); } + std::unique_ptr fDevice; std::vector fRawCmdLineArgs; - std::shared_ptr fPluginManager; FairMQProgOptions fConfig; - std::shared_ptr fDevice; + PluginManager fPluginManager; private: EventManager fEvents; diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index 3fa96fd4a..72d41064f 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -119,34 +119,34 @@ static array eventNames = static map stateNumbers = { - { "OK", FairMQStateMachine::State::OK }, - { "Error", FairMQStateMachine::State::Error }, - { "IDLE", FairMQStateMachine::State::IDLE }, + { "OK", FairMQStateMachine::State::OK }, + { "Error", FairMQStateMachine::State::Error }, + { "IDLE", FairMQStateMachine::State::IDLE }, { "INITIALIZING_DEVICE", FairMQStateMachine::State::INITIALIZING_DEVICE }, - { "DEVICE_READY", FairMQStateMachine::State::DEVICE_READY }, - { "INITIALIZING_TASK", FairMQStateMachine::State::INITIALIZING_TASK }, - { "READY", FairMQStateMachine::State::READY }, - { "RUNNING", FairMQStateMachine::State::RUNNING }, - { "PAUSED", FairMQStateMachine::State::PAUSED }, - { "RESETTING_TASK", FairMQStateMachine::State::RESETTING_TASK }, - { "RESETTING_DEVICE", FairMQStateMachine::State::RESETTING_DEVICE }, - { "EXITING", FairMQStateMachine::State::EXITING } + { "DEVICE_READY", FairMQStateMachine::State::DEVICE_READY }, + { "INITIALIZING_TASK", FairMQStateMachine::State::INITIALIZING_TASK }, + { "READY", FairMQStateMachine::State::READY }, + { "RUNNING", FairMQStateMachine::State::RUNNING }, + { "PAUSED", FairMQStateMachine::State::PAUSED }, + { "RESETTING_TASK", FairMQStateMachine::State::RESETTING_TASK }, + { "RESETTING_DEVICE", FairMQStateMachine::State::RESETTING_DEVICE }, + { "EXITING", FairMQStateMachine::State::EXITING } }; static map eventNumbers = { - { "INIT_DEVICE", FairMQStateMachine::Event::INIT_DEVICE }, + { "INIT_DEVICE", FairMQStateMachine::Event::INIT_DEVICE }, { "internal_DEVICE_READY", FairMQStateMachine::Event::internal_DEVICE_READY }, - { "INIT_TASK", FairMQStateMachine::Event::INIT_TASK }, - { "internal_READY", FairMQStateMachine::Event::internal_READY }, - { "RUN", FairMQStateMachine::Event::RUN }, - { "PAUSE", FairMQStateMachine::Event::PAUSE }, - { "STOP", FairMQStateMachine::Event::STOP }, - { "RESET_TASK", FairMQStateMachine::Event::RESET_TASK }, - { "RESET_DEVICE", FairMQStateMachine::Event::RESET_DEVICE }, - { "internal_IDLE", FairMQStateMachine::Event::internal_IDLE }, - { "END", FairMQStateMachine::Event::END }, - { "ERROR_FOUND", FairMQStateMachine::Event::ERROR_FOUND } + { "INIT_TASK", FairMQStateMachine::Event::INIT_TASK }, + { "internal_READY", FairMQStateMachine::Event::internal_READY }, + { "RUN", FairMQStateMachine::Event::RUN }, + { "PAUSE", FairMQStateMachine::Event::PAUSE }, + { "STOP", FairMQStateMachine::Event::STOP }, + { "RESET_TASK", FairMQStateMachine::Event::RESET_TASK }, + { "RESET_DEVICE", FairMQStateMachine::Event::RESET_DEVICE }, + { "internal_IDLE", FairMQStateMachine::Event::internal_IDLE }, + { "END", FairMQStateMachine::Event::END }, + { "ERROR_FOUND", FairMQStateMachine::Event::ERROR_FOUND } }; // defining the boost MSM state machine @@ -154,7 +154,9 @@ struct Machine_ : public state_machine_def { public: Machine_() - : fWork() + : fUnblockHandler() + , fStateHandlers() + , fWork() , fWorkAvailableCondition() , fWorkDoneCondition() , fWorkMutex() @@ -198,48 +200,10 @@ struct Machine_ : public state_machine_def } }; - struct InitDeviceFct + struct DefaultFct { template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - fsm.fState = ts.Type(); - - unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering " << ts.Name() << " state"; - fsm.fWork = fsm.fInitWrapperHandler; - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct InitTaskFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - fsm.fState = ts.Type(); - - unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering " << ts.Name() << " state"; - fsm.fWork = fsm.fInitTaskWrapperHandler; - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct RunFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) + void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts) { fsm.fState = ts.Type(); @@ -250,7 +214,7 @@ struct Machine_ : public state_machine_def } fsm.fWorkAvailable = true; LOG(state) << "Entering " << ts.Name() << " state"; - fsm.fWork = fsm.fRunWrapperHandler; + fsm.fWork = fsm.fStateHandlers.at(e.Type()); fsm.fWorkAvailableCondition.notify_one(); } }; @@ -303,48 +267,10 @@ struct Machine_ : public state_machine_def } }; - struct ResetTaskFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - fsm.fState = ts.Type(); - - unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering " << ts.Name() << " state"; - fsm.fWork = fsm.fResetTaskWrapperHandler; - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct ResetDeviceFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) - { - fsm.fState = ts.Type(); - - unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering " << ts.Name() << " state"; - fsm.fWork = fsm.fResetWrapperHandler; - fsm.fWorkAvailableCondition.notify_one(); - } - }; - struct ExitingFct { template - void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) + void operator()(EVT const& e, FSM& fsm, SourceState& /* ss */, TargetState& ts) { LOG(state) << "Entering " << ts.Name() << " state"; fsm.fState = ts.Type(); @@ -357,7 +283,7 @@ struct Machine_ : public state_machine_def fsm.fWorkAvailableCondition.notify_one(); } - fsm.fExitHandler(); + fsm.fStateHandlers.at(e.Type())(); } }; @@ -375,18 +301,18 @@ struct Machine_ : public state_machine_def // Transition table for Machine_ struct transition_table : boost::mpl::vector< // Start Event Next Action Guard - Row, + Row, Row, Row, - Row, - Row, + Row, + Row, Row, - Row, - Row, - Row, + Row, + Row, + Row, Row, Row, - Row, + Row, Row, Row, Row> @@ -425,14 +351,8 @@ struct Machine_ : public state_machine_def } } - function fInitWrapperHandler; - function fInitTaskWrapperHandler; - function fRunWrapperHandler; - function fPauseWrapperHandler; - function fResetWrapperHandler; - function fResetTaskWrapperHandler; - function fExitHandler; function fUnblockHandler; + unordered_map> fStateHandlers; // function to execute user states in a worker thread function fWork; @@ -457,7 +377,7 @@ struct Machine_ : public state_machine_def // Wait for work to be done. while (!fWorkAvailable && !fWorkerTerminated) { - fWorkAvailableCondition.wait_for(lock, chrono::milliseconds(300)); + fWorkAvailableCondition.wait_for(lock, chrono::milliseconds(100)); } if (fWorkerTerminated) @@ -493,13 +413,13 @@ FairMQStateMachine::FairMQStateMachine() : fChangeStateMutex() , fFsm(new FairMQFSM) { - static_pointer_cast(fFsm)->fInitWrapperHandler = bind(&FairMQStateMachine::InitWrapper, this); - static_pointer_cast(fFsm)->fInitTaskWrapperHandler = bind(&FairMQStateMachine::InitTaskWrapper, this); - static_pointer_cast(fFsm)->fRunWrapperHandler = bind(&FairMQStateMachine::RunWrapper, this); - static_pointer_cast(fFsm)->fPauseWrapperHandler = bind(&FairMQStateMachine::PauseWrapper, this); - static_pointer_cast(fFsm)->fResetWrapperHandler = bind(&FairMQStateMachine::ResetWrapper, this); - static_pointer_cast(fFsm)->fResetTaskWrapperHandler = bind(&FairMQStateMachine::ResetTaskWrapper, this); - static_pointer_cast(fFsm)->fExitHandler = bind(&FairMQStateMachine::Exit, this); + static_pointer_cast(fFsm)->fStateHandlers.emplace(INIT_DEVICE, bind(&FairMQStateMachine::InitWrapper, this)); + static_pointer_cast(fFsm)->fStateHandlers.emplace(INIT_TASK, bind(&FairMQStateMachine::InitTaskWrapper, this)); + static_pointer_cast(fFsm)->fStateHandlers.emplace(RUN, bind(&FairMQStateMachine::RunWrapper, this)); + static_pointer_cast(fFsm)->fStateHandlers.emplace(PAUSE, bind(&FairMQStateMachine::PauseWrapper, this)); + static_pointer_cast(fFsm)->fStateHandlers.emplace(RESET_TASK, bind(&FairMQStateMachine::ResetTaskWrapper, this)); + static_pointer_cast(fFsm)->fStateHandlers.emplace(RESET_DEVICE, bind(&FairMQStateMachine::ResetWrapper, this)); + static_pointer_cast(fFsm)->fStateHandlers.emplace(END, bind(&FairMQStateMachine::Exit, this)); static_pointer_cast(fFsm)->fUnblockHandler = bind(&FairMQStateMachine::Unblock, this); static_pointer_cast(fFsm)->start(); diff --git a/fairmq/PluginManager.cxx b/fairmq/PluginManager.cxx index e519bc1ee..542aa70d9 100644 --- a/fairmq/PluginManager.cxx +++ b/fairmq/PluginManager.cxx @@ -31,11 +31,53 @@ const std::string fair::mq::PluginManager::fgkLibPrefix = "FairMQPlugin_"; fair::mq::PluginManager::PluginManager() : fSearchPaths{{"."}} , fPluginFactories() + , fPluginServices() , fPlugins() , fPluginOrder() , fPluginProgOptions() +{ +} + +fair::mq::PluginManager::PluginManager(const vector args) + : fSearchPaths{{"."}} + , fPluginFactories() , fPluginServices() + , fPlugins() + , fPluginOrder() + , fPluginProgOptions() { + // Parse command line options + auto options = ProgramOptions(); + auto vm = po::variables_map{}; + try + { + auto parsed = po::command_line_parser(args).options(options).allow_unregistered().run(); + po::store(parsed, vm); + po::notify(vm); + } catch (const po::error& e) + { + throw ProgramOptionsParseError{ToString("Error occured while parsing the 'Plugin Manager' program options: ", e.what())}; + } + + // Process plugin search paths + auto append = vector{}; + auto prepend = vector{}; + auto searchPaths = vector{}; + if (vm.count("plugin-search-path")) + { + for (const auto& path : vm["plugin-search-path"].as>()) + { + if (path.substr(0, 1) == "<") { prepend.emplace_back(path.substr(1)); } + else if (path.substr(0, 1) == ">") { append.emplace_back(path.substr(1)); } + else { searchPaths.emplace_back(path); } + } + } + + // Set supplied options + SetSearchPaths(searchPaths); + for(const auto& path : prepend) { PrependSearchPath(path); } + for(const auto& path : append) { AppendSearchPath(path); } + if (vm.count("plugin")) { LoadPlugins(vm["plugin"].as>()); } } auto fair::mq::PluginManager::ValidateSearchPath(const fs::path& path) -> void @@ -81,46 +123,6 @@ auto fair::mq::PluginManager::ProgramOptions() -> po::options_description return plugin_options; } -auto fair::mq::PluginManager::MakeFromCommandLineOptions(const vector args) -> shared_ptr -{ - // Parse command line options - auto options = ProgramOptions(); - auto vm = po::variables_map{}; - try - { - auto parsed = po::command_line_parser(args).options(options).allow_unregistered().run(); - po::store(parsed, vm); - po::notify(vm); - } catch (const po::error& e) - { - throw ProgramOptionsParseError{ToString("Error occured while parsing the 'Plugin Manager' program options: ", e.what())}; - } - - // Process plugin search paths - auto append = vector{}; - auto prepend = vector{}; - auto searchPaths = vector{}; - if (vm.count("plugin-search-path")) - { - for (const auto& path : vm["plugin-search-path"].as>()) - { - if (path.substr(0, 1) == "<") { prepend.emplace_back(path.substr(1)); } - else if (path.substr(0, 1) == ">") { append.emplace_back(path.substr(1)); } - else { searchPaths.emplace_back(path); } - } - } - - // Create PluginManager with supplied options - auto mgr = make_shared(); - mgr->SetSearchPaths(searchPaths); - for(const auto& path : prepend) { mgr->PrependSearchPath(path); } - for(const auto& path : append) { mgr->AppendSearchPath(path); } - if (vm.count("plugin")) { mgr->LoadPlugins(vm["plugin"].as>()); } - - // Return the plugin manager and command line options, that have not been recognized. - return mgr; -} - auto fair::mq::PluginManager::LoadPlugin(const string& pluginName) -> void { if (pluginName.substr(0,2) == "p:") diff --git a/fairmq/PluginManager.h b/fairmq/PluginManager.h index 36a65ad19..4fcda8fdf 100644 --- a/fairmq/PluginManager.h +++ b/fairmq/PluginManager.h @@ -47,9 +47,10 @@ namespace mq class PluginManager { public: - using PluginFactory = std::shared_ptr(PluginServices&); + using PluginFactory = std::unique_ptr(PluginServices&); PluginManager(); + PluginManager(const std::vector args); ~PluginManager() { @@ -69,7 +70,7 @@ class PluginManager struct PluginInstantiationError : std::runtime_error { using std::runtime_error::runtime_error; }; static auto ProgramOptions() -> boost::program_options::options_description; - static auto MakeFromCommandLineOptions(const std::vector) -> std::shared_ptr; + static auto MakeFromCommandLineOptions(const std::vector) -> PluginManager; struct ProgramOptionsParseError : std::runtime_error { using std::runtime_error::runtime_error; }; static auto LibPrefix() -> const std::string& { return fgkLibPrefix; } @@ -116,10 +117,10 @@ class PluginManager static const std::string fgkLibPrefix; std::vector fSearchPaths; std::map> fPluginFactories; - std::map> fPlugins; + std::unique_ptr fPluginServices; + std::map> fPlugins; std::vector fPluginOrder; std::map fPluginProgOptions; - std::unique_ptr fPluginServices; }; /* class PluginManager */ } /* namespace mq */ diff --git a/fairmq/PluginServices.cxx b/fairmq/PluginServices.cxx index cd6737630..69f329509 100644 --- a/fairmq/PluginServices.cxx +++ b/fairmq/PluginServices.cxx @@ -98,7 +98,7 @@ auto PluginServices::ChangeDeviceState(const std::string& controller, const Devi if (fDeviceController == controller) { - fDevice->ChangeState(fkDeviceStateTransitionMap.at(next)); + fDevice.ChangeState(fkDeviceStateTransitionMap.at(next)); } else { diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index 3565b057d..3cb77c457 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -38,9 +38,9 @@ class PluginServices { public: PluginServices() = delete; - PluginServices(FairMQProgOptions* config, std::shared_ptr device) - : fConfig{config} - , fDevice{device} + PluginServices(FairMQProgOptions* config, FairMQDevice& device) + : fConfig(config) + , fDevice(device) , fDeviceController() , fDeviceControllerMutex() , fReleaseDeviceControlCondition() @@ -114,7 +114,7 @@ class PluginServices friend auto operator<<(std::ostream& os, const DeviceStateTransition& transition) -> std::ostream& { return os << ToStr(transition); } /// @return current device state - auto GetCurrentDeviceState() const -> DeviceState { return fkDeviceStateMap.at(static_cast(fDevice->GetCurrentState())); } + auto GetCurrentDeviceState() const -> DeviceState { return fkDeviceStateMap.at(static_cast(fDevice.GetCurrentState())); } /// @brief Become device controller /// @param controller id @@ -160,14 +160,14 @@ class PluginServices /// the state is running in. auto SubscribeToDeviceStateChange(const std::string& subscriber, std::function callback) -> void { - fDevice->SubscribeToStateChange(subscriber, [&,callback](FairMQDevice::State newState){ + fDevice.SubscribeToStateChange(subscriber, [&,callback](FairMQDevice::State newState){ callback(fkDeviceStateMap.at(newState)); }); } /// @brief Unsubscribe from device state changes /// @param subscriber id - auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice->UnsubscribeFromStateChange(subscriber); } + auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice.UnsubscribeFromStateChange(subscriber); } // Config API struct PropertyNotFoundError : std::runtime_error { using std::runtime_error::runtime_error; }; @@ -272,7 +272,7 @@ class PluginServices private: FairMQProgOptions* fConfig; // TODO make it a shared pointer, once old AliceO2 code is cleaned up - std::shared_ptr fDevice; + FairMQDevice& fDevice; boost::optional fDeviceController; mutable std::mutex fDeviceControllerMutex; std::condition_variable fReleaseDeviceControlCondition; diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index b61edebb7..e368a880c 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -17,6 +17,7 @@ #include #include +#include #include "FairMQDevice.h" @@ -38,7 +39,7 @@ class FairMQBenchmarkSampler : public FairMQDevice protected: bool fSameMessage; int fMsgSize; - int fMsgCounter; + std::atomic fMsgCounter; int fMsgRate; uint64_t fNumIterations; uint64_t fMaxIterations; diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx index 084b13d4a..8421cd776 100644 --- a/fairmq/plugins/Control.cxx +++ b/fairmq/plugins/Control.cxx @@ -17,12 +17,11 @@ using namespace std; namespace { - // ugly global state, but std::signal gives us no other choice - std::function gSignalHandlerClosure; + volatile sig_atomic_t gSignalStatus = 0; extern "C" auto signal_handler(int signal) -> void { - gSignalHandlerClosure(signal); + gSignalStatus = signal; } } @@ -37,10 +36,13 @@ Control::Control(const string name, const Plugin::Version version, const string : Plugin(name, version, maintainer, homepage, pluginServices) , fControllerThread() , fSignalHandlerThread() + , fShutdownThread() , fEvents() , fEventsMutex() + , fShutdownMutex() , fNewEvent() - , fDeviceTerminationRequested{false} + , fDeviceTerminationRequested(false) + , fHasShutdown(false) { try { @@ -73,7 +75,7 @@ Control::Control(const string name, const Plugin::Version version, const string LOG(debug) << "catch-signals: " << GetProperty("catch-signals"); if (GetProperty("catch-signals") > 0) { - gSignalHandlerClosure = bind(&Control::SignalHandler, this, placeholders::_1); + fSignalHandlerThread = thread(&Control::SignalHandler, this); signal(SIGINT, signal_handler); signal(SIGTERM, signal_handler); } @@ -263,69 +265,92 @@ auto Control::StaticMode() -> void } } -auto Control::SignalHandler(int signal) -> void +auto Control::SignalHandler() -> void { - if (!fDeviceTerminationRequested) + while (true) { - fDeviceTerminationRequested = true; - - StealDeviceControl(); - - LOG(info) << "Received device shutdown request (signal " << signal << ")."; - LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately."; - - UnsubscribeFromDeviceStateChange(); // In case, static or interactive mode have subscribed already - SubscribeToDeviceStateChange([&](DeviceState newState) + if (gSignalStatus != 0 && !fHasShutdown) { + LOG(info) << "Received device shutdown request (signal " << gSignalStatus << ")."; + LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately."; + + if (!fDeviceTerminationRequested) { - lock_guard lock{fEventsMutex}; - fEvents.push(newState); + fDeviceTerminationRequested = true; + gSignalStatus = 0; + fShutdownThread = thread(&Control::HandleShutdownSignal, this); } - fNewEvent.notify_one(); - }); + else + { + LOG(warn) << "Received 2nd device shutdown request (signal " << gSignalStatus << ")."; + LOG(warn) << "Aborting immediately!"; + abort(); + } + } + else if (fHasShutdown) + { + break; + } - fSignalHandlerThread = thread(&Control::RunShutdownSequence, this); + this_thread::sleep_for(chrono::milliseconds(100)); } - else +} + +auto Control::HandleShutdownSignal() -> void +{ + StealDeviceControl(); + + UnsubscribeFromDeviceStateChange(); // In case, static or interactive mode have subscribed already + SubscribeToDeviceStateChange([&](DeviceState newState) { - LOG(warn) << "Received 2nd device shutdown request (signal " << signal << ")."; - LOG(warn) << "Aborting immediately !"; - abort(); - } + { + lock_guard lock{fEventsMutex}; + fEvents.push(newState); + } + fNewEvent.notify_one(); + }); + + RunShutdownSequence(); } auto Control::RunShutdownSequence() -> void { - auto nextState = GetCurrentDeviceState(); - EmptyEventQueue(); - while (nextState != DeviceState::Exiting) + lock_guard lock(fShutdownMutex); + if (!fHasShutdown) { - switch (nextState) + auto nextState = GetCurrentDeviceState(); + EmptyEventQueue(); + while (nextState != DeviceState::Exiting) { - case DeviceState::Idle: - ChangeDeviceState(DeviceStateTransition::End); - break; - case DeviceState::DeviceReady: - ChangeDeviceState(DeviceStateTransition::ResetDevice); - break; - case DeviceState::Ready: - ChangeDeviceState(DeviceStateTransition::ResetTask); - break; - case DeviceState::Running: - ChangeDeviceState(DeviceStateTransition::Stop); - break; - case DeviceState::Paused: - ChangeDeviceState(DeviceStateTransition::Resume); - break; - default: - break; + switch (nextState) + { + case DeviceState::Idle: + ChangeDeviceState(DeviceStateTransition::End); + break; + case DeviceState::DeviceReady: + ChangeDeviceState(DeviceStateTransition::ResetDevice); + break; + case DeviceState::Ready: + ChangeDeviceState(DeviceStateTransition::ResetTask); + break; + case DeviceState::Running: + ChangeDeviceState(DeviceStateTransition::Stop); + break; + case DeviceState::Paused: + ChangeDeviceState(DeviceStateTransition::Resume); + break; + default: + // ignore other states + break; + } + + nextState = WaitForNextState(); } - nextState = WaitForNextState(); + fHasShutdown = true; + UnsubscribeFromDeviceStateChange(); + ReleaseDeviceControl(); } - - UnsubscribeFromDeviceStateChange(); - ReleaseDeviceControl(); } auto Control::RunStartupSequence() -> void @@ -357,6 +382,7 @@ Control::~Control() { if (fControllerThread.joinable()) fControllerThread.join(); if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join(); + if (fShutdownThread.joinable()) fShutdownThread.join(); } } /* namespace plugins */ diff --git a/fairmq/plugins/Control.h b/fairmq/plugins/Control.h index 7b445c09b..15ede137c 100644 --- a/fairmq/plugins/Control.h +++ b/fairmq/plugins/Control.h @@ -37,17 +37,21 @@ class Control : public Plugin auto PrintInteractiveHelp() -> void; auto StaticMode() -> void; auto WaitForNextState() -> DeviceState; - auto SignalHandler(int signal) -> void; + auto SignalHandler() -> void; + auto HandleShutdownSignal() -> void; auto RunShutdownSequence() -> void; auto RunStartupSequence() -> void; auto EmptyEventQueue() -> void; std::thread fControllerThread; std::thread fSignalHandlerThread; + std::thread fShutdownThread; std::queue fEvents; std::mutex fEventsMutex; + std::mutex fShutdownMutex; std::condition_variable fNewEvent; std::atomic fDeviceTerminationRequested; + std::atomic fHasShutdown; }; /* class Control */ auto ControlPluginProgramOptions() -> Plugin::ProgOptions; diff --git a/fairmq/runFairMQDevice.h b/fairmq/runFairMQDevice.h index f222a0eac..152edc1bc 100644 --- a/fairmq/runFairMQDevice.h +++ b/fairmq/runFairMQDevice.h @@ -46,7 +46,7 @@ int main(int argc, char* argv[]) // }); runner.AddHook([](DeviceRunner& r){ - r.fDevice = std::shared_ptr{getDevice(r.fConfig)}; + r.fDevice = std::unique_ptr{getDevice(r.fConfig)}; }); return runner.Run(); diff --git a/test/plugin_services/Fixture.h b/test/plugin_services/Fixture.h index cd0688edf..2c7e39932 100644 --- a/test/plugin_services/Fixture.h +++ b/test/plugin_services/Fixture.h @@ -44,7 +44,6 @@ struct PluginServices : ::testing::Test { { fRunStateMachineThread = std::thread(&FairMQDevice::RunStateMachine, mDevice.get()); mDevice->SetTransport("zeromq"); - } ~PluginServices() diff --git a/test/plugins/_plugin_manager.cxx b/test/plugins/_plugin_manager.cxx index 893c3f83f..18afece2a 100644 --- a/test/plugins/_plugin_manager.cxx +++ b/test/plugins/_plugin_manager.cxx @@ -104,14 +104,14 @@ TEST(PluginManager, LoadPluginStatic) TEST(PluginManager, Factory) { const auto args = vector{"-l", "debug", "--help", "-S", ">/lib", "{path1, path2, path3, path4}; - ASSERT_TRUE(static_cast(mgr)); - ASSERT_TRUE(mgr->SearchPaths() == expected); + // ASSERT_TRUE(static_cast(mgr)); + ASSERT_TRUE(mgr.SearchPaths() == expected); } TEST(PluginManager, SearchPathValidation)