**Data Ingestion** is the first stage of the pipeline. Here we will read the ROOT file from HDFS into a Spark dataframe using [Spark-ROOT](https://github.com/diana-hep/spark-root) reader and then we will create the Low Level Features (LLF) and High Level Features datasets.

To run this notebook we used the following configuration:
* *Software stack*: LCG 94 (it has spark 2.3.1)
* *Platform*: centos7-gcc7
* *Spark cluster*: Hadalytic

In [1]:
# Check if Spark Session has been created correctly
spark

In [2]:
# Add a file containing functions that we will use later
spark.sparkContext.addPyFile("utilFunctions.py")

## Read the data from HDFS
<br>
As first step we will read the samples into a Spark dataframe using Spark-Root. We will select only a subset of columns present in the original files.

In [3]:
PATH = "hdfs://hadalytic/project/ML/data/data_root/"

samples = ["qcd", "ttbar", "wjets"]

requiredColumns = [
    "EFlowTrack",
    "EFlowNeutralHadron",
    "EFlowPhoton",
    "Electron",
    "MuonTight",
    "MuonTight_size",
    "Electron_size",
    "MissingET",
    "Jet"
]

In [4]:
from pyspark.sql.functions import lit

dfList = []

for label,sample in enumerate(samples):
    print("Loadind {} sample".format(sample))
    tmpDF = spark.read \
                .format("org.dianahep.sparkroot.experimental") \
                .load(PATH + sample + "*.root") \
                .select(requiredColumns) \
                .withColumn("label", lit(label))
    dfList.append(tmpDF)

Loadind qcd sample
Loadind ttbar sample
Loadind wjets sample


In [5]:
# Merge all samples into a single dataframe
df = dfList[0]
for tmpDF in dfList[1:]:
    df = df.union(tmpDF)

If the dataframe can fit in memory it is possible to cache it to ensure faster access to data. To do this just add the 
```Python
df = df.cache()
```
and in the next action Spark will cache it. Be aware that this might take some time.

Let's have a look at how many events there are for each class. Keep in mind that the labels are mapped as follow
* $0=\text{QCD}$
* $1=\text{t}\bar{\text{t}}$
* $2=\text{W}+\text{jets}$

In [6]:
# Print the number of events per sample
df.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|79996|
|    2|90000|
|    0|35101|
+-----+-----+



In [7]:
# Print the number of events
events = df.count()
events

205097

In [8]:
# Let's print the schema of one of the required columns
df.select("EFlowTrack").printSchema()

root
 |-- EFlowTrack: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- fUniqueID: integer (nullable = true)
 |    |    |-- fBits: integer (nullable = true)
 |    |    |-- PID: integer (nullable = true)
 |    |    |-- Charge: integer (nullable = true)
 |    |    |-- PT: float (nullable = true)
 |    |    |-- Eta: float (nullable = true)
 |    |    |-- Phi: float (nullable = true)
 |    |    |-- EtaOuter: float (nullable = true)
 |    |    |-- PhiOuter: float (nullable = true)
 |    |    |-- X: float (nullable = true)
 |    |    |-- Y: float (nullable = true)
 |    |    |-- Z: float (nullable = true)
 |    |    |-- T: float (nullable = true)
 |    |    |-- XOuter: float (nullable = true)
 |    |    |-- YOuter: float (nullable = true)
 |    |    |-- ZOuter: float (nullable = true)
 |    |    |-- TOuter: float (nullable = true)
 |    |    |-- Dxy: float (nullable = true)
 |    |    |-- SDxy: float (nullable = true)
 |    |    |-- Xd: float (nullable = 

As we can see we are dealing with highly nested data.

## Create derivate datasets

Now we will create the LLF and HLF datasets. This is done by the function `convert` below which takes as input an event (i.e. the list of particles present in that event) and do the following steps:
1. Select the events with at least one isolated electron/muon (implemented in `selection`)
2. Create the list of 801 particles and the 19 low level features for each of them
3. Compute the high level features 

In [9]:
# Import Lorentz Vector and other functions for pTmaps
from utilFunctions import *

def selection(event, TrkPtMap, NeuPtMap, PhotonPtMap, PTcut=23., ISOcut=0.45):
    """
    This function simulates the trigger selection. 
    Foreach event the presence of one isolated muon or electron with pT >23 GeV is required
    """
    if event.Electron_size == 0 and event.MuonTight_size == 0: 
        return False, False, False
    
    foundMuon = None 
    foundEle =  None 
    
    l = LorentzVector()
    
    for ele in event.Electron:
        if ele.PT <= PTcut: continue
        l.SetPtEtaPhiM(ele.PT, ele.Eta, ele.Phi, 0.)
        
        pfisoCh = PFIso(l, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(l, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False)
        if foundEle == None and (pfisoCh+pfisoNeu+pfisoGamma)<ISOcut:
            foundEle = [l.E(), l.Px(), l.Py(), l.Pz(), l.Pt(), l.Eta(), l.Phi(),
                        0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu,
                        0., 0., 0., 1., 0., float(ele.Charge)]
    
    for muon in event.MuonTight:
        if muon.PT <= PTcut: continue
        l.SetPtEtaPhiM(muon.PT, muon.Eta, muon.Phi, 0.)
        
        pfisoCh = PFIso(l, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(l, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False)
        if foundMuon == None and (pfisoCh+pfisoNeu+pfisoGamma)<ISOcut:
            foundMuon = [l.E(), l.Px(), l.Py(), l.Pz(), l.Pt(), l.Eta(), l.Phi(),
                         0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu,
                         0., 0., 0., 0., 1., float(muon.Charge)]
            
    if foundEle != None and foundMuon != None:
        if foundEle[5] > foundMuon[5]:
            return True, foundEle, foundMuon
        else:
            return True, foundMuon, foundEle
    if foundEle != None: return True, foundEle, foundMuon
    if foundMuon != None: return True, foundMuon, foundEle
    
    return False, None, None

In [10]:
import math
import numpy as np
from pyspark.ml.linalg import Vectors

def convert(event):
    """
    This function takes as input an event, applies trigger selection 
    and create LLF and HLF datasets
    """
    q = LorentzVector()
    particles = []
    TrkPtMap = ChPtMapp(0.3, event)
    NeuPtMap = NeuPtMapp(0.3, event)
    PhotonPtMap = PhotonPtMapp(0.3, event)
    if TrkPtMap.shape[0] == 0: return Row()
    if NeuPtMap.shape[0] == 0: return Row()
    if PhotonPtMap.shape[0] == 0: return Row()
    
    #
    # Get leptons
    #
    selected, lep, otherlep = selection(event, TrkPtMap, NeuPtMap, PhotonPtMap)
    if not selected: return Row()
    particles.append(lep)
    lepMomentum = LorentzVector(lep[1], lep[2], lep[3], lep[0])
    
    #
    # Select Tracks
    #
    nTrk = 0
    for h in event.EFlowTrack:
        if nTrk>=450: continue
        if h.PT<=0.5: continue
        q.SetPtEtaPhiM(h.PT, h.Eta, h.Phi, 0.)
        if lepMomentum.DeltaR(q) > 0.0001:
            pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
            pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
            pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
            particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                              h.PT, h.Eta, h.Phi, h.X, h.Y, h.Z,
                              pfisoCh, pfisoGamma, pfisoNeu,
                              1., 0., 0., 0., 0., float(np.sign(h.PID))])
            nTrk += 1
    
    #
    # Select Photons
    #
    nPhoton = 0
    for h in event.EFlowPhoton:
        if nPhoton >= 150: continue
        if h.ET <= 1.: continue
        q.SetPtEtaPhiM(h.ET, h.Eta, h.Phi, 0.)
        pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
        particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                          h.ET, h.Eta, h.Phi, 0., 0., 0.,
                          pfisoCh, pfisoGamma, pfisoNeu,
                          0., 0., 1., 0., 0., 0.])
        nPhoton += 1
    
    #
    # Select Neutrals
    #
    nNeu = 0
    for h in event.EFlowNeutralHadron:
        if nNeu >= 200: continue
        if h.ET <= 1.: continue
        q.SetPtEtaPhiM(h.ET, h.Eta, h.Phi, 0.)
        pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
        particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                          h.ET, h.Eta, h.Phi, 0., 0., 0.,
                          pfisoCh, pfisoGamma, pfisoNeu,
                          0., 1., 0., 0., 0., 0.])
        nNeu += 1
        
    for iTrk in range(nTrk, 450):
        particles.append([0., 0., 0., 0., 0., 0., 0., 0.,0.,
                          0.,0., 0., 0., 0., 0., 0., 0., 0., 0.])
    for iPhoton in range(nPhoton, 150):
        particles.append([0., 0., 0., 0., 0., 0., 0., 0., 0.,
                          0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
    for iNeu in range(nNeu, 200):
        particles.append([0., 0., 0., 0., 0., 0., 0., 0., 0.,
                          0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
        
    #
    # High Level Features
    #
    myMET = event.MissingET[0]
    MET = myMET.MET
    phiMET = myMET.Phi
    MT = 2.*MET*lepMomentum.Pt()*(1-math.cos(lepMomentum.Phi()-phiMET))
    HT = 0.
    nJets = 0.
    nBjets = 0.
    for jet in event.Jet:
        if jet.PT > 30 and abs(jet.Eta)<2.6:
            nJets += 1
            HT += jet.PT
            if jet.BTag>0: 
                nBjets += 1
    LepPt = lep[4]
    LepEta = lep[5]
    LepPhi = lep[6]
    LepIsoCh = lep[10]
    LepIsoGamma = lep[11]
    LepIsoNeu = lep[12]
    LepCharge = lep[18]
    LepIsEle = lep[16]
    hlf = Vectors.dense([HT, MET, phiMET, MT, nJets, nBjets, LepPt, LepEta, LepPhi,
           LepIsoCh, LepIsoGamma, LepIsoNeu, LepCharge, LepIsEle])
    
        
    #
    # return the Row of low level features and high level features
    #
    
    return Row(lfeatures=particles, hfeatures=hlf, label=event.label)

Finally apply the function to all the events

In [11]:
features = df.rdd \
            .map(convert) \
            .filter(lambda row: len(row) > 0) \
            .toDF()

## Save the datasets as a parquet file

In [12]:
DATASETS_PATH = "hdfs://hadalytic/project/ML/data/swan/"
%time features.write.parquet(DATASETS_PATH+"datasets.parquet", mode="overwrite")

CPU times: user 242 ms, sys: 107 ms, total: 348 ms
Wall time: 13min
