## Initialize the Dask client

In [1]:
from dask.distributed import Client
sched_port = 37470
client = Client(address="tcp://127.0.0.1:"+str(sched_port))


+-------------+-----------+-----------+---------+
| Package     | client    | scheduler | workers |
+-------------+-----------+-----------+---------+
| dask        | 2021.09.1 | 2021.09.0 | None    |
| distributed | 2021.09.1 | 2021.09.0 | None    |
+-------------+-----------+-----------+---------+


## Inspect the Dask cluster

In [3]:
client

0,1
Connection method: Direct,
Dashboard: http://127.0.0.1:32953/status,

0,1
Comm: tcp://10.244.6.85:37470,Workers: 4
Dashboard: http://10.244.6.85:32953/status,Total threads: 4
Started: 12 minutes ago,Total memory: 11.16 GiB

0,1
Comm: tcp://10.244.11.231:40567,Total threads: 1
Dashboard: http://10.244.11.231:46209/status,Memory: 2.79 GiB
Nanny: tcp://10.244.11.231:35857,
Local directory: /var/lib/condor/execute/dir_472/dask-worker-space/worker-m8vmx6u9,Local directory: /var/lib/condor/execute/dir_472/dask-worker-space/worker-m8vmx6u9
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 58.2%,Last seen: Just now
Memory usage: 93.05 MiB,Spilled bytes: 0 B
Read bytes: 1.12 kiB,Write bytes: 1.82 kiB

0,1
Comm: tcp://10.244.15.246:36201,Total threads: 1
Dashboard: http://10.244.15.246:33315/status,Memory: 2.79 GiB
Nanny: tcp://10.244.15.246:33859,
Local directory: /var/lib/condor/execute/dir_468/dask-worker-space/worker-vl0knynp,Local directory: /var/lib/condor/execute/dir_468/dask-worker-space/worker-vl0knynp
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 6.0%,Last seen: Just now
Memory usage: 94.34 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.244.10.32:37773,Total threads: 1
Dashboard: http://10.244.10.32:36979/status,Memory: 2.79 GiB
Nanny: tcp://10.244.10.32:45671,
Local directory: /var/lib/condor/execute/dir_475/dask-worker-space/worker-61le395p,Local directory: /var/lib/condor/execute/dir_475/dask-worker-space/worker-61le395p
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 4.0%,Last seen: Just now
Memory usage: 94.32 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.244.6.85:33087,Total threads: 1
Dashboard: http://10.244.6.85:42237/status,Memory: 2.79 GiB
Nanny: tcp://10.244.6.85:34417,
Local directory: /var/lib/condor/execute/dir_4635/dask-worker-space/worker-jfy6ovut,Local directory: /var/lib/condor/execute/dir_4635/dask-worker-space/worker-jfy6ovut
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 4.0%,Last seen: Just now
Memory usage: 92.59 MiB,Spilled bytes: 0 B
Read bytes: 10.93 kiB,Write bytes: 10.88 kiB


## Declare your custom functions inside an initialization function
This function is used to initialize each worker process. This is necessary since ```ROOT.gInterpreter.Declare()``` has effect only on the local C++ interpreter. Here you should declare all custom functions that you plan to use in the analysis. In this example, a ```CountGoodMuons(const RVec<float> &pt)``` function is defined that will be used for a ```Define``` of a new column of the DataFrame.

In [4]:
import ROOT

def my_initialization_function():
  ROOT.gInterpreter.Declare("""
  #include "ROOT/RVec.hxx"
  using namespace ROOT::VecOps;
  
  size_t CountGoodMuons(const RVec<float> &pt){
    size_t n = 0;
    for (size_t i = 0; i < pt.size(); i++) {
        if (pt[i] > 50) n++;
    }
    return n;
  }
  """)

#### you can also declare functions inside a myfunctions.h file like this: #####
# text_file = open("myfunctions.h", "r")
# data = text_file.read()
# def my_initialization_function():
#    ROOT.gInterpreter.Declare('{}'.format(data))

ROOT.RDF.Experimental.Distributed.initialize(my_initialization_function)

Error in <TExMap::Add>: key 139885343440352 is not unique
Error in <TExMap::Add>: key 139885343956944 is not unique


Welcome to JupyROOT 6.25/01


## Create DataFrame and book actions and transformations - no custom functions
Here is an example of how to use ```Define``` and ```Filter``` using simple simple algebraic expressions: create a new column, ```nLightLepton```, which contains the sum of ```nMuon``` and ```nElectron``` in the event and keep events in which ```nLightLepton``` is greater or equal to 5

In [5]:
import ROOT 
 
# Create dataframe from NanoAOD files
df = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame("Events", "https://ttedesch.web.cern.ch/ttedesch/cms_opendata_2012_nanoaod_skimmed/Run2012C_DoubleMuParked.root", npartitions=1, daskclient=client)
df_defined = df.Define("nLightLepton", "nMuon + nElectron")
df_filtered = df_defined.Filter("nLightLepton >= 5", "Filter events with at least 5 light leptons")
h = df_filtered.Histo1D(("nLightLepton","nLightLepton; nLightLepton ;N_{Events}",10,0,10),"nLightLepton")

## Draw histo (the event loop is triggered here)
A simple histogram of the newly-defined variable

In [None]:
ROOT.gStyle.SetOptStat(0); ROOT.gStyle.SetTextFont(42)
c = ROOT.TCanvas("c", "", 800, 700)
h.SetTitle("")
h.GetXaxis().SetTitleSize(0.04)
h.GetYaxis().SetTitleSize(0.04)
h.Draw()
c.Draw()

## Create DataFrame and book actions and transformations - use custom function defined in my_initialization_function()
Here the ```Define``` action uses the ```CountGoodMuons(Muon_pt)``` function (declared to the C++ interpreter in ```my_initialization_function()```) which loops over all Muon pts in the event and counts how many muons have a pt greater than 50 GeV. Then a simple ```Filter``` action is booked: only events with at least one muon with pt greater than 50 GeV are kept. 

In [None]:
import ROOT 
 
# Create dataframe from NanoAOD files
df = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame("Events", "https://ttedesch.web.cern.ch/ttedesch/cms_opendata_2012_nanoaod_skimmed/Run2012C_DoubleMuParked.root", npartitions=1, daskclient=client)
df_defined = df.Define("nMuons_over50", "CountGoodMuons(Muon_pt)")
df_filtered = df_defined.Filter("nMuons_over50 > 0", "Filter events with at least 1 muon with pt>50 GeV")
h = df_filtered.Histo1D(("nMuons_over50","nMuons_over50; nMuons_over50 ;N_{Events}",5,0,5),"nMuons_over50")

## Draw histo (the event loop is triggered here)
A simple histogram of the newly-defined variable.

In [None]:
ROOT.gStyle.SetOptStat(0); ROOT.gStyle.SetTextFont(42)
c1 = ROOT.TCanvas("c1", "", 800, 700)
h.SetTitle("")
h.GetXaxis().SetTitleSize(0.04)
h.GetYaxis().SetTitleSize(0.04)
h.Draw()
c1.Draw()