![ROOT Logo](http://root.cern.ch/img/logos/ROOT_Logo/website-banner/website-banner-%28not%20root%20picture%29.jpg)
![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)
# **GSoC 2018 at CERN-HSF**
## **Distributed Big Data Analysis with TDataFrame**
<hr style="border-top-width: 4px; border-top-color: #34609b;">

Create Spark context.

In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf()
# Your options here, if needed
sc = SparkContext(conf = conf)

Import DistROOT.

In [None]:
import ROOT
from DistROOT import DistTree

Define the mapper and reducer functions for the distributed analysis with TDataFrame. The code processes a physics dataset with the following columns:

| Column Name   | Type     | Quantity                                      |
| :-----------: |:--------:| :--------------------------------------------:|
| *E1*          | double   | Energy of the first muon |
| *eta1*        | double   | Pseudorapidity of the first muon |
| *phi1*        | double   | Phi of the first muon |
| *px1*         | double   | X component of the momentum of the first muon |
| *py1*         | double   | Y component of the momentum of the first muon |
| *pz1*         | double   | Z component of the momentum of the first muon |
| *pt1*         | double   | Transverse momentum of the first muon |
| *C1*          | Long64_t | Charge of the momentum of the first muon |
| *E2*          | double   | Energy of the second muon |
| *eta2*        | double   | Pseudorapidity of the second muon |
| *phi2*        | double   | Phi of the second muon |
| *px2*         | double   | X component of the momentum of the second muon |
| *py2*         | double   | Y component of the momentum of the second muon |
| *pz2*         | double   | Z component of the momentum of the second muon |
| *pt2*         | double   | Transverse momentum of the second muon |
| *C2*          | Long64_t | Charge of the momentum of the second muon |


In [None]:
def fillHist(tdf):
    # Define the analysis cuts
    chargeCutStr = "C1 != C2"
    etaCutStr = "fabs(eta1) < 2.3 && fabs(eta2) < 2.3"
    ptCutStr = "pt1 > 2 && pt2 > 2"
    tdf_f = tdf.Filter(chargeCutStr, "Opposite Charge") \
               .Filter(etaCutStr, "Central Muons") \
               .Filter(ptCutStr, "Sane Pt")
            
    # Create the invariant mass column
    invMassFormulaStr = "sqrt(pow(E1+E2, 2) - (pow(px1+px2, 2) + pow(py1+py2, 2) + pow(pz1+pz2, 2)))"
    tdf_fd = tdf_f.Define("invMass", invMassFormulaStr)
    
    # Create the histograms
    pt1_h = tdf.Histo1D(("","",128,1,1200), "pt1")
    pt2_h = tdf.Histo1D(("","",128,1,1200), "pt2")
    invMass_h = tdf_fd.Histo1D(("invMass","CMS Opendata;#mu#mu mass [GeV];Events",512,5,110), "invMass")
    import ROOT
    pi = ROOT.TMath.Pi()
    phis_h = tdf_fd.Histo2D(("", "", 64, -pi, pi, 64, -pi, pi), "phi1", "phi2")

    # Return the histograms
    return [ pt1_h, pt2_h, invMass_h, phis_h ]

In [None]:
def mergeHist(hList1, hList2):
    for i in range(len(hList1)):
        hList1[i].Add(hList2[i])
    return hList1

Build the DistTree and trigger the parallel processing. The final result is the merged histograms.

In [None]:
dTree = DistTree(filelist = ['https://root.cern/files/teaching/CMS_Open_Dataset.root',],
                 treename = "data",
                 npartitions = 2)

hList = dTree.ProcessAndMerge(fillHist, mergeHist)

Draw the histograms that were filled using Spark and ROOT.

In [None]:
muonsPts = ROOT.TCanvas()
hList[0].Draw("PL PLC PMC")
hList[1].Draw("Same PL PLC PMC")
muonsPts.SetLogy()
muonsPts.SetLogx()
muonsPts.Draw()

phis = ROOT.TCanvas()
hList[3].Draw("col")
phis.Draw()

In [None]:
invMass = ROOT.TCanvas()
hList[2].Draw()
invMass.SetLogy()
invMass.SetLogx()
invMass.SetGrid()
invMass.Draw()