In [2]:
DATAPATH = "/nfs/public/mschneid/moredqmiodata/_SingleMuon_Run2018A-12Nov2019_UL2018-v2_DQMIO"

In [3]:
# in this notebook, we try to apply some operations (namely, computing the
# covariance matrix for PCA) to a large amount of MEs at once. This requires
# a lot more control over IO, to make sure we get efficient reads and also
# don't run out of memory on the way.

import os
# it is cool that numpy uses multiple threads here, but for the small matrices it is not very efficient...
os.environ["OMP_NUM_THREADS"] = "1" 

import ROOT
import root_numpy
from collections import namedtuple, defaultdict
IndexEntry = namedtuple('IndexEntry', ['run', 'lumi', 'type', 'file', 'firstidx', 'lastidx'])
MonitorElement = namedtuple('MonitorElement', ['run', 'lumi', 'name', 'type', 'data'])
NTHREADS=128

def extractdatafromROOT(x):
    if isinstance(x, ROOT.string):
        return unicode(x.data())
    if isinstance(x, int):
        return x
    if isinstance(x, float):
        return x
    else:
        return root_numpy.hist2array(x)
    
class DQMIOFile:
    """
    Open the passed in file and read index data.
    The file names go directly to ROOT, remote files like root://cms-xrd-global.cern.ch//store/... should work.
    """
    def __init__(self, rootfile):
        # for non-local files (/store/...), prefix them with "root://cms-xrd-global.cern.ch/"
        self.rootfile = ROOT.TFile.Open(rootfile)
        self.readindex()
        
    """
    Internal: read index tables.
    """
    def readindex(self):
        self.index = defaultdict(list)
        idxtree = getattr(self.rootfile, "Indices")
        # release GIL in long operations. Disable if it causes trouble.
        #idxtree.GetEntry._threaded = True

        for i in range(idxtree.GetEntries()):
            idxtree.GetEntry(i)
            run, lumi, metype = idxtree.Run, idxtree.Lumi, idxtree.Type
            if lumi == 0:
                # read only per-lumi MEs for now.
                continue
            # inclusive range -- for 0 entries, row is left out
            firstidx, lastidx = idxtree.FirstIndex, idxtree.LastIndex
            e = IndexEntry(run, lumi, metype, self.rootfile.GetName(), firstidx, lastidx)
            self.index[(run, lumi)].append(e)
    
"""
Read MEs matching the given wildcard patterns from a single lumi.
For rootobjects = True, return actual root objects, by default the histogram data is extracted into numpy arrays.
Returns a list of MonitorElement named tuples.
"""
def getMEsForLumi(entries, nameset):
    treenames = { 
      0: "Ints",
      1: "Floats",
      2: "Strings",
      3: "TH1Fs",
      4: "TH1Ss",
      5: "TH1Ds",
      6: "TH2Fs",
      7: "TH2Ss",
      8: "TH2Ds",
      9: "TH3Fs",
      10: "TProfiles",
      11: "TProfile2Ds",
    }
    rootfile = ROOT.TFile.Open(entries[0].file)
    result = []
    for e in entries:
        metree = getattr(rootfile, treenames[e.type])
        metree.GetEntry(0)
        metree.SetBranchStatus("*",0)
        metree.SetBranchStatus("FullName",1)
        # release GIL in long operations. Disable if it causes trouble.
        metree.GetEntry._threaded = True
        for x in range(e.firstidx, e.lastidx+1):
            metree.GetEntry(x)
            mename = str(metree.FullName)
            if not nameset or mename in nameset:
                metree.GetEntry(x, 1)
                value = metree.Value
                value = extractdatafromROOT(value)
                me = MonitorElement(e.run, e.lumi, mename, e.type, value)
                result.append(me)
    return result
    

Welcome to JupyROOT 6.18/04


In [4]:
# open all the files, and read their indices. We don't keep the reader objects;
# the actual IO is a free function that allows us more control over parallelism.

from glob import glob
files = glob(DATAPATH + "/*.root")[:50]

def entries(f):
    iofile = DQMIOFile(f)
    return list(iofile.index.values())

es = sum(map(entries, files), [])

In [5]:
# try reading a lumisection. "None" means read all MEs.
mes = getMEsForLumi(es[0], None)

