Skip to content

Commit

Permalink
Merge pull request #1213 from Dr15Jones/threadSafeSimpleMemoryCheck
Browse files Browse the repository at this point in the history
Multithreading fixes -- Made SimpleMemoryCheck thread-safe
  • Loading branch information
ktf committed Oct 29, 2013
2 parents a4dc017 + fdc325e commit fb4044c
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 47 deletions.
150 changes: 111 additions & 39 deletions FWCore/Services/src/Memory.cc
Expand Up @@ -32,11 +32,14 @@
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/MallocOpts.h"

#include <cstring>
#include <iostream>
#include <memory>
#ifdef __linux__
#include <malloc.h>
#endif
Expand Down Expand Up @@ -128,7 +131,8 @@ namespace edm {
, smapsLineBufferLen_(0)
, growthRateVsize_()
, growthRateRss_()
, moduleSummaryRequested_(iPS.getUntrackedParameter<bool>("moduleMemorySummary")) {
, moduleSummaryRequested_(iPS.getUntrackedParameter<bool>("moduleMemorySummary"))
, measurementUnderway_(false){
// changelog 2
// pg_size = (double)getpagesize();
std::ostringstream ost;
Expand All @@ -142,19 +146,18 @@ namespace edm {
iReg.watchPostSource(this, &SimpleMemoryCheck::postSource);
iReg.watchPostModuleConstruction(this, &SimpleMemoryCheck::postModuleConstruction);
iReg.watchPostModuleBeginJob(this, &SimpleMemoryCheck::postModuleBeginJob);
iReg.watchPostProcessEvent(this, &SimpleMemoryCheck::postEventProcessing);
iReg.watchPostModule(this, &SimpleMemoryCheck::postModule);
iReg.watchPostEvent(this, &SimpleMemoryCheck::postEvent);
iReg.watchPostModuleEvent(this, &SimpleMemoryCheck::postModule);
iReg.watchPostBeginJob(this, &SimpleMemoryCheck::postBeginJob);
iReg.watchPostEndJob(this, &SimpleMemoryCheck::postEndJob);
} else {
iReg.watchPostProcessEvent(this, &SimpleMemoryCheck::postEventProcessing);
iReg.watchPostEvent(this, &SimpleMemoryCheck::postEvent);
iReg.watchPostEndJob(this, &SimpleMemoryCheck::postEndJob);
}
if(moduleSummaryRequested_) { // changelog 2
iReg.watchPreProcessEvent(this, &SimpleMemoryCheck::preEventProcessing);
iReg.watchPreModule(this, &SimpleMemoryCheck::preModule);
iReg.watchPreModuleEvent(this, &SimpleMemoryCheck::preModule);
if(oncePerEventMode_) {
iReg.watchPostModule(this, &SimpleMemoryCheck::postModule);
iReg.watchPostModuleEvent(this, &SimpleMemoryCheck::postModule);
}
}

Expand Down Expand Up @@ -244,24 +247,54 @@ namespace edm {
}

void SimpleMemoryCheck::preSourceConstruction(ModuleDescription const& md) {
updateAndPrint("pre-ctor", md.moduleLabel(), md.moduleName());
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
updateAndPrint("pre-ctor", md.moduleLabel(), md.moduleName());
}
}


void SimpleMemoryCheck::postSourceConstruction(ModuleDescription const& md) {
updateAndPrint("ctor", md.moduleLabel(), md.moduleName());
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
updateAndPrint("ctor", md.moduleLabel(), md.moduleName());
}
}

void SimpleMemoryCheck::postSource() {
updateAndPrint("module", "source", "source");
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
updateAndPrint("module", "source", "source");
}
}

void SimpleMemoryCheck::postModuleConstruction(ModuleDescription const& md) {
updateAndPrint("ctor", md.moduleLabel(), md.moduleName());
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
updateAndPrint("ctor", md.moduleLabel(), md.moduleName());
}
}

void SimpleMemoryCheck::postModuleBeginJob(ModuleDescription const& md) {
updateAndPrint("beginJob", md.moduleLabel(), md.moduleName());
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
updateAndPrint("beginJob", md.moduleLabel(), md.moduleName());
}
}

