From 92a56c26bcf0329ac187e9b9bb48cc02dece7ae4 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Sun, 30 Jan 2022 13:07:00 +0100 Subject: [PATCH] shm: remove UR queues on ResetContent --- fairmq/shmem/Monitor.cxx | 43 ++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 38ad76e38..d09e4a98c 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -553,15 +553,16 @@ std::pair Remove(const std::string& name, bool verbose) } } -std::vector> Monitor::Cleanup(const ShmId& shmId, bool verbose /* = true */) +std::vector> Monitor::Cleanup(const ShmId& shmIdT, bool verbose /* = true */) { + std::string shmId = shmIdT.shmId; std::vector> result; if (verbose) { - LOG(info) << "Cleaning up for shared memory id '" << shmId.shmId << "'..."; + LOG(info) << "Cleaning up for shared memory id '" << shmId << "'..."; } - string managementSegmentName("fmq_" + shmId.shmId + "_mng"); + string managementSegmentName("fmq_" + shmId + "_mng"); try { bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); @@ -579,22 +580,21 @@ std::vector> Monitor::Cleanup(const ShmId& shmId, b LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << "."; } if (!path.empty()) { - result.emplace_back(Remove(path + "fmq_" + shmId.shmId + "_rg_" + to_string(id), verbose)); + result.emplace_back(Remove(path + "fmq_" + shmId + "_rg_" + to_string(id), verbose)); } else { - result.emplace_back(Remove("fmq_" + shmId.shmId + "_rg_" + to_string(id), verbose)); + result.emplace_back(Remove("fmq_" + shmId + "_rg_" + to_string(id), verbose)); } - result.emplace_back(Remove("fmq_" + shmId.shmId + "_rgq_" + to_string(id), verbose)); + result.emplace_back(Remove("fmq_" + shmId + "_rgq_" + to_string(id), verbose)); } } Uint16SegmentInfoHashMap* shmSegments = managementSegment.find(bipc::unique_instance).first; - if (shmSegments) { if (verbose) { LOG(info) << "Found " << shmSegments->size() << " managed segments..."; } for (const auto& segment : *shmSegments) { - result.emplace_back(Remove("fmq_" + shmId.shmId + "_m_" + to_string(segment.first), verbose)); + result.emplace_back(Remove("fmq_" + shmId + "_m_" + to_string(segment.first), verbose)); } } else { if (verbose) { @@ -637,41 +637,50 @@ std::vector> Monitor::CleanupFull(const SessionId& return CleanupFull(shmId, verbose); } -void Monitor::ResetContent(const ShmId& shmId, bool verbose /* = true */) +void Monitor::ResetContent(const ShmId& shmIdT, bool verbose /* = true */) { + std::string shmId = shmIdT.shmId; if (verbose) { - cout << "Resetting segments content for shared memory id '" << shmId.shmId << "'..." << endl; + cout << "Resetting segments content for shared memory id '" << shmId << "'..." << endl; } - string managementSegmentName("fmq_" + shmId.shmId + "_mng"); + string managementSegmentName("fmq_" + shmId + "_mng"); try { using namespace boost::interprocess; managed_shared_memory managementSegment(open_only, managementSegmentName.c_str()); Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; - for (const auto& s : *segmentInfos) { if (verbose) { - cout << "Resetting content of segment '" << "fmq_" << shmId.shmId << "_m_" << s.first << "'..." << endl; + cout << "Resetting content of segment '" << "fmq_" << shmId << "_m_" << s.first << "'..." << endl; } try { if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { - RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()); + RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str()); void* ptr = segment.get_segment_manager(); size_t size = segment.get_segment_manager()->get_size(); new(ptr) segment_manager>, null_index>(size); } else { - SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()); + SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str()); void* ptr = segment.get_segment_manager(); size_t size = segment.get_segment_manager()->get_size(); new(ptr) segment_manager>, null_index>(size); } } catch (bie& e) { if (verbose) { - cout << "Error resetting content of segment '" << std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl; + cout << "Error resetting content of segment '" << std::string("fmq_" + shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl; } } } + + Uint16RegionInfoHashMap* shmRegions = managementSegment.find(bipc::unique_instance).first; + if (shmRegions) { + for (const auto& region : *shmRegions) { + uint16_t id = region.first; + Remove("fmq_" + shmId + "_rgq_" + to_string(id), verbose); + } + } + } catch (bie& e) { if (verbose) { cout << "Could not find '" << managementSegmentName << "' segment. Nothing to cleanup." << endl; @@ -680,7 +689,7 @@ void Monitor::ResetContent(const ShmId& shmId, bool verbose /* = true */) } if (verbose) { - cout << "Done resetting segment content for shared memory id '" << shmId.shmId << "'." << endl; + cout << "Done resetting segment content for shared memory id '" << shmId << "'." << endl; } }