Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefetch data products from Event concurrently #15433

Merged
merged 22 commits into from Aug 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5529a76
Legacy and one modules must prefetch mayConsumes
Dr15Jones Mar 29, 2016
e249165
Always prefetch
Dr15Jones Mar 29, 2016
6a6247e
Removed unnecessary functions
Dr15Jones Apr 27, 2016
9bc1e5d
Make it clear which related test failed
Dr15Jones May 4, 2016
dbfa6e2
Add WaitingTaskList to InputProductResolver
Dr15Jones May 4, 2016
7314e11
Refactoring needed to allow prefetching data asynchronously
Dr15Jones May 4, 2016
2b4034a
Moved exceptionContext into Worker
Dr15Jones May 4, 2016
398a096
Removed return value argument from ProductResolver::resolveProduct call
Dr15Jones May 16, 2016
bf5a5c9
Cache decisions from NoProcessProductResolver
Dr15Jones May 17, 2016
a7d30d9
Begin transition to using SerialTaskQueues instead of mutexes
Dr15Jones Jul 14, 2016
903e648
First crude implementation of asynchronous prefetching
Dr15Jones Jul 14, 2016
542b3d8
Make UnscheduledProductResolver::prefetchAsync actually asynchronous
Dr15Jones Jul 15, 2016
1fffaad
Added BusyWaitIntProducer for testing
Dr15Jones Jul 15, 2016
4b85d2b
Updated unit test output for unscheduled changes
Dr15Jones Jul 25, 2016
741fda1
Set worker state in a thread safe manner
Dr15Jones Jul 26, 2016
834dd9c
Added Worker::shouldRethrowException function
Dr15Jones Jul 28, 2016
f60dc14
Separate prefetching and module running into different tasks
Dr15Jones Aug 2, 2016
0e996f1
Added prefetching signals to ActivityRegistry
Dr15Jones Aug 3, 2016
854196c
Enable Service system within TBB tasks
Dr15Jones Aug 5, 2016
3203550
Make ROOT aware of all TBB threads
Dr15Jones Aug 6, 2016
043b0e3
Fixed case of recursive call into NoProcessProductResolver
Dr15Jones Aug 8, 2016
516d54a
Move CMS_THREAD_GUARD to proper line
Dr15Jones Aug 10, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions FWCore/Framework/BuildFile.xml
@@ -1,6 +1,7 @@
<use name="DataFormats/Common"/>
<use name="DataFormats/Provenance"/>
<use name="FWCore/Common"/>
<use name="FWCore/Concurrency"/>
<use name="FWCore/MessageLogger"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/PluginManager"/>
Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/interface/EDConsumerBase.h
Expand Up @@ -78,7 +78,8 @@ namespace edm {

// ---------- member functions ---------------------------
void updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const&);
ProductResolverIndexHelper const&,
bool iPrefetchMayGet);

