Skip to content

Commit

Permalink
feat: add options to control allocation attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed Oct 8, 2021
1 parent 55a2cfc commit 1449166
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
4 changes: 3 additions & 1 deletion fairmq/plugins/config/Config.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("shm-mlock-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: mlock the shared memory segment only once when created.")
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).")
("shm-zero-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Shared memory: throw fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("bad-alloc-max-attempts", po::value<int >(), "Maximum number of allocation attempts before throwing fair::mq::MessageBadAlloc. -1 is infinite. There is always at least one attempt, so 0 has safe effect as 1.")
("bad-alloc-attempt-interval", po::value<int >()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("shm-no-cleanup", po::value<bool >()->default_value(false), "Shared memory: do not cleanup the memory when last device leaves.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
Expand Down
27 changes: 20 additions & 7 deletions fairmq/shmem/Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,26 @@ class Manager
, fBeatTheHeart(true)
, fRegionEventsSubscriptionActive(false)
, fInterrupted(false)
, fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true)
, fBadAllocMaxAttempts(1)
, fBadAllocAttemptIntervalInMs(config ? config->GetProperty<int>("bad-alloc-attempt-interval", 50) : 50)
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
{
using namespace boost::interprocess;

LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";

if (config) {
// if 'shm-throw-bad-alloc' is explicitly set to false (true is default), ignore other settings
if (config->Count("shm-throw-bad-alloc") && config->GetProperty<bool>("shm-throw-bad-alloc") == false) {
fBadAllocMaxAttempts = -1;
} else if (config->Count("bad-alloc-max-attempts")) {
// if 'bad-alloc-max-attempts' is provided, use it
// this can override the default 'shm-throw-bad-alloc'==true if the value of 'bad-alloc-max-attempts' is -1
fBadAllocMaxAttempts = config->GetProperty<int>("bad-alloc-max-attempts");
}
// otherwise leave fBadAllocMaxAttempts at 1 (the original default, set in the initializer list)
}

bool mlockSegment = false;
bool mlockSegmentOnCreation = false;
bool zeroSegment = false;
Expand Down Expand Up @@ -653,8 +666,6 @@ class Manager
}
}

bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; }

void GetSegment(uint16_t id)
{
auto it = fSegments.find(id);
Expand Down Expand Up @@ -693,9 +704,10 @@ class Manager
alignment = std::max(alignment, alignof(std::max_align_t));

char* ptr = nullptr;
int numAttempts = 0;
size_t fullSize = ShmHeader::FullSize(size, alignment);

while (ptr == nullptr) {
while (!ptr) {
try {
size_t segmentSize = boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId));
if (fullSize > segmentSize) {
Expand All @@ -706,10 +718,10 @@ class Manager
ShmHeader::Construct(ptr, alignment);
} catch (boost::interprocess::bad_alloc& ba) {
// LOG(warn) << "Shared memory full...";
if (ThrowingOnBadAlloc()) {
if (fBadAllocMaxAttempts >= 0 && ++numAttempts >= fBadAllocMaxAttempts) {
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId))));
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::this_thread::sleep_for(std::chrono::milliseconds(fBadAllocAttemptIntervalInMs));
if (Interrupted()) {
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId))));
} else {
Expand Down Expand Up @@ -832,7 +844,8 @@ class Manager
bool fRegionEventsSubscriptionActive;
std::atomic<bool> fInterrupted;

bool fThrowOnBadAlloc;
int fBadAllocMaxAttempts;
int fBadAllocAttemptIntervalInMs;
bool fNoCleanup;
};

Expand Down

0 comments on commit 1449166

Please sign in to comment.