diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index e88461423..5048a0bf5 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -9,8 +9,13 @@ #define FAIR_MQ_SHMEM_COMMON_H_ #include +#include #include +#include + +#include +#include namespace fair { @@ -73,6 +78,16 @@ struct RegionBlock size_t fHint; }; +// find id for unique shmem name: +// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)). +inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId) +{ + boost::hash stringHash; + std::string shmId(std::to_string(stringHash(std::string((std::to_string(geteuid()) + sessionId))))); + shmId.resize(8, '_'); + return shmId; +} + } // namespace shmem } // namespace mq } // namespace fair diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index 89697d076..72bf3b133 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -37,7 +37,7 @@ fair::mq::Transport FairMQTransportFactorySHM::fTransportType = fair::mq::Transp FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config) : FairMQTransportFactory(id) , fDeviceId(id) - , fSessionName("default") + , fShmId() , fContext(nullptr) , fHeartbeatThread() , fSendHeartbeats(true) @@ -58,12 +58,13 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai } int numIoThreads = 1; + string sessionName = "default"; size_t segmentSize = 2000000000; bool autolaunchMonitor = false; if (config) { numIoThreads = config->GetValue("io-threads"); - fSessionName = config->GetValue("session"); + sessionName = config->GetValue("session"); segmentSize = config->GetValue("shm-segment-size"); autolaunchMonitor = config->GetValue("shm-monitor"); } @@ -72,11 +73,11 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai LOG(debug) << "FairMQProgOptions not available! Using defaults."; } - fSessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS) + fShmId = buildShmIdFromSessionIdAndUserId(sessionName); try { - fShMutex = fair::mq::tools::make_unique(bipc::open_or_create, string("fmq_" + fSessionName + "_mtx").c_str()); + fShMutex = fair::mq::tools::make_unique(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str()); if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) { @@ -89,8 +90,8 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); } - fManager = fair::mq::tools::make_unique(fSessionName, segmentSize); - LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fSessionName << "_main" << "' of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes."; + fManager = fair::mq::tools::make_unique(fShmId, segmentSize); + LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes."; { bipc::scoped_lock lock(*fShMutex); @@ -158,7 +159,7 @@ void FairMQTransportFactorySHM::StartMonitor() if (!p.empty()) { - boost::process::spawn(p, "-x", "-s", fSessionName, "-d", "-t", "2000", env); + boost::process::spawn(p, "-x", "--shmid", fShmId, "-d", "-t", "2000", env); int numTries = 0; do { @@ -188,7 +189,7 @@ void FairMQTransportFactorySHM::StartMonitor() void FairMQTransportFactorySHM::SendHeartbeats() { - string controlQueueName("fmq_" + fSessionName + "_cq"); + string controlQueueName("fmq_" + fShmId + "_cq"); while (fSendHeartbeats) { try @@ -310,7 +311,7 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM() if (lastRemoved) { - boost::interprocess::named_mutex::remove(string("fmq_" + fSessionName + "_mtx").c_str()); + bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str()); } } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 2d092faf9..d98eeb7b4 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -60,7 +60,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory static fair::mq::Transport fTransportType; std::string fDeviceId; - std::string fSessionName; + std::string fShmId; void* fContext; std::thread fHeartbeatThread; std::atomic fSendHeartbeats; diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx index 05f3cf825..8c7ad56a6 100644 --- a/fairmq/shmem/Manager.cxx +++ b/fairmq/shmem/Manager.cxx @@ -105,7 +105,7 @@ void Manager::RemoveSegment() { if (bipc::shared_memory_object::remove(fSegmentName.c_str())) { - LOG(debug) << "successfully removed " << fSegmentName << " segment after the device has stopped."; + LOG(debug) << "successfully removed '" << fSegmentName << "' segment after the device has stopped."; } else { diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 792a97d59..97bd7dbf9 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -51,17 +51,17 @@ void signalHandler(int signal) gSignalStatus = signal; } -Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit) +Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit) : fSelfDestruct(selfDestruct) , fInteractive(interactive) , fSeenOnce(false) , fIsDaemon(runAsDaemon) , fCleanOnExit(cleanOnExit) , fTimeoutInMS(timeoutInMS) - , fSessionName(sessionName) - , fSegmentName("fmq_" + fSessionName + "_main") - , fManagementSegmentName("fmq_" + fSessionName + "_mng") - , fControlQueueName("fmq_" + fSessionName + "_cq") + , fShmId(shmId) + , fSegmentName("fmq_" + fShmId + "_main") + , fManagementSegmentName("fmq_" + fShmId + "_mng") + , fControlQueueName("fmq_" + fShmId + "_cq") , fTerminating(false) , fHeartbeatTriggered(false) , fLastHeartbeat(chrono::high_resolution_clock::now()) @@ -201,7 +201,7 @@ void Monitor::Interactive() break; case 'x': cout << "\n[x] --> closing shared memory:" << endl; - Cleanup(fSessionName); + Cleanup(fShmId); break; case 'h': cout << "\n[h] --> help:" << endl << endl; @@ -293,7 +293,7 @@ void Monitor::CheckSegment() if (fHeartbeatTriggered && duration > fTimeoutInMS) { cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl; - Cleanup(fSessionName); + Cleanup(fShmId); fHeartbeatTriggered = false; if (fSelfDestruct) { @@ -340,7 +340,7 @@ void Monitor::CheckSegment() if (fIsDaemon && duration > fTimeoutInMS * 2) { - Cleanup(fSessionName); + Cleanup(fShmId); fHeartbeatTriggered = false; if (fSelfDestruct) { @@ -360,9 +360,9 @@ void Monitor::CheckSegment() } } -void Monitor::Cleanup(const string& sessionName) +void Monitor::Cleanup(const string& shmId) { - string managementSegmentName("fmq_" + sessionName + "_mng"); + string managementSegmentName("fmq_" + shmId + "_mng"); try { bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); @@ -373,8 +373,8 @@ void Monitor::Cleanup(const string& sessionName) unsigned int regionCount = rc->fCount; for (unsigned int i = 1; i <= regionCount; ++i) { - RemoveObject("fmq_" + sessionName + "_rg_" + to_string(i)); - RemoveQueue(string("fmq_" + sessionName + "_rgq_" + to_string(i))); + RemoveObject("fmq_" + shmId + "_rg_" + to_string(i)); + RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i))); } } else @@ -389,9 +389,8 @@ void Monitor::Cleanup(const string& sessionName) cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl; } - RemoveObject("fmq_" + sessionName + "_main"); - - boost::interprocess::named_mutex::remove(string("fmq_" + sessionName + "_mtx").c_str()); + RemoveObject("fmq_" + shmId + "_main"); + RemoveMutex("fmq_" + shmId + "_mtx"); cout << endl; } @@ -420,6 +419,18 @@ void Monitor::RemoveQueue(const string& name) } } +void Monitor::RemoveMutex(const string& name) +{ + if (bipc::named_mutex::remove(name.c_str())) + { + cout << "Successfully removed \"" << name << "\"." << endl; + } + else + { + cout << "Did not remove \"" << name << "\". Already removed?" << endl; + } +} + void Monitor::PrintQueues() { cout << '\n'; @@ -427,7 +438,7 @@ void Monitor::PrintQueues() try { bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); - StringVector* queues = segment.find(string("fmq_" + fSessionName + "_qs").c_str()).first; + StringVector* queues = segment.find(string("fmq_" + fShmId + "_qs").c_str()).first; if (queues) { cout << "found " << queues->size() << " queue(s):" << endl; @@ -500,7 +511,7 @@ Monitor::~Monitor() } if (fCleanOnExit) { - Cleanup(fSessionName); + Cleanup(fShmId); } } diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index d0d0527bd..6a323f274 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -39,6 +39,7 @@ class Monitor static void Cleanup(const std::string& sessionName); static void RemoveObject(const std::string&); static void RemoveQueue(const std::string&); + static void RemoveMutex(const std::string&); private: void PrintHeader(); @@ -55,7 +56,7 @@ class Monitor bool fIsDaemon; bool fCleanOnExit; unsigned int fTimeoutInMS; - std::string fSessionName; + std::string fShmId; std::string fSegmentName; std::string fManagementSegmentName; std::string fControlQueueName; @@ -71,4 +72,4 @@ class Monitor } // namespace mq } // namespace fair -#endif /* FAIR_MQ_SHMEM_MONITOR_H_ */ \ No newline at end of file +#endif /* FAIR_MQ_SHMEM_MONITOR_H_ */ diff --git a/fairmq/shmem/README.md b/fairmq/shmem/README.md index c4a36a255..15e019df3 100644 --- a/fairmq/shmem/README.md +++ b/fairmq/shmem/README.md @@ -12,8 +12,9 @@ The shared memory monitor tool, supplied with the shared memory transport can be With default arguments the monitor will run indefinitely with no output, and clean up shared memory segment if it is open and no heartbeats from devices arrive within a timeout period. It can be further customized with following parameters: - `--session `: customize the name of the shared memory segment via the session name (default is "default"). + `--session `: for which session to run the monitor (default is "default"). The actual ressource names will be built out of session id, user id (hashed and truncated). `--cleanup`: start monitor, perform cleanup of the memory and quit. + `--shmid `: if provided, this shmem id will be used instead of the one generated from session id. Use this if you know the name of the shared memory ressource, but do not have the used session id. `--self-destruct`: run until the memory segment is closed (either naturally via cleanup performed by devices or in case of a crash (no heartbeats within timeout)). `--interactive`: run interactively, with detailed segment details and user input for various shmem operations. `--timeout `: specifiy the timeout for the heartbeats from shmem transports in milliseconds (default 5000). @@ -27,9 +28,9 @@ The Monitor class can also be used independently from the supplied executable (b FairMQ Shared Memory currently uses following names to register shared memory on the system: -`fmq__main` - main segment name, used for user data (session name can be overridden via `--session`). -`fmq__mng` - management segment name, used for storing management data. -`fmq__cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments). -`fmq__mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments). -`fmq__rg_` - names of unmanaged regions. -`fmq__rgq_` - names of queues for the unmanaged regions. +`fmq__main` - main segment name, used for user data (the shmId is generated out of session id and user id). +`fmq__mng` - management segment name, used for storing management data. +`fmq__cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments). +`fmq__mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments). +`fmq__rg_` - names of unmanaged regions. +`fmq__rgq_` - names of queues for the unmanaged regions. diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index f06a070c1..d06fc9621 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -6,6 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ #include +#include #include @@ -20,6 +21,7 @@ using namespace std; using namespace boost::program_options; +using namespace fair::mq::shmem; static void daemonize() { @@ -68,6 +70,7 @@ int main(int argc, char** argv) try { string sessionName; + string shmId; bool cleanup = false; bool selfDestruct = false; bool interactive = false; @@ -77,7 +80,8 @@ int main(int argc, char** argv) options_description desc("Options"); desc.add_options() - ("session,s", value(&sessionName)->default_value("default"), "Name of the session which to monitor") + ("session,s", value(&sessionName)->default_value("default"), "session id which to monitor") + ("shmid", value(&shmId)->default_value(""), "Shmem Id to monitor (if not provided, it is generated out of session id and user id)") ("cleanup,c", value(&cleanup)->implicit_value(true), "Perform cleanup and quit") ("self-destruct,x", value(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory") ("interactive,i", value(&interactive)->implicit_value(true), "Interactive run") @@ -97,24 +101,27 @@ int main(int argc, char** argv) notify(vm); - sessionName.resize(8, '_'); // shorten the session name, to accommodate for name size limit on some systems (MacOS) - if (runAsDaemon) { daemonize(); } + if (shmId == "") + { + shmId = buildShmIdFromSessionIdAndUserId(sessionName); + } + if (cleanup) { - cout << "Cleaning up \"" << sessionName << "\"..." << endl; - fair::mq::shmem::Monitor::Cleanup(sessionName); - fair::mq::shmem::Monitor::RemoveQueue("fmq_" + sessionName + "_cq"); + cout << "Cleaning up \"" << shmId << "\"..." << endl; + Monitor::Cleanup(shmId); + Monitor::RemoveQueue("fmq_" + shmId + "_cq"); return 0; } - cout << "Starting shared memory monitor for session: \"" << sessionName << "\"..." << endl; + cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl; - fair::mq::shmem::Monitor monitor{sessionName, selfDestruct, interactive, timeoutInMS, runAsDaemon, cleanOnExit}; + Monitor monitor{shmId, selfDestruct, interactive, timeoutInMS, runAsDaemon, cleanOnExit}; monitor.CatchSignals(); monitor.Run();