In [6]:
# now we can slim down the list a bit to reduce the IO volume. E.g. we can't apply
# PCA to big 2D histograms anyways.
import numpy
interesting = set([x.name for x in mes if numpy.prod(x.data.shape) < 1000 
     and not x.name.startswith("Pixel") 
     #and not x.name.startswith("RPC")
     and not x.name.startswith("JetMet")
     and not x.name.startswith("HLT")
     and not x.name.startswith("L1TEMU")
     #and not x.name.startswith("Egamma")
     and not x.name.startswith("AlCaReco")])
# the result is a set, which allows fast lookups. 
len(interesting)

27668

In [7]:
# try reading the reduced set, it should be fast.
%time mes = getMEsForLumi(es[100], interesting)

CPU times: user 3 s, sys: 173 ms, total: 3.18 s
Wall time: 3.79 s


In [8]:
# ... and make sure we actually got something.
len(mes)

27668

In [None]:
# now do the actual work. We will use a mulitprocessing process Pool to read many
# files in parallel (the process isolation helps to keep ROOT from blowing up, and
# also helps with the operations that may be CPU-bound on Python side), as well as 
# a thread pool for the actual covariance computation (here, the threadpool reduces
# IO overhead, since the actual amount of work to do is rather small.)
# This requires a bit of juggling with Tasks and Promises, since we can't just read
# everything first (we might run out of memory.)
# To make the covariance computation fast, it is crucial to sort the data by ME first.

from multiprocessing import Pool
from multiprocessing.pool import ThreadPool 

def extractfile(es_names):
    return getMEsForLumi(*es_names)

covmat = dict()
covqueue = defaultdict(list)

def addtocovmat(mes):
    # the bottleneck here are actually the stack calls, which do not release the GIL.
    # the .dot() which does is pretty fast. So using multiple threads does not gain much...
    assert(len(set(me.name for me in mes)) == 1)
    if len(mes[0].data.shape) > 1:
        # the reshape call is rather slow, avoid it if we can
        size = numpy.prod(mes[0].data.shape)
        mat = numpy.stack([me.data.reshape(size) for me in mes])
    else:
        mat = numpy.stack([me.data for me in mes])
    cmat = mat.transpose().dot(mat)
    if not mes[0].name in covmat:
        covmat[mes[0].name] = cmat
    else:
        covmat[mes[0].name] += cmat
            
def addtocovqueue(mes):
    for me in mes:
        covqueue[me.name].append(me)
        
def processcovqueue(tp, minbatchsize = 0):
    res = []
    keys = covqueue.keys()
    for mename in keys:
        if len(covqueue[mename]) > minbatchsize:
            res.append(tp.map_async(addtocovmat, [covqueue[mename]]))
            covqueue[mename] = []
    return res
    

p = Pool(64)
tp = ThreadPool(4)

res = []
for e in es:
    res.append(p.map_async(extractfile, [(e, interesting),]))

from IPython import display
import time

covres = []

while len(res) > 0 or len(covres) > 0:
    done = []
    covdone = []
    for r in res:
        if r.ready():
            map(addtocovqueue, r.get())
            done.append(r)
    for r in covres:
        if r.ready():
            covdone.append(r)
    if len(covres) == 0 or len(res) == 0:
        # batch up until we run out of work...
        covres += processcovqueue(tp, 0)
    res = [r for r in res if r not in done]
    covres = [r for r in covres if r not in covdone]
    display.clear_output(wait=True)
    display.display("%d reading tasks, %d covariance tasks remaining" % (len(res), len(covres)))
    time.sleep(1)
    
# wait for async stuff to complete
p.close()
tp.close()
p.join()
tp.join()

'392 reading tasks, 24477 covariance tasks remaining'

In [None]:
# a random covariance matrix
covmat[covmat.keys()[4567]].shape

In [None]:
# make sure we have all...
covmat.keys()

In [None]:
# now, we can play around with a much smaller dataset...
eigval, eigvec = numpy.linalg.eig(covmat[covmat.keys()[4567]])

In [None]:
%matplotlib inline
import matplotlib
matplotlib.rcParams['figure.figsize'] = [15, 8]
import matplotlib.pyplot as plt

plt.plot(numpy.log(eigval), 'o-')

In [None]:
import random
for mename in random.sample(covmat.keys(), 5):
    eigval, eigvec = numpy.linalg.eig(covmat[mename])
    plt.plot(numpy.log(eigval), 'o-', label=mename)
plt.legend()