In [1]:
import json
import os
import yaml
from pathlib import Path
from dask.distributed import Client

In [2]:
use_local=True

In [3]:
if not use_local:
    with open(f'~/.dlio_profiler/configuration.yaml', 'r') as file:
        dlp_yaml = yaml.safe_load(file)
        app_root = dlp_yaml["app"]
else:
    app_root = str(Path(os.getcwd()).parent.parent)

In [4]:
import sys
sys.path.insert(0, app_root)
sys.path

['/usr/WS2/haridev',
 '/collab/usr/gapps/python/build/spack-toss4.1/var/spack/environments/python/._view/75prb56irmif5ejtirjthpx6kq3gqo52/lib/python39.zip',
 '/collab/usr/gapps/python/build/spack-toss4.1/var/spack/environments/python/._view/75prb56irmif5ejtirjthpx6kq3gqo52/lib/python3.9',
 '/collab/usr/gapps/python/build/spack-toss4.1/var/spack/environments/python/._view/75prb56irmif5ejtirjthpx6kq3gqo52/lib/python3.9/lib-dynload',
 '',
 '/usr/workspace/haridev/scr-dlio/venv/lib/python3.9/site-packages']

In [5]:
import dlp_analyzer
print(dlp_analyzer.__file__)
from dlp_analyzer.main import DLPAnalyzer,get_dlp_configuration,update_dlp_configuration,setup_logging,setup_dask_cluster, reset_dask_cluster, get_dlp_configuration


/usr/workspace/haridev/scr-dlio/venv/lib/python3.9/site-packages/dlp_analyzer/__init__.py


In [6]:
if not use_local:
    dask_run_dir = os.path.join(app_root, "dlp_analyzer", "dask", "run_dir")
    with open (os.path.join(dask_run_dir, f"scheduler_{os.getenv('USER')}.json"), "r") as f:
        dask_scheduler = json.load(f)["address"]
else:
    dask_scheduler = None

In [7]:
app_name = "dlio" # dlio dlio_scr

In [8]:
def get_conditions(json_object):
    app_io_cond = "reader" in json_object["cat"] or "checkpoint" in json_object["cat"]
    compute_cond = "compute" in json_object["name"] # Cosmoflow
    io_cond = "POSIX" == json_object["cat"] # Cosmoflow
    return app_io_cond, compute_cond, io_cond

In [9]:
condition_fn = None #
if app_name == "dlio":
    filename = "/g/g92/haridev/projects/scr-dlio/logs/n2_p8_base/trace*.pfw.gz"
    condition_fn = get_conditions
elif app_name == "dlio_scr":
    filename = "/g/g92/haridev/projects/scr-dlio/logs/n2_p8_scr/trace*.pfw.gz"
    condition_fn = get_conditions

In [10]:
conf = update_dlp_configuration(dask_scheduler=dask_scheduler, verbose=True, workers=64,
                                log_file=f"./dlp_{os.getenv('USER')}.log", rebuild_index=False, time_approximate=True, 
                                host_pattern=r'lassen(\d+)', time_granularity=30e6, skip_hostname=True, conditions=condition_fn)

In [11]:
setup_logging()

In [12]:
setup_dask_cluster()

[INFO] [15:53:23] Initialized Client with 64 workers and link http://127.0.0.1:8787/status [/usr/workspace/haridev/scr-dlio/venv/lib/python3.9/site-packages/dlp_analyzer/main.py:668]


In [13]:
analyzer = DLPAnalyzer(filename)

[INFO] [15:53:24] Created index for 16 files [/usr/workspace/haridev/scr-dlio/venv/lib/python3.9/site-packages/dlp_analyzer/main.py:365]
[INFO] [15:53:24] Total size of all files are <dask.bag.core.Item object at 0x1554a7dc63a0> bytes [/usr/workspace/haridev/scr-dlio/venv/lib/python3.9/site-packages/dlp_analyzer/main.py:367]
[INFO] [15:53:25] Loading 8994 batches out of 16 files and has 147302750 lines overall [/usr/workspace/haridev/scr-dlio/venv/lib/python3.9/site-packages/dlp_analyzer/main.py:380]
[INFO] [15:56:13] Loaded events [/usr/workspace/haridev/scr-dlio/venv/lib/python3.9/site-packages/dlp_analyzer/main.py:422]
[INFO] [15:56:13] Loaded plots with slope threshold: 45 [/usr/workspace/haridev/scr-dlio/venv/lib/python3.9/site-packages/dlp_analyzer/main.py:428]