void SimpleMemoryCheck::postEndJob() {
Expand Down Expand Up @@ -482,39 +515,78 @@ namespace edm {
#endif
} // postEndJob

void SimpleMemoryCheck::preEventProcessing(EventID const& iID, Timestamp const&) {
currentEventID_ = iID; // changelog 2
}

void SimpleMemoryCheck::postEventProcessing(Event const& e, EventSetup const&) {
void SimpleMemoryCheck::postEvent(StreamContext const&iContext) {
++count_;
update();
if (monitorPssAndPrivate_) {
currentSmaps_ = fetchSmaps();
}
updateEventStats(e.id());
if(oncePerEventMode_) {
// should probably use be Run:Event or count_ for the label and name
updateMax();
andPrint("event", "", "");
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
update();
if (monitorPssAndPrivate_) {
currentSmaps_ = fetchSmaps();
}
updateEventStats(iContext.eventID());
if(oncePerEventMode_) {
// should probably use be Run:Event or count_ for the label and name
updateMax();
andPrint("event", "", "");
}
}
}

void SimpleMemoryCheck::preModule(ModuleDescription const&) {
update(); // changelog 2
moduleEntryVsize_ = current_->vsize;
void SimpleMemoryCheck::preModule(StreamContext const& iStreamContext, ModuleCallingContext const& iModuleContext) {
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
bool expected = false;
if(moduleMeasurementUnderway_.compare_exchange_strong(expected,true) ) {
update();
// changelog 2
moduleEntryVsize_ = current_->vsize;
moduleStreamID_.store(iStreamContext.streamID().value(), std::memory_order_release);
moduleID_.store(iModuleContext.moduleDescription()->id(),std::memory_order_release);
}
}
}

void SimpleMemoryCheck::postModule(ModuleDescription const& md) {
void SimpleMemoryCheck::postModule(StreamContext const& iStreamContext, ModuleCallingContext const& iModuleContext) {
if(!oncePerEventMode_) {
updateAndPrint("module", md.moduleLabel(), md.moduleName());
} else if(moduleSummaryRequested_) { // changelog 2
update();
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
auto const md = iModuleContext.moduleDescription();
updateAndPrint("module", md->moduleLabel(), md->moduleName());
}
}
if(moduleSummaryRequested_) { // changelog 2
double dv = current_->vsize - moduleEntryVsize_;
std::string label = md.moduleLabel();
updateModuleMemoryStats (modules_[label], dv);

if(moduleSummaryRequested_) {
//is this the module instance we are measuring?
if(moduleMeasurementUnderway_.load(std::memory_order_acquire) and
(iStreamContext.streamID().value()==moduleStreamID_.load(std::memory_order_acquire) ) and
(iModuleContext.moduleDescription()->id() == moduleID_.load(std::memory_order_acquire))) {
//Need to release our module measurement lock
std::shared_ptr<void> guard(nullptr,[this](void const*) {
moduleMeasurementUnderway_.store(false,std::memory_order_release);
});
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
if(oncePerEventMode_) {
update();
}
// changelog 2
double dv = current_->vsize - moduleEntryVsize_;
std::string label = iModuleContext.moduleDescription()->moduleLabel();
updateModuleMemoryStats (modules_[label], dv, iStreamContext.eventID());
}
}
}
}

Expand Down Expand Up @@ -717,7 +789,7 @@ namespace edm {
// changelog 2
void
SimpleMemoryCheck::updateModuleMemoryStats(SignificantModule& m,
double dv) {
double dv, edm::EventID const& currentEventID) {
if(count_ < num_to_skip_) {
m.totalEarlyVsize += dv;
if(dv > m.maxEarlyVsize) m.maxEarlyVsize = dv;
Expand All @@ -726,7 +798,7 @@ namespace edm {
m.totalDeltaVsize += dv;
if(dv > m.maxDeltaVsize) {
m.maxDeltaVsize = dv;
m.eventMaxDeltaV = currentEventID_;
m.eventMaxDeltaV = currentEventID;
}
}
} //updateModuleMemoryStats
Expand Down
21 changes: 13 additions & 8 deletions FWCore/Services/src/Memory.h
Expand Up @@ -24,6 +24,7 @@
#include "FWCore/Services/src/ProcInfoFetcher.h"

#include <cstdio>
#include <atomic>

namespace edm {
class EventID;
Expand Down Expand Up @@ -65,14 +66,13 @@ namespace edm {

void postBeginJob();

void preEventProcessing(const edm::EventID&, const edm::Timestamp&);
void postEventProcessing(const Event&, const EventSetup&);
void postEvent(StreamContext const&);

void postModuleBeginJob(const ModuleDescription&);
void postModuleConstruction(const ModuleDescription&);

void preModule(const ModuleDescription&);
void postModule(const ModuleDescription&);
void preModule(StreamContext const&, ModuleCallingContext const&);
void postModule(StreamContext const&, ModuleCallingContext const&);

void postEndJob();

Expand Down Expand Up @@ -106,7 +106,7 @@ namespace edm {
bool oncePerEventMode_;
bool jobReportOutputOnly_;
bool monitorPssAndPrivate_;
int count_;
std::atomic<int> count_;

//smaps
FILE* smapsFile_;
Expand Down Expand Up @@ -225,9 +225,14 @@ namespace edm {
typedef std::map<std::string,SignificantModule> SignificantModulesMap;
SignificantModulesMap modules_;
double moduleEntryVsize_;
edm::EventID currentEventID_;
void updateModuleMemoryStats(SignificantModule & m, double dv);

void updateModuleMemoryStats(SignificantModule & m, double dv, edm::EventID const&);

//Used to guarantee we only do one measurement at a time
std::atomic<bool> measurementUnderway_;
std::atomic<bool> moduleMeasurementUnderway_;
std::atomic<unsigned int> moduleStreamID_;
std::atomic<unsigned int> moduleID_;

}; // SimpleMemoryCheck

std::ostream &
Expand Down

0 comments on commit fb4044c

Please sign in to comment.