Skip to content

Commit

Permalink
Add --shm-mlock-segment-on-creation option
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrohr authored and rbx committed Jul 16, 2021
1 parent 38f9870 commit a6193a3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 21 deletions.
43 changes: 22 additions & 21 deletions fairmq/plugins/config/Config.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,28 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
namespace po = boost::program_options;
auto pluginOptions = po::options_description{"FairMQ device options"};
pluginOptions.add_options()
("id", po::value<string >()->default_value(""), "Device ID.")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string >()->default_value("zeromq"), "Transport ('zeromq'/'shmem').")
("network-interface", po::value<string >()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("init-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
("shm-allocation", po::value<string >()->default_value("rbtree_best_fit"), "Shared memory allocation algorithm: rbtree_best_fit/simple_seq_fit.")
("shm-segment-id", po::value<uint16_t >()->default_value(0), "EXPERIMENTAL: Shared memory segment id for message creation.")
("shm-mlock-segment", po::value<bool >()->default_value(false), "Shared memory: mlock the shared memory segment after initialization.")
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("shm-no-cleanup", po::value<bool >()->default_value(false), "Shared memory: do not cleanup the memory when last device leaves.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string >()->default_value("default"), "Session name.")
("config-key", po::value<string >(), "Use provided value instead of device id for fetching the configuration from JSON file.")
("mq-config", po::value<string >(), "JSON input as file.")
("channel-config", po::value<vector<string>>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list");
("id", po::value<string >()->default_value(""), "Device ID.")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string >()->default_value("zeromq"), "Transport ('zeromq'/'shmem').")
("network-interface", po::value<string >()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("init-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
("shm-allocation", po::value<string >()->default_value("rbtree_best_fit"), "Shared memory allocation algorithm: rbtree_best_fit/simple_seq_fit.")
("shm-segment-id", po::value<uint16_t >()->default_value(0), "EXPERIMENTAL: Shared memory segment id for message creation.")
("shm-mlock-segment", po::value<bool >()->default_value(false), "Shared memory: mlock the shared memory segment after initialization.")
("shm-mlock-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: mlock the shared memory segment only once when created.")
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("shm-no-cleanup", po::value<bool >()->default_value(false), "Shared memory: do not cleanup the memory when last device leaves.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string >()->default_value("default"), "Session name.")
("config-key", po::value<string >(), "Use provided value instead of device id for fetching the configuration from JSON file.")
("mq-config", po::value<string >(), "JSON input as file.")
("channel-config", po::value<vector<string>>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list");
return pluginOptions;
}

Expand Down
9 changes: 9 additions & 0 deletions fairmq/shmem/Manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ class Manager
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";

bool mlockSegment = false;
bool mlockSegmentOnCreation = false;
bool zeroSegment = false;
bool autolaunchMonitor = false;
std::string allocationAlgorithm("rbtree_best_fit");
if (config) {
mlockSegment = config->GetProperty<bool>("shm-mlock-segment", mlockSegment);
mlockSegmentOnCreation = config->GetProperty<bool>("shm-mlock-segment-on-creation", mlockSegmentOnCreation);
zeroSegment = config->GetProperty<bool>("shm-zero-segment", zeroSegment);
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
allocationAlgorithm = config->GetProperty<std::string>("shm-allocation", allocationAlgorithm);
Expand Down Expand Up @@ -162,6 +164,13 @@ class Manager
}
ss << "Created ";
(fEventCounter->fCount)++;
if (mlockSegmentOnCreation) {
LOG(error) << "Locking the managed segment memory pages...";
if (mlock(boost::apply_visitor(SegmentAddress(), fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId))) == -1) {
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
}
LOG(error) << "Successfully locked the managed segment memory pages.";
}
} else {
op = "open";
// found segment with the given id, opening
Expand Down
1 change: 1 addition & 0 deletions test/transport/_options.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ void ZeroingAndMlock(const string& transport)
config.SetProperty<size_t>("shm-segment-size", 16384);
config.SetProperty<bool>("shm-zero-segment", true);
config.SetProperty<bool>("shm-mlock-segment", true);
config.SetProperty<bool>("shm-mlock-segment-on-creation", true);

auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);

Expand Down

0 comments on commit a6193a3

Please sign in to comment.