Skip to content

Commit

Permalink
Merge pull request #15433 from Dr15Jones/prefetchConcurrently
Browse files Browse the repository at this point in the history
Prefetch data products from Event concurrently
  • Loading branch information
davidlange6 committed Aug 11, 2016
2 parents 7e21ba2 + 516d54a commit 4740b57
Show file tree
Hide file tree
Showing 34 changed files with 3,200 additions and 414 deletions.
1 change: 1 addition & 0 deletions FWCore/Framework/BuildFile.xml
@@ -1,6 +1,7 @@
<use name="DataFormats/Common"/>
<use name="DataFormats/Provenance"/>
<use name="FWCore/Common"/>
<use name="FWCore/Concurrency"/>
<use name="FWCore/MessageLogger"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/PluginManager"/>
Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/interface/EDConsumerBase.h
Expand Up @@ -78,7 +78,8 @@ namespace edm {

// ---------- member functions ---------------------------
void updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const&);
ProductResolverIndexHelper const&,
bool iPrefetchMayGet);

typedef ProductLabels Labels;
void labelsForToken(EDGetToken iToken, Labels& oLabels) const;
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/ModuleContextSentry.h
Expand Up @@ -13,7 +13,7 @@ namespace edm {
ModuleContextSentry(ModuleCallingContext* moduleCallingContext,
ParentContext const& parentContext) :
moduleCallingContext_(moduleCallingContext) {
moduleCallingContext_->setContext(ModuleCallingContext::State::kPrefetching, parentContext,
moduleCallingContext_->setContext(ModuleCallingContext::State::kRunning, parentContext,
CurrentModuleOnThread::getCurrentModuleOnThread());
CurrentModuleOnThread::setCurrentModuleOnThread(moduleCallingContext_);
}
Expand Down
6 changes: 0 additions & 6 deletions FWCore/Framework/interface/OccurrenceTraits.h
Expand Up @@ -56,12 +56,6 @@ namespace edm {
static void postPathSignal(ActivityRegistry *a, HLTPathStatus const& status, PathContext const* pathContext) {
a->postPathEventSignal_(*pathContext->streamContext(), *pathContext, status);
}
static void preModuleSignal(ActivityRegistry *a, StreamContext const* streamContext, ModuleCallingContext const* moduleCallingContext) {
a->preModuleEventSignal_(*streamContext, *moduleCallingContext);
}
static void postModuleSignal(ActivityRegistry *a, StreamContext const* streamContext, ModuleCallingContext const* moduleCallingContext) {
a->postModuleEventSignal_(*streamContext, *moduleCallingContext);
}
};

template <>
Expand Down
8 changes: 5 additions & 3 deletions FWCore/Framework/interface/Principal.h
Expand Up @@ -47,6 +47,7 @@ namespace edm {
class EDConsumerBase;
class SharedResourcesAcquirer;
class InputProductResolver;
class WaitingTask;

struct FilledProductPtr {
bool operator()(propagate_const<std::shared_ptr<ProductResolverBase>> const& iObj) { return bool(iObj);}
Expand Down Expand Up @@ -120,9 +121,10 @@ namespace edm {
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const;

void prefetch(ProductResolverIndex index,
bool skipCurrentProcess,
ModuleCallingContext const* mcc) const;
void prefetchAsync(WaitingTask* waitTask,
ProductResolverIndex index,
bool skipCurrentProcess,
ModuleCallingContext const* mcc) const;

void getManyByType(TypeID const& typeID,
BasicHandleVec& results,
Expand Down
59 changes: 46 additions & 13 deletions FWCore/Framework/interface/ProductResolverBase.h
Expand Up @@ -27,24 +27,48 @@ namespace edm {
class SharedResourcesAcquirer;
class Principal;
class UnscheduledConfigurator;
class WaitingTask;

class ProductResolverBase {
public:

enum ResolveStatus { ProductFound, ProductNotFound, Ambiguous };

class Resolution {
public:
static std::uintptr_t constexpr kAmbiguityValue = 0x1;
static std::uintptr_t constexpr kAmbiguityMask = std::numeric_limits<std::uintptr_t>::max() ^ kAmbiguityValue;
explicit Resolution( ProductData const* iData):
m_data(iData) {}

bool isAmbiguous() const { return reinterpret_cast<std::uintptr_t>(m_data) == kAmbiguityValue; }

ProductData const* data() const { return reinterpret_cast<ProductData const*>(kAmbiguityMask & reinterpret_cast<std::uintptr_t>(m_data)); }

static Resolution makeAmbiguous() { return Resolution(reinterpret_cast<ProductData const*>(kAmbiguityValue)); }
private:
ProductData const* m_data;
};

ProductResolverBase();
virtual ~ProductResolverBase();

ProductResolverBase(ProductResolverBase const&) = delete; // Disallow copying and moving
ProductResolverBase& operator=(ProductResolverBase const&) = delete; // Disallow copying and moving

ProductData const* resolveProduct(ResolveStatus& resolveStatus,
Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
return resolveProduct_(resolveStatus, principal, skipCurrentProcess, sra, mcc);
Resolution resolveProduct(Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
return resolveProduct_( principal, skipCurrentProcess, sra, mcc);
}

/** oDataFetchedIsValid is allowed to be nullptr in which case no value will be assigned
*/
void prefetchAsync(WaitingTask* waitTask,
Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
return prefetchAsync_(waitTask, principal, skipCurrentProcess, sra, mcc);
}

void resetProductData() { resetProductData_(false); }
Expand All @@ -68,6 +92,8 @@ namespace edm {

// Product was deleted early in order to save memory
bool productWasDeleted() const {return productWasDeleted_();}

bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const { return productWasFetchedAndIsValid_(iSkipCurrentProcess); }

// Retrieves pointer to the per event(lumi)(run) provenance.
ProductProvenance const* productProvenancePtr() const { return productProvenancePtr_(); }
Expand Down Expand Up @@ -132,15 +158,22 @@ namespace edm {
virtual void setupUnscheduled(UnscheduledConfigurator const&);

private:
virtual ProductData const* resolveProduct_(ResolveStatus& resolveStatus,
Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const = 0;
virtual Resolution resolveProduct_(Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const = 0;
virtual void prefetchAsync_(WaitingTask* waitTask,
Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const = 0;

virtual bool unscheduledWasNotRun_() const = 0;
virtual bool productUnavailable_() const = 0;
virtual bool productResolved_() const = 0;
virtual bool productWasDeleted_() const = 0;
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const = 0;

virtual void putProduct_(std::unique_ptr<WrapperBase> edp) const = 0;
virtual void putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const = 0;
virtual BranchDescription const& branchDescription_() const = 0;
Expand Down
11 changes: 9 additions & 2 deletions FWCore/Framework/interface/SharedResourcesAcquirer.h
Expand Up @@ -27,15 +27,19 @@

// forward declarations
class testSharedResourcesRegistry;

namespace edm {
class SerialTaskQueue;

class SharedResourcesAcquirer
{
public:
friend class ::testSharedResourcesRegistry;

SharedResourcesAcquirer() = default;
explicit SharedResourcesAcquirer(std::vector<std::recursive_mutex*>&& iResources):
m_resources(iResources){}
explicit SharedResourcesAcquirer(std::vector<std::recursive_mutex*>&& iResources, std::shared_ptr<SerialTaskQueue> iQueue = std::shared_ptr<SerialTaskQueue>()):
m_resources(iResources),
m_queue(iQueue){}

SharedResourcesAcquirer(SharedResourcesAcquirer&&) = default;
SharedResourcesAcquirer(const SharedResourcesAcquirer&) = default;
Expand All @@ -59,10 +63,13 @@ namespace edm {

///The number returned may be less than the number of resources requested if a resource is only used by one module and therefore is not being shared.
size_t numberOfResources() const { return m_resources.size();}

SerialTaskQueue* serialQueue() const { return m_queue.get(); }
private:

// ---------- member data --------------------------------
std::vector<std::recursive_mutex*> m_resources;
std::shared_ptr<SerialTaskQueue> m_queue;
};
}

Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h
Expand Up @@ -88,7 +88,8 @@ namespace edm {
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const;

void updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const&);
ProductResolverIndexHelper const&,
bool iPrefetchMayGet);

const EDConsumerBase* consumer() const;

Expand Down
Expand Up @@ -80,7 +80,8 @@ namespace edm {
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const;

void updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const&);
ProductResolverIndexHelper const&,
bool iPrefetchMayGet);

void modulesDependentUpon(std::string const& iProcessName,
std::string const& iModuleLabel,
Expand Down
6 changes: 5 additions & 1 deletion FWCore/Framework/src/EDConsumerBase.cc
Expand Up @@ -135,7 +135,8 @@ EDConsumerBase::recordConsumes(BranchType iBranch, TypeToGet const& iType, edm::

void
EDConsumerBase::updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const& iHelper)
ProductResolverIndexHelper const& iHelper,
bool iPrefetchMayGet)
{
frozen_ = true;
{
Expand Down Expand Up @@ -188,6 +189,9 @@ EDConsumerBase::updateLookup(BranchType iBranchType,

if(iBranchType == InEvent) {
itemsToGet(iBranchType, itemsToGetFromEvent_);
if(iPrefetchMayGet) {
itemsMayGet(iBranchType, itemsToGetFromEvent_);
}
}
}

Expand Down
7 changes: 3 additions & 4 deletions FWCore/Framework/src/EventPrincipal.cc
Expand Up @@ -280,9 +280,9 @@ namespace edm {
return whyFailed;
}));
}
ProductResolverBase::ResolveStatus status;
auto data = phb->resolveProduct(status,*this,false,nullptr,nullptr);
auto resolution = phb->resolveProduct(*this,false,nullptr,nullptr);

auto data = resolution.data();
if(data) {
return BasicHandle(data->wrapper(), &(data->provenance()));
}
Expand Down Expand Up @@ -433,8 +433,7 @@ namespace edm {
<< "EventPrincipal::getThinnedAssociation, ThinnedAssociation ProductResolver cannot be found\n"
<< "This should never happen. Contact a Framework developer";
}
ProductResolverBase::ResolveStatus status;
ProductData const* productData = phb->resolveProduct(status,*this,false,nullptr,nullptr);
ProductData const* productData = (phb->resolveProduct(*this,false,nullptr,nullptr)).data();
if (productData == nullptr) {
return nullptr;
}
Expand Down
34 changes: 15 additions & 19 deletions FWCore/Framework/src/Principal.cc
Expand Up @@ -492,26 +492,26 @@ namespace edm {
assert(index !=ProductResolverIndexInvalid);
auto& productResolver = productResolvers_[index];
assert(0!=productResolver.get());
ProductResolverBase::ResolveStatus resolveStatus;
ProductData const* productData = productResolver->resolveProduct(resolveStatus, *this, skipCurrentProcess, sra, mcc);
if(resolveStatus == ProductResolverBase::Ambiguous) {
auto resolution = productResolver->resolveProduct(*this, skipCurrentProcess, sra, mcc);
if(resolution.isAmbiguous()) {
ambiguous = true;
return BasicHandle();
}
if(productData == 0) {
auto productData = resolution.data();
if(productData == nullptr) {
return BasicHandle();
}
return BasicHandle(productData->wrapper(), &(productData->provenance()));
}

void
Principal::prefetch(ProductResolverIndex index,
Principal::prefetchAsync(WaitingTask * task,
ProductResolverIndex index,
bool skipCurrentProcess,
ModuleCallingContext const* mcc) const {
auto const& productResolver = productResolvers_.at(index);
assert(0!=productResolver.get());
ProductResolverBase::ResolveStatus resolveStatus;
productResolver->resolveProduct(resolveStatus, *this,skipCurrentProcess, nullptr, mcc);
productResolver->prefetchAsync(task,*this, skipCurrentProcess,nullptr,mcc);
}

void
Expand Down Expand Up @@ -605,8 +605,7 @@ namespace edm {
continue;
}

ProductResolverBase::ResolveStatus resolveStatus;
ProductData const* productData = productResolver->resolveProduct(resolveStatus, *this,false, sra, mcc);
ProductData const* productData = productResolver->resolveProduct(*this,false, sra, mcc).data();
if(productData) {
// Skip product if not available.
results.emplace_back(productData->wrapper(), &(productData->provenance()));
Expand Down Expand Up @@ -655,12 +654,11 @@ namespace edm {

auto const& productResolver = productResolvers_[index];

ProductResolverBase::ResolveStatus resolveStatus;
ProductData const* productData = productResolver->resolveProduct(resolveStatus, *this, skipCurrentProcess, sra, mcc);
if(resolveStatus == ProductResolverBase::Ambiguous) {
auto resolution = productResolver->resolveProduct(*this, skipCurrentProcess, sra, mcc);
if(resolution.isAmbiguous()) {
throwAmbiguousException("findProductByLabel", typeID, inputTag.label(), inputTag.instance(), inputTag.process());
}
return productData;
return resolution.data();
}

ProductData const*
Expand Down Expand Up @@ -691,12 +689,11 @@ namespace edm {

auto const& productResolver = productResolvers_[index];

ProductResolverBase::ResolveStatus resolveStatus;
ProductData const* productData = productResolver->resolveProduct(resolveStatus, *this, false, sra, mcc);
if(resolveStatus == ProductResolverBase::Ambiguous) {
auto resolution = productResolver->resolveProduct(*this, false, sra, mcc);
if(resolution.isAmbiguous()) {
throwAmbiguousException("findProductByLabel", typeID, label, instance, process);
}
return productData;
return resolution.data();
}

ProductData const*
Expand All @@ -720,8 +717,7 @@ namespace edm {
}

if(phb->unscheduledWasNotRun()) {
ProductResolverBase::ResolveStatus status;
if(not phb->resolveProduct(status,*this,false, nullptr, mcc) ) {
if(not phb->resolveProduct(*this,false, nullptr, mcc).data() ) {
throwProductNotFoundException("getProvenance(onDemand)", errors::ProductNotFound, bid);
}
}
Expand Down

0 comments on commit 4740b57

Please sign in to comment.