# This is an example of opening multiple .root file using uproot4, dask and displaying data into a single aggregated histogram

## 1. Importing all the necessary packages

In [1]:
import sys
sys.path.insert(0, '..')
# sys.path.insert(1, '../python/')
# import root_numpy as rnp
import uproot4
import python.file_manager as fm
import ROOT

Welcome to JupyROOT 6.26/11


## 2. Importing the second batch of packages

In [2]:
import numpy as np
import fsspec
import distributed
import awkward as ak
import dask_awkward as dak
print("Loaded")

Loaded


## 3. Importing histogram libraries

In [None]:
import hist

import dask_histogram as dh
import boost_histogram as bh
print("Loaded")

## 4. Opening a distributed client for Dask

In [None]:
client = distributed.Client()
client

## 5. Opening .root files using Dask and Uproot

### 5.1 Opening files using Dask

In [None]:
import uproot as up

In [None]:
dask_multiple_files = uproot4.dask({'root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_0.root': 'Events',
                   'root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_1.root': 'Events'
                   })

In [None]:
dask_single_file = uproot4.dask({'root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_0.root': 'Events'}, library='np')

In [None]:
dask_single_third = up.dask({'root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v102D/perfNano_9026841_0.root': 'Events'}, library='np')

In [None]:
dask_single_third['event']

### 5.2 Opening files using uproot.open

In [None]:
file =  uproot4.open('root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_0.root')
tree = file['Events']
arr = tree.arrays(['nTkEleL2'], library='ak')

### 5.2.1 Opening MULTIPLE files using uproot.open

In [None]:
file_first =  uproot4.open('root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_0.root')
tree_first = file_first['Events']

file_second =  uproot4.open('root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_1.root')
tree_second = file_second['Events']

arr_first = tree_first.arrays(['nTkEleL2'], library='ak')
arr_second = tree_second.arrays(['nTkEleL2'], library='ak')

arr_first_second_final = ak.concatenate([arr_first, arr_second])

In [None]:
import uproot as up

In [None]:
file_third =  up.open('root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_0.root')
tree_third = file_third['Events']

In [None]:
dask_single_file3 = up.dask({'root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_0.root': 'Events'}, library='np')

In [None]:
type(tree_third)

### 5.3 Checking the input of opened files

In [None]:
print(dask_multiple_files['nTkEleL2'])

In [None]:
print(dask_single_file['nTkEleL2'])

In [None]:
print(arr[0])

### 5.4 Setting up the charts

In [None]:
import matplotlib.pyplot as plt
import dask.array as da
import dask
import hist.dask as dah

In [None]:
def plot_chart(data):
    # add the axes, finalize with storage
    h = (
        dah.Hist.new.Reg(50, -5, 5, name="S", label="s [units]", flow=False)
        .Reg(50, -5, 5, name="W", label="w [units]", flow=False)
        .Double()
    )

    s_data = data
    w_data = data
        
    # delayed fill
    h.fill(W=w_data, S=s_data)
    
    # auto-plot
    fig, axs = plt.subplots(1, 2, figsize=(10, 4))
    
    h.project("W").plot(ax=axs[0])
    h.project("W", "S").plot(ax=axs[1])
    plt.show()
    
    return h

In [None]:
def finalize_dask_chart(histogram):
    # render in-memory histogram
    histogram = histogram.compute()

    import matplotlib.pyplot as plt

    # auto-plot
    fig, axs = plt.subplots(1, 2, figsize=(10, 4))

    histogram.project("W").plot(ax=axs[0])
    histogram.project("W", "S").plot(ax=axs[1])
    plt.show()

### 5.5 Dask charts: opening a single .ROOT file in MULTIPLE steps

In [None]:
dask_chart_single = plot_chart(dask_single_file['nTkEleL2'])

In [None]:
finalize_dask_chart(dask_chart_single)

### 5.6  uproot charts: opening a single .ROOT file in a SINGLE step

In [None]:
uproot_chart_single = plot_chart(tree['nTkEleL2'])

In [None]:
uproot_chart_single_third = plot_chart(tree_third['nTkEleL2'])

In [None]:
type(tree_third['nTkEleL2'])

### 5.7 Dask charts: opening multiple .ROOT files in SINGLE step

