In [3]:
from __future__ import division
import numpy as np
from matplotlib import pyplot as plt
import glob, operator, time, shutil, scipy, sys, os, gzip, subprocess
import pandas as pd
from multiprocessing import Pool

pd.set_option('display.max_columns', 100)
plt.style.use('classic')

In [56]:
def parse_by_month(year_month):
    year, month = year_month
    try:os.mkdir('/projects/AMASE/zliu/facility-data/ds/%d' % year)
    except: pass
    
    try:os.mkdir('/projects/AMASE/zliu/facility-data/ds/%d/%d' % (year, month))
    except: pass
    
    fns = []
    for ddir in glob.glob('/gpfs/mira-fs0/logs/darshan/mira/%d/%d/*' % (year, month)):
        day = ddir.split('/')[-1]
        try: os.mkdir('/projects/AMASE/zliu/facility-data/ds/%d/%d/%s' % (year, month, day))
        except: pass
        fns += glob.glob('%s/*.gz' % ddir)

    for fn in fns[:5]:
        with gzip.open(fn, 'rb') as f_in:
            ofn = '/projects/AMASE/zliu/facility-data/ds/' + \
                  '/'.join(fn.split('/')[-4:-1]) + '/' + fn.split('/')[-1][:-3]
            with open(ofn, 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
            

In [8]:
yms = [(2018, m) for m in range(4, 13)] + [(2019, 1), (2019, 2), (2019, 3)]

# p = Pool(len(yms))
# _ = p.map(parse_by_month, yms)

In [78]:
fns = []
year, month = 2018, 8
for ddir in glob.glob('/gpfs/mira-fs0/logs/darshan/mira/%d/%d/*' % (year, month)):
    fns += glob.glob('%s/*.gz' % ddir)
    
ds_jids = set()
for fn in fns:
    _jid = fn.split('/')[-1].split('_')[-3][2:]
    ds_jids.add(_jid)

In [74]:
ap_st_jids = set([_jid[:-1] for _jid in open('ap-single-task-jids.txt').readlines()])

In [83]:
len(ap_st_jids), len(ds_jids), len(ds_jids.intersection(ap_st_jids)), len(fns)

(2252, 2507, 2216, 4872)

In [86]:
# !cp /projects/AMASE/zliu/facility-data/ds/*.txt ./

In [None]:
def parse_by_fn(sfn, option):
    ofn = sfn + '.' + option

#     os.system('/home/zcliu/usr/bin/darshan-parser --%s %s > %s' % (option, sfn, ofn))
#     output = subprocess.getoutput('/home/zcliu/usr/bin/darshan-parser --%s %s' % (option, sfn))
    output = os.popen('/home/zcliu/usr/bin/darshan-parser --%s %s' % (option, sfn)).read()
    print(output)
    
fns = []
year, month = 2018, 8
for ddir in glob.glob('/projects/AMASE/zliu/facility-data/ds/%d/%d/*' % (year, month)):
    fns += glob.glob('%s/*.darshan' % ddir)
    
for fn in fns[:1]:
    print (fn)
    for opt in ('all', 'base', 'file', 'file-list', 'file-list-detailed', 'perf', 'total')[-2:-1]:
        parse_by_fn(fn, opt)

In [2]:
def extract_total_from_raw(sfn):
    try:
        perf = os.popen('/home/zcliu/usr/bin/darshan-parser --total ' + sfn).read()
    except:
        print("error when run darshan-parser for %s" % sfn)
        return None
    
    posix_wr_time, posix_rd_time, posix_meta_time = None, None, None
    mpiio_wr_time, mpiio_rd_time, mpiio_meta_time = None, None, None
    for line in perf.split('\n'):
        if line.startswith('total_CP_F_POSIX_READ_TIME:'):
            posix_rd_time = float(line.split()[-1])
            continue
        if line.startswith('total_CP_F_POSIX_WRITE_TIME:'):
            posix_wr_time = float(line.split()[-1])
            continue
        if line.startswith('total_CP_F_POSIX_META_TIME:'):
            posix_meta_time = float(line.split()[-1])
            continue
        if line.startswith('total_CP_F_MPI_READ_TIME:'):
            mpiio_rd_time = float(line.split()[-1])
            continue
        if line.startswith('total_CP_F_MPI_WRITE_TIME:'):
            mpiio_wr_time = float(line.split()[-1])
            continue
        if line.startswith('total_CP_F_MPI_META_TIME:'):
            mpiio_meta_time = float(line.split()[-1])
            continue
    return (posix_wr_time, posix_rd_time, posix_meta_time, \
            mpiio_wr_time, mpiio_rd_time, mpiio_meta_time)

    
def extract_file_from_raw(sfn):
    try:
        perf = os.popen('/home/zcliu/usr/bin/darshan-parser --file ' + sfn).read()
    except:
        print("error when run darshan-parser for %s" % sfn)
        return None
    
    read_bytes, write_bytes, wr_bytes, unique_bytes, shared_bytes = None, None, None, None, None
    read_nf, write_nf, wr_nf, unique_nf, shared_nf = None, None, None, None, None
    for line in perf.split('\n'):
        if line.startswith('# read_only:'):
            lsp = line.split()
            read_bytes = int(lsp[-2])
            read_nf = int(lsp[-3])
            continue
        if line.startswith('# write_only:'):
            lsp = line.split()
            write_bytes = int(lsp[-2])
            write_nf = int(lsp[-3])
            continue
        if line.startswith('# read_write:'):
            lsp = line.split()
            wr_bytes = int(lsp[-2])
            wr_nf = int(lsp[-3])
            continue
        if line.startswith('# unique:'):
            lsp = line.split()
            unique_bytes = int(lsp[-2])
            unique_nf = int(lsp[-3])
            continue
        if line.startswith('# shared:'):
            lsp = line.split()
            shared_bytes = int(lsp[-2])
            shared_nf = int(lsp[-3])
            continue
    return (read_bytes, write_bytes, wr_bytes, unique_bytes, shared_bytes, \
            read_nf, write_nf, wr_nf, unique_nf, shared_nf)
    
def extract_perf_from_raw(sfn):
    try:
        perf = os.popen('/home/zcliu/usr/bin/darshan-parser --perf ' + sfn).read()
    except:
        print("error when run darshan-parser for %s" % sfn)
        return None
    
    total_bytes, slowest_rank_io_time, slowest_rank_meta_time = None, None, None
    time_by_cumul_io_only, time_by_cumul_meta_only, time_by_open = None, None, None
    agg_perf = None
    for line in perf.split('\n'):
        if line.startswith('# total_bytes:'):
            total_bytes = int(line.split()[-1])
            continue
            
        if line.startswith('# agg_perf_by_slowest:'):
            agg_perf = int(line.split()[-1])
            continue
            
    print(run_time)
    
def extract_filelist_from_raw(sfn):
#     jid = sfn.split('/')[-1].split('_')[-3][2:]
    try:
        flist = os.popen('/home/zcliu/usr/bin/darshan-parser --file-list ' + sfn).read()
    except:
        print("error when run darshan-parser for %s" % sfn)
        return None
    lsp = flist.split('\n')
    jid, idx, cul_time, total_bytes = None, None, 0, 0
    start_time, end_time, nprocs = None, None, None
    for idx in range(len(lsp)):
        if lsp[idx].startswith('# jobid:'): 
            try: jid = int(lsp[idx].split()[-1])
            except: print ("error when parse: %s" % lsp[idx])
            continue
            
        if lsp[idx].startswith('# start_time:'): 
            try: start_time = int(lsp[idx].split()[-1])
            except: print ("error when parse: %s" % lsp[idx])
            continue
            
        if lsp[idx].startswith('# end_time:'): 
            try: end_time = int(lsp[idx].split()[-1])
            except: print ("error when parse: %s" % lsp[idx])
            continue
            
        if lsp[idx].startswith('# nprocs:'): 
            try: nprocs = int(lsp[idx].split()[-1])
            except: print ("error when parse: %s" % lsp[idx])
            continue
            
        if lsp[idx].startswith('# <hash>\t<suffix>'):
            break
            
    iotype2cnt  = {'POSIX':0, 'MPI':0}
    iotype2time = {'POSIX':0, 'MPI':0}
    for idx in range(idx+1, len(lsp)-1):
        _lsp    = lsp[idx].split()
        _iotype = _lsp[-4]
        try: _iotime = int(_lsp[-3]) * float(_lsp[-1])
        except: print ("error when parse: %s" % lsp[idx])
        cul_time += _iotime
        iotype2cnt[_iotype]  += 1
        iotype2time[_iotype] += _iotime
    ret = (jid, start_time, end_time, nprocs, cul_time, \
           iotype2time.get('POSIX'), iotype2time.get('MPI'), \
           iotype2cnt.get('POSIX'), iotype2cnt.get('MPI'), )
    return ret
    
def extrac_iotime(year_month):
    year, month = year_month
    fns = []
    for ddir in glob.glob('/projects/AMASE/zliu/facility-data/ds/%d/%d/*' % (year, month)):
        fns += glob.glob('%s/*.darshan' % ddir)

    jid2cul_iotime_val = []
    for fn in fns[:]:
        _rcd_filelist = extract_filelist_from_raw(fn)
        if _rcd_filelist is None: continue
        _rcd_file = extract_file_from_raw(fn)
        if _rcd_file is None: continue
        _rcd_total = extract_total_from_raw(fn)
        if _rcd_total is None: continue
        
        jid2cul_iotime_val.append(_rcd_filelist + _rcd_file + _rcd_total)
    jid2cul_iotime = pd.DataFrame(jid2cul_iotime_val, \
                                  columns=('jid', 'start_time', 'end_time', 'nprocs',\
                                           'cul_iotime', 'posix_time', 'mpiio_time', \
                                           'posix_cnt', 'mpiio_cnt', 'read_bytes', \
                                           'write_bytes', 'wr_bytes', 'unique_bytes', \
                                           'shared_bytes', 'read_nf', 'write_nf', \
                                           'wr_nf', 'unique_nf', 'shared_nf', \
                                           'posix_wr_time', 'posix_rd_time', 'posix_meta_time', \
                                           'mpiio_wr_time', 'mpiio_rd_time', 'mpiio_meta_time'))

    jid2cul_iotime.to_csv('iotime-%04d%02d.csv' % (year, month), index=False)

# yms = [(2018, m) for m in range(2, 13)] + [(2019, 1), (2019, 2), (2019, 3)]
# p = Pool(len(yms))
# _ = p.map(extrac_iotime, yms)
# p.terminate()
# p.close()

In [3]:
def extract_all_in_one(fn):
    _rcd_filelist = extract_filelist_from_raw(fn)
    if _rcd_filelist is None: return None
    _rcd_file = extract_file_from_raw(fn)
    if _rcd_file is None: return None
    _rcd_total = extract_total_from_raw(fn)
    if _rcd_total is None: return None
    return _rcd_filelist + _rcd_file + _rcd_total

def extrac_iotime_parallel(year_month):
    year, month = year_month
    fns = []
    for ddir in glob.glob('/projects/AMASE/zliu/facility-data/ds/%d/%d/*' % (year, month)):
        fns += glob.glob('%s/*.darshan' % ddir)

    with Pool(256) as p:
        jid2cul_iotime_val = p.map(extract_all_in_one, fns)
    jid2cul_iotime_val = [v for v in jid2cul_iotime_val if v is not None]
    jid2cul_iotime = pd.DataFrame(jid2cul_iotime_val, \
                                  columns=('jid', 'start_time', 'end_time', 'nprocs',\
                                           'cul_iotime', 'posix_time', 'mpiio_time', \
                                           'posix_cnt', 'mpiio_cnt', 'read_bytes', \
                                           'write_bytes', 'wr_bytes', 'unique_bytes', \
                                           'shared_bytes', 'read_nf', 'write_nf', \
                                           'wr_nf', 'unique_nf', 'shared_nf', \
                                           'posix_wr_time', 'posix_rd_time', 'posix_meta_time', \
                                           'mpiio_wr_time', 'mpiio_rd_time', 'mpiio_meta_time'))

    jid2cul_iotime.to_csv('iotime-%04d%02d.csv' % (year, month), index=False)
    
yms = [(2018, m) for m in range(2, 13)] + [(2019, 1), (2019, 2), (2019, 3)]
for ym in yms[2:]:
    print("start processing %d-%02d @%d" % (ym[0], ym[1], time.time()))
    extrac_iotime_parallel(ym)
    print("%d-%02d is done @%d" % (ym[0], ym[1], time.time()))

start processing 2018-04 @1554911241
error when run darshan-parser for /projects/AMASE/zliu/facility-data/ds/2018/4/18/yghadar_lmp_mira_chem_io_id1513985_4-18-80494-5977469701034858038_1.darshan
error when run darshan-parser for /projects/AMASE/zliu/facility-data/ds/2018/4/18/yghadar_lmp_mira_chem_io_id1513988_4-18-80831-3690045021201141699_1.darshan
2018-04 is done @1554913342
start processing 2018-05 @1554913342
2018-05 is done @1554913735
start processing 2018-06 @1554913735
2018-06 is done @1554914086
start processing 2018-07 @1554914086
2018-07 is done @1554914423
start processing 2018-08 @1554914423
2018-08 is done @1554914614
start processing 2018-09 @1554914614
error when run darshan-parser for /projects/AMASE/zliu/facility-data/ds/2018/9/13/33146_nek5000_id1636657_9-13-70112-1348400304312054786_1.darshan
2018-09 is done @1554914904
start processing 2018-10 @1554914904
2018-10 is done @1554915926
start processing 2018-11 @1554915926
2018-11 is done @1554919794
start processing 

In [10]:
def list_fns(yms):
    fns = []
    for year, month in yms:
        for ddir in glob.glob('/projects/AMASE/zliu/facility-data/ds/%d/%d/*' % (year, month)):
            fns += glob.glob('%s/*.darshan' % ddir)
    return fns

fns_all = list_fns(yms)
len(fns_all)

387400

In [7]:
fns_all[0]

'/projects/AMASE/zliu/facility-data/ds/2018/2/1/bing_IOR_id1399527_2-1-23239-6230366815674728381_1.darshan'

In [None]:
def extract_all_from_raw(sfn):
    mount_point = {"/projects":0, "/bgsys":1, "/":2, "/gpfs/mira-fs1":3, \
                   "/gpfs/mira-fs0":4, "/gpfs/mira-home":5}
    try:
        parsed = os.popen('/home/zcliu/usr/bin/darshan-parser --all ' + sfn).read()
    except:
        print("error when run darshan-parser for %s" % sfn)
        return None
    ret_frcds = {}
    lsp = parsed.split('\n')
    jid, cul_time, total_bytes = None, None, 0
    start_time, end_time, nprocs = None, None, None
    for idx in range(len(lsp)):
        if lsp[idx].startswith('# jobid:'): 
            jid = int(lsp[idx].split()[-1])
            continue
            
        if lsp[idx].startswith('# start_time:'): 
            start_time = int(lsp[idx].split()[-1])
            continue
            
        if lsp[idx].startswith('# end_time:'): 
            end_time = int(lsp[idx].split()[-1])
            continue
            
        if lsp[idx].startswith('# nprocs:'): 
            nprocs = int(lsp[idx].split()[-1])
            continue
            
        if lsp[idx].startswith('#<rank>'):
            break
            
#     if not lsp[idx].startswith('#<rank>'): return None
    global_idx = idx + 1
    while True:
        f_counter = []
        for idx in range(global_idx, global_idx+162):
            _lsp = lsp[idx].split()
            if len(f_counter)==0:
                f_counter = [_lsp[1], _lsp[0], mount_point[_lsp[-2]]]
            f_counter.append(_lsp[3])

        ret_frcds[f_counter[0]] = tuple(f_counter[1:])
        global_idx = idx + 1
        if lsp[global_idx].startswith('total_CP_'): break
        
    for idx in range(global_idx, len(lsp)):
        if lsp[idx].startswith('# <hash>\t<suffix>'): break
            
    global_idx = idx + 1
    f_cnt = 0
    for idx in range(global_idx, len(lsp)):
        _lsp    = lsp[idx].split()
        ret_frcds[_lsp[0]] += tuple(_lsp[2:6])
        f_cnt += 1
        if f_cnt == len(ret_frcds): break
    
    return [(f, jid)+ret_frcds[f] for f in ret_frcds.keys()]

# val_tbl = []
# for fn in list_fns(((2018, 3), )):
#     _ = extract_all_from_raw(fn)
#     if _ is not None: val_tbl.append(_)

for ym in [(2018, m) for m in range(2, 13)] + [(2019, 1), (2019, 2), (2019, 3)]:
    val_tbl = []
    fns = list_fns((ym, ))
    with Pool(128) as p:
        val_tbl_bytask = p.map(extract_all_from_raw, fns[:])

    for vtbl in val_tbl_bytask:
        if vtbl is not None:
            val_tbl += vtbl
    val_tbl = np.array(val_tbl)

    cols_opt = pd.read_csv('f_counter_name.txt')
    f_counter_mask = cols_opt.opt_in.values==1
    f_counters = cols_opt.counters.values[f_counter_mask]

    pd.DataFrame(val_tbl, columns=f_counters).to_csv('file-records-%d%02d.csv' % (ym[0], ym[1]), \
                                                                        index=False)

In [28]:
# print(os.popen('/home/zcliu/usr/bin/darshan-parser --all ' + fns[0]).read())

In [53]:
open('f_counter_name.txt').read().split('\n')[-2:]

['CP_F_VARIANCE_RANK_BYTES', '']

In [238]:
jid2cul_iotime.to_csv('iotime-201808.csv', index=False)

In [None]:
fns = []
for ddir in glob.glob('/projects/AMASE/zliu/facility-data/ds/%d/%d/*' % (2018, 5))[:1]:
    fns += glob.glob('%s/*.darshan' % ddir)

for fn in fns[:1]:
    _rcd_filelist = extract_filelist_from_raw(fn)
    if _rcd_filelist is None: continue
    _rcd_file = extract_file_from_raw(fn)
    if _rcd_file is None: continue
    _rcd_total = extract_total_from_raw(fn)
    if _rcd_total is None: continue

In [3]:
fns = list_fns(((2018, 5), ))

In [48]:
def extract_filesize_info(sfn):
    mount_point = {"/projects":0, "/bgsys":1, "/":2, "/gpfs/mira-fs1":3, \
                   "/gpfs/mira-fs0":4, "/gpfs/mira-home":5}
    try:
        parsed_fd = os.popen('/home/zcliu/usr/bin/darshan-parser --all ' + sfn)#.read()
    except:
        print("error when run darshan-parser for %s" % sfn)
        return None

    jid, start_time, end_time, nprocs = None, None, None, None
    for line in parsed_fd:
        
        if line.startswith('# jobid:'): 
            jid = int(line.split()[-1])
            continue
            
        if line.startswith('# start_time:'): 
            start_time = int(line.split()[-1])
            continue
            
        if line.startswith('# end_time:'): 
            end_time = int(line.split()[-1])
            continue
            
        if line.startswith('# nprocs:'): 
            nprocs = int(line.split()[-1])
            continue
            
        if line.startswith('#<rank>'):
            break
    
    task2frcd, file_rcd = {}, []
    task_key = (jid, start_time, end_time, nprocs)
    for line in parsed_fd:
        if line.startswith('total_CP_'): break
        file_rcd.append(line)# += line        
    file_rcd = pd.DataFrame([x.split() for x in file_rcd], \
          columns=('rank', 'filename', 'counter', 'value', 'fn_suffix', 'mount_pt', 'fs_type'))
    
    read_rcd  = file_rcd[(file_rcd.counter=='CP_BYTES_READ') & (file_rcd.value>'0')]
    write_rcd = file_rcd[(file_rcd.counter=='CP_BYTES_WRITTEN') & (file_rcd.value>'0')]
    _rrcd = [int(x) for x in read_rcd.value.values]
    _wrcd = [int(x) for x in write_rcd.value.values]
    
    task2frcd[task_key] = (_rrcd, _wrcd)
    return task2frcd
    
extract_filesize_info(fns[0])

{(1524752, 1525133257, 1525133263, 1): ([], [982515712, 110])}

In [None]:
for ym in [(2018, m) for m in range(2, 13)] + [(2019, 1), (2019, 2), (2019, 3)]:
    val_tbl = []
    fns = list_fns((ym, ))
    with Pool(128) as p:
        val_tbl_bytask = p.map(extract_filesize_info, fns[:])
    cmb_res = {}
    for vtbl in val_tbl_bytask:
        if vtbl is not None: 
            cmb_res = {**vtbl, **cmb_res}

    np.save("fsize-records-%d-%02d" % (ym[0], ym[1]), cmb_res)
    break

In [63]:
rcd = np.load("fsize-records-2018-02.npy")[()]

In [66]:
rcd.keys()

dict_keys([(1398441, 1517445391, 1517445412, 4096), (1396772, 1517493356, 1517500553, 512), (1399527, 1517466439, 1517466526, 800), (1400043, 1517468671, 1517468688, 800), (1399816, 1517468671, 1517468678, 2048), (1399527, 1517467218, 1517467232, 3200), (1396772, 1517493428, 1517500610, 512), (1396772, 1517493393, 1517500557, 512), (1402385, 1517504910, 1517504924, 1024), (1399527, 1517467383, 1517467396, 3200)])