# desc-wfmon/perfstat.ipynb

Version 1.02  

Generate and verify the perfstat table used in DESC gen3_workflow.  

We assume [desc-wfmon](https://github.com/LSSTDESC/desc-wfmon) has been installed using the install notebook.


In [None]:
import os
print(f"Working directory is {os.getcwd()}")

In [None]:
# Set up desc-wfmon, installing if needed in the local directory.
insfile = 'install/setup.py'
doinstall = not os.path.exists(insfile)
if doinstall or True:
    import os
    devdir = f"{os.getenv('HOME')}/desc/dev"
    pkgdir = f"{devdir}/desc-wfmon"
    bsfil = f"{pkgdir}/bootstrap.py"
    if not os.path.exists(bsfil):
        print(f"ERROR: Unable to find desc-wfmon bootstrap: {bsfil}")
    # Run boostrap that build and install in ./install.
    %run $bsfil
%run {insfile}

import sys
import collections
import pandas
import desc.wfmon
import desc.sysmon
import matplotlib.pyplot as plt

print(f"Python version is {sys.version}")
for pkg in [desc.wfmon, desc.sysmon]:
    print(f"{pkg} version is {pkg.__version__}")

## Configuration
List the system and process monitoring files for which we want schema.

In [None]:
# Read the config.
%run monexp.py

# Defaults for configuration.
class monexp_def:
    dir = os.getcwd()
    name = os.path.basename(os.getcwd())
    stunit = 'minute'
    sbunit = 'gb'
    tmin = 0
    tmax = 20
    njobmax = 400
    
# Set defaults for any missing parameters.
if 'monexp' not in dir():
    monexp = monexp_def
else:
    for nam in monexp_def.__dict__:
        if nam[0:2] == '__': continue
        if not hasattr(monexp, nam):
            setattr(monexp, nam, getattr(monexp_def, nam))

# List of files to display
sysfils = ['sysmon.csv']
prcfils = ['runinfo/monitoring.db']

# Set the level for process tables.
lev = 2

# Set units for the memory.
bunit, sbuinit = 1, 'byte'
#bunit, sbunit = 2**20, 'MB'
#bunit, sbunit = 2**30, 'GB'

# Time plot limit
tmean_max = monexp.tmax

## Fetch system-level monitoring schema.

In [None]:
line = '----------------------------------------------------------------------------'
print(line)
for sysfil in sysfils:
    if os.path.exists(sysfil):
        print(f"System monitor file: {sysfil}")
        sym = pandas.read_csv(sysfil)
        print(f"System monitor sample count: {len(sym)}")
        print(f"System monitor columns:")
        for cnam in sym.columns:
            print(f"  {cnam}")   
        assert(len(sym.cpu_count.unique()) == 1)
        ncpu = sym.cpu_count[0]
        print(f"CPU count is {ncpu:.0f}")
        assert(len(sym.mem_total.unique()) == 1)
        maxmem = sym.mem_total[0]
        print(f"Total memory is {maxmem:.1f} GB")
    else:
        print(f"File not found: {sysfil}")
    print(line)

## Fetch the process-level monitoring schema

The process monitoring data is read from the mysql DB produced by parsl. Of particular interest is the task table where metrics are sampled at regular intervals seprately for each job.

In [None]:
print(line)
for prcfil in prcfils:
    if os.path.exists(prcfil):
        dbr = desc.wfmon.MonDbReader(prcfil, fix=False, run_id=monexp.run_id)
        dbr.tables(lev)
        print(dbr.table('task').query('task_id<10').task_stderr)
    else:
        print(f"File not found: {sysfil}")
    print(line)

## Fetch the perf data.

In [None]:
pfm = desc.wfmon.PerfStatLogReader(0)
print(f"Reading perf stat data.")
ret = pfm.read_mondb_logs(dbr, 1)
print(f"nskip = {pfm.nskip} ?= {ret['nskip']}")
print(f"nkeep = {pfm.nkeep} ?= {ret['nkeep']}")
print(f"nfile = {pfm.nfile}")
print(f" nval = {pfm.nval}")
print(f"nmiss = {pfm.nmissing}")
print(f"Perf state names: {pfm.dict.keys()}")
assert(pfm.nkeep == ret["nkeep"])
#assert(pfm.nskip == ret["nskip"])
pft = pfm.table()
print(pft)
print(f"Run IDs: {dbr.run_ids}")
print(f"Task names: {dbr.task_names}")
print(f"Task counts: {dbr.task_name_counts}")

#print(pfm.dict)

In [None]:
dbr.fix_times()
tsk = dbr.table('task')
ttr = dbr.table('try')
assert(len(tsk) == len(pft))
for nam in ['run_idx', 'task_idx', 'task_id']:
    dif = pft[nam] - tsk[nam]
    udif = dif.unique()
    print(f"Unique task diffs for {nam}: {udif}")
    ttrdif = pft[nam] - ttr[nam]
    uttrdif = ttrdif.unique()
    print(f"Unique  try diffs for {nam}: {uttrdif}")


## Plot setup
Set plotting params and define functions to plot histograms and graphs.

In [None]:
sfx = '.png'
print(f"Plot file suffix: {sfx}")
plt.rc('font', size=16)
plt.rc('savefig', facecolor='white', bbox='tight')
plt.rc('axes', titlesize='medium')

def clip(x, x1, x2):
    x1c = 1.001*x1 if x1 > 0 else 0.999*x1
    x2c = 1.001*x2 if x2 < 0 else 0.999*x2
    return x.clip(x1c, x2c)

def hist(x, x1, x2, slab, slabx, slaby, sttl, fnam='', legpos='upper right'):
    xc = clip(x, x1, x2)
    count = len(xc)
    mean = xc.mean()
    slab_long = f"{slab} [{count}] {mean:0.2f}"
    plt.figure(figsize=(pdx, pdy))
    scol = 'tab:blue'
    if '180GB' in sttl: scol = 'tab:red'
    sav = plt.hist(bins=100, range=(x1, x2), x=xc, label=slab_long, color=scol)
    plt.legend(loc=legpos, fontsize=12)
    plt.xlabel(slabx)
    plt.ylabel(slaby)
    junk = plt.xlim([x1, x2])
    plt.title(sttl)
    if len(fnam):
        pfx = os.path.basename(os.getcwd()) + '-'
        sfx = '.png'
        pathout = pfx + fnam + sfx
        print(f"Saving plot to {pathout}")
        plt.savefig(pathout)
        
def graph(x, y, x1, x2, y1, y2, slab, slabx, slaby, sttl, fnam='', legpos='upper right'):
    yc = clip(y, y1, y2)
    count = len(y)
    mean = yc.mean()
    slab_long = f"{slab} [{count}] {mean:0.2f}"
    plt.figure(figsize=(pdx, pdy))
    scol = 'tab:blue'
    if '180GB' in sttl: scol = 'tab:red'
    sav = plt.plot(x, yc, '.', label=slab_long, color=scol)
    plt.legend(loc=legpos, fontsize=12)
    plt.xlabel(slabx)
    plt.ylabel(slaby)
    junkx = plt.xlim([x1, x2])
    junky = plt.ylim([y1, y2])
    plt.title(sttl)
    plt.grid(True)
    if len(fnam):
        pfx = os.path.basename(os.getcwd()) + '-'
        sfx = '.png'
        pathout = pfx + fnam + sfx
        print(f"Saving plot to {pathout}")
        plt.savefig(pathout)
        
def multigraph(xs, ys, x1, x2, y1, y2, slabs, slabx, slaby, sttl, fnam='', legpos='upper right', nlegcol=1):
    nplt = len(xs)
    assert( len(ys) == nplt )
    ycs = nplt*[None]
    mean = nplt*[]
    slab_longs = nplt*[None]
    for iplt in range(nplt):
        ycs[iplt] = clip(ys[iplt], y1, y2)
        mean = ycs[iplt].mean()
        count = len(ycs[iplt])
        slab_longs[iplt] = f"{slabs[iplt]} [{count}] {mean:0.2f}"
    plt.figure(figsize=(pdx, pdy))
    for iplt in range(nplt):
        sav = plt.plot(xs[iplt], ycs[iplt], '.', label=slab_longs[iplt], color=None)
    plt.legend(loc=legpos, fontsize=12, ncol=nlegcol)
    plt.xlabel(slabx)
    plt.ylabel(slaby)
    junkx = plt.xlim([x1, x2])
    junky = plt.ylim([y1, y2])
    plt.title(sttl)
    plt.grid(True)
    if len(fnam):
        pfx = os.path.basename(os.getcwd()) + '-'
        sfx = '.png'
        pathout = pfx + fnam + sfx
        print(f"Saving plot to {pathout}")
        plt.savefig(pathout)

In [None]:
sttl = os.path.basename(os.getcwd()) + ': ' + open('README.txt', 'r').readline().strip()
print(f"Plot title: {sttl}")
# Plot size
pdx = 20
pdy = 6

print(tsk.columns)
t0 = min(tsk.task_time_invoked)
print(t0)
tsk['tfix'] = tsk.task_time_invoked - t0
if sym is not None:
    sym['tfix'] = sym['time'] - dbr.t0 - t0
ttr_start = ttr.task_try_time_running
pft['tmean'] = 0.5*(pft.tstart + pft.tstop)/60
if tmean_max == 0: tmean_max = max((max(pft.tmean)//10 + 1)*10, 20)
print(f"tmean_max = {tmean_max} {max(pft.tmean)//10}")
pft['twall'] = pft.tstop - pft.tstart
pft['gins'] = 1.e-9*pft['instructions:u']
ttr_dt = ttr.task_try_time_returned - ttr.task_try_time_running
pft_dt = pft['time-elapsed']
pft_clk = pft['task-clock:u']*0.001    # usec --> sec
pft_usy = pft['user'] + pft['sys']

slabx = 'Time [sec]'
slaby = 'Number of tasks'
x1 = 0
x2 = 20
hist(pft.tmean, x1, x2, 'Mean run time', 'Time [min]', slaby, sttl, 'tmean')
x2 = 200
hist(ttr_dt, x1, x2, 'try run time', slabx, slaby, sttl, 'runtime')
hist(pft_dt, x1, x2, 'elapsed run time', slabx, slaby, sttl)

x1 = -50
x2 = 50
hist( ttr_dt-pft_dt, x1, x2, 'try run time - elapsed', slabx, slaby, sttl)
hist(pft_dt-pft_clk, x1, x2, 'elapsed - clock', slabx, slaby, sttl)
x1 = -1
x2 = 1
hist(pft_usy-pft_clk, x1, x2, 'user + sys - clock', slabx, slaby, sttl)

In [None]:
# Instructions
x2 = 800
hist(pft.gins, x1, x2, 'Instructions/task', 'Giga-instructions', slaby, sttl, 'insts')

In [None]:
slab = 'All tasks'

# Instructions/task
x1 = 0
x2 = 1000
hist(pft.gins, x1, x2, slab, 'Giga-instructions/task', slaby, sttl, 'instructions', 'upper right')

# CPU time efficiency
pft['cputeff'] = pft_clk/pft['twall']
x1 = 0
x2 = 1.0
hist(pft.cputeff, x1, x2, slab, '(CPU time)/(wall time)', slaby, sttl, 'cputeff', 'upper left')

# CPU speed
speed = pft['cycles:u']/pft_clk*1.e-9    # /sec --> GHz
x1 = 0
x2 = 4
slabx = 'CPU speed [GHz]'
hist(speed, x1, x2, slab, 'CPU speed [GHz]', slaby, sttl, 'cpuspeed')

# Instructions per cycle
pft['ipc'] = pft['instructions:u']/pft['cycles:u']
hist(pft.ipc, x1, x2, slab, 'Giga-instructions/cycle', slaby, sttl, 'ips')

# Instruction clock speed.
x2 = 10
pft['ips'] = pft.gins/pft_clk
hist(pft.ips, x1, x2, 'All tasks', 'Giga-instructions/clock-sec', slaby, sttl, 'insclockspeed')

# Instruction wall speed.
x2 = 10
pft['ipw'] = pft.gins/pft.twall
hist(pft.ipw, x1, x2, 'All tasks', 'Giga-instructions/wall-sec', slaby, sttl, 'inswallspeed')




In [None]:
spfac = 1.0/1.e6
ntsk = max(pft.task_idx) + 1
tmeans = ntsk*[None]
cptefs = ntsk*[None]
speeds = ntsk*[None]
ipcs   = ntsk*[None]
ipws   = ntsk*[None]
slabs  = ntsk*[None]
for itsk in range(ntsk):
    slabs[itsk] = dbr.task_names[itsk][0:20]
    pfts = pft.query(f"task_idx=={itsk}")
    tmeans[itsk] = pfts.tmean
    cptefs[itsk] = pfts.cputeff
    speeds[itsk] = pfts.speed*spfac
    ipcs[itsk]   = pfts.ipc
    ipws[itsk]   = pfts.ipw
tplt1 = 0
tplt2 = tmean_max
slabx = 'Mean task time [min]'
multigraph(tmeans, cptefs, tplt1, tplt2, 0,   1, slabs, slabx, '(CPU time)/(wall time)',     sttl, 'tmean_cputeffs', 'lower right', 2)
multigraph(tmeans, speeds, tplt1, tplt2, 0, 3.5, slabs, slabx, 'CPU speed [GHz]',            sttl, 'tmean_speeds', 'lower right', 2)
multigraph(tmeans,   ipcs, tplt1, tplt2, 0,   4, slabs, slabx, 'Instructions/cycle',    sttl, 'tmean_ipcs', 'upper right', 2)
multigraph(tmeans,   ipws, tplt1, tplt2, 0,  12, slabs, slabx, 'Giga-instructions/wall-sec', sttl, 'tmean_ipws', 'upper right', 2)

## Running tasks vs. time

In [None]:
t1 = min(pft.tstart)
t2 = max(pft.tstop)
tstep = 5
nstep = int((t2 - t1)/tstep)
ran = range(nstep)
tval = pandas.Series(nstep*[None])
ntsks = pandas.Series(nstep*[None])
for istep in ran:
    tval[istep] = t1 + istep*tstep
    pftrun = pft.query(f"{tval[istep]}>=tstart and {tval[istep]}<tstop")
    ntsks[istep] = len(pftrun)
graph(tval/60, ntsks, tplt1, tplt2, 0, 140, 'All tasks', 'Time [min]', 'Running task count', sttl, 'time_ntask')    


In [None]:
dt = t2 - t1
dtmin = dt/60
ntsk = len(pft)
ngin = pft.gins.sum()
rtsk = ntsk/dtmin
rgin = ngin/dt
rtsk_ref = 115.4
rgin_ref = 298
ntc = ntsks.mean()
ntc_ref = 45.4
print(f" Task throughput:{rtsk:6.1f} tasks/minute           [{rtsk/rtsk_ref:.2f}]")
print(f"Inst. throughput:{rgin:6.1f} giga-instructions/sec  [{rgin/rgin_ref:.2f}]")
print(f"Mean running task count: {ntc:5.1f}                 [{ntc/ntc_ref:.2f}]")

## System CPU speed vs time.

In [None]:
if sym is not None and 'cpu_freq' in sym.columns:
    graph(sym.tfix/60, sym.cpu_freq, tplt1, tplt2, 0, 4, 'System average', 'Time [min]', 'CPU frequency [GHz]', sttl, 'time_syscpuspeed')