In [None]:
uproot_multiple_files = plot_chart(arr_first_second_final)

### 5.8 Compare Dask with single ROOT file VS uproot4 single ROOT file

#### 5.8.1 uproot

In [None]:
bin_contents_uproot = uproot_chart_single.values()

print("LENGTH OF THE BINS: ",  bin_contents_uproot.size)

for i, content in enumerate(bin_contents_uproot):
    print(f"Bin {i}: {content}")

#### 5.8.2 dask

In [None]:
bin_contents_dask_single = dask_chart_single.values()

print("LENGTH OF THE BINS: ",  bin_contents_dask_single.size)

for i, content in enumerate(bin_contents_dask_single):
    print(f"Bin {i}: {content}")

### 5.9 Dask charts: opening MULTIPLE ROOT files

In [None]:
dask_chart_multiple = plot_chart(dask_multiple_files['nTkEleL2'])

In [None]:
finalize_dask_chart(dask_chart_multiple)

### 6. uproot charts: opening MULTIPLE ROOT files

In [None]:
uproot_chart_multiple = plot_chart(arr_first_second_final)

### 6.1 Compare Dask with MULTIPLE ROOT file VS uproot4 MULTIPLE ROOT file

#### 6.1.1 uproot

In [None]:
bin_contents_uproot_multiple = uproot_chart_multiple.values()

print("LENGTH OF THE BINS: ",  bin_contents_uproot_multiple.size)

for i, content in enumerate(bin_contents_uproot_multiple):
    print(f"Bin {i}: {content}")

#### 6.1.2 dask

In [None]:
bin_contents_dask_multiple = dask_chart_multiple.values()

print("LENGTH OF THE BINS: ",  bin_contents_dask_multiple.size)

for i, content in enumerate(bin_contents_dask_multiple):
    print(f"Bin {i}: {content}")

In [5]:
import pandas as pd
import datetime
import resource
import gc
import awkward as ak
import awkward_pandas as akpd
import vector
vector.register_awkward()

class TreeReader(object):
    def __init__(self, entry_range, max_events):
        self.tree = None
        self._branches = []
        # this is the gloabl "entry" across files
        self.global_entry = -1
        # this is the "entry" local to the open file (reset to 0) every new file
        self.file_entry = -1
        self.max_events = max_events
        self.entry_range = entry_range

        self.n_tot_entries = 0

    def setTree(self, uptree):
        self.tree = uptree
        self._branches = []
        branch_blacklist = ['tc_wafer',
                            'tc_cell',
                            'tc_waferu',
                            'tc_waferv',
                            'tc_cellu',
                            'tc_cellv',
                            'gen_PUNumInt',
                            'gen_TrueNumInt',
                            # 'gen_daughters', 
                            'simpart_posx', 'simpart_posy', 'simpart_posz',
                            ]
        if len(self._branches) == 0:
            self._branches = [br for br in self.tree.keys() if br not in branch_blacklist]
        print(f'open new tree file with # entries: {self.tree.num_entries}')
        self.file_entry = -1

    def next(self, debug=0):

        if self.max_events != -1:
            if self.n_tot_entries == self.max_events:
                print('END loop for max_event!')
                # we processed the max # of events
                return False
        if self.entry_range[1] != -1:
            if self.global_entry == self.entry_range[1]:
                print('END loop for entry_range')
                return False
        if self.file_entry == self.tree.num_entries-1:
            print('END loop for end_of_file')
            return False

        if self.global_entry == -1:
            self.global_entry = self.entry_range[0]
            self.file_entry = self.entry_range[0]
        else:
            self.file_entry += 1
            self.global_entry += 1

        # entry is the cursor in the file: when we open a new one (not the first) needs to be set to 0 again
        if debug >= 2 or self.global_entry % 1000 == 0:
            self.printEntry()
        
        self.n_tot_entries += 1
        return True

    def printEntry(self):
        print("--- File entry: {}, global entry: {}, tot # events: {} @ {}, MaxRSS {:.2f} Mb".format(
            self.file_entry,
            self.global_entry,
            self.n_tot_entries,
            datetime.datetime.now(),
            resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/1E6))
        # print(self.tree.keys())
        # print(self.tree.arrays(['run', 'lumi', 'event'], library='pd', entry_start=self.file_entry, entry_stop=self.file_entry+1))
        # self.dump_garbage()
    
    def dump_garbage(self):
        """
        show us what's the garbage about
        """
            
        # force collection
        print ("\nGARBAGE:")
        gc.collect()

        print ("\nGARBAGE OBJECTS:")
        for x in gc.garbage:
            s = str(x)
            if len(s) > 80: s = s[:80]
            print (type(x),"\n  ", s)


    def getDataFrame(self, prefix, entry_block, fallback=None):
        branches = [br for br in self._branches
                    if br.startswith(prefix+'_') and
                    not br == '{}_n'.format(prefix)]
        names = ['_'.join(br.split('_')[1:]) for br in branches]
        name_map = dict(zip(names, branches))
        if len(branches) == 0:
            if fallback is not None:
                return self.getDataFrame(prefix=fallback, entry_block=entry_block)
            prefs = set([br.split('_')[0] for br in self._branches])
            print(f'stored branch prefixes are: {prefs}')
            raise ValueError(f'[TreeReader::getDataFrame] No branches with prefix: {prefix}')
        
        akarray = self.tree.arrays(names, 
                                   library='ak', 
                                   aliases=name_map, 
                                   entry_start=self.file_entry, 
                                   entry_stop=self.file_entry+entry_block)
        
        print("TYPE AKARRAY")
        print(akarray)
        records = {}
        for field in akarray.fields:
            records[field] = akarray[field]
        
        if 'pt' in names and 'eta' in names and 'phi' in names:
            if not 'mass' in names and not 'energy' in names:
                records['mass'] = 0.*akarray['pt']
            return vector.zip(records)

        return ak.zip(records)

        # FIXME: we should probably do an ak.Record using sometjhing along the lines of:
        # ele_rec = ak.zip({'pt': tkele.pt, 'eta': tkele.eta, 'phi': tkele.phi}, with_name="pippo")
        # this would allow to handle the records and assign behaviours....

        # return akarray
        


