Skip to content

Commit

Permalink
Resume/Interrupt transports consistently
Browse files Browse the repository at this point in the history
 - Resume transports before state callbacks & handlers
 - Interrupt transports on new transitions
  • Loading branch information
rbx committed Feb 22, 2023
1 parent efb659f commit 9093ed8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
16 changes: 5 additions & 11 deletions fairmq/Device.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,12 @@ Device::Device(ProgOptions* config, tools::Version version)
{
SubscribeToNewTransition("device", [&](Transition transition) {
LOG(trace) << "device notified on new transition: " << transition;
InterruptTransports();
});

switch (transition) {
case Transition::Stop:
InterruptTransports();
break;
default:
break;
}
fStateMachine.PrepareState([&](State state) {
LOG(trace) << "Resuming transports for " << state << " state";
ResumeTransports();
});

fStateMachine.HandleStates([&](State state) {
Expand Down Expand Up @@ -462,9 +460,6 @@ void Device::RunWrapper()
if (rateLogging && rateLogger->joinable()) { rateLogger->join(); }
});

// notify transports to resume transfers
ResumeTransports();

// change to Error state in case of an exception, to release LogSocketRates
tools::CallOnDestruction cod([&](){
ChangeState(Transition::ErrorFound);
Expand Down Expand Up @@ -494,7 +489,6 @@ void Device::RunWrapper()

// if Run() exited and the state is still RUNNING, transition to READY.
if (!NewStatePending()) {
InterruptTransports();
ChangeState(Transition::Stop);
}

Expand Down
22 changes: 22 additions & 0 deletions fairmq/StateMachine.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ struct Machine_ : public state_machine_def<Machine_>
}
}

void CallStatePrep(const State state) const
{
if (!fStatePrepSignal.empty()) {
fStatePrepSignal(state);
}
}

void CallNewTransitionCallbacks(const Transition transition) const
{
if (!fNewTransitionSignal.empty()) {
Expand All @@ -175,6 +182,7 @@ struct Machine_ : public state_machine_def<Machine_>

boost::signals2::signal<void(const State)> fStateChangeSignal;
boost::signals2::signal<void(const State)> fStateHandleSignal;
boost::signals2::signal<void(const State)> fStatePrepSignal;
boost::signals2::signal<void(const Transition)> fNewTransitionSignal;
unordered_map<string, boost::signals2::connection> fStateChangeSignalsMap;
unordered_map<string, boost::signals2::connection> fNewTransitionSignalsMap;
Expand All @@ -198,6 +206,7 @@ struct Machine_ : public state_machine_def<Machine_>
}
}

CallStatePrep(fState);
CallStateChangeCallbacks(fState);
CallStateHandler(fState);
}
Expand Down Expand Up @@ -313,6 +322,16 @@ void StateMachine::UnsubscribeFromStateChange(const string& key)
}
}

void StateMachine::PrepareState(std::function<void(const State)> callback)
{
auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
if (fsm->fStatePrepSignal.empty()) {
fsm->fStatePrepSignal.connect(callback);
} else {
LOG(error) << "state preparation handler is already set";
}
}

void StateMachine::HandleStates(function<void(const State)> callback)
{
auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
Expand All @@ -326,6 +345,9 @@ void StateMachine::HandleStates(function<void(const State)> callback)
void StateMachine::StopHandlingStates()
{
auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
if (!fsm->fStatePrepSignal.empty()) {
fsm->fStatePrepSignal.disconnect_all_slots();
}
if (!fsm->fStateHandleSignal.empty()) {
fsm->fStateHandleSignal.disconnect_all_slots();
}
Expand Down
1 change: 1 addition & 0 deletions fairmq/StateMachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class StateMachine
void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback);
void UnsubscribeFromStateChange(const std::string& key);

void PrepareState(std::function<void(const State)> callback);
void HandleStates(std::function<void(const State)> callback);
void StopHandlingStates();

Expand Down

0 comments on commit 9093ed8

Please sign in to comment.