Skip to content

Commit

Permalink
Merge pull request #1036 from Dr15Jones/serializeEventSetupModules
Browse files Browse the repository at this point in the history
Multithreaded framework -- Serialize calls to EventSetup modules
  • Loading branch information
ktf committed Oct 9, 2013
2 parents b1be5a1 + 5609dae commit 8061806
Show file tree
Hide file tree
Showing 37 changed files with 710 additions and 106 deletions.
10 changes: 5 additions & 5 deletions FWCore/Framework/interface/DataProxy.h
Expand Up @@ -20,6 +20,7 @@
//

// system include files
#include <atomic>

// user include files

Expand All @@ -37,7 +38,7 @@ namespace edm {
virtual ~DataProxy();

// ---------- const member functions ---------------------
bool cacheIsValid() const { return cacheIsValid_; }
bool cacheIsValid() const { return cacheIsValid_.load(std::memory_order_acquire); }

void doGet(EventSetupRecord const& iRecord, DataKey const& iKey, bool iTransiently) const;
void const* get(EventSetupRecord const&, DataKey const& iKey, bool iTransiently) const;
Expand Down Expand Up @@ -85,12 +86,11 @@ namespace edm {
DataProxy(DataProxy const&); // stop default

DataProxy const& operator=(DataProxy const&); // stop default
void setCacheIsValidAndAccessType(bool iTransientAccessOnly) const;

// ---------- member data --------------------------------
mutable void const* cache_;
mutable bool cacheIsValid_;
mutable bool nonTransientAccessRequested_;
mutable void const* cache_; //protected by a global mutex
mutable std::atomic<bool> cacheIsValid_;
mutable std::atomic<bool> nonTransientAccessRequested_;
ComponentDescription const* description_;
};
}
Expand Down
6 changes: 5 additions & 1 deletion FWCore/Framework/interface/EDAnalyzer.h
Expand Up @@ -5,8 +5,10 @@
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"

#include <string>
#include <mutex>

// EDAnalyzer is the base class for all analyzer "modules".

Expand All @@ -25,7 +27,7 @@ namespace edm {
template <typename T> friend class WorkerT;
typedef EDAnalyzer ModuleType;

EDAnalyzer() : moduleDescription_() {}
EDAnalyzer();
virtual ~EDAnalyzer();

std::string workerType() const {return "WorkerT<EDAnalyzer>";}
Expand Down Expand Up @@ -75,6 +77,8 @@ namespace edm {
moduleDescription_ = md;
}
ModuleDescription moduleDescription_;
SharedResourcesAcquirer resourceAcquirer_;
std::mutex mutex_;

std::function<void(BranchDescription const&)> callWhenNewProductsRegistered_;
};
Expand Down
9 changes: 6 additions & 3 deletions FWCore/Framework/interface/EDFilter.h
Expand Up @@ -14,11 +14,14 @@ These products should be informational products about the filter decision.
#include "FWCore/Framework/interface/ProducerBase.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"

#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"

#include <string>
#include <vector>
#include <mutex>

namespace edm {
namespace maker {
Expand All @@ -34,9 +37,7 @@ namespace edm {
template <typename T> friend class WorkerT;
typedef EDFilter ModuleType;

EDFilter() : ProducerBase() , moduleDescription_(),
previousParentage_(), previousParentageId_() {
}
EDFilter();
virtual ~EDFilter();

static void fillDescriptions(ConfigurationDescriptions& descriptions);
Expand Down Expand Up @@ -90,6 +91,8 @@ namespace edm {
}
ModuleDescription moduleDescription_;
std::vector<BranchID> previousParentage_;
SharedResourcesAcquirer resourceAcquirer_;
std::mutex mutex_;
ParentageID previousParentageId_;
};
}
Expand Down
4 changes: 4 additions & 0 deletions FWCore/Framework/interface/EDProducer.h
Expand Up @@ -11,12 +11,14 @@ EDProducts into an Event.

#include "FWCore/Framework/interface/ProducerBase.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

#include <string>
#include <vector>
#include <mutex>

namespace edm {

Expand Down Expand Up @@ -85,6 +87,8 @@ namespace edm {
}
ModuleDescription moduleDescription_;
std::vector<BranchID> previousParentage_;
SharedResourcesAcquirer resourceAcquirer_;
std::mutex mutex_;
ParentageID previousParentageId_;
};
}
Expand Down
12 changes: 12 additions & 0 deletions FWCore/Framework/interface/EventProcessor.h
Expand Up @@ -34,6 +34,8 @@ configured in the user's main() function, and is set running.
#include <set>
#include <string>
#include <vector>
#include <mutex>
#include <exception>

