Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize calls to EventSetup modules #1036

Merged
merged 12 commits into from Oct 9, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions FWCore/Framework/interface/DataProxy.h
Expand Up @@ -20,6 +20,7 @@
//

// system include files
#include <atomic>

// user include files

Expand All @@ -37,7 +38,7 @@ namespace edm {
virtual ~DataProxy();

// ---------- const member functions ---------------------
bool cacheIsValid() const { return cacheIsValid_; }
bool cacheIsValid() const { return cacheIsValid_.load(std::memory_order_acquire); }

void doGet(EventSetupRecord const& iRecord, DataKey const& iKey, bool iTransiently) const;
void const* get(EventSetupRecord const&, DataKey const& iKey, bool iTransiently) const;
Expand Down Expand Up @@ -85,12 +86,11 @@ namespace edm {
DataProxy(DataProxy const&); // stop default

DataProxy const& operator=(DataProxy const&); // stop default
void setCacheIsValidAndAccessType(bool iTransientAccessOnly) const;

// ---------- member data --------------------------------
mutable void const* cache_;
mutable bool cacheIsValid_;
mutable bool nonTransientAccessRequested_;
mutable void const* cache_; //protected by a global mutex
mutable std::atomic<bool> cacheIsValid_;
mutable std::atomic<bool> nonTransientAccessRequested_;
ComponentDescription const* description_;
};
}
Expand Down
6 changes: 5 additions & 1 deletion FWCore/Framework/interface/EDAnalyzer.h
Expand Up @@ -5,8 +5,10 @@
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"

#include <string>
#include <mutex>

// EDAnalyzer is the base class for all analyzer "modules".

Expand All @@ -25,7 +27,7 @@ namespace edm {
template <typename T> friend class WorkerT;
typedef EDAnalyzer ModuleType;

EDAnalyzer() : moduleDescription_() {}
EDAnalyzer();
virtual ~EDAnalyzer();

std::string workerType() const {return "WorkerT<EDAnalyzer>";}
Expand Down Expand Up @@ -75,6 +77,8 @@ namespace edm {
moduleDescription_ = md;
}
ModuleDescription moduleDescription_;
SharedResourcesAcquirer resourceAcquirer_;
std::mutex mutex_;

std::function<void(BranchDescription const&)> callWhenNewProductsRegistered_;
};
Expand Down
9 changes: 6 additions & 3 deletions FWCore/Framework/interface/EDFilter.h
Expand Up @@ -14,11 +14,14 @@ These products should be informational products about the filter decision.
#include "FWCore/Framework/interface/ProducerBase.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"

#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"

#include <string>
#include <vector>
#include <mutex>

namespace edm {
namespace maker {
Expand All @@ -34,9 +37,7 @@ namespace edm {
template <typename T> friend class WorkerT;
typedef EDFilter ModuleType;

EDFilter() : ProducerBase() , moduleDescription_(),
previousParentage_(), previousParentageId_() {
}
EDFilter();
virtual ~EDFilter();

static void fillDescriptions(ConfigurationDescriptions& descriptions);
Expand Down Expand Up @@ -90,6 +91,8 @@ namespace edm {
}
ModuleDescription moduleDescription_;
std::vector<BranchID> previousParentage_;
SharedResourcesAcquirer resourceAcquirer_;
std::mutex mutex_;
ParentageID previousParentageId_;
};
}
Expand Down
4 changes: 4 additions & 0 deletions FWCore/Framework/interface/EDProducer.h
Expand Up @@ -11,12 +11,14 @@ EDProducts into an Event.

#include "FWCore/Framework/interface/ProducerBase.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

#include <string>
#include <vector>
#include <mutex>

namespace edm {

Expand Down Expand Up @@ -85,6 +87,8 @@ namespace edm {
}
ModuleDescription moduleDescription_;
std::vector<BranchID> previousParentage_;
SharedResourcesAcquirer resourceAcquirer_;
std::mutex mutex_;
ParentageID previousParentageId_;
};
}
Expand Down
12 changes: 12 additions & 0 deletions FWCore/Framework/interface/EventProcessor.h
Expand Up @@ -34,6 +34,8 @@ configured in the user's main() function, and is set running.
#include <set>
#include <string>
#include <vector>
#include <mutex>
#include <exception>

