## Data Ingestion and Filtering - Pipeline for the Topology Classifier with Apache Spark
## Optimized Using Spark SQL and Higher Order Functions (HOF)

**1. Data Ingestion** is the first stage of the pipeline. Here we will read the ROOT file from HDFS into a Spark dataframe using [Spark-ROOT](https://github.com/diana-hep/spark-root) reader and then we will create the Low Level Features (LLF) and High Level Features datasets.

To run this notebook we used the following configuration:
* *Software stack*: Spark 2.4.3
* *Platform*: CentOS 7, Python 3.6
* *Spark cluster*: Analytix

In [1]:
# pip install pyspark or use your favorite way to set Spark Home, here we use findspark
import findspark
findspark.init('/home/luca/Spark/spark-2.4.3-bin-hadoop2.7') #set path to SPARK_HOME

In [2]:
# Configure according to your environment
pyspark_python = "<path to python>/bin/python"
spark_root_jar="https://github.com/diana-hep/spark-root/blob/master/jars/spark-root_2.11-0.1.17.jar?raw=true"

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("1-Data Ingestion Optimized with Spark SQL and HOF") \
        .master("yarn") \
        .config("spark.driver.memory","8g") \
        .config("spark.executor.memory","14g") \
        .config("spark.executor.cores","8") \
        .config("spark.executor.instances","50") \
        .config("spark.dynamicAllocation.enabled","false") \
        .config("spark.jars",spark_root_jar) \
        .config("spark.jars.packages","org.diana-hep:root4j:0.1.6") \
        .config("spark.pyspark.python",pyspark_python) \
        .config("spark.eventLog.enabled","false") \
        .getOrCreate()

In [3]:
spark

In [4]:
# Add a file containing functions that we will use later
spark.sparkContext.addPyFile("utilFunctions.py")

## Read the data from HDFS
<br>
As first step we will read the samples into a Spark dataframe using Spark-Root. We will select only a subset of columns present in the original files.

In [5]:
PATH = "hdfs://analytix/Training/Spark/TopologyClassifier/lepFilter_rawData/"

samples = ["qcd_lepFilter_13TeV", "ttbar_lepFilter_13TeV", "Wlnu_lepFilter_13TeV"]

requiredColumns = [
    "EFlowTrack",
    "EFlowNeutralHadron",
    "EFlowPhoton",
    "Electron",
    "MuonTight",
    "MuonTight_size",
    "Electron_size",
    "MissingET",
    "Jet"
]

In [6]:
from pyspark.sql.functions import lit

dfList = []

for label,sample in enumerate(samples):
    print("Loading {} sample...".format(sample))
    tmpDF = spark.read \
                .format("org.dianahep.sparkroot.experimental") \
                .load(PATH + sample + "/*.root") \
                .select(requiredColumns) \
                .withColumn("label", lit(label))
    dfList.append(tmpDF)

Loading Wlnu_lepFilter_13TeV sample...
Loading qcd_lepFilter_13TeV sample...
Loading ttbar_lepFilter_13TeV sample...


In [7]:
# Merge all samples into a single dataframe
df = dfList[0]
for tmpDF in dfList[1:]:
    df = df.union(tmpDF)

Let's have a look at how many events there are for each class. Keep in mind that the labels are mapped as follow
* $0=\text{QCD}$
* $1=\text{t}\bar{\text{t}}$
* $2=\text{W}+\text{jets}$

Let's print the schema of one of the required columns. This shows that the  
**schema is complex and nested** (the full schema is even more complex)

In [8]:
df.select("EFlowTrack").printSchema()

root
 |-- EFlowTrack: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- fUniqueID: integer (nullable = true)
 |    |    |-- fBits: integer (nullable = true)
 |    |    |-- PID: integer (nullable = true)
 |    |    |-- Charge: integer (nullable = true)
 |    |    |-- PT: float (nullable = true)
 |    |    |-- Eta: float (nullable = true)
 |    |    |-- Phi: float (nullable = true)
 |    |    |-- EtaOuter: float (nullable = true)
 |    |    |-- PhiOuter: float (nullable = true)
 |    |    |-- X: float (nullable = true)
 |    |    |-- Y: float (nullable = true)
 |    |    |-- Z: float (nullable = true)
 |    |    |-- T: float (nullable = true)
 |    |    |-- XOuter: float (nullable = true)
 |    |    |-- YOuter: float (nullable = true)
 |    |    |-- ZOuter: float (nullable = true)
 |    |    |-- TOuter: float (nullable = true)
 |    |    |-- Dxy: float (nullable = true)
 |    |    |-- SDxy: float (nullable = true)
 |    |    |-- Xd: float (nullable = 

# Filtering with Spark SQL and Higher Order Functions (HOF)

Spark SQL/Dataframe API can be used to write some of the filters needed for this work.  
This has the advantage of better performance, Spark SQL runs typically faster than UDF filters as it avoid data serialization back and forth to Python.
Additionally, Spark SQL Higher Order Functions, introduced in Spark from version 2.4.0, allow to espress operations on nested data (arrays), which are very important for our ue case.    
A few examples of how this work in our case:  

### Example: Filter muons and electrons

In [9]:
# Use a small subset of the data to illustrate how Spark SQL and HOF work
df_test = df.limit(100)
df_test.cache()
df_test.count()

100

In [10]:
filtered = df_test.filter((df['MuonTight_size']!=0) | (df['Electron_size']!=0))

In [11]:
filtered.count()

78

### Example: Filter tracks

In [12]:
tracks = df_test.select('EFlowTrack').take(1)

In [13]:
len(tracks[0].EFlowTrack)

306

In [14]:
tracks[0].EFlowTrack[0]

Row(fUniqueID=7544, fBits=50331664, PID=211, Charge=1, PT=0.3664204180240631, Eta=-3.9139456748962402, Phi=-2.277653455734253, EtaOuter=-3.894503355026245, PhiOuter=-2.4723126888275146, X=0.0, Y=0.0, Z=78.35020446777344, T=6.139511815606014e-11, XOuter=-95.81527709960938, YOuter=-75.79782104492188, ZOuter=-3000.0, TOuter=1.0339084255406306e-08, Dxy=0.0, SDxy=0.0, Xd=0.0, Yd=0.0, Zd=78.35020446777344, EFlowTrack_Particle=Row(TObject=Row(fUniqueID=0, fBits=65536)))

Original Python code with filters:

```Python
def ChPtMapp(DR, event):
    pTmap = []
    for h in event.EFlowTrack:
        if h.PT<= 0.5: continue
        pTmap.append([h.Eta, h.Phi, h.PT])
    return np.asarray(pTmap)
```

hence we can use HOF to filer all the particles in `EFlowTrack` with $p_T\leq0.5$

In [15]:
filtered.createOrReplaceTempView("test_events")

In [16]:
spark.sql("""
SELECT cardinality(EFlowTrack),
    cardinality(FILTER(EFlowTrack,
        tracks -> tracks.PT > 0.5
    )) EFlowTrack_Filtered
FROM test_events
""").show(5)

+----------------+-------------------+
|size(EFlowTrack)|EFlowTrack_Filtered|
+----------------+-------------------+
|             306|                238|
|             472|                372|
|             341|                250|
|             389|                295|
|             162|                108|
+----------------+-------------------+
only showing top 5 rows



We can do the same thing for the others, for example consider the photons

In [17]:
spark.sql("""
SELECT cardinality(EFlowPhoton),
    cardinality(FILTER(EFlowPhoton,
        photon -> photon.ET > 1
    )) EFlowPhoton_Filtered
FROM test_events
""").show(5)

+-----------------+--------------------+
|size(EFlowPhoton)|EFlowPhoton_Filtered|
+-----------------+--------------------+
|              377|                  75|
|              540|                  82|
|              429|                  54|
|              583|                  77|
|              228|                  29|
+-----------------+--------------------+
only showing top 5 rows



and also neutral hadrons

In [18]:
spark.sql("""
SELECT cardinality(EFlowNeutralHadron),
    cardinality(FILTER(EFlowNeutralHadron,
        hadron -> hadron.ET > 1
    )) EFlowNeutralHadron_Filtered
FROM test_events
""").show(5)

+------------------------+---------------------------+
|size(EFlowNeutralHadron)|EFlowNeutralHadron_Filtered|
+------------------------+---------------------------+
|                     311|                         92|
|                     426|                        136|
|                     335|                         95|
|                     422|                        138|
|                     214|                         52|
+------------------------+---------------------------+
only showing top 5 rows



During the simulation of the trigger we require $p_T > 23\,\text{GeV}$ for muons and electrons

In [19]:
spark.sql("""
SELECT 
    cardinality(Electron),
    cardinality(MuonTight),
    cardinality(FILTER(Electron,
        electron -> electron.PT > 23
    )) Electron_Filtered,
    cardinality(FILTER(MuonTight,
        muon -> muon.PT > 23
    )) Muon_Filtered
FROM test_events
WHERE MuonTight_size > 0 OR Electron_size > 0
""").show(5)

+--------------+---------------+-----------------+-------------+
|size(Electron)|size(MuonTight)|Electron_Filtered|Muon_Filtered|
+--------------+---------------+-----------------+-------------+
|             0|              1|                0|            1|
|             0|              1|                0|            1|
|             0|              1|                0|            1|
|             1|              0|                0|            0|
|             0|              1|                0|            1|
+--------------+---------------+-----------------+-------------+
only showing top 5 rows



# Data pre-filtering with Spark SQL and High Level Function optimizations 
From here we do the actual processing of filtering data for our use case

In [9]:
# Print the number of events per sample and the total (label=null)
df.rollup("label").count().orderBy("label", ascending=False).show()

+-----+--------+
|label|   count|
+-----+--------+
|    2|26335315|
|    1|13780026|
|    0|14354796|
| null|54470137|
+-----+--------+



In [9]:
# register the dataframe with events data as a temporary view
df.createOrReplaceTempView("events")

Filter leptons (electrons and muons in this case)
- require $p_T$ > 23 GEv for electron and muon
- take events with at leat one electron or muon

In [10]:
## Filter leptons
filteredLeptons = spark.sql("""
SELECT *
FROM (
    SELECT
        label,
        EFlowTrack,
        EFlowNeutralHadron,
        EFlowPhoton ,
        MissingET,
        Jet,
        FILTER(Electron, 
            electron -> electron.PT > 23
        ) Electron,
        FILTER(MuonTight,
            muon -> muon.PT > 23
        ) MuonTight
    FROM events
    WHERE MuonTight_size > 0 OR Electron_size > 0
) Filtered 
WHERE cardinality(Electron) > 0 
      OR cardinality(MuonTight) > 0
""")

In [11]:
filteredLeptons.createOrReplaceTempView("filteredLeptons")

In [12]:
## Decrease number of tracks
filteredDF = spark.sql("""
SELECT 
    label,
    FILTER(EFlowTrack,
        tracks -> tracks.PT > 0.5
    ) EFlowTrack, 
    
    FILTER(EFlowNeutralHadron,
        hadron -> hadron.ET > 1.0
    ) EFlowNeutralHadron,
    
    FILTER(EFlowPhoton,
        photon -> photon.ET > 1.0
    ) EFlowPhoton,
    
    FILTER(Jet,
        jet -> ((jet.PT>30.0) AND (ABS(jet.Eta)<2.6)) 
    ) Jets,
    
    MissingET,
    Electron,
    MuonTight
FROM filteredLeptons
""")

In [13]:
filteredDF.createOrReplaceTempView("filteredDF")

In [14]:
## Reduce number of features
reduced = spark.sql("""
SELECT
     label,
     TRANSFORM(EFlowTrack,
     track -> map_from_arrays(
        Array("PT", "Eta", "Phi", "PID", "X", "Y", "Z"),
        Array(track.PT, track.Eta, track.Phi, track.PID, track.X, track.Y, track.Z)
        )
     ) Tracks,
     
     TRANSFORM(EFlowPhoton,
     photon -> map_from_arrays(
        Array("ET", "Eta", "Phi"),
        Array(photon.ET, photon.Eta, photon.Phi)
        )
     ) Photons,
     
     TRANSFORM(EFlowNeutralHadron,
     hadron -> map_from_arrays(
        Array("ET", "Eta", "Phi"),
        Array(hadron.ET, hadron.Eta, hadron.Phi)
        )
     ) NeutralHadrons,
     
     TRANSFORM(MissingET,
     missingET -> map_from_arrays(
        Array("MET", "Phi"),
        Array(missingET.MET, missingET.Phi)
        )
     ) MissingET,
     
     TRANSFORM(Jets,
     jet -> map_from_arrays(
        Array("PT", "BTag"),
        Array(jet.PT, jet.BTag)
        )
     ) Jets,
     
     TRANSFORM(Electron,
     electron -> map_from_arrays(
        Array("PT", "Eta", "Phi", "Charge"),
        Array(electron.PT, electron.Eta, electron.Phi, electron.Charge)
        )
     ) Electron,
     
     TRANSFORM(MuonTight,
     muon -> map_from_arrays(
        Array("PT", "Eta", "Phi", "Charge"),
        Array(muon.PT, muon.Eta, muon.Phi,muon.Charge)
        )
     ) MuonTight
     
FROM filteredDF
""")

In [15]:
reduced.createOrReplaceTempView("reduced")

In [16]:
reduced.printSchema()

root
 |-- label: integer (nullable = false)
 |-- Tracks: array (nullable = true)
 |    |-- element: map (containsNull = false)
 |    |    |-- key: string
 |    |    |-- value: float (valueContainsNull = true)
 |-- Photons: array (nullable = true)
 |    |-- element: map (containsNull = false)
 |    |    |-- key: string
 |    |    |-- value: float (valueContainsNull = true)
 |-- NeutralHadrons: array (nullable = true)
 |    |-- element: map (containsNull = false)
 |    |    |-- key: string
 |    |    |-- value: float (valueContainsNull = true)
 |-- MissingET: array (nullable = true)
 |    |-- element: map (containsNull = false)
 |    |    |-- key: string
 |    |    |-- value: float (valueContainsNull = true)
 |-- Jets: array (nullable = true)
 |    |-- element: map (containsNull = false)
 |    |    |-- key: string
 |    |    |-- value: float (valueContainsNull = true)
 |-- Electron: array (nullable = true)
 |    |-- element: map (containsNull = false)
 |    |    |-- key: string
 |    |  

In [17]:
print("Total number of events:", df.count())
print("Number of events after filtering with Spark SQL and HOF:", reduced.count())

Total number of events: 54470137
Number of events after filtering with Spark SQL and HOF: 37757705


In [18]:
# Print the number of events per sample and the total (label=null)
print("Original dataset")
spark.sql("select label, count(*) from events group by rollup(label) order by label nulls last").show()

print("After filtering with Spark SQL and HOF")
spark.sql("select label, count(*) from reduced group by rollup(label) order by label nulls last").show()

Original dataset
+-----+--------+
|label|count(1)|
+-----+--------+
|    0|14354796|
|    1|13780026|
|    2|26335315|
| null|54470137|
+-----+--------+

After filtering with Spark SQL and HOF
+-----+--------+
|label|count(1)|
+-----+--------+
|    0| 9983864|
|    1| 9743725|
|    2|18030116|
| null|37757705|
+-----+--------+



## Create derivate datasets

Now we will create the LLF and HLF datasets. This is done by the function `convert` below which takes as input an event (i.e. the list of particles present in that event) and do the following steps:
0. Start from the reduced dataframe computed above, with events with at least one isolated electron/muon 
1. Create the list of 801 particles and the 19 low level features for each of them
2. Compute the high level features 

In [19]:
# Import Lorentz Vector and other functions for pTmaps
from utilFunctions import *

def selection(Electron, MuonTight, TrkPtMap, NeuPtMap, PhotonPtMap):
    """
    This function simulates the trigger selection.
    Part of the selection is implemented in previous cells using Spark SQL
    """
    
    foundMuon = None 
    foundEle =  None 

    l = LorentzVector()
    for ele in Electron:

        l.SetPtEtaPhiM(ele["PT"], ele["Eta"], ele["Phi"], 0.)
        
        pfisoCh = PFIso(l, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(l, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False)
        if foundEle == None and (pfisoCh+pfisoNeu+pfisoGamma)<0.45:
            foundEle = [l.E(), l.Px(), l.Py(), l.Pz(), l.Pt(), l.Eta(), l.Phi(),
                        0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu,
                        0., 0., 0., 1., 0., float(ele["Charge"])]
    for muon in MuonTight:
        #
        # Has to replace the TLorentzVector functionality
        #
        l.SetPtEtaPhiM(muon["PT"], muon["Eta"], muon["Phi"], 0.)
        
        pfisoCh = PFIso(l, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(l, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False)
        if foundMuon == None and (pfisoCh+pfisoNeu+pfisoGamma)<0.45:
            foundMuon = [l.E(), l.Px(), l.Py(), l.Pz(), l.Pt(), l.Eta(), l.Phi(),
                         0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu,
                         0., 0., 0., 0., 1., float(muon["Charge"])]
    if foundEle != None and foundMuon != None:
        if foundEle[5] > foundMuon[5]:
            return True, foundEle, foundMuon
        else:
            return True, foundMuon, foundEle
    if foundEle != None: return True, foundEle, foundMuon
    if foundMuon != None: return True, foundMuon, foundEle
    return False, None, None

In [20]:
from pyspark.sql import Row
from pyspark.sql.functions import pandas_udf, udf, col
from pyspark.sql.types import ArrayType, DoubleType
import numpy as np

def convert(event):
    """
    This function takes as input an event, applies trigger selection 
    and create LLF and HLF datasets
    """
    q = LorentzVector()
    particles = np.zeros((801,19))
    index = 0
    
    TrkPtMap = ChPtMapp2(event.Tracks)
    NeuPtMap = NeuPtMapp2(event.NeutralHadrons)
    PhotonPtMap = PhotonPtMapp2(event.Photons)
    
    ## Lepton Filter
    selected, lep, otherlep = selection(event.Electron, event.MuonTight,
                                        TrkPtMap, NeuPtMap, PhotonPtMap)
    if not selected: return Row()
    #particles.append(lep)
    particles[index] = lep
    index
    lepMomentum = LorentzVector(lep[1], lep[2], lep[3], lep[0])
    
    nTrk = 0
    for h in event.Tracks:
        if nTrk>=450: break
        q.SetPtEtaPhiM(h["PT"], h["Eta"], h["Phi"], 0.)
        if lepMomentum.DeltaR(q) > 0.0001:
            pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
            pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
            pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
            """particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                              h["PT"], h["Eta"], h["Phi"], h["X"], h["Y"], h["Z"],
                              pfisoCh, pfisoGamma, pfisoNeu,
                              1., 0., 0., 0., 0., np.sign(h["PID"])])"""
            particles[index] = [q.E(), q.Px(), q.Py(), q.Pz(),
                              h["PT"], h["Eta"], h["Phi"], h["X"], h["Y"], h["Z"],
                              pfisoCh, pfisoGamma, pfisoNeu,
                              1., 0., 0., 0., 0., np.sign(h["PID"])]
            nTrk += 1
            index += 1
            
    nPhoton = 0
    for h in event.Photons:
        if nPhoton >= 150: break
        q.SetPtEtaPhiM(h["ET"], h["Eta"], h["Phi"], 0.)
        pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
        """particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                          h["ET"], h["Eta"], h["Phi"], 0., 0., 0.,
                          pfisoCh, pfisoGamma, pfisoNeu,
                          0., 0., 1., 0., 0., 0.])"""
        particles[index] = [q.E(), q.Px(), q.Py(), q.Pz(),
                          h["ET"], h["Eta"], h["Phi"], 0., 0., 0.,
                          pfisoCh, pfisoGamma, pfisoNeu,
                          0., 0., 1., 0., 0., 0.]
        nPhoton += 1
        index += 1
    
    nNeu = 0
    for h in event.NeutralHadrons:
        if nNeu >= 200: break
        q.SetPtEtaPhiM(h["ET"], h["Eta"], h["Phi"], 0.)
        pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
        """particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                          h["ET"], h["Eta"], h["Phi"], 0., 0., 0.,
                          pfisoCh, pfisoGamma, pfisoNeu,
                          0., 1., 0., 0., 0., 0.])"""
        particles[index] = [q.E(), q.Px(), q.Py(), q.Pz(),
                          h["ET"], h["Eta"], h["Phi"], 0., 0., 0.,
                          pfisoCh, pfisoGamma, pfisoNeu,
                          0., 1., 0., 0., 0., 0.]
        nNeu += 1
        index += 1
    #    
    # Build high level features
    #
    myMET = event.MissingET[0]
    MET = myMET["MET"]
    phiMET = myMET["Phi"]
    MT = 2.*MET*lepMomentum.Pt()*(1-math.cos(lepMomentum.Phi()-phiMET))
    HT = 0.
    nJets = 0.
    nBjets = 0.
    for jet in event.Jets:
        nJets += 1
        HT += jet["PT"]
        if jet["BTag"]>0: 
            nBjets += 1
    LepPt = lep[4]
    LepEta = lep[5]
    LepPhi = lep[6]
    LepIsoCh = lep[10]
    LepIsoGamma = lep[11]
    LepIsoNeu = lep[12]
    LepCharge = lep[18]
    LepIsEle = lep[16]
    
    hlf = [HT, MET, phiMET, MT, nJets, nBjets, LepPt, LepEta, LepPhi,
           LepIsoCh, LepIsoGamma, LepIsoNeu, LepCharge, LepIsEle]
    #
    # return the Row of low level features and high level features
    #
    return Row(lfeatures=particles.tolist(), hfeatures=hlf, label=event.label)

Finally apply the function to all the events

In [21]:
features = reduced.rdd \
            .map(convert) \
            .filter(lambda row: len(row) > 0) \
            .toDF()

In [22]:
features.printSchema()

root
 |-- hfeatures: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- label: long (nullable = true)
 |-- lfeatures: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)



## Save the datasets as Parquet files

In [23]:
dataset_path = "hdfs://analytix/Training/Spark/TopologyClassifier/dataIngestion_full_13TeV"
num_partitions = 3000 # used in DataFrame coalesce operation to limit number of output files

%time features.coalesce(num_partitions).write.partitionBy("label").parquet(dataset_path)

CPU times: user 812 ms, sys: 548 ms, total: 1.36 s
Wall time: 2h 11min 23s


In [24]:
print("Number of events written to Parquet:", spark.read.parquet(dataset_path).count())

Number of events written to Parquet: 25468476


In [25]:
spark.stop()