Skip to content

Commit

Permalink
consolidate UnmanagedRegion options
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx authored and dennisklein committed Dec 16, 2021
1 parent acfb495 commit d630fbb
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 74 deletions.
12 changes: 5 additions & 7 deletions examples/region/sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,20 @@ struct Sampler : fair::mq::Device
<< ", flags: " << info.flags;
});

fair::mq::RegionConfig regionCfg;
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
regionCfg.lock = true; // mlock region after creation
regionCfg.zero = true; // zero region content after creation
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
std::lock_guard<std::mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size();

if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks";
}
},
"", // path, if a region is backed by a file
0, // flags that are passed for region creation
fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory }
));
fRegion->SetLinger(fLinger);
}, regionCfg));
}

bool ConditionalRun() override
Expand Down
23 changes: 21 additions & 2 deletions fairmq/TransportFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,15 @@ class TransportFactory
/// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion
// [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]]
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
RegionCallback callback = nullptr,
const std::string& path = "",
int flags = 0,
RegionConfig cfg = RegionConfig()) = 0;
// [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]]
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
RegionBulkCallback callback = nullptr,
RegionBulkCallback bulkCallback = nullptr,
const std::string& path = "",
int flags = 0,
RegionConfig cfg = RegionConfig()) = 0;
Expand All @@ -128,19 +130,36 @@ class TransportFactory
/// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion
// [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]]
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
int64_t userFlags,
RegionCallback callback = nullptr,
const std::string& path = "",
int flags = 0,
RegionConfig cfg = RegionConfig()) = 0;
// [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]]
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
int64_t userFlags,
RegionBulkCallback callback = nullptr,
RegionBulkCallback bulkCallback = nullptr,
const std::string& path = "",
int flags = 0,
RegionConfig cfg = RegionConfig()) = 0;


/// @brief Create new UnmanagedRegion
/// @param size size of the region
/// @param callback callback to be called when a message belonging to this region is no longer needed by the transport
/// @param cfg region configuration
/// @return pointer to UnmanagedRegion
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg) = 0;

/// @brief Create new UnmanagedRegion
/// @param size size of the region
/// @param bulkCallback callback to be called when message(s) belonging to this region is no longer needed by the transport
/// @param cfg region configuration
/// @return pointer to UnmanagedRegion
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback, RegionConfig cfg) = 0;

/// @brief Subscribe to region events (creation, destruction, ...)
/// @param callback the callback that is called when a region event occurs
virtual void SubscribeToRegionEvents(RegionEventCallback callback) = 0;
Expand Down
21 changes: 14 additions & 7 deletions fairmq/UnmanagedRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
#ifndef FAIR_MQ_UNMANAGEDREGION_H
#define FAIR_MQ_UNMANAGEDREGION_H

#include <fairmq/Transports.h>

#include <cstddef> // size_t
#include <cstdint> // uint32_t
#include <fairmq/Transports.h>

#include <functional> // std::function
#include <memory> // std::unique_ptr
#include <ostream> // std::ostream
#include <ostream>
#include <string>
#include <vector>

namespace fair::mq {
Expand Down Expand Up @@ -119,13 +122,17 @@ struct RegionConfig
{
RegionConfig() = default;

RegionConfig(bool l, bool z)
: lock(l)
, zero(z)
RegionConfig(bool _lock, bool _zero)
: lock(_lock)
, zero(_zero)
{}

bool lock = false;
bool zero = false;
bool lock = false; /// mlock region after creation
bool zero = false; /// zero region content after creation
int creationFlags = 0; /// flags passed to the underlying transport on region creation
int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user
std::string path = ""; /// file path, if the region is backed by a file
uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events
};

} // namespace fair::mq
Expand Down
16 changes: 16 additions & 0 deletions fairmq/ofi/TransportFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,22 @@ struct TransportFactory final : mq::TransportFactory
throw std::runtime_error("Not yet implemented UMR.");
}

