Skip to content

Commit

Permalink
add mlock/zero options to unmanaged region
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed May 7, 2021
1 parent c85d6e0 commit 857aa84
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 40 deletions.
11 changes: 7 additions & 4 deletions examples/region/Sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,20 @@ void Sampler::InitTask()
<< ", flags: " << info.flags;
});

fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
0,
10000000,
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
lock_guard<mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size();

if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks";
}
}
},
"", // path, if a region is backed by a file
0, // flags that are passed for region creation
fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory }
));
fRegion->SetLinger(fLinger);
}
Expand Down
8 changes: 4 additions & 4 deletions fairmq/FairMQTransportFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ class FairMQTransportFactory
/// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
/// @brief Create new UnmanagedRegion
/// @param size size of the region
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
/// @param callback callback to be called when a message belonging to this region is no longer needed by the transport
/// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;

/// @brief Subscribe to region events (creation, destruction, ...)
/// @param callback the callback that is called when a region event occurs
Expand Down
13 changes: 13 additions & 0 deletions fairmq/FairMQUnmanagedRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event
namespace fair::mq
{

struct RegionConfig {
bool lock;
bool zero;

RegionConfig()
: lock(false), zero(false)
{}

RegionConfig(bool l, bool z)
: lock(l), zero(z)
{}
};

using RegionCallback = FairMQRegionCallback;
using RegionBulkCallback = FairMQRegionBulkCallback;
using RegionEventCallback = FairMQRegionEventCallback;
Expand Down
8 changes: 4 additions & 4 deletions fairmq/ofi/TransportFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,22 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
// return PollerPtr{new Poller(channelsMap, channelList)};
}

auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}

auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}

auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}

auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
Expand Down
18 changes: 16 additions & 2 deletions fairmq/shmem/Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,9 @@ class Manager
const int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
const std::string& path = "",
int flags = 0)
const std::string& path,
int flags,
fair::mq::RegionConfig cfg)
{
using namespace boost::interprocess;
try {
Expand Down Expand Up @@ -311,6 +312,19 @@ class Manager
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";

if (cfg.lock) {
LOG(debug) << "Locking region " << id << "...";
if (mlock(r.first->second->fRegion.get_address(), r.first->second->fRegion.get_size()) == -1) {
LOG(error) << "Could not lock region " << id << ". Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked region " << id << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << id << "...";
memset(r.first->second->fRegion.get_address(), 0x00, r.first->second->fRegion.get_size());
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}

fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));

r.first->second->InitializeQueues();
Expand Down
20 changes: 10 additions & 10 deletions fairmq/shmem/TransportFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,29 +141,29 @@ class TransportFactory final : public fair::mq::TransportFactory
return std::make_unique<Poller>(channelsMap, channelList);
}

UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
}

UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
}

UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
}

UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
}

UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags, fair::mq::RegionConfig cfg)
{
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this);
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this, cfg);
}

void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }
Expand Down
9 changes: 5 additions & 4 deletions fairmq/shmem/UnmanagedRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
const int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
const std::string& path = "",
int flags = 0,
FairMQTransportFactory* factory = nullptr)
const std::string& path,
int flags,
FairMQTransportFactory* factory,
fair::mq::RegionConfig cfg)
: FairMQUnmanagedRegion(factory)
, fManager(manager)
, fRegion(nullptr)
, fRegionId(0)
{
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags);
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags, cfg);
fRegion = result.first;
fRegionId = result.second;
}
Expand Down
20 changes: 10 additions & 10 deletions fairmq/zeromq/TransportFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,29 @@ class TransportFactory final : public FairMQTransportFactory
return std::make_unique<Poller>(channelsMap, channelList);
}

UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
}

UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
}

UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
}

UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
}

UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int)
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg)
{
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this);
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this, cfg);
auto zPtr = static_cast<UnmanagedRegion*>(ptr.get());
fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created);
return ptr;
Expand Down
20 changes: 18 additions & 2 deletions fairmq/zeromq/UnmanagedRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <cstddef> // size_t
#include <string>

#include <sys/mman.h> // mlock

namespace fair::mq::zmq
{

Expand All @@ -30,7 +32,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
FairMQTransportFactory* factory = nullptr)
FairMQTransportFactory* factory,
fair::mq::RegionConfig cfg)
: fair::mq::UnmanagedRegion(factory)
, fCtx(ctx)
, fId(fCtx.RegionCount())
Expand All @@ -39,7 +42,20 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
, fUserFlags(userFlags)
, fCallback(callback)
, fBulkCallback(bulkCallback)
{}
{
if (cfg.lock) {
LOG(debug) << "Locking region " << fId << "...";
if (mlock(fBuffer, fSize) == -1) {
LOG(error) << "Could not lock region " << fId << ". Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked region " << fId << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << fId << "...";
memset(fBuffer, 0x00, fSize);
LOG(debug) << "Successfully zeroed free memory of region " << fId << ".";
}
}

UnmanagedRegion(const UnmanagedRegion&) = delete;
UnmanagedRegion operator=(const UnmanagedRegion&) = delete;
Expand Down

0 comments on commit 857aa84

Please sign in to comment.