Skip to content

Commit

Permalink
Refactor initialization
Browse files Browse the repository at this point in the history
 - add device constructor that accepts FairMQProgOptions object.
 - Initialize config values in INIT state (to allow their update).
 - Simplify FairMQProgOptions handling in FairMQDevice.
 - Simplify SetTransport/SetConfig - refactor duplicated code.
 - Add FairMQDevice methods to add channels.
  • Loading branch information
rbx authored and dennisklein committed Aug 27, 2018
1 parent 1c78b8e commit 1bb558a
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 163 deletions.
197 changes: 66 additions & 131 deletions fairmq/FairMQDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

#include <FairMQDevice.h>

#include <fairmq/Tools.h>
#include <fairmq/Transports.h>

#include <boost/algorithm/string.hpp> // join/split

#include <boost/uuid/uuid.hpp>
Expand All @@ -31,53 +28,39 @@

using namespace std;


FairMQDevice::FairMQDevice()
: fTransportFactory(nullptr)
, fTransports()
, fChannels()
, fConfig(nullptr)
, fId()
, fNumIoThreads(1)
, fInitialValidationFinished(false)
, fInitialValidationCondition()
, fInitialValidationMutex()
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fNetworkInterface()
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
, fInitializationTimeoutInS(120)
, fDataCallbacks(false)
, fMsgInputs()
, fMultipartInputs()
, fMultitransportInputs()
, fChannelRegistry()
, fInputChannelKeys()
, fMultitransportMutex()
, fMultitransportProceed(false)
, fExternalConfig(false)
, fVersion({0, 0, 0})
, fRate(0.)
, fLastTime(0)
, fRawCmdLineArgs()
: FairMQDevice(nullptr, {0, 0, 0})
{
}

FairMQDevice::FairMQDevice(FairMQProgOptions& config)
: FairMQDevice(&config, {0, 0, 0})
{
}

FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
: FairMQDevice(nullptr, version)
{
}

FairMQDevice::FairMQDevice(FairMQProgOptions& config, const fair::mq::tools::Version version)
: FairMQDevice(&config, version)
{
}

FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Version version)
: fTransportFactory(nullptr)
, fTransports()
, fChannels()
, fConfig(nullptr)
, fInternalConfig(config ? nullptr : fair::mq::tools::make_unique<FairMQProgOptions>())
, fConfig(config ? config : fInternalConfig.get())
, fId()
, fNumIoThreads(1)
, fInitialValidationFinished(false)
, fInitialValidationCondition()
, fInitialValidationMutex()
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fNetworkInterface()
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
, fInitializationTimeoutInS(120)
, fDataCallbacks(false)
, fMsgInputs()
, fMultipartInputs()
Expand All @@ -86,7 +69,6 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
, fInputChannelKeys()
, fMultitransportMutex()
, fMultitransportProceed(false)
, fExternalConfig(false)
, fVersion(version)
, fRate(0.)
, fLastTime(0)
Expand All @@ -96,16 +78,43 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)

