Skip to content

Commit

Permalink
Shmem: Build shmem names out of session id + user id
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx authored and dennisklein committed Oct 11, 2018
1 parent 1d45095 commit e090967
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 45 deletions.
15 changes: 15 additions & 0 deletions fairmq/shmem/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
#define FAIR_MQ_SHMEM_COMMON_H_

#include <atomic>
#include <string>

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/functional/hash.hpp>

#include <unistd.h>
#include <sys/types.h>

namespace fair
{
Expand Down Expand Up @@ -73,6 +78,16 @@ struct RegionBlock
size_t fHint;
};

// find id for unique shmem name:
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId)
{
boost::hash<std::string> stringHash;
std::string shmId(std::to_string(stringHash(std::string((std::to_string(geteuid()) + sessionId)))));
shmId.resize(8, '_');
return shmId;
}

} // namespace shmem
} // namespace mq
} // namespace fair
Expand Down
19 changes: 10 additions & 9 deletions fairmq/shmem/FairMQTransportFactorySHM.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fair::mq::Transport FairMQTransportFactorySHM::fTransportType = fair::mq::Transp
FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config)
: FairMQTransportFactory(id)
, fDeviceId(id)
, fSessionName("default")
, fShmId()
, fContext(nullptr)
, fHeartbeatThread()
, fSendHeartbeats(true)
Expand All @@ -58,12 +58,13 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
}

int numIoThreads = 1;
string sessionName = "default";
size_t segmentSize = 2000000000;
bool autolaunchMonitor = false;
if (config)
{
numIoThreads = config->GetValue<int>("io-threads");
fSessionName = config->GetValue<string>("session");
sessionName = config->GetValue<string>("session");
segmentSize = config->GetValue<size_t>("shm-segment-size");
autolaunchMonitor = config->GetValue<bool>("shm-monitor");
}
Expand All @@ -72,11 +73,11 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
LOG(debug) << "FairMQProgOptions not available! Using defaults.";
}

fSessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS)
fShmId = buildShmIdFromSessionIdAndUserId(sessionName);

try
{
fShMutex = fair::mq::tools::make_unique<bipc::named_mutex>(bipc::open_or_create, string("fmq_" + fSessionName + "_mtx").c_str());
fShMutex = fair::mq::tools::make_unique<bipc::named_mutex>(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str());

if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)
{
Expand All @@ -89,8 +90,8 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
}

fManager = fair::mq::tools::make_unique<Manager>(fSessionName, segmentSize);
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fSessionName << "_main" << "' of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes.";
fManager = fair::mq::tools::make_unique<Manager>(fShmId, segmentSize);
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes.";

