Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MLPF: first integration of EDProducer and training code #32048

Merged
merged 6 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions Configuration/ProcessModifiers/python/mlpf_cff.py
@@ -0,0 +1,4 @@
import FWCore.ParameterSet.Config as cms

# This modifier is for activating MLPF reconstruction in 2021
mlpf = cms.Modifier()
2 changes: 2 additions & 0 deletions Configuration/PyReleaseValidation/python/relval_2017.py
Expand Up @@ -35,6 +35,7 @@
# (Patatrack ECAL-only: TTbar - on CPU, on GPU, both, auto)
# (Patatrack HCAL-only: TTbar - on CPU, on GPU, both, auto)
# (TTbar 0T, TTbar PU 0T)
# (TTbar PU MLPF)
# (QCD 1.8TeV DeepCore)
# 2023 (TTbar, TTbar PU, TTbar PU premix)
# 2024 (TTbar, TTbar PU, TTbar PU premix)
Expand All @@ -60,6 +61,7 @@
11634.511,11634.512, # 11634.513,11634.514,
11634.521,11634.522, # 11634.523,11634.524
11634.24,11834.24,
11834.13,
11634.17, #FIXME to 11723.17,
12434.0,12634.0,12634.99,
12834.0,13034.0,13034.99]
Expand Down
Expand Up @@ -352,6 +352,30 @@ def condition(self, fragment, stepList, key, hasHarvest):
offset = 0.9,
)

# MLPF workflows
class UpgradeWorkflow_mlpf(UpgradeWorkflow):
def setup_(self, step, stepName, stepDict, k, properties):
if 'Reco' in step:
stepDict[stepName][k] = merge([self.step3, stepDict[step][k]])
def condition(self, fragment, stepList, key, hasHarvest):
return fragment=="TTbar_14TeV" and '2021' in key

upgradeWFs['mlpf'] = UpgradeWorkflow_mlpf(
steps = [
'Reco',
],
PU = [
'Reco',
],
suffix = '_mlpf',
offset = 0.13,
)
upgradeWFs['mlpf'].step3 = {
'--datatier': 'GEN-SIM-RECO,RECOSIM,MINIAODSIM,DQMIO',
'--eventcontent': 'FEVTDEBUGHLT,RECOSIM,MINIAODSIM,DQM',
'--procModifiers': 'mlpf'
}

# Patatrack workflows
class UpgradeWorkflowPatatrack(UpgradeWorkflow):
def condition(self, fragment, stepList, key, hasHarvest):
Expand Down
Expand Up @@ -87,3 +87,9 @@
outputCommands = RecoParticleFlowFEVT.outputCommands + ['keep recoPFRecHits_particleFlowClusterECAL__*',
'keep recoPFRecHits_particleFlowRecHitHGC__*',
'keep *_simPFProducer_*_*'])

from Configuration.ProcessModifiers.mlpf_cff import mlpf
from RecoParticleFlow.PFProducer.mlpf_EventContent_cff import MLPF_RECO

mlpf.toModify(RecoParticleFlowRECO,
outputCommands = RecoParticleFlowRECO.outputCommands + MLPF_RECO.outputCommands)
5 changes: 5 additions & 0 deletions RecoParticleFlow/Configuration/python/RecoParticleFlow_cff.py
Expand Up @@ -86,4 +86,9 @@
e.toModify(pfPileUp, enable = cms.bool(False))


from Configuration.ProcessModifiers.mlpf_cff import mlpf
from RecoParticleFlow.PFProducer.mlpfProducer_cfi import mlpfProducer

_mlpfTask = cms.Task(mlpfProducer, particleFlowRecoTask.copy())

mlpf.toReplaceWith(particleFlowRecoTask, _mlpfTask)
67 changes: 67 additions & 0 deletions RecoParticleFlow/PFProducer/interface/MLPFModel.h
@@ -0,0 +1,67 @@
#ifndef RecoParticleFlow_PFProducer_interface_MLPFModel
#define RecoParticleFlow_PFProducer_interface_MLPFModel

#include "FWCore/Framework/interface/Event.h"
#include "DataFormats/ParticleFlowReco/interface/PFBlockElement.h"
#include "DataFormats/ParticleFlowCandidate/interface/PFCandidate.h"

