Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made SimpleMemoryCheck thread-safe #1213

Merged
merged 1 commit into from Oct 29, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
150 changes: 111 additions & 39 deletions FWCore/Services/src/Memory.cc
Expand Up @@ -32,11 +32,14 @@
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/MallocOpts.h"

#include <cstring>
#include <iostream>
#include <memory>
#ifdef __linux__
#include <malloc.h>
#endif
Expand Down Expand Up @@ -128,7 +131,8 @@ namespace edm {
, smapsLineBufferLen_(0)
, growthRateVsize_()
, growthRateRss_()
, moduleSummaryRequested_(iPS.getUntrackedParameter<bool>("moduleMemorySummary")) {
, moduleSummaryRequested_(iPS.getUntrackedParameter<bool>("moduleMemorySummary"))
, measurementUnderway_(false){
// changelog 2
// pg_size = (double)getpagesize();
std::ostringstream ost;
Expand All @@ -142,19 +146,18 @@ namespace edm {
iReg.watchPostSource(this, &SimpleMemoryCheck::postSource);
iReg.watchPostModuleConstruction(this, &SimpleMemoryCheck::postModuleConstruction);
iReg.watchPostModuleBeginJob(this, &SimpleMemoryCheck::postModuleBeginJob);
iReg.watchPostProcessEvent(this, &SimpleMemoryCheck::postEventProcessing);
iReg.watchPostModule(this, &SimpleMemoryCheck::postModule);
iReg.watchPostEvent(this, &SimpleMemoryCheck::postEvent);
iReg.watchPostModuleEvent(this, &SimpleMemoryCheck::postModule);
iReg.watchPostBeginJob(this, &SimpleMemoryCheck::postBeginJob);
iReg.watchPostEndJob(this, &SimpleMemoryCheck::postEndJob);
} else {
iReg.watchPostProcessEvent(this, &SimpleMemoryCheck::postEventProcessing);
iReg.watchPostEvent(this, &SimpleMemoryCheck::postEvent);
iReg.watchPostEndJob(this, &SimpleMemoryCheck::postEndJob);
}
if(moduleSummaryRequested_) { // changelog 2
iReg.watchPreProcessEvent(this, &SimpleMemoryCheck::preEventProcessing);
iReg.watchPreModule(this, &SimpleMemoryCheck::preModule);
iReg.watchPreModuleEvent(this, &SimpleMemoryCheck::preModule);
if(oncePerEventMode_) {
iReg.watchPostModule(this, &SimpleMemoryCheck::postModule);
iReg.watchPostModuleEvent(this, &SimpleMemoryCheck::postModule);
}
}

Expand Down Expand Up @@ -244,24 +247,54 @@ namespace edm {
}

void SimpleMemoryCheck::preSourceConstruction(ModuleDescription const& md) {
updateAndPrint("pre-ctor", md.moduleLabel(), md.moduleName());
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
updateAndPrint("pre-ctor", md.moduleLabel(), md.moduleName());
}
}


void SimpleMemoryCheck::postSourceConstruction(ModuleDescription const& md) {
updateAndPrint("ctor", md.moduleLabel(), md.moduleName());
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
updateAndPrint("ctor", md.moduleLabel(), md.moduleName());
}
}

void SimpleMemoryCheck::postSource() {
updateAndPrint("module", "source", "source");
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
updateAndPrint("module", "source", "source");
}
}

void SimpleMemoryCheck::postModuleConstruction(ModuleDescription const& md) {
updateAndPrint("ctor", md.moduleLabel(), md.moduleName());
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
updateAndPrint("ctor", md.moduleLabel(), md.moduleName());
}
}

void SimpleMemoryCheck::postModuleBeginJob(ModuleDescription const& md) {
updateAndPrint("beginJob", md.moduleLabel(), md.moduleName());
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
updateAndPrint("beginJob", md.moduleLabel(), md.moduleName());
}
}