{
bipc::scoped_lock<bipc::named_mutex> lock(*fShMutex);
Expand Down Expand Up @@ -158,7 +159,7 @@ void FairMQTransportFactorySHM::StartMonitor()

if (!p.empty())
{
boost::process::spawn(p, "-x", "-s", fSessionName, "-d", "-t", "2000", env);
boost::process::spawn(p, "-x", "--shmid", fShmId, "-d", "-t", "2000", env);
int numTries = 0;
do
{
Expand Down Expand Up @@ -188,7 +189,7 @@ void FairMQTransportFactorySHM::StartMonitor()

void FairMQTransportFactorySHM::SendHeartbeats()
{
string controlQueueName("fmq_" + fSessionName + "_cq");
string controlQueueName("fmq_" + fShmId + "_cq");
while (fSendHeartbeats)
{
try
Expand Down Expand Up @@ -310,7 +311,7 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM()

if (lastRemoved)
{
boost::interprocess::named_mutex::remove(string("fmq_" + fSessionName + "_mtx").c_str());
bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str());
}
}

Expand Down
2 changes: 1 addition & 1 deletion fairmq/shmem/FairMQTransportFactorySHM.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory

static fair::mq::Transport fTransportType;
std::string fDeviceId;
std::string fSessionName;
std::string fShmId;
void* fContext;
std::thread fHeartbeatThread;
std::atomic<bool> fSendHeartbeats;
Expand Down
2 changes: 1 addition & 1 deletion fairmq/shmem/Manager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void Manager::RemoveSegment()
{
if (bipc::shared_memory_object::remove(fSegmentName.c_str()))
{
LOG(debug) << "successfully removed " << fSegmentName << " segment after the device has stopped.";
LOG(debug) << "successfully removed '" << fSegmentName << "' segment after the device has stopped.";
}
else
{
Expand Down
45 changes: 28 additions & 17 deletions fairmq/shmem/Monitor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ void signalHandler(int signal)
gSignalStatus = signal;
}

Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit)
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit)
: fSelfDestruct(selfDestruct)
, fInteractive(interactive)
, fSeenOnce(false)
, fIsDaemon(runAsDaemon)
, fCleanOnExit(cleanOnExit)
, fTimeoutInMS(timeoutInMS)
, fSessionName(sessionName)
, fSegmentName("fmq_" + fSessionName + "_main")
, fManagementSegmentName("fmq_" + fSessionName + "_mng")
, fControlQueueName("fmq_" + fSessionName + "_cq")
, fShmId(shmId)
, fSegmentName("fmq_" + fShmId + "_main")
, fManagementSegmentName("fmq_" + fShmId + "_mng")
, fControlQueueName("fmq_" + fShmId + "_cq")
, fTerminating(false)
, fHeartbeatTriggered(false)
, fLastHeartbeat(chrono::high_resolution_clock::now())
Expand Down Expand Up @@ -201,7 +201,7 @@ void Monitor::Interactive()
break;
case 'x':
cout << "\n[x] --> closing shared memory:" << endl;
Cleanup(fSessionName);
Cleanup(fShmId);
break;
case 'h':
cout << "\n[h] --> help:" << endl << endl;
Expand Down Expand Up @@ -293,7 +293,7 @@ void Monitor::CheckSegment()
if (fHeartbeatTriggered && duration > fTimeoutInMS)
{
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
Cleanup(fSessionName);
Cleanup(fShmId);
fHeartbeatTriggered = false;
if (fSelfDestruct)
{
Expand Down Expand Up @@ -340,7 +340,7 @@ void Monitor::CheckSegment()

if (fIsDaemon && duration > fTimeoutInMS * 2)
{
Cleanup(fSessionName);
Cleanup(fShmId);
fHeartbeatTriggered = false;
if (fSelfDestruct)
{
Expand All @@ -360,9 +360,9 @@ void Monitor::CheckSegment()
}
}

void Monitor::Cleanup(const string& sessionName)
void Monitor::Cleanup(const string& shmId)
{
string managementSegmentName("fmq_" + sessionName + "_mng");
string managementSegmentName("fmq_" + shmId + "_mng");
try
{
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
Expand All @@ -373,8 +373,8 @@ void Monitor::Cleanup(const string& sessionName)
unsigned int regionCount = rc->fCount;
for (unsigned int i = 1; i <= regionCount; ++i)
{
RemoveObject("fmq_" + sessionName + "_rg_" + to_string(i));
RemoveQueue(string("fmq_" + sessionName + "_rgq_" + to_string(i)));
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i)));
}
}
else
Expand All @@ -389,9 +389,8 @@ void Monitor::Cleanup(const string& sessionName)
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
}

RemoveObject("fmq_" + sessionName + "_main");

boost::interprocess::named_mutex::remove(string("fmq_" + sessionName + "_mtx").c_str());
RemoveObject("fmq_" + shmId + "_main");
RemoveMutex("fmq_" + shmId + "_mtx");

cout << endl;
}
Expand Down Expand Up @@ -420,14 +419,26 @@ void Monitor::RemoveQueue(const string& name)
}
}

void Monitor::RemoveMutex(const string& name)
{
if (bipc::named_mutex::remove(name.c_str()))
{
cout << "Successfully removed \"" << name << "\"." << endl;
}
else
{
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
}
}

void Monitor::PrintQueues()
{
cout << '\n';

try
{
bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str());
StringVector* queues = segment.find<StringVector>(string("fmq_" + fSessionName + "_qs").c_str()).first;
StringVector* queues = segment.find<StringVector>(string("fmq_" + fShmId + "_qs").c_str()).first;
if (queues)
{
cout << "found " << queues->size() << " queue(s):" << endl;
Expand Down Expand Up @@ -500,7 +511,7 @@ Monitor::~Monitor()
}
if (fCleanOnExit)
{
Cleanup(fSessionName);
Cleanup(fShmId);
}
}

Expand Down
5 changes: 3 additions & 2 deletions fairmq/shmem/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Monitor
static void Cleanup(const std::string& sessionName);
static void RemoveObject(const std::string&);
static void RemoveQueue(const std::string&);
static void RemoveMutex(const std::string&);

private:
void PrintHeader();
Expand All @@ -55,7 +56,7 @@ class Monitor
bool fIsDaemon;
bool fCleanOnExit;
unsigned int fTimeoutInMS;
std::string fSessionName;
std::string fShmId;
std::string fSegmentName;
std::string fManagementSegmentName;
std::string fControlQueueName;
Expand All @@ -71,4 +72,4 @@ class Monitor
} // namespace mq
} // namespace fair