typedef ProductLabels Labels;
void labelsForToken(EDGetToken iToken, Labels& oLabels) const;
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/ModuleContextSentry.h
Expand Up @@ -13,7 +13,7 @@ namespace edm {
ModuleContextSentry(ModuleCallingContext* moduleCallingContext,
ParentContext const& parentContext) :
moduleCallingContext_(moduleCallingContext) {
moduleCallingContext_->setContext(ModuleCallingContext::State::kPrefetching, parentContext,
moduleCallingContext_->setContext(ModuleCallingContext::State::kRunning, parentContext,
CurrentModuleOnThread::getCurrentModuleOnThread());
CurrentModuleOnThread::setCurrentModuleOnThread(moduleCallingContext_);
}
Expand Down
6 changes: 0 additions & 6 deletions FWCore/Framework/interface/OccurrenceTraits.h
Expand Up @@ -56,12 +56,6 @@ namespace edm {
static void postPathSignal(ActivityRegistry *a, HLTPathStatus const& status, PathContext const* pathContext) {
a->postPathEventSignal_(*pathContext->streamContext(), *pathContext, status);
}
static void preModuleSignal(ActivityRegistry *a, StreamContext const* streamContext, ModuleCallingContext const* moduleCallingContext) {
a->preModuleEventSignal_(*streamContext, *moduleCallingContext);
}
static void postModuleSignal(ActivityRegistry *a, StreamContext const* streamContext, ModuleCallingContext const* moduleCallingContext) {
a->postModuleEventSignal_(*streamContext, *moduleCallingContext);
}
};

template <>
Expand Down
8 changes: 5 additions & 3 deletions FWCore/Framework/interface/Principal.h
Expand Up @@ -47,6 +47,7 @@ namespace edm {
class EDConsumerBase;
class SharedResourcesAcquirer;
class InputProductResolver;
class WaitingTask;

struct FilledProductPtr {
bool operator()(propagate_const<std::shared_ptr<ProductResolverBase>> const& iObj) { return bool(iObj);}
Expand Down Expand Up @@ -120,9 +121,10 @@ namespace edm {
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const;

void prefetch(ProductResolverIndex index,
bool skipCurrentProcess,
ModuleCallingContext const* mcc) const;
void prefetchAsync(WaitingTask* waitTask,
ProductResolverIndex index,
bool skipCurrentProcess,
ModuleCallingContext const* mcc) const;

void getManyByType(TypeID const& typeID,
BasicHandleVec& results,
Expand Down
59 changes: 46 additions & 13 deletions FWCore/Framework/interface/ProductResolverBase.h
Expand Up @@ -27,24 +27,48 @@ namespace edm {
class SharedResourcesAcquirer;
class Principal;
class UnscheduledConfigurator;
class WaitingTask;

class ProductResolverBase {
public:

enum ResolveStatus { ProductFound, ProductNotFound, Ambiguous };

class Resolution {
public:
static std::uintptr_t constexpr kAmbiguityValue = 0x1;
static std::uintptr_t constexpr kAmbiguityMask = std::numeric_limits<std::uintptr_t>::max() ^ kAmbiguityValue;
explicit Resolution( ProductData const* iData):
m_data(iData) {}

bool isAmbiguous() const { return reinterpret_cast<std::uintptr_t>(m_data) == kAmbiguityValue; }

ProductData const* data() const { return reinterpret_cast<ProductData const*>(kAmbiguityMask & reinterpret_cast<std::uintptr_t>(m_data)); }

static Resolution makeAmbiguous() { return Resolution(reinterpret_cast<ProductData const*>(kAmbiguityValue)); }
private:
ProductData const* m_data;
};

ProductResolverBase();
virtual ~ProductResolverBase();

ProductResolverBase(ProductResolverBase const&) = delete; // Disallow copying and moving
ProductResolverBase& operator=(ProductResolverBase const&) = delete; // Disallow copying and moving

ProductData const* resolveProduct(ResolveStatus& resolveStatus,
Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
return resolveProduct_(resolveStatus, principal, skipCurrentProcess, sra, mcc);
Resolution resolveProduct(Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
return resolveProduct_( principal, skipCurrentProcess, sra, mcc);
}

/** oDataFetchedIsValid is allowed to be nullptr in which case no value will be assigned
*/
void prefetchAsync(WaitingTask* waitTask,
Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {
return prefetchAsync_(waitTask, principal, skipCurrentProcess, sra, mcc);
}

void resetProductData() { resetProductData_(false); }
Expand All @@ -68,6 +92,8 @@ namespace edm {

// Product was deleted early in order to save memory
bool productWasDeleted() const {return productWasDeleted_();}

bool productWasFetchedAndIsValid(bool iSkipCurrentProcess) const { return productWasFetchedAndIsValid_(iSkipCurrentProcess); }

// Retrieves pointer to the per event(lumi)(run) provenance.
ProductProvenance const* productProvenancePtr() const { return productProvenancePtr_(); }
Expand Down Expand Up @@ -132,15 +158,22 @@ namespace edm {
virtual void setupUnscheduled(UnscheduledConfigurator const&);

private:
virtual ProductData const* resolveProduct_(ResolveStatus& resolveStatus,
Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const = 0;
virtual Resolution resolveProduct_(Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const = 0;
virtual void prefetchAsync_(WaitingTask* waitTask,
Principal const& principal,
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const = 0;

virtual bool unscheduledWasNotRun_() const = 0;
virtual bool productUnavailable_() const = 0;
virtual bool productResolved_() const = 0;
virtual bool productWasDeleted_() const = 0;
virtual bool productWasFetchedAndIsValid_(bool iSkipCurrentProcess) const = 0;

virtual void putProduct_(std::unique_ptr<WrapperBase> edp) const = 0;
virtual void putOrMergeProduct_(std::unique_ptr<WrapperBase> edp) const = 0;
virtual BranchDescription const& branchDescription_() const = 0;
Expand Down
11 changes: 9 additions & 2 deletions FWCore/Framework/interface/SharedResourcesAcquirer.h
Expand Up @@ -27,15 +27,19 @@

// forward declarations
class testSharedResourcesRegistry;

namespace edm {
class SerialTaskQueue;

class SharedResourcesAcquirer
{
public:
friend class ::testSharedResourcesRegistry;

SharedResourcesAcquirer() = default;
explicit SharedResourcesAcquirer(std::vector<std::recursive_mutex*>&& iResources):
m_resources(iResources){}
explicit SharedResourcesAcquirer(std::vector<std::recursive_mutex*>&& iResources, std::shared_ptr<SerialTaskQueue> iQueue = std::shared_ptr<SerialTaskQueue>()):
m_resources(iResources),
m_queue(iQueue){}

SharedResourcesAcquirer(SharedResourcesAcquirer&&) = default;
SharedResourcesAcquirer(const SharedResourcesAcquirer&) = default;
Expand All @@ -59,10 +63,13 @@ namespace edm {

///The number returned may be less than the number of resources requested if a resource is only used by one module and therefore is not being shared.
size_t numberOfResources() const { return m_resources.size();}

SerialTaskQueue* serialQueue() const { return m_queue.get(); }
private:

// ---------- member data --------------------------------
std::vector<std::recursive_mutex*> m_resources;
std::shared_ptr<SerialTaskQueue> m_queue;
};
}

Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h
Expand Up @@ -88,7 +88,8 @@ namespace edm {
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const;

void updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const&);
ProductResolverIndexHelper const&,
bool iPrefetchMayGet);

const EDConsumerBase* consumer() const;

Expand Down
Expand Up @@ -80,7 +80,8 @@ namespace edm {
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFromEvent() const;

void updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const&);
ProductResolverIndexHelper const&,
bool iPrefetchMayGet);

void modulesDependentUpon(std::string const& iProcessName,
std::string const& iModuleLabel,
Expand Down
6 changes: 5 additions & 1 deletion FWCore/Framework/src/EDConsumerBase.cc
Expand Up @@ -135,7 +135,8 @@ EDConsumerBase::recordConsumes(BranchType iBranch, TypeToGet const& iType, edm::

void
EDConsumerBase::updateLookup(BranchType iBranchType,
ProductResolverIndexHelper const& iHelper)
ProductResolverIndexHelper const& iHelper,
bool iPrefetchMayGet)
{
frozen_ = true;
{
Expand Down Expand Up @@ -188,6 +189,9 @@ EDConsumerBase::updateLookup(BranchType iBranchType,

if(iBranchType == InEvent) {
itemsToGet(iBranchType, itemsToGetFromEvent_);
if(iPrefetchMayGet) {
itemsMayGet(iBranchType, itemsToGetFromEvent_);
}
}
}

Expand Down
7 changes: 3 additions & 4 deletions FWCore/Framework/src/EventPrincipal.cc
Expand Up @@ -280,9 +280,9 @@ namespace edm {
return whyFailed;
}));
}
ProductResolverBase::ResolveStatus status;
auto data = phb->resolveProduct(status,*this,false,nullptr,nullptr);
auto resolution = phb->resolveProduct(*this,false,nullptr,nullptr);

auto data = resolution.data();
if(data) {
return BasicHandle(data->wrapper(), &(data->provenance()));
}
Expand Down Expand Up @@ -433,8 +433,7 @@ namespace edm {
<< "EventPrincipal::getThinnedAssociation, ThinnedAssociation ProductResolver cannot be found\n"
<< "This should never happen. Contact a Framework developer";
}
ProductResolverBase::ResolveStatus status;
ProductData const* productData = phb->resolveProduct(status,*this,false,nullptr,nullptr);
ProductData const* productData = (phb->resolveProduct(*this,false,nullptr,nullptr)).data();
if (productData == nullptr) {
return nullptr;
}
Expand Down
34 changes: 15 additions & 19 deletions FWCore/Framework/src/Principal.cc
Expand Up @@ -492,26 +492,26 @@ namespace edm {
assert(index !=ProductResolverIndexInvalid);
auto& productResolver = productResolvers_[index];
assert(0!=productResolver.get());
ProductResolverBase::ResolveStatus resolveStatus;
ProductData const* productData = productResolver->resolveProduct(resolveStatus, *this, skipCurrentProcess, sra, mcc);
if(resolveStatus == ProductResolverBase::Ambiguous) {
auto resolution = productResolver->resolveProduct(*this, skipCurrentProcess, sra, mcc);
if(resolution.isAmbiguous()) {
ambiguous = true;
return BasicHandle();
}
if(productData == 0) {
auto productData = resolution.data();
if(productData == nullptr) {
return BasicHandle();
}
return BasicHandle(productData->wrapper(), &(productData->provenance()));
}

void
Principal::prefetch(ProductResolverIndex index,
Principal::prefetchAsync(WaitingTask * task,
ProductResolverIndex index,
bool skipCurrentProcess,
ModuleCallingContext const* mcc) const {
auto const& productResolver = productResolvers_.at(index);
assert(0!=productResolver.get());
ProductResolverBase::ResolveStatus resolveStatus;
productResolver->resolveProduct(resolveStatus, *this,skipCurrentProcess, nullptr, mcc);
productResolver->prefetchAsync(task,*this, skipCurrentProcess,nullptr,mcc);
}

void
Expand Down Expand Up @@ -605,8 +605,7 @@ namespace edm {
continue;
}

ProductResolverBase::ResolveStatus resolveStatus;
ProductData const* productData = productResolver->resolveProduct(resolveStatus, *this,false, sra, mcc);
ProductData const* productData = productResolver->resolveProduct(*this,false, sra, mcc).data();
if(productData) {
// Skip product if not available.
results.emplace_back(productData->wrapper(), &(productData->provenance()));
Expand Down Expand Up @@ -655,12 +654,11 @@ namespace edm {

auto const& productResolver = productResolvers_[index];

ProductResolverBase::ResolveStatus resolveStatus;
ProductData const* productData = productResolver->resolveProduct(resolveStatus, *this, skipCurrentProcess, sra, mcc);
if(resolveStatus == ProductResolverBase::Ambiguous) {
auto resolution = productResolver->resolveProduct(*this, skipCurrentProcess, sra, mcc);
if(resolution.isAmbiguous()) {
throwAmbiguousException("findProductByLabel", typeID, inputTag.label(), inputTag.instance(), inputTag.process());
}
return productData;
return resolution.data();
}

ProductData const*
Expand Down Expand Up @@ -691,12 +689,11 @@ namespace edm {

auto const& productResolver = productResolvers_[index];

ProductResolverBase::ResolveStatus resolveStatus;
ProductData const* productData = productResolver->resolveProduct(resolveStatus, *this, false, sra, mcc);
if(resolveStatus == ProductResolverBase::Ambiguous) {
auto resolution = productResolver->resolveProduct(*this, false, sra, mcc);
if(resolution.isAmbiguous()) {
throwAmbiguousException("findProductByLabel", typeID, label, instance, process);
}
return productData;
return resolution.data();
}

ProductData const*
Expand All @@ -720,8 +717,7 @@ namespace edm {
}

if(phb->unscheduledWasNotRun()) {
ProductResolverBase::ResolveStatus status;
if(not phb->resolveProduct(status,*this,false, nullptr, mcc) ) {
if(not phb->resolveProduct(*this,false, nullptr, mcc).data() ) {
throwProductNotFoundException("getProvenance(onDemand)", errors::ProductNotFound, bid);
}
}
Expand Down