namespace reco::mlpf {
//The model takes the following number of features for each input PFElement
static constexpr unsigned int NUM_ELEMENT_FEATURES = 15;

//these are defined at model creation time and set the random LSH codebook size
static constexpr int NUM_MAX_ELEMENTS_BATCH = 20000;
static constexpr int LSH_BIN_SIZE = 100;

//In CPU mode, we only want to evaluate each event separately
static constexpr int BATCH_SIZE = 1;

//The model has 12 outputs for each particle:
// out[0-7]: particle classification logits
// out[8]: regressed eta
// out[9]: regressed phi
// out[10]: regressed energy
// out[11]: regressed charge logit
static constexpr unsigned int NUM_OUTPUTS = 12;
static constexpr unsigned int NUM_CLASS = 7;
static constexpr unsigned int IDX_ETA = 8;
static constexpr unsigned int IDX_PHI = 9;
static constexpr unsigned int IDX_ENERGY = 10;
static constexpr unsigned int IDX_CHARGE = 11;

//index [0, N_pdgids) -> PDGID
//this maps the absolute values of the predicted PDGIDs to an array of ascending indices
static const std::vector<int> pdgid_encoding = {0, 1, 2, 11, 13, 22, 130, 211};

//PFElement::type -> index [0, N_types)
//this maps the type of the PFElement to an ascending index that is used by the model to distinguish between different elements
static const std::map<int, int> elem_type_encoding = {
{0, 0},
{1, 1},
{2, 2},
{3, 3},
{4, 4},
{5, 5},
{6, 6},
{7, 7},
{8, 8},
{9, 9},
{10, 10},
{11, 11},
};

std::array<float, NUM_ELEMENT_FEATURES> getElementProperties(const reco::PFBlockElement& orig);
float normalize(float in);

int argMax(std::vector<float> const& vec);

reco::PFCandidate makeCandidate(int pred_pid, int pred_charge, float pred_e, float pred_eta, float pred_phi);

const std::vector<const reco::PFBlockElement*> getPFElements(const reco::PFBlockCollection& blocks);

void setCandidateRefs(reco::PFCandidate& cand,
const std::vector<const reco::PFBlockElement*> elems,
size_t ielem_originator);
}; // namespace reco::mlpf

#endif
3 changes: 3 additions & 0 deletions RecoParticleFlow/PFProducer/plugins/BuildFile.xml
Expand Up @@ -79,5 +79,8 @@
<use name="RecoParticleFlow/PFProducer"/>
<use name="RecoParticleFlow/PFTracking"/>
<use name="RecoEcal/EgammaCoreTools"/>
<use name="PhysicsTools/TensorFlow"/>
<use name="HeterogeneousCore/SonicCore"/>
<use name="HeterogeneousCore/SonicTriton"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the complexity of the dependencies, it may be practical to have a separate package

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. How about RecoParticleFlow/MLPFProducer? A new cms-data repo will need to be created, I wonder if there are other things to keep in mind when creating a new package in CMSSW?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New packages should also be added to appropriate categories in https://github.com/cms-sw/cms-bot/. So far, we don't have a clear guideline about how to organize and isolate various dependencies...

<flags EDM_PLUGIN="1"/>
</library>
139 changes: 139 additions & 0 deletions RecoParticleFlow/PFProducer/plugins/MLPFProducer.cc
@@ -0,0 +1,139 @@
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/stream/EDProducer.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/MakerMacros.h"

#include "DataFormats/ParticleFlowCandidate/interface/PFCandidate.h"
#include "PhysicsTools/TensorFlow/interface/TensorFlow.h"
#include "RecoParticleFlow/PFProducer/interface/MLPFModel.h"

struct MLPFCache {
const tensorflow::GraphDef* graph_def;
};

class MLPFProducer : public edm::stream::EDProducer<edm::GlobalCache<MLPFCache> > {
public:
explicit MLPFProducer(const edm::ParameterSet&, const MLPFCache*);
void produce(edm::Event& event, const edm::EventSetup& setup) override;
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

// static methods for handling the global cache
static std::unique_ptr<MLPFCache> initializeGlobalCache(const edm::ParameterSet&);
static void globalEndJob(MLPFCache*);

private:
const edm::EDPutTokenT<reco::PFCandidateCollection> pfCandidatesPutToken_;
const edm::EDGetTokenT<reco::PFBlockCollection> inputTagBlocks_;
const std::string model_path_;
tensorflow::Session* session_;
};

MLPFProducer::MLPFProducer(const edm::ParameterSet& cfg, const MLPFCache* cache)
: pfCandidatesPutToken_{produces<reco::PFCandidateCollection>()},
inputTagBlocks_(consumes<reco::PFBlockCollection>(cfg.getParameter<edm::InputTag>("src"))),
model_path_(cfg.getParameter<std::string>("model_path")) {
session_ = tensorflow::createSession(cache->graph_def);
}

