diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 85e4f0bb8..27eef548f 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -327,7 +327,6 @@ class Manager fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc)); - r.first->second->InitializeQueues(); r.first->second->StartReceivingAcks(); result.first = &(r.first->second->fRegion); result.second = id; diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 77abf9e08..d4af19f09 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -309,8 +309,6 @@ class Message final : public fair::mq::Message } if (fRegionPtr) { - fRegionPtr->InitializeQueues(); - fRegionPtr->StartSendingAcks(); fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint}); } else { LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack"; diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 9a4ab7198..419453c20 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -104,6 +104,10 @@ struct Region } } + + InitializeQueues(); + StartSendingAcks(); + LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")"; } @@ -116,21 +120,17 @@ struct Region { using namespace boost::interprocess; - if (fQueue == nullptr) { - if (fRemote) { - fQueue = std::make_unique(open_only, fQueueName.c_str()); - } else { - fQueue = std::make_unique(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); - } - LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")"; + if (fRemote) { + fQueue = std::make_unique(open_only, fQueueName.c_str()); + } else { + fQueue = std::make_unique(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); } + LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")"; } void StartSendingAcks() { - if (!fAcksSender.joinable()) { - fAcksSender = std::thread(&Region::SendAcks, this); - } + fAcksSender = std::thread(&Region::SendAcks, this); } void SendAcks() {