Skip to content

Commit

Permalink
feat: Add new tunable --shm-metadata-msg-size
Browse files Browse the repository at this point in the history
The shm metadata msg will be right-padded to the given size. This
tunable may be used to saturate the kernel msg buffers more quickly with
the effect that the ZeroMQ message queue size - on which the FairMQ
shmem transport relies upon - behaves more accurately for very small
queue sizes.

This introduces a change for the meta msg format in the multipart case:
old: | MetaHeader 1 | ... | MetaHeader n |
new: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize |
where `n` is a `size_t` and contains the number of following meta headers.
Previously, this number was infered from the msg buffer size itself which is
no longer possible due to the potential padding.

Implements #432
  • Loading branch information
dennisklein committed Jun 13, 2023
1 parent 491a943 commit f278e7e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 31 deletions.
1 change: 1 addition & 0 deletions fairmq/devices/startMQBenchmark.sh.in
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --multipart $multipart"
SAMPLER+=" --num-parts $numParts"
SAMPLER+=" --shm-throw-bad-alloc false"
# SAMPLER+=" --shm-metadata-msg-size 1024"
# SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --max-iterations $maxIterations"
SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555"
Expand Down
2 changes: 2 additions & 0 deletions fairmq/plugins/config/Config.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <fairmq/JSONParser.h>
#include <fairmq/SuboptParser.h>

#include <cstddef> // for std::size_t
#include <vector>

using namespace std;
Expand Down Expand Up @@ -72,6 +73,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).")
("shm-zero-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Shared memory: throw fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-metadata-msg-size", po::value<std::size_t >()->default_value(0), "Shared memory: size of the zmq metadata message (values smaller than minimum are clamped to the minimum).")
("bad-alloc-max-attempts", po::value<int >(), "Maximum number of allocation attempts before throwing fair::mq::MessageBadAlloc. -1 is infinite. There is always at least one attempt, so 0 has safe effect as 1.")
("bad-alloc-attempt-interval", po::value<int >()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).")
("shm-monitor", po::value<bool >()->default_value(false), "Shared memory: run monitor daemon.")
Expand Down
7 changes: 6 additions & 1 deletion fairmq/shmem/Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <algorithm> // max
#include <chrono>
#include <condition_variable>
#include <cstddef> // max_align_t
#include <cstddef> // max_align_t, std::size_t
#include <cstdlib> // getenv
#include <cstring> // memcpy
#include <memory> // make_unique
Expand Down Expand Up @@ -151,6 +151,7 @@ class Manager
, fBadAllocMaxAttempts(1)
, fBadAllocAttemptIntervalInMs(config ? config->GetProperty<int>("bad-alloc-attempt-interval", 50) : 50)
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
, fMetadataMsgSize(config ? config->GetProperty<std::size_t>("shm-metadata-msg-size", 0) : 0)
{
using namespace boost::interprocess;

Expand Down Expand Up @@ -828,6 +829,8 @@ class Manager
}
}

auto GetMetadataMsgSize() const noexcept { return fMetadataMsgSize; }

~Manager()
{
fRegionsGen += 1; // signal TL cache invalidation
Expand Down Expand Up @@ -884,6 +887,8 @@ class Manager
int fBadAllocMaxAttempts;
int fBadAllocAttemptIntervalInMs;
bool fNoCleanup;

std::size_t fMetadataMsgSize;
};

} // namespace fair::mq::shmem
Expand Down
64 changes: 34 additions & 30 deletions fairmq/shmem/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@

#include <zmq.h>

#include <algorithm> // for std::max
#include <atomic>
#include <memory> // make_unique
#include <cstddef> // for std::size_t
#include <cstring> // for std::memcpy
#include <exception> // for std::terminate
#include <memory> // for std::make_unique

namespace fair::mq {
class TransportFactory;
Expand All @@ -47,6 +51,7 @@ class Socket final : public fair::mq::Socket
, fMessagesRx(0)
, fTimeout(100)
, fConnectedPeersCount(0)
, fMetadataMsgSize(manager.GetMetadataMsgSize())
{
assert(context);

Expand Down Expand Up @@ -124,8 +129,8 @@ class Socket final : public fair::mq::Socket
}
int elapsed = 0;

// make meta msg
zmq::ZMsg zmqMsg(sizeof(MetaHeader));
// meta msg format: | MetaHeader | padded to fMetadataMsgSize |
zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(MetaHeader)));
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));