#endif /* FAIR_MQ_SHMEM_MONITOR_H_ */
#endif /* FAIR_MQ_SHMEM_MONITOR_H_ */
15 changes: 8 additions & 7 deletions fairmq/shmem/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ The shared memory monitor tool, supplied with the shared memory transport can be

With default arguments the monitor will run indefinitely with no output, and clean up shared memory segment if it is open and no heartbeats from devices arrive within a timeout period. It can be further customized with following parameters:

`--session <arg>`: customize the name of the shared memory segment via the session name (default is "default").
`--session <arg>`: for which session to run the monitor (default is "default"). The actual ressource names will be built out of session id, user id (hashed and truncated).
`--cleanup`: start monitor, perform cleanup of the memory and quit.
`--shmid <arg>`: if provided, this shmem id will be used instead of the one generated from session id. Use this if you know the name of the shared memory ressource, but do not have the used session id.
`--self-destruct`: run until the memory segment is closed (either naturally via cleanup performed by devices or in case of a crash (no heartbeats within timeout)).
`--interactive`: run interactively, with detailed segment details and user input for various shmem operations.
`--timeout <arg>`: specifiy the timeout for the heartbeats from shmem transports in milliseconds (default 5000).
Expand All @@ -27,9 +28,9 @@ The Monitor class can also be used independently from the supplied executable (b

FairMQ Shared Memory currently uses following names to register shared memory on the system:

`fmq_<sessionName>_main` - main segment name, used for user data (session name can be overridden via `--session`).
`fmq_<sessionName>_mng` - management segment name, used for storing management data.
`fmq_<sessionName>_cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
`fmq_<sessionName>_mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
`fmq_<sessionName>_rg_<index>` - names of unmanaged regions.
`fmq_<sessionName>_rgq_<index>` - names of queues for the unmanaged regions.
`fmq_<shmId>_main` - main segment name, used for user data (the shmId is generated out of session id and user id).
`fmq_<shmId>_mng` - management segment name, used for storing management data.
`fmq_<shmId>_cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
`fmq_<shmId>_mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
`fmq_<shmId>_rg_<index>` - names of unmanaged regions.
`fmq_<shmId>_rgq_<index>` - names of queues for the unmanaged regions.
23 changes: 15 additions & 8 deletions fairmq/shmem/runMonitor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Common.h>

#include <boost/program_options.hpp>

Expand All @@ -20,6 +21,7 @@

using namespace std;
using namespace boost::program_options;
using namespace fair::mq::shmem;

static void daemonize()
{
Expand Down Expand Up @@ -68,6 +70,7 @@ int main(int argc, char** argv)
try
{
string sessionName;
string shmId;
bool cleanup = false;
bool selfDestruct = false;
bool interactive = false;
Expand All @@ -77,7 +80,8 @@ int main(int argc, char** argv)

options_description desc("Options");
desc.add_options()
("session,s", value<string>(&sessionName)->default_value("default"), "Name of the session which to monitor")
("session,s", value<string>(&sessionName)->default_value("default"), "session id which to monitor")
("shmid", value<string>(&shmId)->default_value(""), "Shmem Id to monitor (if not provided, it is generated out of session id and user id)")
("cleanup,c", value<bool>(&cleanup)->implicit_value(true), "Perform cleanup and quit")
("self-destruct,x", value<bool>(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory")
("interactive,i", value<bool>(&interactive)->implicit_value(true), "Interactive run")
Expand All @@ -97,24 +101,27 @@ int main(int argc, char** argv)

notify(vm);

sessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS)

if (runAsDaemon)
{
daemonize();
}

if (shmId == "")
{
shmId = buildShmIdFromSessionIdAndUserId(sessionName);
}

if (cleanup)
{
cout << "Cleaning up \"" << sessionName << "\"..." << endl;
fair::mq::shmem::Monitor::Cleanup(sessionName);
fair::mq::shmem::Monitor::RemoveQueue("fmq_" + sessionName + "_cq");
cout << "Cleaning up \"" << shmId << "\"..." << endl;
Monitor::Cleanup(shmId);
Monitor::RemoveQueue("fmq_" + shmId + "_cq");
return 0;
}

cout << "Starting shared memory monitor for session: \"" << sessionName << "\"..." << endl;
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl;

fair::mq::shmem::Monitor monitor{sessionName, selfDestruct, interactive, timeoutInMS, runAsDaemon, cleanOnExit};
Monitor monitor{shmId, selfDestruct, interactive, timeoutInMS, runAsDaemon, cleanOnExit};

monitor.CatchSignals();
monitor.Run();
Expand Down

0 comments on commit e090967

Please sign in to comment.