Skip to content

Commit

Permalink
Merge pull request #15383 from Dr15Jones/fixRecursiveCall
Browse files Browse the repository at this point in the history
Fixed case of recursive call into NoProcessProductResolver
  • Loading branch information
davidlange6 committed Aug 8, 2016
2 parents 39927b3 + 9dde89c commit c7eb5ea
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 16 deletions.
51 changes: 35 additions & 16 deletions FWCore/Framework/src/ProductResolvers.cc
Expand Up @@ -537,7 +537,8 @@ namespace edm {
ambiguous_(ambiguous),
lastCheckIndex_(ambiguous_.size() + kUnsetOffset),
lastSkipCurrentCheckIndex_(lastCheckIndex_.load()),
prefetchRequested_(false) {
prefetchRequested_(false),
skippingPrefetchRequested_(false) {
assert(ambiguous_.size() == matchingHolders_.size());
}

Expand Down Expand Up @@ -602,16 +603,36 @@ namespace edm {
bool skipCurrentProcess,
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const {

waitingTasks_.add(waitTask);

bool expected = false;
if( prefetchRequested_.compare_exchange_strong(expected,true)) {
if(not skipCurrentProcess) {
waitingTasks_.add(waitTask);

bool expected = false;
if( prefetchRequested_.compare_exchange_strong(expected,true)) {
//we are the first thread to request
tryPrefetchResolverAsync(0, principal, skipCurrentProcess, sra, mcc, ServiceRegistry::instance().presentToken());
}
} else {
skippingWaitingTasks_.add(waitTask);
bool expected = false;
if( skippingPrefetchRequested_.compare_exchange_strong(expected,true)) {
//we are the first thread to request
tryPrefetchResolverAsync(0, principal, skipCurrentProcess, sra, mcc, ServiceRegistry::instance().presentToken());
tryPrefetchResolverAsync(0, principal, skipCurrentProcess, sra, mcc, ServiceRegistry::instance().presentToken());
}
}
}

void NoProcessProductResolver::setCache(bool iSkipCurrentProcess,
ProductResolverIndex iIndex,
std::exception_ptr iExceptPtr) const {
if( not iSkipCurrentProcess) {
lastCheckIndex_ = iIndex;
waitingTasks_.doneWaiting(iExceptPtr);
} else {
lastSkipCurrentCheckIndex_ = iIndex;
skippingWaitingTasks_.doneWaiting(iExceptPtr);
}
}

namespace {
class TryNextResolverWaitingTask : public edm::WaitingTask {
public:
Expand Down Expand Up @@ -666,9 +687,8 @@ namespace edm {
std::exception_ptr iExceptPtr) const {
std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
auto k = lookupProcessOrder[iProcessingIndex];
std::atomic<unsigned int>& updateCacheIndex = iSkipCurrentProcess? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
updateCacheIndex = k;
waitingTasks_.doneWaiting(iExceptPtr);

setCache(iSkipCurrentProcess, k, iExceptPtr);
}


Expand All @@ -681,9 +701,8 @@ namespace edm {
ProductResolverBase const* productResolver = principal.getProductResolverByIndex(matchingHolders_[k]);

if(productResolver->productWasFetchedAndIsValid(iSkipCurrentProcess)) {
std::atomic<unsigned int>& updateCacheIndex = iSkipCurrentProcess? lastSkipCurrentCheckIndex_ : lastCheckIndex_;
updateCacheIndex = k;
waitingTasks_.doneWaiting(nullptr);

setCache(iSkipCurrentProcess, k, nullptr);
return true;
}
return false;
Expand All @@ -699,7 +718,6 @@ namespace edm {
ServiceToken token) const {
std::vector<unsigned int> const& lookupProcessOrder = principal.lookupProcessOrder();
auto index = iProcessingIndex;
std::atomic<unsigned int>& updateCacheIndex = skipCurrentProcess? lastSkipCurrentCheckIndex_ : lastCheckIndex_;

const unsigned int choiceSize = ambiguous_.size();
unsigned int newCacheIndex = choiceSize + kMissingOffset;
Expand Down Expand Up @@ -743,8 +761,7 @@ namespace edm {
++index;
}
//data product unavailable
updateCacheIndex = newCacheIndex;
waitingTasks_.doneWaiting(nullptr);
setCache(skipCurrentProcess, newCacheIndex, nullptr);
}

void NoProcessProductResolver::setProvenance_(ProductProvenanceRetriever const* , ProcessHistory const& , ProductID const& ) {
Expand All @@ -764,7 +781,9 @@ namespace edm {
lastCheckIndex_ = resetValue;
lastSkipCurrentCheckIndex_ = resetValue;
prefetchRequested_ = false;
skippingPrefetchRequested_ = false;
waitingTasks_.reset();
skippingWaitingTasks_.reset();
}

bool NoProcessProductResolver::singleProduct_() const {
Expand Down
4 changes: 4 additions & 0 deletions FWCore/Framework/src/ProductResolvers.h
Expand Up @@ -333,12 +333,16 @@ namespace edm {
SharedResourcesAcquirer* sra,
ModuleCallingContext const* mcc) const;

void setCache(bool skipCurrentProcess, ProductResolverIndex index, std::exception_ptr exceptionPtr) const;

std::vector<ProductResolverIndex> matchingHolders_;
std::vector<bool> ambiguous_;
mutable WaitingTaskList waitingTasks_;
mutable WaitingTaskList skippingWaitingTasks_;
mutable std::atomic<unsigned int> lastCheckIndex_;
mutable std::atomic<unsigned int> lastSkipCurrentCheckIndex_;
mutable std::atomic<bool> prefetchRequested_;
mutable std::atomic<bool> skippingPrefetchRequested_;
};

class SingleChoiceNoProcessProductResolver : public ProductResolverBase {
Expand Down

0 comments on commit c7eb5ea

Please sign in to comment.