Skip to content

Commit

Permalink
region example: add options for testing with externally-created regions
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed Sep 9, 2022
1 parent d105960 commit f5c46ce
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 13 deletions.
7 changes: 7 additions & 0 deletions examples/region/fairmq-start-ex-region.sh.in
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10"
SAMPLER+=" --transport $transport"
# SAMPLER+=" --external-region true"
# SAMPLER+=" --shm-no-cleaup true"
# SAMPLER+=" --shm-monitor false"
# SAMPLER+=" --shmid 1"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &

SINK="fairmq-ex-region-sink"
SINK+=" --id sink1"
SINK+=" --severity debug"
SINK+=" --transport $transport"
# SINK+=" --shm-no-cleaup true"
# SINK+=" --shm-monitor false"
# SINK+=" --shmid 1"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &
40 changes: 27 additions & 13 deletions examples/region/sampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct Sampler : fair::mq::Device
{
void InitTask() override
{
fExternalRegion = fConfig->GetProperty<bool>("external-region");
fMsgSize = fConfig->GetProperty<int>("msg-size");
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
Expand All @@ -34,18 +35,29 @@ struct Sampler : fair::mq::Device

fair::mq::RegionConfig regionCfg;
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
regionCfg.lock = true; // mlock region after creation
regionCfg.zero = true; // zero region content after creation
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
std::lock_guard<std::mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size();
if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks";
}
}, regionCfg));
// options for testing with an externally-created -region
if (fExternalRegion) {
regionCfg.id = 1;
regionCfg.removeOnDestruction = false;
regionCfg.lock = false; // mlock region after creation
regionCfg.lock = false; // mlock region after creation
} else {
regionCfg.lock = true; // mlock region after creation
regionCfg.zero = true; // zero region content after creation
}
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
"data", // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
std::lock_guard<std::mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size();
if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks";
}
},
regionCfg
));
}

bool ConditionalRun() override
Expand Down Expand Up @@ -91,6 +103,7 @@ struct Sampler : fair::mq::Device
}

private:
int fExternalRegion = false;
int fMsgSize = 10000;
uint32_t fLinger = 100;
uint64_t fMaxIterations = 0;
Expand All @@ -105,7 +118,8 @@ void addCustomOptions(bpo::options_description& options)
options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
}

std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
Expand Down

0 comments on commit f5c46ce

Please sign in to comment.