auto CreateUnmanagedRegion(std::size_t /*size*/,
RegionCallback /*callback*/,
RegionConfig /*cfg*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}

auto CreateUnmanagedRegion(std::size_t /*size*/,
RegionBulkCallback /*callback*/,
RegionConfig /*cfg*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}

auto SubscribeToRegionEvents(RegionEventCallback /*callback*/) -> void override
{
throw std::runtime_error("Not yet implemented.");
Expand Down
6 changes: 3 additions & 3 deletions fairmq/shmem/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ struct RegionInfo
{
RegionInfo(const VoidAlloc& alloc)
: fPath("", alloc)
, fFlags(0)
, fCreationFlags(0)
, fUserFlags(0)
, fDestroyed(false)
{}

RegionInfo(const char* path, const int flags, const uint64_t userFlags, const VoidAlloc& alloc)
: fPath(path, alloc)
, fFlags(flags)
, fCreationFlags(flags)
, fUserFlags(userFlags)
, fDestroyed(false)
{}

Str fPath;
int fFlags;
int fCreationFlags;
uint64_t fUserFlags;
bool fDestroyed;
};
Expand Down
29 changes: 13 additions & 16 deletions fairmq/shmem/Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
#include "Common.h"
#include "Monitor.h"
#include "Region.h"

#include <fairmq/Message.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/tools/Strings.h>
#include <fairmq/Transports.h>
#include <FairMQLogger.h>
#include <FairMQMessage.h>

#include <fairlogger/Logger.h>

#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/filesystem.hpp>
Expand Down Expand Up @@ -369,19 +369,15 @@ class Manager
bool Interrupted() { return fInterrupted.load(); }

std::pair<boost::interprocess::mapped_region*, uint16_t> CreateRegion(const size_t size,
const int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
const std::string& path,
int flags,
fair::mq::RegionConfig cfg)
RegionConfig cfg)
{
using namespace boost::interprocess;
try {
std::pair<mapped_region*, uint16_t> result;

{
uint16_t id = 0;
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);

RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
Expand All @@ -396,16 +392,16 @@ class Manager
LOG(debug) << "initialized region counter with: " << rc->fCount;
}

id = rc->fCount;
uint16_t id = rc->fCount;

auto it = fRegions.find(id);
if (it != fRegions.end()) {
LOG(error) << "Trying to create a region that already exists";
return {nullptr, id};
}

auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, cfg));
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";

if (cfg.lock) {
LOG(debug) << "Locking region " << id << "...";
Expand All @@ -421,7 +417,7 @@ class Manager
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}

fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
fShmRegions->emplace(id, RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, fShmVoidAlloc));

r.first->second->StartReceivingAcks();
result.first = &(r.first->second->fRegion);
Expand Down Expand Up @@ -476,11 +472,12 @@ class Manager
try {
// get region info
RegionInfo regionInfo = fShmRegions->at(id);
std::string path = regionInfo.fPath.c_str();
int flags = regionInfo.fFlags;
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
RegionConfig cfg;
cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str();
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";

auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, 0, true, nullptr, nullptr, path, flags));
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, 0, true, nullptr, nullptr, std::move(cfg)));
return r.first->second.get();
} catch (std::out_of_range& oor) {
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
Expand Down
7 changes: 4 additions & 3 deletions fairmq/shmem/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
#include "Manager.h"
#include "Region.h"
#include "UnmanagedRegion.h"
#include <FairMQLogger.h>
#include <FairMQMessage.h>
#include <FairMQUnmanagedRegion.h>
#include <fairmq/Message.h>
#include <fairmq/UnmanagedRegion.h>

#include <fairlogger/Logger.h>

#include <boost/interprocess/mapped_region.hpp>

Expand Down
2 changes: 1 addition & 1 deletion fairmq/shmem/Monitor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
if (m != nullptr) {
RegionInfo ri = m->at(i);
string path = ri.fPath.c_str();
int flags = ri.fFlags;
int flags = ri.fCreationFlags;
if (verbose) {
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << ".";
}
Expand Down
23 changes: 11 additions & 12 deletions fairmq/shmem/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
#define FAIR_MQ_SHMEM_REGION_H_

#include "Common.h"

#include <FairMQLogger.h>
#include <FairMQUnmanagedRegion.h>
#include <fairmq/UnmanagedRegion.h>
#include <fairmq/tools/Strings.h>

#include <fairlogger/Logger.h>

#include <boost/filesystem.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
Expand All @@ -38,9 +38,9 @@ namespace fair::mq::shmem

struct Region
{
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, RegionConfig cfg)
: fRemote(remote)
, fLinger(100)
, fLinger(cfg.linger)
, fStopAcks(false)
, fName("fmq_" + shmId + "_rg_" + std::to_string(id))
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
Expand All @@ -53,8 +53,8 @@ struct Region
{
using namespace boost::interprocess;

if (!path.empty()) {
fName = std::string(path + fName);
if (!cfg.path.empty()) {
fName = std::string(cfg.path + fName);

if (!fRemote) {
// create a file
Expand All @@ -75,7 +75,7 @@ struct Region
}
fFileMapping = file_mapping(fName.c_str(), read_write);
LOG(debug) << "shmem: initialized file: " << fName;
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags);
} else {
try {
if (fRemote) {
Expand All @@ -84,19 +84,18 @@ struct Region
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
}
} catch(interprocess_exception& e) {
} catch (interprocess_exception& e) {
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what();
throw;
}
try {
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
} catch(interprocess_exception& e) {
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags);
} catch (interprocess_exception& e) {
LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what();
throw;
}
}


InitializeQueues();
StartSendingAcks();

Expand Down
8 changes: 4 additions & 4 deletions fairmq/shmem/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
#include "Common.h"
#include "Manager.h"
#include "Message.h"

#include <FairMQSocket.h>
#include <FairMQMessage.h>
#include <FairMQLogger.h>
#include <fairmq/Socket.h>
#include <fairmq/Message.h>
#include <fairmq/tools/Strings.h>
#include <fairmq/zeromq/Common.h>

#include <fairlogger/Logger.h>

#include <zmq.h>

#include <atomic>
Expand Down
Loading

0 comments on commit d630fbb

Please sign in to comment.