Skip to content

Commit

Permalink
shm: check result of region acquisition
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed May 7, 2021
1 parent 2ca62d0 commit 8a2641d
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 16 deletions.
4 changes: 2 additions & 2 deletions examples/region/Sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ void Sampler::InitTask()
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");

fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(info) << "Region event: " << info.event
<< ", managed: " << info.managed
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
<< ", ptr: " << info.ptr
<< ", size: " << info.size
Expand Down
4 changes: 2 additions & 2 deletions examples/region/Sink.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ void Sink::InitTask()
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(info) << "Region event: " << info.event
<< ", managed: " << info.managed
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
<< ", ptr: " << info.ptr
<< ", size: " << info.size
Expand Down
4 changes: 2 additions & 2 deletions examples/region/fairmq-start-ex-region.sh.in
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10"
SAMPLER+=" --transport shmem"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &

SINK="fairmq-ex-region-sink"
SINK+=" --id sink1"
SINK+=" --severity debug"
SINK+=" --transport shmem"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &
11 changes: 7 additions & 4 deletions fairmq/shmem/Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ class Manager
fRegionEventsCV.notify_all();

return result;

} catch (interprocess_exception& e) {
LOG(error) << "cannot create region. Already created/not cleaned up?";
LOG(error) << e.what();
Expand Down Expand Up @@ -377,7 +376,7 @@ class Manager
LOG(error) << oor.what();
return nullptr;
} catch (boost::interprocess::interprocess_exception& e) {
LOG(warn) << "Could not get remote region for id '" << id << "'";
LOG(error) << "Could not get remote region for id '" << id << "': " << e.what();
return nullptr;
}
}
Expand Down Expand Up @@ -413,8 +412,12 @@ class Manager
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (!e.second.fDestroyed) {
auto region = GetRegionUnsafe(info.id);
info.ptr = region->fRegion.get_address();
info.size = region->fRegion.get_size();
if (region) {
info.ptr = region->fRegion.get_address();
info.size = region->fRegion.get_size();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
}
} else {
info.ptr = nullptr;
info.size = 0;
Expand Down
22 changes: 16 additions & 6 deletions fairmq/shmem/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,23 @@ struct Region
LOG(debug) << "shmem: initialized file: " << fName;
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
} else {
if (fRemote) {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
} else {
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
try {
if (fRemote) {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
} else {
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
}
} catch(interprocess_exception& e) {
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what();
throw;
}
try {
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
} catch(interprocess_exception& e) {
LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what();
throw;
}
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
}

InitializeQueues();
Expand Down

0 comments on commit 8a2641d

Please sign in to comment.