In [None]:
file_second =  uproot4.open('root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_1.root')
tree_second = file_second['Events']

In [None]:
type(tree_second)

In [None]:
va = uproot4.dask(tree_second)

In [None]:
type(va)

In [None]:
for field in file_second:
    print(field)

In [3]:
def getUpTree(uprobj, name):
    parts = name.split('/')
    # if len(parts) > 1:
    #     return getUpTree(uprobj, '/'.join(parts[1:]))
    return uprobj[name]

In [6]:
files_with_protocol = ['root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_1.root']
range_ev = (0, 1)
tree_reader = TreeReader(range_ev, 4)
print('events_per_job: {}'.format(1))
print('maxEvents: {}'.format(1))
print('range_ev: {}'.format(range_ev))

# tr = Tracer()

break_file_loop = False
for tree_file_name in files_with_protocol:
    if break_file_loop:
        break
    # tree_file = up.open(tree_file_name, num_workers=2)
    tree_file = uproot4.open(tree_file_name, num_workers=1)

    print(f'opening file: {tree_file_name}')
    print(f' . tree name: ')

    ttree = getUpTree(tree_file, 'Events')

events_per_job: 1
maxEvents: 1
range_ev: (0, 1)
opening file: root://eoscms.cern.ch//eos/cms/store/cmst3/group/l1tr/cerminar/l1teg/fp_ntuples/DoubleElectron_FlatPt-1To100_PU200/FP/v100D/perfNano_8830991_1.root
 . tree name: 


In [14]:
akarray_new = uproot4.dask(ttree, library='ak')

In [18]:
branches = [br for br in ttree.keys()
            if br.startswith('TkEleL2'+'_') and
            not br == '{}_n'.format('TkEleL2')]
names = ['_'.join(br.split('_')[1:]) for br in branches]

name_map = dict(zip(names, branches))

In [34]:
name_map_new = dict(zip(branches, names))

In [35]:
name_map_new