namespace statemachine {
class Machine;
Expand Down Expand Up @@ -230,6 +232,11 @@ namespace edm {

void possiblyContinueAfterForkChildFailure();

friend class StreamProcessingTask;
void processEventsForStreamAsync(unsigned int iStreamIndex,
std::atomic<bool>* finishedProcessingEvents);


//read the next event using Stream iStreamIndex
void readEvent(unsigned int iStreamIndex);

Expand Down Expand Up @@ -262,6 +269,11 @@ namespace edm {
std::unique_ptr<FileBlock> fb_;
boost::shared_ptr<EDLooperBase> looper_;

//The atomic protects concurrent access of deferredExceptionPtr_
std::atomic<bool> deferredExceptionPtrIsSet_;
std::exception_ptr deferredExceptionPtr_;

std::mutex nextTransitionMutex_;
PrincipalCache principalCache_;
bool beginJobCalled_;
bool shouldWeStop_;
Expand Down
9 changes: 8 additions & 1 deletion FWCore/Framework/interface/OutputModule.h
Expand Up @@ -21,12 +21,16 @@ output stream.
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/getAllTriggerNames.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"

#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

#include <array>
#include <string>
#include <vector>
#include <map>
#include <atomic>
#include <mutex>

namespace edm {

Expand Down Expand Up @@ -119,7 +123,7 @@ namespace edm {
private:

int maxEvents_;
int remainingEvents_;
std::atomic<int> remainingEvents_;

// TODO: Give OutputModule
// an interface (protected?) that supplies client code with the
Expand Down Expand Up @@ -161,6 +165,9 @@ namespace edm {

BranchChildren branchChildren_;

SharedResourcesAcquirer resourceAcquirer_;
std::mutex mutex_;

//------------------------------------------------------------------
// private member functions
//------------------------------------------------------------------
Expand Down
69 changes: 69 additions & 0 deletions FWCore/Framework/interface/SharedResourcesAcquirer.h
@@ -0,0 +1,69 @@
#ifndef Subsystem_Package_SharedResourcesAcquirer_h
#define Subsystem_Package_SharedResourcesAcquirer_h
// -*- C++ -*-
//
// Package: Subsystem/Package
// Class : SharedResourcesAcquirer
//
/**\class SharedResourcesAcquirer SharedResourcesAcquirer.h "SharedResourcesAcquirer.h"

Description: Handles acquiring and releasing a group of resources shared between modules

Usage:
<usage>

*/
//
// Original Author: Chris Jones
// Created: Sun, 06 Oct 2013 19:43:26 GMT
//

// system include files

// user include files
#include <vector>
#include <mutex>
#include <memory>

// forward declarations
class testSharedResourcesRegistry;
namespace edm {
class SharedResourcesAcquirer
{
public:
friend class ::testSharedResourcesRegistry;

SharedResourcesAcquirer() = default;
explicit SharedResourcesAcquirer(std::vector<std::recursive_mutex*>&& iResources):
m_resources(iResources){}

SharedResourcesAcquirer(SharedResourcesAcquirer&&) = default;
SharedResourcesAcquirer(const SharedResourcesAcquirer&) = default;
SharedResourcesAcquirer& operator=(const SharedResourcesAcquirer&) = default;

~SharedResourcesAcquirer() = default;

// ---------- member functions ---------------------------
void lock();
void unlock();

///Used by the framework to temporarily unlock a resource in the case where a module is temporarily suspended,
/// e.g. when a Event::getByLabel call launches a Producer via unscheduled processing
template<typename FUNC>
void temporaryUnlock(FUNC iFunc) {
std::shared_ptr<SharedResourcesAcquirer*> guard(this,[](SharedResourcesAcquirer* iThis) {iThis->lock();});
this->unlock();
iFunc();
}

///The number returned may be less than the number of resources requested if a resource is only used by one module and therefore is not being shared.
size_t numberOfResources() const { return m_resources.size();}
private:

// ---------- member data --------------------------------
std::vector<std::recursive_mutex*> m_resources;
};
}


#endif
7 changes: 7 additions & 0 deletions FWCore/Framework/interface/one/EDAnalyzerBase.h
Expand Up @@ -19,10 +19,12 @@
//

// system include files
#include <mutex>

// user include files
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

Expand Down Expand Up @@ -97,11 +99,16 @@ namespace edm {
virtual void doBeginLuminosityBlock_(LuminosityBlock const& lbp, EventSetup const& c);
virtual void doEndLuminosityBlock_(LuminosityBlock const& lbp, EventSetup const& c);

virtual SharedResourcesAcquirer createAcquirer();

void setModuleDescription(ModuleDescription const& md) {
moduleDescription_ = md;
}
ModuleDescription moduleDescription_;
std::function<void(BranchDescription const&)> callWhenNewProductsRegistered_;

SharedResourcesAcquirer resourcesAcquirer_;
std::mutex mutex_;
};
}
}
Expand Down
7 changes: 6 additions & 1 deletion FWCore/Framework/interface/one/EDFilterBase.h
Expand Up @@ -19,11 +19,13 @@
//

// system include files
#include <mutex>

// user include files
#include "FWCore/Framework/interface/ProducerBase.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

Expand Down Expand Up @@ -99,15 +101,18 @@ namespace edm {
virtual void doEndRunProduce_(Run& rp, EventSetup const& c);
virtual void doBeginLuminosityBlockProduce_(LuminosityBlock& lbp, EventSetup const& c);
virtual void doEndLuminosityBlockProduce_(LuminosityBlock& lbp, EventSetup const& c);


virtual SharedResourcesAcquirer createAcquirer();

void setModuleDescription(ModuleDescription const& md) {
moduleDescription_ = md;
}
ModuleDescription moduleDescription_;
std::vector<BranchID> previousParentage_;
ParentageID previousParentageId_;

SharedResourcesAcquirer resourcesAcquirer_;
std::mutex mutex_;
};

}
Expand Down
5 changes: 5 additions & 0 deletions FWCore/Framework/interface/one/EDProducerBase.h
Expand Up @@ -19,11 +19,13 @@
//

// system include files
#include <mutex>

// user include files
#include "FWCore/Framework/interface/ProducerBase.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

Expand Down Expand Up @@ -99,6 +101,7 @@ namespace edm {
virtual void doBeginLuminosityBlockProduce_(LuminosityBlock& lbp, EventSetup const& c);
virtual void doEndLuminosityBlockProduce_(LuminosityBlock& lbp, EventSetup const& c);

virtual SharedResourcesAcquirer createAcquirer();

void setModuleDescription(ModuleDescription const& md) {
moduleDescription_ = md;
Expand All @@ -107,6 +110,8 @@ namespace edm {
std::vector<BranchID> previousParentage_;
ParentageID previousParentageId_;

SharedResourcesAcquirer resourcesAcquirer_;
std::mutex mutex_;
};

}
Expand Down
11 changes: 9 additions & 2 deletions FWCore/Framework/interface/one/OutputModuleBase.h
Expand Up @@ -23,7 +23,8 @@
#include <string>
#include <vector>
#include <map>

#include <atomic>
#include <mutex>

// user include files
#include "DataFormats/Provenance/interface/BranchChildren.h"
Expand All @@ -39,6 +40,7 @@
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/getAllTriggerNames.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"

// forward declarations
Expand Down Expand Up @@ -133,7 +135,7 @@ namespace edm {
private:

int maxEvents_;
int remainingEvents_;
std::atomic<int> remainingEvents_;

// TODO: Give OutputModule
// an interface (protected?) that supplies client code with the
Expand Down Expand Up @@ -175,9 +177,14 @@ namespace edm {

BranchChildren branchChildren_;

SharedResourcesAcquirer resourcesAcquirer_;
std::mutex mutex_;
//------------------------------------------------------------------
// private member functions
//------------------------------------------------------------------

virtual SharedResourcesAcquirer createAcquirer();

void doWriteRun(RunPrincipal const& rp, ModuleCallingContext const*);
void doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleCallingContext const*);
void doOpenFile(FileBlock const& fb);
Expand Down