void FairMQDevice::InitWrapper()
{
if (!fTransportFactory)
fId = fConfig->GetValue<string>("id");
fRate = fConfig->GetValue<float>("rate");
fPortRangeMin = fConfig->GetValue<int>("port-range-min");
fPortRangeMax = fConfig->GetValue<int>("port-range-max");

try
{
LOG(error) << "Transport not initialized. Did you call SetTransport()?";
exit(EXIT_FAILURE);
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
}
catch (const exception& e)
{
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
}

for (auto& c : fConfig->GetFairMQMap())
{
if (fChannels.find(c.first) == fChannels.end())
{
LOG(debug) << "Inserting new device channel from config: " << c.first;
fChannels.insert(c);
}
else
{
LOG(debug) << "Updating existing device channel from config: " << c.first;
fChannels[c.first] = c.second;
}
}

LOG(debug) << "Requesting '" << fair::mq::TransportNames.at(fDefaultTransportType) << "' as default transport for the device";
fTransportFactory = AddTransport(fDefaultTransportType);

// Containers to store the uninitialized channels.
vector<FairMQChannel*> uninitializedBindingChannels;
vector<FairMQChannel*> uninitializedConnectingChannels;

string networkInterface = fConfig->GetValue<string>("network-interface");

// Fill the uninitialized channel containers
for (auto& mi : fChannels)
{
Expand All @@ -126,11 +135,11 @@ void FairMQDevice::InitWrapper()
if (vi->fAddress == "unspecified" || vi->fAddress == "")
{
// if the configured network interface is default, get its name from the default route
if (fNetworkInterface == "default")
if (networkInterface == "default")
{
fNetworkInterface = fair::mq::tools::getDefaultRouteNetworkInterface();
networkInterface = fair::mq::tools::getDefaultRouteNetworkInterface();
}
vi->fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(fNetworkInterface) + ":1";
vi->fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1";
}
// fill the uninitialized list
uninitializedBindingChannels.push_back(&(*vi));
Expand Down Expand Up @@ -175,10 +184,12 @@ void FairMQDevice::InitWrapper()
fInitialValidationCondition.notify_one();
}

int initializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");

// go over the list of channels until all are initialized (and removed from the uninitialized list)
int numAttempts = 1;
auto sleepTimeInMS = 50;
auto maxAttempts = fInitializationTimeoutInS * 1000 / sleepTimeInMS;
auto maxAttempts = initializationTimeoutInS * 1000 / sleepTimeInMS;
// first attempt
AttachChannels(uninitializedConnectingChannels);
// if not all channels could be connected, update their address values from config and retry
Expand All @@ -201,9 +212,9 @@ void FairMQDevice::InitWrapper()

if (numAttempts++ > maxAttempts)
{
LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts";
LOG(error) << "could not connect all channels after " << initializationTimeoutInS << " attempts";
ChangeState(ERROR_FOUND);
// throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts"));
// throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
}

AttachChannels(uninitializedConnectingChannels);
Expand Down Expand Up @@ -252,19 +263,15 @@ void FairMQDevice::AttachChannels(vector<FairMQChannel*>& chans)

bool FairMQDevice::AttachChannel(FairMQChannel& ch)
{
if (!ch.fTransportFactory)
if (ch.fTransportType == fair::mq::Transport::DEFAULT || ch.fTransportType == fTransportFactory->GetType())
{
if (ch.fTransportType == fair::mq::Transport::DEFAULT || ch.fTransportType == fTransportFactory->GetType())
{
LOG(debug) << ch.fName << ": using default transport";
ch.InitTransport(fTransportFactory);
}
else
{
LOG(debug) << ch.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(ch.fTransportType);
ch.InitTransport(AddTransport(ch.fTransportType));
}
ch.fTransportType = ch.fTransportFactory->GetType();
LOG(debug) << ch.fName << ": using default transport";
ch.InitTransport(fTransportFactory);
}
else
{
LOG(debug) << ch.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(ch.fTransportType);
ch.InitTransport(AddTransport(ch.fTransportType));
}

vector<string> endpoints;
Expand Down Expand Up @@ -798,78 +805,10 @@ shared_ptr<FairMQTransportFactory> FairMQDevice::AddTransport(const fair::mq::Tr
}
}

void FairMQDevice::CreateOwnConfig()
{
// TODO: make fConfig a shared_ptr when no old user code has FairMQProgOptions ptr*
fConfig = new FairMQProgOptions();

string id{boost::uuids::to_string(boost::uuids::random_generator()())};
LOG(warn) << "No FairMQProgOptions provided, creating one internally and setting device ID to " << id;

// dummy argc+argv
char arg0[] = "undefined"; // executable name
char arg1[] = "--id";
char* arg2 = const_cast<char*>(id.c_str()); // device ID
const char* argv[] = { &arg0[0], &arg1[0], arg2, nullptr };
int argc = static_cast<int>((sizeof(argv) / sizeof(argv[0])) - 1);

fConfig->ParseAll(argc, &argv[0]);

fId = fConfig->GetValue<string>("id");
fNetworkInterface = fConfig->GetValue<string>("network-interface");
fNumIoThreads = fConfig->GetValue<int>("io-threads");
fInitializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
fRate = fConfig->GetValue<float>("rate");
try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
} catch(const exception& e) {
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
}
}

void FairMQDevice::SetTransport(const string& transport)
{
// This method is the first to be called, if FairMQProgOptions are not used (either SetTransport() or SetConfig() make sense, not both).
// Make sure here that at least internal config is available.
if (!fExternalConfig && !fConfig)
{
CreateOwnConfig();
}

if (fTransports.empty())
{
LOG(debug) << "Requesting '" << transport << "' as default transport for the device";
fTransportFactory = AddTransport(fair::mq::TransportTypes.at(transport));
}
else
{
LOG(error) << "Transports container is not empty when setting transport. Setting default twice?";
ChangeState(ERROR_FOUND);
}
}

void FairMQDevice::SetConfig(FairMQProgOptions& config)
{
fExternalConfig = true;
fInternalConfig.reset();
fConfig = &config;
for (auto& c : fConfig->GetFairMQMap())
{
if (!fChannels.insert(c).second)
{
LOG(warn) << "did not insert channel '" << c.first << "', it is already in the device.";
}
}
fId = fConfig->GetValue<string>("id");
fNetworkInterface = fConfig->GetValue<string>("network-interface");
fNumIoThreads = fConfig->GetValue<int>("io-threads");
fInitializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
fRate = fConfig->GetValue<float>("rate");
try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
} catch(const exception& e) {
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
}
SetTransport(fConfig->GetValue<string>("transport"));
}

void FairMQDevice::LogSocketRates()
Expand Down Expand Up @@ -1020,7 +959,7 @@ void FairMQDevice::Reset()
for (auto& vi : mi.second)
{
// vi.fReset = true;
vi.fSocket.reset();
vi.fSocket.reset(); // destroy FairMQSocket
}
}
}
Expand All @@ -1032,10 +971,6 @@ const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const i

void FairMQDevice::Exit()
{
if (!fExternalConfig && fConfig)
{
delete fConfig;
}
}

FairMQDevice::~FairMQDevice()
Expand Down
Loading

0 comments on commit 1bb558a

Please sign in to comment.