void MLPFProducer::produce(edm::Event& event, const edm::EventSetup& setup) {
using namespace reco::mlpf;

const auto& blocks = event.get(inputTagBlocks_);
const auto& all_elements = getPFElements(blocks);

const long long int num_elements_total = all_elements.size();

//tensor size must be a multiple of the bin size and larger than the number of elements
const auto tensor_size = LSH_BIN_SIZE * (num_elements_total / LSH_BIN_SIZE + 1);
assert(tensor_size <= NUM_MAX_ELEMENTS_BATCH);

//Create the input tensor
tensorflow::TensorShape shape({BATCH_SIZE, tensor_size, NUM_ELEMENT_FEATURES});
tensorflow::Tensor input(tensorflow::DT_FLOAT, shape);
input.flat<float>().setZero();

//Fill the input tensor
unsigned int ielem = 0;
for (const auto* pelem : all_elements) {
const auto& elem = *pelem;

//prepare the input array from the PFElement
const auto& props = getElementProperties(elem);

//copy features to the input array
for (unsigned int iprop = 0; iprop < NUM_ELEMENT_FEATURES; iprop++) {
input.tensor<float, 3>()(0, ielem, iprop) = normalize(props[iprop]);
}
ielem += 1;
}

//TF model input and output tensor names
const tensorflow::NamedTensorList input_list = {{"x:0", input}};
const std::vector<std::string> output_names = {"Identity:0"};

//Prepare the output tensor
std::vector<tensorflow::Tensor> outputs;

//run the GNN inference, given the inputs and the output.
//Note that the GNN enables information transfer between the input PFElements,
//such that the output ML-PFCandidates are in general combinations of the input PFElements, in the form of
//y_out = Adj.x_in, where x_in is input matrix (num_elem, NUM_ELEMENT_FEATURES), y_out is the output matrix (num_elem, NUM_OUTPUT_FEATURES)
//and Adj is an adjacency matrix between the elements that is constructed on the fly during model inference.
tensorflow::run(session_, input_list, output_names, &outputs);

//process the output tensor to ML-PFCandidates.
//The output can contain up to num_elem particles, with predicted PDGID=0 corresponding to no particles predicted.
const auto out_arr = outputs[0].tensor<float, 3>();

std::vector<reco::PFCandidate> pOutputCandidateCollection;
for (unsigned int ielem = 0; ielem < all_elements.size(); ielem++) {
//get the coefficients in the output corresponding to the class probabilities (raw logits)
std::vector<float> pred_id_logits;
for (unsigned int idx_id = 0; idx_id <= NUM_CLASS; idx_id++) {
pred_id_logits.push_back(out_arr(0, ielem, idx_id));
}

//get the most probable class PDGID
int pred_pid = pdgid_encoding[argMax(pred_id_logits)];

//get the predicted momentum components
float pred_eta = out_arr(0, ielem, IDX_ETA);
float pred_phi = out_arr(0, ielem, IDX_PHI);
float pred_charge = out_arr(0, ielem, IDX_CHARGE);
float pred_e = out_arr(0, ielem, IDX_ENERGY);

//a particle was predicted for this PFElement, otherwise it was a spectator
if (pred_pid != 0) {
auto cand = makeCandidate(pred_pid, pred_charge, pred_e, pred_eta, pred_phi);
setCandidateRefs(cand, all_elements, ielem);
pOutputCandidateCollection.push_back(cand);
}
} //loop over PFElements

event.emplace(pfCandidatesPutToken_, pOutputCandidateCollection);
}

std::unique_ptr<MLPFCache> MLPFProducer::initializeGlobalCache(const edm::ParameterSet& params) {
// this method is supposed to create, initialize and return a MLPFCache instance
std::unique_ptr<MLPFCache> cache = std::make_unique<MLPFCache>();

//load the frozen TF graph of the GNN model
std::string path = params.getParameter<std::string>("model_path");
auto fullPath = edm::FileInPath(path).fullPath();
LogDebug("MLPFProducer") << "Initializing MLPF model from " << fullPath;

cache->graph_def = tensorflow::loadGraphDef(fullPath);

return cache;
}

void MLPFProducer::globalEndJob(MLPFCache* cache) { delete cache->graph_def; }

void MLPFProducer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
desc.add<edm::InputTag>("src", edm::InputTag("particleFlowBlock"));
desc.add<std::string>("model_path", "RecoParticleFlow/PFProducer/data/mlpf/mlpf_2020_11_04.pb");
descriptions.addWithDefaultLabel(desc);
}

DEFINE_FWK_MODULE(MLPFProducer);
8 changes: 8 additions & 0 deletions RecoParticleFlow/PFProducer/python/mlpf_EventContent_cff.py
@@ -0,0 +1,8 @@
import FWCore.ParameterSet.Config as cms

MLPF_RECO = cms.PSet(
outputCommands = cms.untracked.vstring(
'keep recoPFCandidates_mlpfProducer_*_*',
)
)