Skip to content

Commit

Permalink
Allow plugins to create channels
Browse files Browse the repository at this point in the history
This also fixes a bug, that prevented the usage of custom types with the
plugin config API.
  • Loading branch information
dennisklein committed Oct 31, 2018
1 parent 3b5b2b5 commit 5e4876c
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 12 deletions.
6 changes: 5 additions & 1 deletion fairmq/FairMQDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,11 @@ class FairMQDevice : public FairMQStateMachine
void SetRawCmdLineArgs(const std::vector<std::string>& args) { fRawCmdLineArgs = args; }
std::vector<std::string> GetRawCmdLineArgs() const { return fRawCmdLineArgs; }

void RunStateMachine() { ProcessWork(); };
void RunStateMachine()
{
CallStateChangeCallbacks(FairMQStateMachine::IDLE);
ProcessWork();
};

/// Wait for the supplied amount of time or for interruption.
/// If interrupted, returns false, otherwise true.
Expand Down
5 changes: 3 additions & 2 deletions fairmq/FairMQStateMachine.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,13 @@ struct Machine_ : public state_machine_def<Machine_>
using initial_state = boost::mpl::vector<IDLE_FSM_STATE, OK_FSM_STATE>;

template<typename Event, typename FSM>
void on_entry(Event const&, FSM& fsm)
void on_entry(Event const&, FSM& /*fsm*/)
{
LOG(state) << "Starting FairMQ state machine";
fState = FairMQStateMachine::IDLE;
LOG(state) << "Entering IDLE state";
fsm.CallStateChangeCallbacks(FairMQStateMachine::IDLE);
// fsm.CallStateChangeCallbacks(FairMQStateMachine::IDLE);
// we call this for now in FairMQDevice::RunStateMachine()
}

template<typename Event, typename FSM>
Expand Down
7 changes: 5 additions & 2 deletions fairmq/PluginServices.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,16 @@ class PluginServices
auto SetProperty(const std::string& key, T val) -> void
{
auto currentState = GetCurrentDeviceState();
if (currentState == DeviceState::InitializingDevice)
if ( (currentState == DeviceState::InitializingDevice)
|| ((currentState == DeviceState::Idle) && (key == "channel-config")))
{
fConfig.SetValue(key, val);
}
else
{
throw InvalidStateError{tools::ToString("PluginServices::SetProperty is not supported in device state ", currentState, ". Supported state is ", DeviceState::InitializingDevice, ".")};
throw InvalidStateError{
tools::ToString("PluginServices::SetProperty is not supported in device state ", currentState, ". ",
"Supported state is ", DeviceState::InitializingDevice, ".")};
}
}
struct InvalidStateError : std::runtime_error { using std::runtime_error::runtime_error; };
Expand Down
19 changes: 18 additions & 1 deletion fairmq/options/FairMQProgOptions.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al
else if (fVarMap.count("channel-config"))
{
LOG(debug) << "channel-config: Parsing channel configuration";
UpdateChannelMap(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as<vector<string>>(), idForParser));
ParseChannelsFromCmdLine();
}
else
{
Expand All @@ -185,6 +185,23 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al
return 0;
}

void FairMQProgOptions::ParseChannelsFromCmdLine()
{
string idForParser;

// check if config-key for config parser is provided
if (fVarMap.count("config-key"))
{
idForParser = fVarMap["config-key"].as<string>();
}
else if (fVarMap.count("id"))
{
idForParser = fVarMap["id"].as<string>();
}

UpdateChannelMap(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as<vector<string>>(), idForParser));
}

void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered)
{
fVarMap.clear();
Expand Down
19 changes: 13 additions & 6 deletions fairmq/options/FairMQProgOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ class FairMQProgOptions
// update variable map
UpdateVarMap<typename std::decay<T>::type>(key, val);

// update FairMQChannel map if the key is a channel key
if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)
if (key == "channel-config")
{
if (fChannelKeyMap.count(key))
{
UpdateChannelValue(fChannelKeyMap.at(key).channel, fChannelKeyMap.at(key).index, fChannelKeyMap.at(key).member, val);
}
ParseChannelsFromCmdLine();
}
else if (fChannelKeyMap.count(key))
{
UpdateChannelValue(fChannelKeyMap.at(key).channel, fChannelKeyMap.at(key).index, fChannelKeyMap.at(key).member, val);
}

lock.unlock();
Expand Down Expand Up @@ -210,6 +210,12 @@ class FairMQProgOptions
}

int UpdateChannelMap(const FairMQChannelMap& map);
template<typename T>
int UpdateChannelValue(const std::string&, int, const std::string&, T)
{
LOG(error) << "update of FairMQChannel map failed, because value type not supported";
return 1;
}
int UpdateChannelValue(const std::string& channelName, int index, const std::string& member, const std::string& val);
int UpdateChannelValue(const std::string& channelName, int index, const std::string& member, int val);

Expand All @@ -223,6 +229,7 @@ class FairMQProgOptions
vm[key].value() = boost::any(val);
}

void ParseChannelsFromCmdLine();
};

#endif /* FAIRMQPROGOPTIONS_H */

0 comments on commit 5e4876c

Please sign in to comment.