Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ServiceWeakToken with asynchronous tasks #33330

Merged
merged 5 commits into from Apr 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 17 additions & 12 deletions FWCore/Framework/interface/Callback.h
Expand Up @@ -83,30 +83,34 @@ namespace edm {
iRecord->activityRegistry()->preESModulePrefetchingSignal_.emit(iRecord->key(), callingContext_);
if UNLIKELY (producer_->hasMayConsumes()) {
//after prefetching need to do the mayGet
ServiceWeakToken weakToken = token;
auto mayGetTask = edm::make_waiting_task(
[this, iRecord, iEventSetupImpl, token, group](std::exception_ptr const* iExcept) {
[this, iRecord, iEventSetupImpl, weakToken, group](std::exception_ptr const* iExcept) {
if (iExcept) {
runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, token);
runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, weakToken.lock());
return;
}
if (handleMayGet(iRecord, iEventSetupImpl)) {
auto runTask = edm::make_waiting_task(
[this, group, iRecord, iEventSetupImpl, token](std::exception_ptr const* iExcept) {
runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, token);
[this, group, iRecord, iEventSetupImpl, weakToken](std::exception_ptr const* iExcept) {
runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, weakToken.lock());
});
prefetchNeededDataAsync(
WaitingTaskHolder(*group, runTask), iEventSetupImpl, &((*postMayGetProxies_).front()), token);
prefetchNeededDataAsync(WaitingTaskHolder(*group, runTask),
iEventSetupImpl,
&((*postMayGetProxies_).front()),
weakToken.lock());
} else {
runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, token);
runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, weakToken.lock());
}
});

