Skip to content

Commit

Permalink
shm: revert some changes from c85d6e0 that introduced a race
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed May 19, 2021
1 parent 021c1b1 commit 9bf908f
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 13 deletions.
1 change: 0 additions & 1 deletion fairmq/shmem/Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ class Manager

fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));

r.first->second->InitializeQueues();
r.first->second->StartReceivingAcks();
result.first = &(r.first->second->fRegion);
result.second = id;
Expand Down
2 changes: 0 additions & 2 deletions fairmq/shmem/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,6 @@ class Message final : public fair::mq::Message
}

if (fRegionPtr) {
fRegionPtr->InitializeQueues();
fRegionPtr->StartSendingAcks();
fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
} else {
LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack";
Expand Down
20 changes: 10 additions & 10 deletions fairmq/shmem/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ struct Region
}
}


InitializeQueues();
StartSendingAcks();

LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")";
}

Expand All @@ -116,21 +120,17 @@ struct Region
{
using namespace boost::interprocess;

if (fQueue == nullptr) {
if (fRemote) {
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
} else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
}
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
if (fRemote) {
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
} else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
}
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
}

void StartSendingAcks()
{
if (!fAcksSender.joinable()) {
fAcksSender = std::thread(&Region::SendAcks, this);
}
fAcksSender = std::thread(&Region::SendAcks, this);
}
void SendAcks()
{
Expand Down

0 comments on commit 9bf908f

Please sign in to comment.