namespace statemachine {
class Machine;
Expand Down Expand Up @@ -230,6 +232,11 @@ namespace edm {

void possiblyContinueAfterForkChildFailure();

friend class StreamProcessingTask;
void processEventsForStreamAsync(unsigned int iStreamIndex,
std::atomic<bool>* finishedProcessingEvents);


//read the next event using Stream iStreamIndex
void readEvent(unsigned int iStreamIndex);

Expand Down Expand Up @@ -262,6 +269,11 @@ namespace edm {
std::unique_ptr<FileBlock> fb_;
boost::shared_ptr<EDLooperBase> looper_;

//The atomic protects concurrent access of deferredExceptionPtr_
std::atomic<bool> deferredExceptionPtrIsSet_;
std::exception_ptr deferredExceptionPtr_;

std::mutex nextTransitionMutex_;
PrincipalCache principalCache_;
bool beginJobCalled_;
bool shouldWeStop_;
Expand Down
9 changes: 8 additions & 1 deletion FWCore/Framework/interface/OutputModule.h
Expand Up @@ -21,12 +21,16 @@ output stream.
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/getAllTriggerNames.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"

#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

#include <array>
#include <string>
#include <vector>
#include <map>
#include <atomic>
#include <mutex>

namespace edm {

Expand Down Expand Up @@ -119,7 +123,7 @@ namespace edm {
private:

int maxEvents_;
int remainingEvents_;
std::atomic<int> remainingEvents_;

// TODO: Give OutputModule
// an interface (protected?) that supplies client code with the
Expand Down Expand Up @@ -161,6 +165,9 @@ namespace edm {

BranchChildren branchChildren_;

SharedResourcesAcquirer resourceAcquirer_;
std::mutex mutex_;

//------------------------------------------------------------------
// private member functions
//------------------------------------------------------------------
Expand Down
69 changes: 69 additions & 0 deletions FWCore/Framework/interface/SharedResourcesAcquirer.h
@@ -0,0 +1,69 @@
#ifndef Subsystem_Package_SharedResourcesAcquirer_h
#define Subsystem_Package_SharedResourcesAcquirer_h
// -*- C++ -*-
//
// Package: Subsystem/Package
// Class : SharedResourcesAcquirer
//
/**\class SharedResourcesAcquirer SharedResourcesAcquirer.h "SharedResourcesAcquirer.h"
Description: Handles acquiring and releasing a group of resources shared between modules
Usage:
<usage>
*/
//
// Original Author: Chris Jones
// Created: Sun, 06 Oct 2013 19:43:26 GMT
//

// system include files

// user include files
#include <vector>
#include <mutex>
#include <memory>

// forward declarations
class testSharedResourcesRegistry;
namespace edm {
class SharedResourcesAcquirer
{
public:
friend class ::testSharedResourcesRegistry;

SharedResourcesAcquirer() = default;
explicit SharedResourcesAcquirer(std::vector<std::recursive_mutex*>&& iResources):
m_resources(iResources){}

SharedResourcesAcquirer(SharedResourcesAcquirer&&) = default;
SharedResourcesAcquirer(const SharedResourcesAcquirer&) = default;
SharedResourcesAcquirer& operator=(const SharedResourcesAcquirer&) = default;

~SharedResourcesAcquirer() = default;

// ---------- member functions ---------------------------
void lock();
void unlock();

///Used by the framework to temporarily unlock a resource in the case where a module is temporarily suspended,
/// e.g. when a Event::getByLabel call launches a Producer via unscheduled processing
template<typename FUNC>
void temporaryUnlock(FUNC iFunc) {
std::shared_ptr<SharedResourcesAcquirer*> guard(this,[](SharedResourcesAcquirer* iThis) {iThis->lock();});
this->unlock();
iFunc();
}

///The number returned may be less than the number of resources requested if a resource is only used by one module and therefore is not being shared.
size_t numberOfResources() const { return m_resources.size();}
private:

// ---------- member data --------------------------------
std::vector<std::recursive_mutex*> m_resources;
};
}


#endif
7 changes: 7 additions & 0 deletions FWCore/Framework/interface/one/EDAnalyzerBase.h
Expand Up @@ -19,10 +19,12 @@
//

// system include files
#include <mutex>

// user include files
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

Expand Down Expand Up @@ -97,11 +99,16 @@ namespace edm {
virtual void doBeginLuminosityBlock_(LuminosityBlock const& lbp, EventSetup const& c);
virtual void doEndLuminosityBlock_(LuminosityBlock const& lbp, EventSetup const& c);

virtual SharedResourcesAcquirer createAcquirer();

void setModuleDescription(ModuleDescription const& md) {
moduleDescription_ = md;
}
ModuleDescription moduleDescription_;
std::function<void(BranchDescription const&)> callWhenNewProductsRegistered_;

SharedResourcesAcquirer resourcesAcquirer_;
std::mutex mutex_;
};
}
}
Expand Down
7 changes: 6 additions & 1 deletion FWCore/Framework/interface/one/EDFilterBase.h
Expand Up @@ -19,11 +19,13 @@
//

// system include files
#include <mutex>

// user include files
#include "FWCore/Framework/interface/ProducerBase.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

Expand Down Expand Up @@ -99,15 +101,18 @@ namespace edm {
virtual void doEndRunProduce_(Run& rp, EventSetup const& c);
virtual void doBeginLuminosityBlockProduce_(LuminosityBlock& lbp, EventSetup const& c);
virtual void doEndLuminosityBlockProduce_(LuminosityBlock& lbp, EventSetup const& c);


virtual SharedResourcesAcquirer createAcquirer();

void setModuleDescription(ModuleDescription const& md) {
moduleDescription_ = md;
}
ModuleDescription moduleDescription_;
std::vector<BranchID> previousParentage_;
ParentageID previousParentageId_;

SharedResourcesAcquirer resourcesAcquirer_;
std::mutex mutex_;
};

}
Expand Down
5 changes: 5 additions & 0 deletions FWCore/Framework/interface/one/EDProducerBase.h
Expand Up @@ -19,11 +19,13 @@
//

// system include files
#include <mutex>

// user include files
#include "FWCore/Framework/interface/ProducerBase.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

Expand Down Expand Up @@ -99,6 +101,7 @@ namespace edm {
virtual void doBeginLuminosityBlockProduce_(LuminosityBlock& lbp, EventSetup const& c);
virtual void doEndLuminosityBlockProduce_(LuminosityBlock& lbp, EventSetup const& c);

virtual SharedResourcesAcquirer createAcquirer();

void setModuleDescription(ModuleDescription const& md) {
moduleDescription_ = md;
Expand All @@ -107,6 +110,8 @@ namespace edm {
std::vector<BranchID> previousParentage_;
ParentageID previousParentageId_;

SharedResourcesAcquirer resourcesAcquirer_;
std::mutex mutex_;
};

}
Expand Down
11 changes: 9 additions & 2 deletions FWCore/Framework/interface/one/OutputModuleBase.h
Expand Up @@ -23,7 +23,8 @@
#include <string>
#include <vector>
#include <map>

#include <atomic>
#include <mutex>

// user include files
#include "DataFormats/Provenance/interface/BranchChildren.h"
Expand All @@ -39,6 +40,7 @@
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/getAllTriggerNames.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

// forward declarations
Expand Down Expand Up @@ -133,7 +135,7 @@ namespace edm {
private:

int maxEvents_;
int remainingEvents_;
std::atomic<int> remainingEvents_;

// TODO: Give OutputModule
// an interface (protected?) that supplies client code with the
Expand Down Expand Up @@ -175,9 +177,14 @@ namespace edm {

BranchChildren branchChildren_;

SharedResourcesAcquirer resourcesAcquirer_;
std::mutex mutex_;
//------------------------------------------------------------------
// private member functions
//------------------------------------------------------------------

virtual SharedResourcesAcquirer createAcquirer();

void doWriteRun(RunPrincipal const& rp, ModuleCallingContext const*);
void doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleCallingContext const*);
void doOpenFile(FileBlock const& fb);
Expand Down

0 comments on commit 8061806

Please sign in to comment.