Skip to content

Commit

Permalink
feat: Deprecate Device::fChannels in preparation for #427
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisklein committed Mar 1, 2023
1 parent f699208 commit 5ef17fd
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 25 deletions.
44 changes: 26 additions & 18 deletions fairmq/Device.cxx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2012-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
Expand Down Expand Up @@ -77,6 +77,9 @@ Device::Device(ProgOptions& config, tools::Version version)
: Device(&config, version)
{}

/// TODO: Remove this once Device::fChannels is no longer public
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
Device::Device(ProgOptions* config, tools::Version version)
: fTransportFactory(nullptr)
, fInternalConfig(config ? nullptr : make_unique<ProgOptions>())
Expand Down Expand Up @@ -138,6 +141,7 @@ Device::Device(ProgOptions* config, tools::Version version)

fStateMachine.Start();
}
#pragma GCC diagnostic pop

void Device::TransitionTo(State s)
{
Expand Down Expand Up @@ -229,7 +233,7 @@ void Device::InitWrapper()
unordered_map<string, int> infos = fConfig->GetChannelInfo();
for (const auto& info : infos) {
for (int i = 0; i < info.second; ++i) {
fChannels[info.first].emplace_back(info.first, i, fConfig->GetPropertiesStartingWith(tools::ToString("chans.", info.first, ".", i, ".")));
GetChannels()[info.first].emplace_back(info.first, i, fConfig->GetPropertiesStartingWith(tools::ToString("chans.", info.first, ".", i, ".")));
}
}

Expand All @@ -239,7 +243,7 @@ void Device::InitWrapper()
string networkInterface = fConfig->GetProperty<string>("network-interface", DefaultNetworkInterface);

// Fill the uninitialized channel containers
for (auto& channel : fChannels) {
for (auto& channel : GetChannels()) {
int subChannelIndex = 0;
for (auto& subChannel : channel.second) {
// set channel transport
Expand Down Expand Up @@ -330,7 +334,7 @@ void Device::ConnectWrapper()
AttachChannels(fUninitializedConnectingChannels);
}

if (fChannels.empty()) {
if (GetChannels().empty()) {
LOG(warn) << "No channels created after finishing initialization";
}

Expand Down Expand Up @@ -449,7 +453,7 @@ void Device::RunWrapper()

unique_ptr<thread> rateLogger;
// Check if rate logging thread is needed
const bool rateLogging = any_of(fChannels.cbegin(), fChannels.cend(), [](auto ch) {
const bool rateLogging = any_of(GetChannels().cbegin(), GetChannels().cend(), [](auto ch) {
return any_of(ch.second.cbegin(), ch.second.cend(), [](auto sub) { return sub.fRateLogging > 0; });
});

Expand All @@ -470,7 +474,7 @@ void Device::RunWrapper()
// process either data callbacks or ConditionalRun/Run
if (fDataCallbacks) {
// if only one input channel, do lightweight handling without additional polling.
if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) {
if (fInputChannelKeys.size() == 1 && GetChannels().at(fInputChannelKeys.at(0)).size() == 1) {
HandleSingleChannelInput();
} else {// otherwise do full handling with polling
HandleMultipleChannelInput();
Expand Down Expand Up @@ -517,7 +521,7 @@ void Device::HandleMultipleChannelInput()
// check if more than one transport is used
fMultitransportInputs.clear();
for (const auto& k : fInputChannelKeys) {
mq::Transport t = fChannels.at(k).at(0).fTransportType;
mq::Transport t = GetChannel(k, 0).fTransportType;
if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) {
fMultitransportInputs.insert(pair<mq::Transport, vector<string>>(t, vector<string>()));
fMultitransportInputs.at(t).push_back(k);
Expand All @@ -527,13 +531,13 @@ void Device::HandleMultipleChannelInput()
}

for (const auto& mi : fMsgInputs) {
for (auto& i : fChannels.at(mi.first)) {
for (auto& i : GetChannels().at(mi.first)) {
i.fMultipart = false;
}
}

for (const auto& mi : fMultipartInputs) {
for (auto& i : fChannels.at(mi.first)) {
for (auto& i : GetChannels().at(mi.first)) {
i.fMultipart = true;
}
}
Expand All @@ -544,16 +548,16 @@ void Device::HandleMultipleChannelInput()
} else { // otherwise poll directly
bool proceed = true;

PollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys));
PollerPtr poller(GetChannel(fInputChannelKeys.at(0), 0).fTransportFactory->CreatePoller(GetChannels(), fInputChannelKeys));

while (!NewStatePending() && proceed) {
poller->Poll(200);

// check which inputs are ready and call their data handlers if they are.
for (const auto& ch : fInputChannelKeys) {
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
for (unsigned int i = 0; i < GetChannels().at(ch).size(); ++i) {
if (poller->CheckInput(ch, i)) {
if (fChannels.at(ch).at(i).fMultipart) {
if (GetChannel(ch, i).fMultipart) {
proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
} else {
proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
Expand Down Expand Up @@ -590,21 +594,21 @@ void Device::HandleMultipleTransportInput()
void Device::PollForTransport(const TransportFactory* factory, const vector<string>& channelKeys)
{
try {
PollerPtr poller(factory->CreatePoller(fChannels, channelKeys));
PollerPtr poller(factory->CreatePoller(GetChannels(), channelKeys));

while (!NewStatePending() && fMultitransportProceed) {
poller->Poll(500);

for (const auto& ch : channelKeys) {
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
for (unsigned int i = 0; i < GetChannels().at(ch).size(); ++i) {
if (poller->CheckInput(ch, i)) {
lock_guard<mutex> lock(fMultitransportMutex);

if (!fMultitransportProceed) {
break;
}

if (fChannels.at(ch).at(i).fMultipart) {
if (GetChannel(ch, i).fMultipart) {
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
} else {
fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
Expand All @@ -628,7 +632,7 @@ void Device::PollForTransport(const TransportFactory* factory, const vector<stri

bool Device::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i)
{
unique_ptr<Message> input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage());
unique_ptr<Message> input(GetChannel(chName, i).fTransportFactory->CreateMessage());

if (Receive(input, chName, i) >= 0) {
return callback(input, i);
Expand Down Expand Up @@ -685,7 +689,7 @@ void Device::LogSocketRates()
size_t chanNameLen = 0;

// iterate over the channels map
for (auto& channel : fChannels) {
for (auto& channel : GetChannels()) {
// iterate over the channels vector
for (auto& subChannel : channel.second) {
if (subChannel.fRateLogging > 0) {
Expand Down Expand Up @@ -806,18 +810,22 @@ void Device::ResetWrapper()

Reset();

fChannels.clear();
GetChannels().clear();
fTransportFactory.reset();
if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
}

/// TODO: Remove this once Device::fChannels is no longer public
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
Device::~Device()
{
UnsubscribeFromNewTransition("device");
fStateMachine.StopHandlingStates();
LOG(debug) << "Shutting down device " << fId;
}
#pragma GCC diagnostic pop

} // namespace fair::mq
26 changes: 21 additions & 5 deletions fairmq/Device.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2021-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2021-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
Expand Down Expand Up @@ -233,7 +233,7 @@ class Device
}
}

return GetChannel(chans.at(0), 0).Transport()->CreatePoller(fChannels, chans);
return GetChannel(chans.at(0), 0).Transport()->CreatePoller(GetChannels(), chans);
}

PollerPtr NewPoller(const std::vector<Channel*>& channels)
Expand Down Expand Up @@ -321,15 +321,15 @@ class Device

Channel& GetChannel(const std::string& channelName, const int index = 0)
try {
return fChannels.at(channelName).at(index);
return GetChannels().at(channelName).at(index);
} catch (const std::out_of_range& oor) {
LOG(error) << "GetChannel(): '" << channelName << "[" << index << "]' does not exist.";
throw;
}

size_t GetNumSubChannels(const std::string& channelName)
try {
return fChannels.at(channelName).size();
return GetChannels().at(channelName).size();
} catch (const std::out_of_range& oor) {
LOG(error) << "GetNumSubChannels(): '" << channelName << "' does not exist.";
throw;
Expand All @@ -340,7 +340,7 @@ class Device
/// @param index sub-channel
unsigned long GetNumberOfConnectedPeers(const std::string& channelName, int index = 0)
{
return fChannels.at(channelName).at(index).GetNumberOfConnectedPeers();
return GetChannel(channelName, index).GetNumberOfConnectedPeers();
}

virtual void RegisterChannelEndpoints() {}
Expand Down Expand Up @@ -438,7 +438,23 @@ class Device
fTransports; ///< Container for transports

public:
[[deprecated("Use GetChannels() instead.")]]
std::unordered_map<std::string, std::vector<Channel>> fChannels; ///< Device channels
std::unordered_map<std::string, std::vector<Channel>>& GetChannels()
{
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
return fChannels;
#pragma GCC diagnostic pop
}
std::unordered_map<std::string, std::vector<Channel>> const& GetChannels() const
{
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
return fChannels;
#pragma GCC diagnostic pop
}

std::unique_ptr<ProgOptions> fInternalConfig; ///< Internal program options configuration
ProgOptions* fConfig; ///< Pointer to config (internal or external)

Expand Down
4 changes: 2 additions & 2 deletions fairmq/devices/Merger.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
Expand Down Expand Up @@ -47,7 +47,7 @@ class Merger : public Device

std::vector<Channel*> chans;

for (auto& chan : fChannels.at(fInChannelName)) {
for (auto& chan : GetChannels().at(fInChannelName)) {
chans.push_back(&chan);
}

Expand Down

0 comments on commit 5ef17fd

Please sign in to comment.