diff --git a/fairmq/Device.cxx b/fairmq/Device.cxx index 19c8cb172..d9961ed91 100644 --- a/fairmq/Device.cxx +++ b/fairmq/Device.cxx @@ -435,7 +435,7 @@ void Device::InitTaskWrapper() void Device::RunWrapper() { - LOG(info) << "DEVICE: Running..."; + LOG(info) << "fair::mq::Device running..."; // start the rate logger thread future rateLogger = async(launch::async, &Device::LogSocketRates, this); @@ -445,46 +445,43 @@ void Device::RunWrapper() t.second->Resume(); } - try { - PreRun(); - - // process either data callbacks or ConditionalRun/Run - if (fDataCallbacks) { - // if only one input channel, do lightweight handling without additional polling. - if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) { - HandleSingleChannelInput(); - } else {// otherwise do full handling with polling - HandleMultipleChannelInput(); - } - } else { - tools::RateLimiter rateLimiter(fRate); + // change to Error state in case of an exception, to release LogSocketRates + tools::CallOnDestruction cod([&](){ + ChangeState(Transition::ErrorFound); + }); - while (!NewStatePending() && ConditionalRun()) { - if (fRate > 0.001) { - rateLimiter.maybe_sleep(); - } - } + PreRun(); - Run(); + // process either data callbacks or ConditionalRun/Run + if (fDataCallbacks) { + // if only one input channel, do lightweight handling without additional polling. + if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) { + HandleSingleChannelInput(); + } else {// otherwise do full handling with polling + HandleMultipleChannelInput(); } + } else { + tools::RateLimiter rateLimiter(fRate); - // if Run() exited and the state is still RUNNING, transition to READY. - if (!NewStatePending()) { - UnblockTransports(); - ChangeState(Transition::Stop); + while (!NewStatePending() && ConditionalRun()) { + if (fRate > 0.001) { + rateLimiter.maybe_sleep(); + } } - PostRun(); - } catch (const out_of_range& oor) { - LOG(error) << "out of range: " << oor.what(); - LOG(error) << "incorrect/incomplete channel configuration?"; - ChangeState(Transition::ErrorFound); - throw; - } catch (...) { - ChangeState(Transition::ErrorFound); - throw; + Run(); } + // if Run() exited and the state is still RUNNING, transition to READY. + if (!NewStatePending()) { + UnblockTransports(); + ChangeState(Transition::Stop); + } + + PostRun(); + + cod.disable(); + rateLogger.get(); } diff --git a/fairmq/Device.h b/fairmq/Device.h index bcfa3275b..77a563935 100644 --- a/fairmq/Device.h +++ b/fairmq/Device.h @@ -320,10 +320,7 @@ class Device try { return fChannels.at(channelName).at(index); } catch (const std::out_of_range& oor) { - LOG(error) - << "requested channel has not been configured? check channel names/configuration."; - LOG(error) << "channel: " << channelName << ", index: " << index; - LOG(error) << "out of range: " << oor.what(); + LOG(error) << "GetChannel(): '" << channelName << "[" << index << "]' does not exist."; throw; } diff --git a/fairmq/StateMachine.cxx b/fairmq/StateMachine.cxx index e1de38547..83baec907 100644 --- a/fairmq/StateMachine.cxx +++ b/fairmq/StateMachine.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include +#include #include @@ -204,6 +205,7 @@ struct Machine_ : public state_machine_def } if (fState == State::Error) { + LOG(trace) << "Device transitioned to error state"; throw StateMachine::ErrorStateException("Device transitioned to error state"); } } @@ -366,20 +368,18 @@ void StateMachine::ProcessWork() { auto fsm = static_pointer_cast(fFsm); - try { - fsm->CallStateChangeCallbacks(State::Idle); - fsm->ProcessWork(); - } catch(ErrorStateException& ese) { - LOG(trace) << "ErrorStateException caught in ProcessWork(), rethrowing"; - throw; - } catch(...) { - LOG(debug) << "Exception caught in ProcessWork(), going to Error state and rethrowing"; + fair::mq::tools::CallOnDestruction cod([&](){ + LOG(debug) << "Exception caught in ProcessWork(), going to Error state"; { lock_guard lock(fsm->fStateMtx); fsm->fState = State::Error; fsm->CallStateChangeCallbacks(State::Error); } ChangeState(Transition::ErrorFound); - throw; - } + }); + + fsm->CallStateChangeCallbacks(State::Idle); + fsm->ProcessWork(); + + cod.disable(); } diff --git a/fairmq/Tools.h b/fairmq/Tools.h index 3ec4aff97..781918697 100644 --- a/fairmq/Tools.h +++ b/fairmq/Tools.h @@ -11,6 +11,7 @@ // IWYU pragma: begin_exports #include +#include #include #include #include diff --git a/fairmq/tools/Exceptions.h b/fairmq/tools/Exceptions.h new file mode 100644 index 000000000..ce3ad6a7b --- /dev/null +++ b/fairmq/tools/Exceptions.h @@ -0,0 +1,54 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_TOOLS_EXCEPTIONS_H +#define FAIR_MQ_TOOLS_EXCEPTIONS_H + +#include + +namespace fair::mq::tools +{ + +/** + * Executes the given callback in the destructor. + * Can be used to execute something in case of an exception when catch is undesirable, e.g.: + * + * { + * // callback will be executed only if f throws an exception + * CallOnDestruction cod([](){ cout << "exception was thrown"; }, true); + * f(); + * cod.disable(); + * } + */ + +class CallOnDestruction +{ + public: + CallOnDestruction(std::function c, bool enable = true) + : callback(c) + , enabled(enable) + {} + + ~CallOnDestruction() + { + if (enabled) { + callback(); + } + } + + void enable() { enabled = true; } + void disable() { enabled = false; } + + private: + std::function callback; + bool enabled; +}; + +} // namespace fair::mq::tools + +#endif /* FAIR_MQ_TOOLS_EXCEPTIONS_H */