diff --git a/fairmq/plugins/config/Config.cxx b/fairmq/plugins/config/Config.cxx index d36b625bb..a8a75acbc 100644 --- a/fairmq/plugins/config/Config.cxx +++ b/fairmq/plugins/config/Config.cxx @@ -71,7 +71,9 @@ Plugin::ProgOptions ConfigPluginProgramOptions() ("shm-mlock-segment-on-creation", po::value()->default_value(false), "Shared memory: mlock the shared memory segment only once when created.") ("shm-zero-segment", po::value()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).") ("shm-zero-segment-on-creation", po::value()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.") - ("shm-throw-bad-alloc", po::value()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") + ("shm-throw-bad-alloc", po::value()->default_value(true), "Shared memory: throw fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") + ("bad-alloc-max-attempts", po::value(), "Maximum number of allocation attempts before throwing fair::mq::MessageBadAlloc. -1 is infinite. There is always at least one attempt, so 0 has safe effect as 1.") + ("bad-alloc-attempt-interval", po::value()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).") ("shm-monitor", po::value()->default_value(true), "Shared memory: run monitor daemon.") ("shm-no-cleanup", po::value()->default_value(false), "Shared memory: do not cleanup the memory when last device leaves.") ("ofi-size-hint", po::value()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index deff90b71..826096f39 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -150,13 +150,26 @@ class Manager , fBeatTheHeart(true) , fRegionEventsSubscriptionActive(false) , fInterrupted(false) - , fThrowOnBadAlloc(config ? config->GetProperty("shm-throw-bad-alloc", true) : true) + , fBadAllocMaxAttempts(1) + , fBadAllocAttemptIntervalInMs(config ? config->GetProperty("bad-alloc-attempt-interval", 50) : 50) , fNoCleanup(config ? config->GetProperty("shm-no-cleanup", false) : false) { using namespace boost::interprocess; LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'."; + if (config) { + // if 'shm-throw-bad-alloc' is explicitly set to false (true is default), ignore other settings + if (config->Count("shm-throw-bad-alloc") && config->GetProperty("shm-throw-bad-alloc") == false) { + fBadAllocMaxAttempts = -1; + } else if (config->Count("bad-alloc-max-attempts")) { + // if 'bad-alloc-max-attempts' is provided, use it + // this can override the default 'shm-throw-bad-alloc'==true if the value of 'bad-alloc-max-attempts' is -1 + fBadAllocMaxAttempts = config->GetProperty("bad-alloc-max-attempts"); + } + // otherwise leave fBadAllocMaxAttempts at 1 (the original default, set in the initializer list) + } + bool mlockSegment = false; bool mlockSegmentOnCreation = false; bool zeroSegment = false; @@ -653,8 +666,6 @@ class Manager } } - bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; } - void GetSegment(uint16_t id) { auto it = fSegments.find(id); @@ -693,9 +704,10 @@ class Manager alignment = std::max(alignment, alignof(std::max_align_t)); char* ptr = nullptr; + int numAttempts = 0; size_t fullSize = ShmHeader::FullSize(size, alignment); - while (ptr == nullptr) { + while (!ptr) { try { size_t segmentSize = boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)); if (fullSize > segmentSize) { @@ -706,10 +718,10 @@ class Manager ShmHeader::Construct(ptr, alignment); } catch (boost::interprocess::bad_alloc& ba) { // LOG(warn) << "Shared memory full..."; - if (ThrowingOnBadAlloc()) { + if (fBadAllocMaxAttempts >= 0 && ++numAttempts >= fBadAllocMaxAttempts) { throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)))); } - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::this_thread::sleep_for(std::chrono::milliseconds(fBadAllocAttemptIntervalInMs)); if (Interrupted()) { throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)))); } else { @@ -832,7 +844,8 @@ class Manager bool fRegionEventsSubscriptionActive; std::atomic fInterrupted; - bool fThrowOnBadAlloc; + int fBadAllocMaxAttempts; + int fBadAllocAttemptIntervalInMs; bool fNoCleanup; };