Skip to content

Commit

Permalink
shm: when refCount segment size is zero, fallback to old behaviour
Browse files Browse the repository at this point in the history
, which is to store reference counts inside the main data segment
  • Loading branch information
rbx committed Nov 29, 2023
1 parent 05a2ae6 commit 2df3d90
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 25 deletions.
1 change: 1 addition & 0 deletions examples/region/fairmq-start-ex-region-advanced.sh.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport"
SAMPLER+=" --rc-segment-size 0"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
Expand Down
3 changes: 2 additions & 1 deletion examples/region/keep-alive.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ struct ShmManager
uint64_t size = stoull(conf.at(1));
fair::mq::RegionConfig cfg;
cfg.id = id;
cfg.rcSegmentSize = 0;
cfg.size = size;
regionCfgs.push_back(cfg);

auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, cfg));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize()
<< ", starting at " << region.GetData() << ". Locking...";
Expand Down
60 changes: 43 additions & 17 deletions fairmq/shmem/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,12 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
}
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get();
if (fRegionPtr->fRcSegmentSize > 0) {
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get();
} else {
fManager.GetSegment(fSegmentId);
return ShmHeader::RefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
}
}

void Copy(const fair::mq::Message& other) override
Expand All @@ -277,19 +282,29 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fRegionId));
}
if (otherMsg.fShared < 0) {
// UR msg not yet shared, create the reference counting object with count 2
try {
otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
} catch (boost::interprocess::bad_alloc& ba) {
throw RefCountBadAlloc(tools::ToString(
"Insufficient space in the reference count segment ",
otherMsg.fRegionId,
", original exception: bad_alloc: ",
ba.what()));
if (fRegionPtr->fRcSegmentSize > 0) {
if (otherMsg.fShared < 0) {
// UR msg not yet shared, create the reference counting object with count 2
try {
otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
} catch (boost::interprocess::bad_alloc& ba) {
throw RefCountBadAlloc(tools::ToString("Insufficient space in the reference count segment ", otherMsg.fRegionId, ", original exception: bad_alloc: ", ba.what()));
}
} else {
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
}
} else { // if RefCount segment size is 0, store the ref count in the managed segment
if (otherMsg.fShared < 0) { // if UR msg is not yet shared
char* ptr = fManager.Allocate(2, 0);
// point the fShared in the unmanaged region message to the refCount holder
otherMsg.fShared = fManager.GetHandleFromAddress(ptr, fSegmentId);
// the message needs to be able to locate in which segment the refCount is stored
otherMsg.fSegmentId = fSegmentId;
ShmHeader::IncrementRefCount(ptr);
} else { // if the UR msg is already shared
fManager.GetSegment(otherMsg.fSegmentId);
ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(otherMsg.fShared, otherMsg.fSegmentId));
}
} else {
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
}
}

Expand Down Expand Up @@ -357,10 +372,21 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
}
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement();
if (refCount == 1) {
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared)));
ReleaseUnmanagedRegionBlock();
if (fRegionPtr->fRcSegmentSize > 0) {
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement();
if (refCount == 1) {
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared)));
ReleaseUnmanagedRegionBlock();
}
} else { // if RefCount segment size is 0, get the ref count from the managed segment
// make sure segment is initialized in this transport
fManager.GetSegment(fSegmentId);
// release unmanaged region block if ref count is one
uint16_t refCount = ShmHeader::DecrementRefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
if (refCount == 1) {
fManager.Deallocate(fShared, fSegmentId);
ReleaseUnmanagedRegionBlock();
}
}
} else {
ReleaseUnmanagedRegionBlock();
Expand Down
8 changes: 5 additions & 3 deletions fairmq/shmem/UnmanagedRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct UnmanagedRegion
, fShmemObject()
, fFile(nullptr)
, fFileMapping()
, fRcSegmentSize(cfg.rcSegmentSize)
, fQueue(nullptr)
, fCallback(nullptr)
, fBulkCallback(nullptr)
Expand Down Expand Up @@ -146,13 +147,13 @@ struct UnmanagedRegion
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}

InitializeRefCountSegment(cfg.rcSegmentSize);
InitializeRefCountSegment(fRcSegmentSize);

if (fControlling && created) {
Register(shmId, cfg);
}

LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << "), refCount segment size: " << fRcSegmentSize;
}

UnmanagedRegion() = delete;
Expand Down Expand Up @@ -266,6 +267,7 @@ struct UnmanagedRegion
std::condition_variable fBlockSendCV;
std::vector<RegionBlock> fBlocksToFree;
const std::size_t fAckBunchSize = 256;
uint64_t fRcSegmentSize;
std::unique_ptr<boost::interprocess::message_queue> fQueue;
std::unique_ptr<boost::interprocess::managed_shared_memory> fRefCountSegment;
std::unique_ptr<RefCountPool> fRefCountPool;
Expand Down Expand Up @@ -321,7 +323,7 @@ struct UnmanagedRegion
void InitializeRefCountSegment(uint64_t size)
{
using namespace boost::interprocess;
if (!fRefCountSegment) {
if (!fRefCountSegment && size > 0) {
fRefCountSegment = std::make_unique<managed_shared_memory>(open_or_create, fRefCountSegmentName.c_str(), size);
LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName;
fRefCountPool = std::make_unique<RefCountPool>(fRefCountSegment->get_segment_manager());
Expand Down
22 changes: 18 additions & 4 deletions test/message/_message.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,10 @@ auto ZeroCopy(bool expandedShmMetadata = false) -> void

// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = false) -> void
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata, uint64_t rcSegmentSize) -> void
{
fair::Logger::SetConsoleSeverity(fair::Severity::debug);

ProgOptions config1;
ProgOptions config2;
string session(tools::Uuid());
Expand All @@ -311,11 +313,13 @@ auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = fal

const size_t msgSize{100};
const size_t regionSize{1000000};
RegionConfig cfg;
cfg.rcSegmentSize = rcSegmentSize;
tools::Semaphore blocker;

auto region = factory1->CreateUnmanagedRegion(regionSize, [&blocker](void*, size_t, void*) {
blocker.Signal();
});
}, cfg);

{
Channel push("Push", "push", factory1);
Expand Down Expand Up @@ -461,12 +465,22 @@ TEST(ZeroCopy, shmem_expanded_metadata) // NOLINT

TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged");
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", false, 10000000);
}

TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", true);
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded", true, 10000000);
}

TEST(ZeroCopyFromUnmanaged, shmem_no_rc_segment) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_no_rc_segment", false, 0);
}

TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata_no_rc_segment) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded_no_rc_segment", true, 0);
}

} // namespace

0 comments on commit 2df3d90

Please sign in to comment.