void SimpleMemoryCheck::postEndJob() {
Expand Down Expand Up @@ -482,39 +515,78 @@ namespace edm {
#endif
} // postEndJob

void SimpleMemoryCheck::preEventProcessing(EventID const& iID, Timestamp const&) {
currentEventID_ = iID; // changelog 2
}

void SimpleMemoryCheck::postEventProcessing(Event const& e, EventSetup const&) {
void SimpleMemoryCheck::postEvent(StreamContext const&iContext) {
++count_;
update();
if (monitorPssAndPrivate_) {
currentSmaps_ = fetchSmaps();
}
updateEventStats(e.id());
if(oncePerEventMode_) {
// should probably use be Run:Event or count_ for the label and name
updateMax();
andPrint("event", "", "");
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
update();
if (monitorPssAndPrivate_) {
currentSmaps_ = fetchSmaps();
}
updateEventStats(iContext.eventID());
if(oncePerEventMode_) {
// should probably use be Run:Event or count_ for the label and name
updateMax();
andPrint("event", "", "");
}
}
}

void SimpleMemoryCheck::preModule(ModuleDescription const&) {
update(); // changelog 2
moduleEntryVsize_ = current_->vsize;
void SimpleMemoryCheck::preModule(StreamContext const& iStreamContext, ModuleCallingContext const& iModuleContext) {
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
bool expected = false;
if(moduleMeasurementUnderway_.compare_exchange_strong(expected,true) ) {
update();
// changelog 2
moduleEntryVsize_ = current_->vsize;
moduleStreamID_.store(iStreamContext.streamID().value(), std::memory_order_release);
moduleID_.store(iModuleContext.moduleDescription()->id(),std::memory_order_release);
}
}
}

void SimpleMemoryCheck::postModule(ModuleDescription const& md) {
void SimpleMemoryCheck::postModule(StreamContext const& iStreamContext, ModuleCallingContext const& iModuleContext) {
if(!oncePerEventMode_) {
updateAndPrint("module", md.moduleLabel(), md.moduleName());
} else if(moduleSummaryRequested_) { // changelog 2
update();
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
auto const md = iModuleContext.moduleDescription();
updateAndPrint("module", md->moduleLabel(), md->moduleName());
}
}
if(moduleSummaryRequested_) { // changelog 2
double dv = current_->vsize - moduleEntryVsize_;
std::string label = md.moduleLabel();
updateModuleMemoryStats (modules_[label], dv);

if(moduleSummaryRequested_) {
//is this the module instance we are measuring?
if(moduleMeasurementUnderway_.load(std::memory_order_acquire) and
(iStreamContext.streamID().value()==moduleStreamID_.load(std::memory_order_acquire) ) and
(iModuleContext.moduleDescription()->id() == moduleID_.load(std::memory_order_acquire))) {
//Need to release our module measurement lock
std::shared_ptr<void> guard(nullptr,[this](void const*) {
moduleMeasurementUnderway_.store(false,std::memory_order_release);
});
bool expected = false;
if(measurementUnderway_.compare_exchange_strong(expected,true,std::memory_order_acq_rel) ) {
std::shared_ptr<void> guard(nullptr,[this](void const*) {
measurementUnderway_.store(false,std::memory_order_release);
});
if(oncePerEventMode_) {
update();
}
// changelog 2
double dv = current_->vsize - moduleEntryVsize_;
std::string label = iModuleContext.moduleDescription()->moduleLabel();
updateModuleMemoryStats (modules_[label], dv, iStreamContext.eventID());
}
}
}
}

Expand Down Expand Up @@ -717,7 +789,7 @@ namespace edm {
// changelog 2
void
SimpleMemoryCheck::updateModuleMemoryStats(SignificantModule& m,
double dv) {
double dv, edm::EventID const& currentEventID) {
if(count_ < num_to_skip_) {
m.totalEarlyVsize += dv;
if(dv > m.maxEarlyVsize) m.maxEarlyVsize = dv;
Expand All @@ -726,7 +798,7 @@ namespace edm {
m.totalDeltaVsize += dv;
if(dv > m.maxDeltaVsize) {
m.maxDeltaVsize = dv;
m.eventMaxDeltaV = currentEventID_;
m.eventMaxDeltaV = currentEventID;
}
}
} //updateModuleMemoryStats
Expand Down
21 changes: 13 additions & 8 deletions FWCore/Services/src/Memory.h
Expand Up @@ -24,6 +24,7 @@
#include "FWCore/Services/src/ProcInfoFetcher.h"

#include <cstdio>
#include <atomic>

namespace edm {
class EventID;
Expand Down Expand Up @@ -65,14 +66,13 @@ namespace edm {

void postBeginJob();

void preEventProcessing(const edm::EventID&, const edm::Timestamp&);
void postEventProcessing(const Event&, const EventSetup&);
void postEvent(StreamContext const&);

void postModuleBeginJob(const ModuleDescription&);
void postModuleConstruction(const ModuleDescription&);

void preModule(const ModuleDescription&);
void postModule(const ModuleDescription&);
void preModule(StreamContext const&, ModuleCallingContext const&);
void postModule(StreamContext const&, ModuleCallingContext const&);

void postEndJob();

Expand Down Expand Up @@ -106,7 +106,7 @@ namespace edm {
bool oncePerEventMode_;
bool jobReportOutputOnly_;
bool monitorPssAndPrivate_;
int count_;
std::atomic<int> count_;

//smaps
FILE* smapsFile_;
Expand Down Expand Up @@ -225,9 +225,14 @@ namespace edm {
typedef std::map<std::string,SignificantModule> SignificantModulesMap;
SignificantModulesMap modules_;
double moduleEntryVsize_;
edm::EventID currentEventID_;
void updateModuleMemoryStats(SignificantModule & m, double dv);

void updateModuleMemoryStats(SignificantModule & m, double dv, edm::EventID const&);

//Used to guarantee we only do one measurement at a time
std::atomic<bool> measurementUnderway_;
std::atomic<bool> moduleMeasurementUnderway_;
std::atomic<unsigned int> moduleStreamID_;
std::atomic<unsigned int> moduleID_;

}; // SimpleMemoryCheck

std::ostream &
Expand Down