Skip to content

Commit

Permalink
shm: remove UR queues on ResetContent
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed Feb 2, 2022
1 parent 4f9aeda commit 92a56c2
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions fairmq/shmem/Monitor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -553,15 +553,16 @@ std::pair<std::string, bool> Remove(const std::string& name, bool verbose)
}
}

std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, bool verbose /* = true */)
std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmIdT, bool verbose /* = true */)
{
std::string shmId = shmIdT.shmId;
std::vector<std::pair<std::string, bool>> result;

if (verbose) {
LOG(info) << "Cleaning up for shared memory id '" << shmId.shmId << "'...";
LOG(info) << "Cleaning up for shared memory id '" << shmId << "'...";
}

string managementSegmentName("fmq_" + shmId.shmId + "_mng");
string managementSegmentName("fmq_" + shmId + "_mng");
try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());

Expand All @@ -579,22 +580,21 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << ".";
}
if (!path.empty()) {
result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId.shmId + "_rg_" + to_string(id), verbose));
result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId + "_rg_" + to_string(id), verbose));
} else {
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId.shmId + "_rg_" + to_string(id), verbose));
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId + "_rg_" + to_string(id), verbose));
}
result.emplace_back(Remove<bipc::message_queue>("fmq_" + shmId.shmId + "_rgq_" + to_string(id), verbose));
result.emplace_back(Remove<bipc::message_queue>("fmq_" + shmId + "_rgq_" + to_string(id), verbose));
}
}

Uint16SegmentInfoHashMap* shmSegments = managementSegment.find<Uint16SegmentInfoHashMap>(bipc::unique_instance).first;

if (shmSegments) {
if (verbose) {
LOG(info) << "Found " << shmSegments->size() << " managed segments...";
}
for (const auto& segment : *shmSegments) {
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId.shmId + "_m_" + to_string(segment.first), verbose));
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId + "_m_" + to_string(segment.first), verbose));
}
} else {
if (verbose) {
Expand Down Expand Up @@ -637,41 +637,50 @@ std::vector<std::pair<std::string, bool>> Monitor::CleanupFull(const SessionId&
return CleanupFull(shmId, verbose);
}

void Monitor::ResetContent(const ShmId& shmId, bool verbose /* = true */)
void Monitor::ResetContent(const ShmId& shmIdT, bool verbose /* = true */)
{
std::string shmId = shmIdT.shmId;
if (verbose) {
cout << "Resetting segments content for shared memory id '" << shmId.shmId << "'..." << endl;
cout << "Resetting segments content for shared memory id '" << shmId << "'..." << endl;
}

string managementSegmentName("fmq_" + shmId.shmId + "_mng");
string managementSegmentName("fmq_" + shmId + "_mng");
try {
using namespace boost::interprocess;
managed_shared_memory managementSegment(open_only, managementSegmentName.c_str());

Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;

for (const auto& s : *segmentInfos) {
if (verbose) {
cout << "Resetting content of segment '" << "fmq_" << shmId.shmId << "_m_" << s.first << "'..." << endl;
cout << "Resetting content of segment '" << "fmq_" << shmId << "_m_" << s.first << "'..." << endl;
}
try {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str());
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, rbtree_best_fit<mutex_family, offset_ptr<void>>, null_index>(size);
} else {
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str());
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, simple_seq_fit<mutex_family, offset_ptr<void>>, null_index>(size);
}
} catch (bie& e) {
if (verbose) {
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl;
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl;
}
}
}

Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
if (shmRegions) {
for (const auto& region : *shmRegions) {
uint16_t id = region.first;
Remove<bipc::message_queue>("fmq_" + shmId + "_rgq_" + to_string(id), verbose);
}
}

} catch (bie& e) {
if (verbose) {
cout << "Could not find '" << managementSegmentName << "' segment. Nothing to cleanup." << endl;
Expand All @@ -680,7 +689,7 @@ void Monitor::ResetContent(const ShmId& shmId, bool verbose /* = true */)
}

if (verbose) {
cout << "Done resetting segment content for shared memory id '" << shmId.shmId << "'." << endl;
cout << "Done resetting segment content for shared memory id '" << shmId << "'." << endl;
}
}

Expand Down

0 comments on commit 92a56c2

Please sign in to comment.