while (true) {
Expand Down Expand Up @@ -165,11 +170,11 @@ class Socket final : public fair::mq::Socket
int nbytes = zmq_recv(fSocket, &(shmMsg->fMeta), sizeof(MetaHeader), flags);
if (nbytes > 0) {
// check for number of received messages. must be 1
if (nbytes != sizeof(MetaHeader)) {
if (static_cast<std::size_t>(nbytes) < sizeof(MetaHeader)) {
throw SocketError(
tools::ToString("Received message is not a valid FairMQ shared memory message. ",
"Possibly due to a misconfigured transport on the sender side. ",
"Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
"Expected minimum size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
}

size_t size = shmMsg->GetSize();
Expand Down Expand Up @@ -198,13 +203,14 @@ class Socket final : public fair::mq::Socket
}
int elapsed = 0;

// put it into zmq message
const unsigned int vecSize = msgVec.size();
zmq::ZMsg zmqMsg(vecSize * sizeof(MetaHeader));

// prepare the message with shm metas
MetaHeader* metas = static_cast<MetaHeader*>(zmqMsg.Data());
// meta msg format: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize |
auto const n = msgVec.size();
zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(std::size_t) + n * sizeof(MetaHeader)));

auto meta_n = static_cast<std::size_t*>(zmqMsg.Data());
*meta_n = n;
++meta_n;
auto metas = static_cast<MetaHeader*>(static_cast<void*>(meta_n));
for (auto& msg : msgVec) {
auto msgPtr = msg.get();
if (!msgPtr) {
Expand All @@ -219,7 +225,7 @@ class Socket final : public fair::mq::Socket
int64_t totalSize = 0;
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
assert(static_cast<unsigned int>(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing
assert(static_cast<unsigned int>(nbytes) >= sizeof(std::size_t) + (n * sizeof(MetaHeader)));

for (auto& msg : msgVec) {
Message* shmMsg = static_cast<Message*>(msg.get());
Expand Down Expand Up @@ -259,26 +265,23 @@ class Socket final : public fair::mq::Socket
zmq::ZMsg zmqMsg;

while (true) {
int64_t totalSize = 0;
std::size_t totalSize = 0;
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
MetaHeader* hdrVec = static_cast<MetaHeader*>(zmqMsg.Data());
const auto hdrVecSize = zmqMsg.Size();

assert(hdrVecSize > 0);
if (hdrVecSize % sizeof(MetaHeader) != 0) {
throw SocketError(
tools::ToString("Received message is not a valid FairMQ shared memory message. ",
"Possibly due to a misconfigured transport on the sender side. ",
"Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes));
}

const auto numMessages = hdrVecSize / sizeof(MetaHeader);
msgVec.reserve(numMessages);

for (size_t m = 0; m < numMessages; m++) {
// create new message (part)
msgVec.emplace_back(std::make_unique<Message>(fManager, hdrVec[m], GetTransport()));
[[maybe_unused]] auto const size = zmqMsg.Size();
assert(size > sizeof(std::size_t));
auto meta_n = static_cast<std::size_t*>(zmqMsg.Data());
auto const n = *meta_n;
assert(size >= sizeof(std::size_t) + n * sizeof(MetaHeader));
++meta_n;
auto metas = static_cast<MetaHeader*>(static_cast<void*>(meta_n));
msgVec.reserve(msgVec.size() + n);
auto const transport = GetTransport();

for (std::size_t i = 0; i < n; ++i) {
msgVec.push_back(std::make_unique<Message>(fManager, *metas, transport));
++metas;
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast)
Message* shmMsg = static_cast<Message*>(msgVec.back().get());
totalSize += shmMsg->GetSize();
}
Expand Down Expand Up @@ -456,6 +459,7 @@ class Socket final : public fair::mq::Socket

int fTimeout;
mutable unsigned long fConnectedPeersCount;
std::size_t fMetadataMsgSize;
};

} // namespace fair::mq::shmem
Expand Down

0 comments on commit f278e7e

Please sign in to comment.