{'TkEleL2_eta': 'eta',
 'TkEleL2_pfIso': 'pfIso',
 'TkEleL2_pfIsoPV': 'pfIsoPV',
 'TkEleL2_phi': 'phi',
 'TkEleL2_pt': 'pt',
 'TkEleL2_puppiIso': 'puppiIso',
 'TkEleL2_puppiIsoPV': 'puppiIsoPV',
 'TkEleL2_tkEta': 'tkEta',
 'TkEleL2_tkIso': 'tkIso',
 'TkEleL2_tkIsoPV': 'tkIsoPV',
 'TkEleL2_tkPhi': 'tkPhi',
 'TkEleL2_tkPt': 'tkPt',
 'TkEleL2_vz': 'vz',
 'TkEleL2_charge': 'charge',
 'TkEleL2_hwQual': 'hwQual'}

In [111]:
akarray = ttree.arrays(names, 
                           library='ak', 
                           aliases=name_map, 
                           entry_start=0, 
                           entry_stop=1001)

In [38]:
akarray_form_mapping = uproot4.dask(ttree, library='ak', form_mapping=name_map_new)

In [43]:
akarray_form_mapping22= uproot4.dask(ttree, library='ak', form_mapping={'test': 'luminosityBlock'})

In [112]:
type(akarray)

awkward.highlevel.Array

In [26]:
akarray.fields

['eta',
 'pfIso',
 'pfIsoPV',
 'phi',
 'pt',
 'puppiIso',
 'puppiIsoPV',
 'tkEta',
 'tkIso',
 'tkIsoPV',
 'tkPhi',
 'tkPt',
 'vz',
 'charge',
 'hwQual']

In [39]:
akarray_form_mapping.fields

['run',
 'luminosityBlock',
 'event',
 'bunchCrossing',
 'nTkEleL2',
 'TkEleL2_eta',
 'TkEleL2_pfIso',
 'TkEleL2_pfIsoPV',
 'TkEleL2_phi',
 'TkEleL2_pt',
 'TkEleL2_puppiIso',
 'TkEleL2_puppiIsoPV',
 'TkEleL2_tkEta',
 'TkEleL2_tkIso',
 'TkEleL2_tkIsoPV',
 'TkEleL2_tkPhi',
 'TkEleL2_tkPt',
 'TkEleL2_vz',
 'TkEleL2_charge',
 'TkEleL2_hwQual',
 'nTkEmL2',
 'TkEmL2_eta',
 'TkEmL2_pfIso',
 'TkEmL2_pfIsoPV',
 'TkEmL2_phi',
 'TkEmL2_pt',
 'TkEmL2_puppiIso',
 'TkEmL2_puppiIsoPV',
 'TkEmL2_tkIso',
 'TkEmL2_tkIsoPV',
 'TkEmL2_charge',
 'TkEmL2_hwQual',
 'nGenEl',
 'GenEl_pt',
 'GenEl_eta',
 'GenEl_phi',
 'GenEl_vz',
 'GenEl_caloeta',
 'GenEl_calophi',
 'GenEl_charge',
 'GenEl_prompt',
 'GenEl_pdgid',
 'nGenPh',
 'GenPh_pt',
 'GenPh_eta',
 'GenPh_phi',
 'GenPh_vz',
 'GenPh_caloeta',
 'GenPh_calophi',
 'GenPh_charge',
 'GenPh_prompt',
 'GenPh_pdgid',
 'nHGCal3DCl',
 'HGCal3DCl_emaxe',
 'HGCal3DCl_eta',
 'HGCal3DCl_hoe',
 'HGCal3DCl_layer10',
 'HGCal3DCl_layer50',
 'HGCal3DCl_layer90',
 'HGCal3DCl_m

In [50]:
import dask.dataframe as dd

In [105]:
df = dak.to_dataframe(akarray_form_mapping22)
df = df.rename(columns=name_map_new)

In [108]:
dask_array_simple = df.to_dask_array()

In [115]:
dask_array_simple

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan, 153)","(nan, 153)"
Dask graph,4 chunks in 4 graph layers,4 chunks in 4 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes unknown unknown Shape (nan, 153) (nan, 153) Dask graph 4 chunks in 4 graph layers Data type float64 numpy.ndarray",,

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan, 153)","(nan, 153)"
Dask graph,4 chunks in 4 graph layers,4 chunks in 4 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [110]:
dak.from_dask_array(dask_array_simple)

ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (2,) + inhomogeneous part.

In [119]:
renamed_ttree = ttree.rename(name_map_new)

AttributeError: 'Model_TTree_v20' object has no attribute 'rename'

In [118]:
type(ttree)

uproot.models.TTree.Model_TTree_v20