Skip to content

Commit

Permalink
Merge pull request #39557 from Dr15Jones/transformAsync
Browse files Browse the repository at this point in the history
Added transformAsync ability to modules
  • Loading branch information
cmsbuild committed Oct 17, 2022
2 parents 25d2ec0 + 8924cf4 commit e803e43
Show file tree
Hide file tree
Showing 33 changed files with 659 additions and 192 deletions.
18 changes: 14 additions & 4 deletions FWCore/Framework/interface/TransformerBase.h
Expand Up @@ -16,6 +16,7 @@
#include <string>
#include <functional>
#include <memory>
#include <any>

namespace edm {
class ProducerBase;
Expand All @@ -25,6 +26,8 @@ namespace edm {
class BranchDescription;
class ProductResolverIndexHelper;
class ModuleDescription;
class WaitingTaskWithArenaHolder;
class WaitingTaskHolder;

class TransformerBase {
public:
Expand All @@ -34,20 +37,27 @@ namespace edm {
protected:
//The function takes the WrapperBase corresponding to the data product from the EDPutToken
// and returns the WrapperBase associated to the id and instanceName
using TransformFunction = std::function<std::unique_ptr<edm::WrapperBase>(edm::WrapperBase const&)>;
using TransformFunction = std::function<std::unique_ptr<edm::WrapperBase>(std::any)>;
using PreTransformFunction = std::function<std::any(edm::WrapperBase const&, edm::WaitingTaskWithArenaHolder)>;

void registerTransformImp(ProducerBase&, EDPutToken, const TypeID& id, std::string instanceName, TransformFunction);
void registerTransformAsyncImp(
ProducerBase&, EDPutToken, const TypeID& id, std::string instanceName, PreTransformFunction, TransformFunction);

std::size_t findMatchingIndex(ProducerBase const& iBase, edm::BranchDescription const&) const;
ProductResolverIndex prefetchImp(std::size_t iIndex) const { return transformInfo_.get<0>(iIndex); }
void transformImp(std::size_t iIndex, ProducerBase const& iBase, edm::EventForTransformer&) const;
ProductResolverIndex prefetchImp(std::size_t iIndex) const { return transformInfo_.get<kResolverIndex>(iIndex); }
void transformImpAsync(WaitingTaskHolder iTask,
std::size_t iIndex,
ProducerBase const& iBase,
edm::EventForTransformer&) const;

void extendUpdateLookup(ProducerBase const&,
ModuleDescription const& iModuleDesc,
ProductResolverIndexHelper const& iHelper);

private:
SoATuple<ProductResolverIndex, TypeID, EDPutToken, TransformFunction> transformInfo_;
enum InfoColumns { kResolverIndex, kType, kToken, kPreTransform, kTransform };
SoATuple<ProductResolverIndex, TypeID, EDPutToken, PreTransformFunction, TransformFunction> transformInfo_;
};
} // namespace edm

Expand Down
16 changes: 11 additions & 5 deletions FWCore/Framework/interface/global/EDFilterBase.h
Expand Up @@ -39,6 +39,7 @@ namespace edm {
class ThinnedAssociationsHelper;
class WaitingTaskWithArenaHolder;
class EventForTransformer;
class ServiceWeakToken;

namespace maker {
template <typename T>
Expand Down Expand Up @@ -78,10 +79,12 @@ namespace edm {
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
void doTransform(size_t iTransformIndex,
EventPrincipal const& iEvent,
ActivityRegistry*,
ModuleCallingContext const*);
void doTransformAsync(WaitingTaskHolder iTask,
size_t iTransformIndex,
EventPrincipal const& iEvent,
ActivityRegistry*,
ModuleCallingContext const*,
ServiceWeakToken const&);
//For now this is a placeholder
/*virtual*/ void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
ModuleCallingContext const& iModuleCallingContext,
Expand Down Expand Up @@ -156,7 +159,10 @@ namespace edm {

virtual size_t transformIndex_(edm::BranchDescription const& iBranch) const;
virtual ProductResolverIndex transformPrefetch_(std::size_t iIndex) const;
virtual void transform_(std::size_t iIndex, edm::EventForTransformer& iEvent) const;
virtual void transformAsync_(WaitingTaskHolder iTask,
std::size_t iIndex,
edm::EventForTransformer& iEvent,
ServiceWeakToken const& iToken) const;

virtual void clearInputProcessBlockCaches();
virtual bool hasAcquire() const { return false; }
Expand Down
16 changes: 11 additions & 5 deletions FWCore/Framework/interface/global/EDProducerBase.h
Expand Up @@ -40,6 +40,7 @@ namespace edm {
class ThinnedAssociationsHelper;
class WaitingTaskWithArenaHolder;
class EventForTransformer;
class ServiceWeakToken;

namespace maker {
template <typename T>
Expand Down Expand Up @@ -81,10 +82,12 @@ namespace edm {
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
void doTransform(size_t iTransformIndex,
EventPrincipal const& iEvent,
ActivityRegistry*,
ModuleCallingContext const*);
void doTransformAsync(WaitingTaskHolder iTask,
size_t iTransformIndex,
EventPrincipal const& iEvent,
ActivityRegistry*,
ModuleCallingContext const*,
ServiceWeakToken const&);
void doPreallocate(PreallocationConfiguration const&);
void doBeginJob();
void doEndJob();
Expand Down Expand Up @@ -159,7 +162,10 @@ namespace edm {

virtual size_t transformIndex_(edm::BranchDescription const& iBranch) const;
virtual ProductResolverIndex transformPrefetch_(std::size_t iIndex) const;
virtual void transform_(std::size_t iIndex, edm::EventForTransformer& iEvent) const;
virtual void transformAsync_(WaitingTaskHolder iTask,
std::size_t iIndex,
edm::EventForTransformer& iEvent,
ServiceWeakToken const& iToken) const;

virtual void clearInputProcessBlockCaches();
virtual bool hasAccumulator() const { return false; }
Expand Down
51 changes: 40 additions & 11 deletions FWCore/Framework/interface/global/implementors.h
Expand Up @@ -29,6 +29,8 @@

// user include files
#include "FWCore/Common/interface/FWCoreCommonFwd.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
#include "FWCore/Framework/interface/CacheHandle.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/InputProcessBlockCacheImpl.h"
Expand All @@ -47,6 +49,7 @@
namespace edm {

class WaitingTaskWithArenaHolder;
class ServiceWeakToken;

namespace global {
namespace impl {
Expand Down Expand Up @@ -472,15 +475,38 @@ namespace edm {
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
using ReturnTypeT = decltype(iF(std::declval<G>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformImp(*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](edm::WrapperBase const& iGotProduct) {
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{},
f(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product()));
});
TransformerBase::registerTransformImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](std::any const& iGotProduct) {
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
});
}

template <typename G, typename P, typename F>
void registerTransformAsync(edm::EDPutTokenT<G> iToken,
P iPre,
F iF,
std::string productInstance = std::string()) {
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformAsyncImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
},
[f = std::move(iF)](std::any const& iCache) {
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{},
f(std::any_cast<CacheTypeT>(iCache)));
});
}

private:
Expand All @@ -490,8 +516,11 @@ namespace edm {
ProductResolverIndex transformPrefetch_(std::size_t iIndex) const final {
return TransformerBase::prefetchImp(iIndex);
}
void transform_(std::size_t iIndex, edm::EventForTransformer& iEvent) const final {
return TransformerBase::transformImp(iIndex, *this, iEvent);
void transformAsync_(WaitingTaskHolder iTask,
std::size_t iIndex,
edm::EventForTransformer& iEvent,
ServiceWeakToken const& iToken) const final {
return TransformerBase::transformImpAsync(std::move(iTask), iIndex, *this, iEvent);
}
void extendUpdateLookup(BranchType iBranchType, ProductResolverIndexHelper const& iHelper) override {
if (iBranchType == InEvent) {
Expand Down
16 changes: 11 additions & 5 deletions FWCore/Framework/interface/limited/EDFilterBase.h
Expand Up @@ -39,6 +39,7 @@ namespace edm {
class ActivityRegistry;
class ThinnedAssociationsHelper;
class EventForTransformer;
class ServiceWeakToken;

namespace maker {
template <typename T>
Expand Down Expand Up @@ -78,10 +79,12 @@ namespace edm {

private:
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doTransform(size_t iTransformIndex,
EventPrincipal const& iEvent,
ActivityRegistry*,
ModuleCallingContext const*);
void doTransformAsync(WaitingTaskHolder iTask,
size_t iTransformIndex,
EventPrincipal const& iEvent,
ActivityRegistry*,
ModuleCallingContext const*,
ServiceWeakToken const&);

//For now this is a placeholder
/*virtual*/ void preActionBeforeRunEventAsync(WaitingTaskHolder,
Expand Down Expand Up @@ -157,7 +160,10 @@ namespace edm {

virtual size_t transformIndex_(edm::BranchDescription const& iBranch) const;
virtual ProductResolverIndex transformPrefetch_(std::size_t iIndex) const;
virtual void transform_(std::size_t iIndex, edm::EventForTransformer& iEvent) const;
virtual void transformAsync_(WaitingTaskHolder iTask,
std::size_t iIndex,
edm::EventForTransformer& iEvent,
ServiceWeakToken const& iToken) const;

virtual void clearInputProcessBlockCaches();

Expand Down
16 changes: 11 additions & 5 deletions FWCore/Framework/interface/limited/EDProducerBase.h
Expand Up @@ -40,6 +40,7 @@ namespace edm {
class ActivityRegistry;
class ThinnedAssociationsHelper;
class EventForTransformer;
class ServiceWeakToken;

namespace maker {
template <typename T>
Expand Down Expand Up @@ -81,10 +82,12 @@ namespace edm {

private:
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doTransform(size_t iTransformIndex,
EventPrincipal const& iEvent,
ActivityRegistry*,
ModuleCallingContext const*);
void doTransformAsync(WaitingTaskHolder iTask,
size_t iTransformIndex,
EventPrincipal const& iEvent,
ActivityRegistry*,
ModuleCallingContext const*,
ServiceWeakToken const&);
void doPreallocate(PreallocationConfiguration const&);
void doBeginJob();
void doEndJob();
Expand Down Expand Up @@ -160,7 +163,10 @@ namespace edm {

virtual size_t transformIndex_(edm::BranchDescription const& iBranch) const;
virtual ProductResolverIndex transformPrefetch_(std::size_t iIndex) const;
virtual void transform_(std::size_t iIndex, edm::EventForTransformer& iEvent) const;
virtual void transformAsync_(WaitingTaskHolder iTask,
std::size_t iIndex,
edm::EventForTransformer& iEvent,
ServiceWeakToken const& iToken) const;

virtual void clearInputProcessBlockCaches();
virtual bool hasAccumulator() const { return false; }
Expand Down
51 changes: 40 additions & 11 deletions FWCore/Framework/interface/limited/implementors.h
Expand Up @@ -28,6 +28,8 @@

// user include files
#include "FWCore/Common/interface/FWCoreCommonFwd.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
#include "FWCore/Framework/interface/CacheHandle.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/InputProcessBlockCacheImpl.h"
Expand All @@ -44,6 +46,7 @@

// forward declarations
namespace edm {
class ServiceWeakToken;

namespace limited {
namespace impl {
Expand Down Expand Up @@ -460,15 +463,38 @@ namespace edm {
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
using ReturnTypeT = decltype(iF(std::declval<G>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformImp(*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](edm::WrapperBase const& iGotProduct) {
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{},
f(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product()));
});
TransformerBase::registerTransformImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](std::any const& iGotProduct) {
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
});
}

template <typename G, typename P, typename F>
void registerTransformAsync(edm::EDPutTokenT<G> iToken,
P iPre,
F iF,
std::string productInstance = std::string()) {
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformAsyncImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
},
[f = std::move(iF)](std::any const& iCache) {
auto cache = std::any_cast<CacheTypeT>(iCache);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(cache));
});
}

private:
Expand All @@ -478,8 +504,11 @@ namespace edm {
ProductResolverIndex transformPrefetch_(std::size_t iIndex) const final {
return TransformerBase::prefetchImp(iIndex);
}
void transform_(std::size_t iIndex, edm::EventForTransformer& iEvent) const final {
return TransformerBase::transformImp(iIndex, *this, iEvent);
void transformAsync_(WaitingTaskHolder iTask,
std::size_t iIndex,
edm::EventForTransformer& iEvent,
ServiceWeakToken const& iToken) const final {
return TransformerBase::transformImpAsync(std::move(iTask), iIndex, *this, iEvent);
}
void extendUpdateLookup(BranchType iBranchType, ProductResolverIndexHelper const& iHelper) override {
if (iBranchType == InEvent) {
Expand Down
6 changes: 5 additions & 1 deletion FWCore/Framework/interface/maker/Worker.h
Expand Up @@ -269,7 +269,11 @@ namespace edm {
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&) = 0;

virtual void implDoTransform(size_t iTransformIndex, EventPrincipal const&, ParentContext const&) = 0;
virtual void implDoTransformAsync(WaitingTaskHolder,
size_t iTransformIndex,
EventPrincipal const&,
ParentContext const&,
ServiceWeakToken const&) = 0;
virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const = 0;

virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
Expand Down
6 changes: 5 additions & 1 deletion FWCore/Framework/interface/maker/WorkerT.h
Expand Up @@ -93,7 +93,11 @@ namespace edm {
void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskWithArenaHolder&) final;

size_t transformIndex(edm::BranchDescription const&) const final;
void implDoTransform(size_t iTransformIndex, EventPrincipal const&, ParentContext const&) final;
void implDoTransformAsync(WaitingTaskHolder,
size_t iTransformIndex,
EventPrincipal const&,
ParentContext const&,
ServiceWeakToken const&) final;
ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const final;

bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) override;
Expand Down

0 comments on commit e803e43

Please sign in to comment.