In [14]:
items = analyzer.summary()
items

[INFO] [15:56:13] Total number of events in the workload are 147302718 [/usr/workspace/haridev/scr-dlio/venv/lib/python3.9/site-packages/dlp_analyzer/main.py:520]
[INFO] [15:56:15] Approximate True 985599722, 477236273.0, 19514168.0, 1531779091.0,                457722105.0, 0.0, 1512264923.0, 0.0 [/usr/workspace/haridev/scr-dlio/venv/lib/python3.9/site-packages/dlp_analyzer/main.py:474]


In [30]:
app_time = analyzer.events.query("name == 'PyTorchCheckpointing.checkpoint'").compute()
app_time_files = app_time.groupby(["trange","pid","tid"]).agg({"dur":sum}).\
                  groupby(["trange"]).agg({"dur":max}).sum()
app_time_files / 1e6

dur    103.972262
dtype: double[pyarrow]

In [35]:
checkpoint_events = analyzer.events.query("name == 'write'")
checkpoint_events = checkpoint_events.compute()
checkpoint_events

Unnamed: 0,name,cat,pid,tid,ts,te,dur,tinterval,trange,hostname,compute_time,io_time,app_io_time,total_time,filename,phase,size
12910,write,POSIX,0,2664209,250005384,250225091,219707,,8.0,corona171,,219707,,219707,/p/lustre2/haridev/dlio/scr/checkpoints/scr_me...,2,30827
1943,write,POSIX,0,2664209,250852746,250865085,12339,,8.0,corona171,,12339,,12339,/p/lustre2/haridev/dlio/scr/checkpoints/scr_me...,2,704
15516,write,POSIX,0,2664209,250865108,252595636,1730528,,8.0,corona171,,1730528,,1730528,/p/lustre2/haridev/dlio/scr/checkpoints/scr_me...,2,1009254400
4568,write,POSIX,0,2664209,253668019,253668037,18,,8.0,corona171,,18,,18,/p/lustre2/haridev/dlio/scr/checkpoints/scr_me...,2,64
4569,write,POSIX,0,2664209,253668074,254603165,935091,,8.0,corona171,,935091,,935091,/p/lustre2/haridev/dlio/scr/checkpoints/scr_me...,2,1009254400
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
14879,write,POSIX,9,314836,929319731,929319758,27,,30.0,corona173,,27,,27,/p/lustre2/haridev/dlio/scr/checkpoints/scr_me...,2,64
14880,write,POSIX,9,314836,929319806,930213492,893686,,30.0,corona173,,893686,,893686,/p/lustre2/haridev/dlio/scr/checkpoints/scr_me...,2,865075200
14881,write,POSIX,9,314836,930797822,930799120,1298,,31.0,corona173,,1298,,1298,/p/lustre2/haridev/dlio/scr/checkpoints/scr_me...,2,1587392
14882,write,POSIX,9,314836,930799172,932698114,1898942,,31.0,corona173,,1898942,,1898942,/p/lustre2/haridev/dlio/scr/checkpoints/scr_me...,2,1875123200


In [36]:
checkpoint_files = checkpoint_events.groupby(["filename","trange","pid","tid"]).agg({"dur":sum}).\
                  groupby(["filename","trange"]).agg({"dur":max}).\
                  groupby(["filename"]).agg({"dur":sum})

In [39]:
checkpoint_files.min(), checkpoint_files.max() / 1e6, checkpoint_files.sum() / 1e6, checkpoint_files.mean() / 1e6, checkpoint_files.count()

(dur    347
 dtype: uint64[pyarrow],
 dur    8.46394
 dtype: double[pyarrow],
 dur    694.993596
 dtype: double[pyarrow],
 dur    0.965269
 dtype: double[pyarrow],
 dur    720
 dtype: int64)

In [None]:
checkpoint_files.min(), checkpoint_files.max(), checkpoint_files.mean()