//Get everything we can before knowing about the mayGets
prefetchNeededDataAsync(WaitingTaskHolder(*group, mayGetTask), iEventSetupImpl, getTokenIndices(), token);
} else {
ServiceWeakToken weakToken = token;
auto task = edm::make_waiting_task(
[this, group, iRecord, iEventSetupImpl, token](std::exception_ptr const* iExcept) {
runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, token);
[this, group, iRecord, iEventSetupImpl, weakToken](std::exception_ptr const* iExcept) {
runProducerAsync(group, iExcept, iRecord, iEventSetupImpl, weakToken.lock());
});
prefetchNeededDataAsync(WaitingTaskHolder(*group, task), iEventSetupImpl, getTokenIndices(), token);
}
Expand Down Expand Up @@ -176,19 +180,20 @@ namespace edm {
return;
}
iRecord->activityRegistry()->postESModulePrefetchingSignal_.emit(iRecord->key(), callingContext_);
producer_->queue().push(*iGroup, [this, iRecord, iEventSetupImpl, token]() {
ServiceWeakToken weakToken = token;
producer_->queue().push(*iGroup, [this, iRecord, iEventSetupImpl, weakToken]() {
callingContext_.setState(ESModuleCallingContext::State::kRunning);
std::exception_ptr exceptPtr;
try {
convertException::wrap([this, iRecord, iEventSetupImpl, token] {
convertException::wrap([this, iRecord, iEventSetupImpl, weakToken] {
auto proxies = getTokenIndices();
if (postMayGetProxies_) {
proxies = &((*postMayGetProxies_).front());
}
TRecord rec;
edm::ESParentContext pc{&callingContext_};
rec.setImpl(iRecord, transitionID(), proxies, iEventSetupImpl, &pc, true);
ServiceRegistry::Operate operate(token);
ServiceRegistry::Operate operate(weakToken.lock());
iRecord->activityRegistry()->preESModuleSignal_.emit(iRecord->key(), callingContext_);
struct EndGuard {
EndGuard(EventSetupRecordImpl const* iRecord, ESModuleCallingContext const& iContext)
Expand Down
9 changes: 5 additions & 4 deletions FWCore/Framework/src/GlobalSchedule.h
Expand Up @@ -175,8 +175,9 @@ namespace edm {
T::preScheduleSignal(actReg_.get(), globalContext.get());
}

ServiceWeakToken weakToken = token;
auto doneTask = make_waiting_task(
[this, iHolder, cleaningUpAfterException, globalContext, token](std::exception_ptr const* iPtr) mutable {
[this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
Expand All @@ -189,19 +190,19 @@ namespace edm {
if (ex.context().empty()) {
ost << "Processing " << T::transitionName() << " ";
}
ServiceRegistry::Operate op(token);
ServiceRegistry::Operate op(weakToken.lock());
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}
if (actReg_) {
ServiceRegistry::Operate op(token);
ServiceRegistry::Operate op(weakToken.lock());
actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
}
}
if (actReg_) {
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
ServiceRegistry::Operate op(token);
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), globalContext.get());
} catch (...) {
if (not excpt) {
Expand Down
5 changes: 3 additions & 2 deletions FWCore/Framework/src/Path.cc
Expand Up @@ -358,9 +358,10 @@ namespace edm {
++lastModuleIndex;
}
for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, token = iToken, &iGroup](
ServiceWeakToken weakToken = iToken;
auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, weakToken, &iGroup](
std::exception_ptr const* iException) {
this->workerFinished(iException, lastModuleIndex, info, token, iID, iContext, iGroup);
this->workerFinished(iException, lastModuleIndex, info, weakToken.lock(), iID, iContext, iGroup);
});
workers_[lastModuleIndex].runWorkerAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
WaitingTaskHolder(iGroup, nextTask), iInfo, iToken, iID, iContext);
Expand Down
9 changes: 5 additions & 4 deletions FWCore/Framework/src/ProductResolvers.cc
Expand Up @@ -263,9 +263,10 @@ namespace edm {
m_waitingTasks.add(waitTask);

if (prefetchRequested) {
auto workToDo = [this, mcc, &principal, token]() {
ServiceWeakToken weakToken = token;
auto workToDo = [this, mcc, &principal, weakToken]() {
//need to make sure Service system is activated on the reading thread
ServiceRegistry::Operate operate(token);
ServiceRegistry::Operate operate(weakToken.lock());
// Caught exception is propagated via WaitingTaskList
CMS_SA_ALLOW try {
resolveProductImpl<true>([this, &principal, mcc]() {
Expand Down Expand Up @@ -959,7 +960,7 @@ namespace edm {
} else {
if (not resolver_->dataValidFromResolver(index_, *principal_, skipCurrentProcess_)) {
resolver_->tryPrefetchResolverAsync(
index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_, group_);
index_ + 1, *principal_, skipCurrentProcess_, sra_, mcc_, serviceToken_.lock(), group_);
}
}
}
Expand All @@ -970,7 +971,7 @@ namespace edm {
SharedResourcesAcquirer* sra_;
ModuleCallingContext const* mcc_;
tbb::task_group* group_;
ServiceToken serviceToken_;
ServiceWeakToken serviceToken_;
unsigned int index_;
bool skipCurrentProcess_;
};
Expand Down
9 changes: 5 additions & 4 deletions FWCore/Framework/src/StreamSchedule.cc
Expand Up @@ -608,9 +608,10 @@ namespace edm {
//use to give priorities on an error to ones from Paths
auto pathErrorHolder = std::make_unique<std::atomic<std::exception_ptr*>>(nullptr);
auto pathErrorPtr = pathErrorHolder.get();
ServiceWeakToken weakToken = serviceToken;
auto allPathsDone = make_waiting_task(
[iTask, this, serviceToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
ServiceRegistry::Operate operate(serviceToken);
[iTask, this, weakToken, pathError = std::move(pathErrorHolder)](std::exception_ptr const* iPtr) mutable {
ServiceRegistry::Operate operate(weakToken.lock());

std::exception_ptr ptr;
if (pathError->load()) {
Expand All @@ -627,9 +628,9 @@ namespace edm {
// run under that condition.
WaitingTaskHolder allPathsHolder(*iTask.group(), allPathsDone);

auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, serviceToken](
auto pathsDone = make_waiting_task([allPathsHolder, pathErrorPtr, transitionInfo = info, this, weakToken](
std::exception_ptr const* iPtr) mutable {
ServiceRegistry::Operate operate(serviceToken);
ServiceRegistry::Operate operate(weakToken.lock());

if (iPtr) {
//this is used to prioritize this error over one
Expand Down
14 changes: 8 additions & 6 deletions FWCore/Framework/src/StreamSchedule.h
Expand Up @@ -386,8 +386,9 @@ namespace edm {
T::setStreamContext(streamContext_, principal);

auto id = principal.id();
auto doneTask =
make_waiting_task([this, iHolder, id, cleaningUpAfterException, token](std::exception_ptr const* iPtr) mutable {
ServiceWeakToken weakToken = token;
auto doneTask = make_waiting_task(
[this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
Expand All @@ -400,17 +401,17 @@ namespace edm {
if (ex.context().empty()) {
ost << "Processing " << T::transitionName() << " " << id;
}
ServiceRegistry::Operate op(token);
ServiceRegistry::Operate op(weakToken.lock());
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}

ServiceRegistry::Operate op(token);
ServiceRegistry::Operate op(weakToken.lock());
actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
}
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
ServiceRegistry::Operate op(token);
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), &streamContext_);
} catch (...) {
if (not excpt) {
Expand All @@ -421,7 +422,8 @@ namespace edm {
});

auto task = make_functor_task(
[this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, token]() mutable {
[this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
auto token = weakToken.lock();
ServiceRegistry::Operate op(token);
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
Expand Down
13 changes: 8 additions & 5 deletions FWCore/Framework/src/Worker.cc
Expand Up @@ -208,9 +208,10 @@ namespace edm {
EventPrincipal const* iPrincipal) {
successTask->increment_ref_count();

ServiceWeakToken weakToken = token;
auto choiceTask =
edm::make_waiting_task([id, successTask, iPrincipal, this, token, &group](std::exception_ptr const*) {
ServiceRegistry::Operate guard(token);
edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
ServiceRegistry::Operate guard(weakToken.lock());
// There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
CMS_SA_ALLOW try {
if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
Expand Down Expand Up @@ -271,18 +272,20 @@ namespace edm {

if UNLIKELY (tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) == 1) {
auto taskGroup = iTask.group();
taskGroup->run([this, task = std::move(iTask), iTrans, &iImpl, iToken]() {
ServiceWeakToken weakToken = iToken;
taskGroup->run([this, task = std::move(iTask), iTrans, &iImpl, weakToken]() {
std::exception_ptr exceptPtr{};
iImpl.taskArena()->execute([this, iTrans, &iImpl, iToken, &exceptPtr]() {
iImpl.taskArena()->execute([this, iTrans, &iImpl, weakToken, &exceptPtr]() {
exceptPtr = syncWait([&](WaitingTaskHolder&& iHolder) {
auto const& recs = esRecordsToGetFrom(iTrans);
auto const& items = esItemsToGetFrom(iTrans);
auto hWaitTask = std::move(iHolder);
auto token = weakToken.lock();
for (size_t i = 0; i != items.size(); ++i) {
if (recs[i] != ESRecordIndex{}) {
auto rec = iImpl.findImpl(recs[i]);
if (rec) {
rec->prefetchAsync(hWaitTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
rec->prefetchAsync(hWaitTask, items[i], &iImpl, token, ESParentContext(&moduleCallingContext_));
}
}
}
Expand Down
32 changes: 19 additions & 13 deletions FWCore/Framework/src/Worker.h
Expand Up @@ -399,7 +399,7 @@ namespace edm {

void execute() final {
//Need to make the services available early so other services can see them
ServiceRegistry::Operate guard(m_serviceToken);
ServiceRegistry::Operate guard(m_serviceToken.lock());

//incase the emit causes an exception, we need a memory location
// to hold the exception_ptr
Expand Down Expand Up @@ -429,7 +429,7 @@ namespace edm {
sContext = m_context,
serviceToken = m_serviceToken]() {
//Need to make the services available
ServiceRegistry::Operate operateRunModule(serviceToken);
ServiceRegistry::Operate operateRunModule(serviceToken.lock());

//If needed, we pause the queue in begin transition and resume it
// at the end transition. This can guarantee that the module
Expand Down Expand Up @@ -461,7 +461,7 @@ namespace edm {
StreamID m_streamID;
ParentContext const m_parentContext;
typename T::Context const* m_context;
ServiceToken m_serviceToken;
ServiceWeakToken m_serviceToken;
tbb::task_group* m_group;
};

Expand Down Expand Up @@ -496,7 +496,7 @@ namespace edm {

void execute() final {
//Need to make the services available early so other services can see them
ServiceRegistry::Operate guard(m_serviceToken);
ServiceRegistry::Operate guard(m_serviceToken.lock());

//incase the emit causes an exception, we need a memory location
// to hold the exception_ptr
Expand All @@ -522,7 +522,7 @@ namespace edm {
serviceToken = m_serviceToken,
holder = m_holder]() {
//Need to make the services available
ServiceRegistry::Operate operateRunAcquire(serviceToken);
ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());

std::exception_ptr* ptr = nullptr;
worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
Expand All @@ -539,7 +539,7 @@ namespace edm {
EventTransitionInfo m_eventTransitionInfo;
ParentContext const m_parentContext;
WaitingTaskWithArenaHolder m_holder;
ServiceToken m_serviceToken;
ServiceWeakToken m_serviceToken;
};

// This class does nothing unless there is an exception originating
Expand Down Expand Up @@ -972,11 +972,16 @@ namespace edm {
};
auto* group = task.group();
auto ownRunTask = std::make_shared<DestroyTask>(runTask);
auto selectionTask = make_waiting_task(
[ownRunTask, parentContext, info = transitionInfo, token, group, this](std::exception_ptr const*) mutable {
ServiceRegistry::Operate guard(token);
prefetchAsync<T>(
WaitingTaskHolder(*group, ownRunTask->release()), token, parentContext, info, T::transition_);
ServiceWeakToken weakToken = token;
auto selectionTask =
make_waiting_task([ownRunTask, parentContext, info = transitionInfo, weakToken, group, this](
std::exception_ptr const*) mutable {
ServiceRegistry::Operate guard(weakToken.lock());
prefetchAsync<T>(WaitingTaskHolder(*group, ownRunTask->release()),
weakToken.lock(),
parentContext,
info,
T::transition_);
});
prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
} else {
Expand Down Expand Up @@ -1038,12 +1043,13 @@ namespace edm {

waitingTasks_.add(task);
if (workStarted) {
auto toDo = [this, info = transitionInfo, streamID, parentContext, context, serviceToken]() {
ServiceWeakToken weakToken = serviceToken;
auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
std::exception_ptr exceptionPtr;
// Caught exception is propagated via WaitingTaskList
CMS_SA_ALLOW try {
//Need to make the services available
ServiceRegistry::Operate guard(serviceToken);
ServiceRegistry::Operate guard(weakToken.lock());

this->runModule<T>(info, streamID, parentContext, context);
} catch (...) {
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Framework/test/callback_t.cppunit.cc
Expand Up @@ -111,8 +111,8 @@ namespace {
&ar);
edm::FinalWaitingTask task;
tbb::task_group group;
iCallback.prefetchAsync(
edm::WaitingTaskHolder(group, &task), &rec, nullptr, edm::ServiceToken{}, edm::ESParentContext{});
edm::ServiceToken token;
iCallback.prefetchAsync(edm::WaitingTaskHolder(group, &task), &rec, nullptr, token, edm::ESParentContext{});
do {
group.wait();
} while (not task.done());
Expand Down
6 changes: 4 additions & 2 deletions FWCore/Framework/test/dependentrecord_t.cppunit.cc
Expand Up @@ -759,8 +759,9 @@ namespace {
if (rec) {
edm::FinalWaitingTask waitTask;
tbb::task_group group;
edm::ServiceToken token;
rec->prefetchAsync(
edm::WaitingTaskHolder(group, &waitTask), proxies[i], &iImpl, edm::ServiceToken{}, edm::ESParentContext{});
edm::WaitingTaskHolder(group, &waitTask), proxies[i], &iImpl, token, edm::ESParentContext{});
do {
group.wait();
} while (not waitTask.done());
Expand All @@ -787,8 +788,9 @@ namespace {
if (rec) {
edm::FinalWaitingTask waitTask;
tbb::task_group group;
edm::ServiceToken token;
rec->prefetchAsync(
edm::WaitingTaskHolder(group, &waitTask), proxies[i], &iImpl, edm::ServiceToken{}, edm::ESParentContext{});
edm::WaitingTaskHolder(group, &waitTask), proxies[i], &iImpl, token, edm::ESParentContext{});
do {
group.wait();
} while (not waitTask.done());
Expand Down