# Analysis for DFProfiler

This is a simple analysis notebook for dfprofiler.

## Imports

In [1]:
import logging
import json
import dask
import os
from pathlib import Path
from glob import glob
import math
import zindex_py as zindex
import numpy as np
import intervals as I

In [2]:
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, progress, wait, get_client
from dask.distributed import Future, get_client

## Project Variables

In [3]:
app_root = str(Path(os.getcwd()).parent)

In [4]:
logging.basicConfig(
    level=logging.INFO,
    handlers=[
        logging.StreamHandler(),
    ],
    format="%(asctime)s [%(levelname)s]: %(message)s in %(pathname)s:%(lineno)d",
)

## Setup Dask Local Cluster

In [5]:
workers=16
cluster = LocalCluster(n_workers=workers)  # Launches a scheduler and workers locally
client = Client(cluster)  # Connect to distributed cluster and override default
logging.info(f"Initialized Client with {workers} workers and link {client.dashboard_link}")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 36165 instead
2024-10-22 10:57:02,993 [INFO]: Initialized Client with 16 workers and link http://127.0.0.1:36165/status in /var/tmp/haridev/ipykernel_3184500/3142773904.py:4


## Start Analysis

In [86]:
is_trace = True
file=f"{app_root}/tests/output/ops-64_ts-64m/RAW-DIRECT.pfw.gz"
file=f"{app_root}/tests/output/ops-64_ts-64m/RAW-BUFFERED.pfw.gz"
file_pattern = glob(file)
file_pattern

['/usr/WS2/haridev/datacrumbs/tests/output/ops-64_ts-64m/RAW-BUFFERED.pfw.gz']

## Function to load trace data

In [87]:
def create_index(filename):
    index_file = f"{filename}.zindex"
    if not os.path.exists(index_file):
        status = zindex.create_index(filename, index_file=f"file:{index_file}",
                                     regex="id:\b([0-9]+)", numeric=True, unique=True, debug=False, verbose=False)
        logging.debug(f"Creating Index for {filename} returned {status}")
    return filename

def get_linenumber(filename):
    index_file = f"{filename}.zindex"
    line_number = zindex.get_max_line(filename, index_file=index_file, debug=False, verbose=False)
    logging.debug(f" The {filename} has {line_number} lines")
    return (filename, line_number)

def get_size(filename):
    if filename.endswith('.pfw'):
        size = os.stat(filename).st_size
    elif filename.endswith('.pfw.gz'):
        index_file = f"{filename}.zindex"
        line_number = zindex.get_max_line(filename, index_file=index_file,debug=False, verbose=False)
        size = line_number * 256
    logging.debug(f" The {filename} has {size/1024**3} GB size")
    return int(size)


def generate_line_batches(filename, max_line):
    batch_size = 16*1024
    for start in range(0, max_line, batch_size):
        end =  min((start + batch_size - 1) , (max_line - 1))
        logging.debug(f"Created a batch for {filename} from [{start}, {end}] lines")
        yield filename, start, end

def load_indexed_gzip_files(filename, start, end):
    index_file = f"{filename}.zindex"
    json_lines = zindex.zquery(filename, index_file=index_file,
                          raw=f"select a.line from LineOffsets a where a.line >= {start} AND a.line <= {end};", debug=False, verbose=False)
    logging.debug(f"Read {len(json_lines)} json lines for [{start}, {end}]")
    return json_lines

In [116]:
def load_profile(line):
    d = {}
    if line is not None and line !="" and len(line) > 0 and "[" != line[0] and line != "\n" :
        try:
            unicode_line = ''.join([i if ord(i) < 128 else '#' for i in line])
            val = json.loads(unicode_line)
            if "pid" in d:
                d["pid"] = val["pid"]
            if "tid" in d:
                d["tid"] = val["tid"]
            if "ts" in d:
                d["ts_us"] = int(val["ts"])
            d["filename"] = "NA"
            if "args" in val:
                if "time" in val["args"]:
                    d["dur_sec"] = float(val["args"]["time"])
                if "freq" in val["args"]:
                    d["freq"] = val["args"]["freq"]
                if "size_sum" in val["args"]:
                    d["size_bytes"] = val["args"]["size_sum"]
                if "fname" in val["args"] and val["args"]["fname"]:
                    d["filename"] = val["args"]["fname"]
            d["func_id"] = val["name"]
            d["cat"] = val["cat"]
        except Exception as error:
            logging.error(f"Processing {line} failed with {error}")
    return d


def load_trace(line):
    d = {}
    if line is not None and line !="" and len(line) > 0 and "[" != line[0] and line != "\n" :
        try:
            unicode_line = ''.join([i if ord(i) < 128 else '#' for i in line])
            val = json.loads(unicode_line)
            d["name"] = val["name"]
            d["cat"] = val["cat"]
            if "pid" in val:
                d["pid"] = val["pid"]
            if "tid" in val:
                d["tid"] = val["tid"]
            d["ts"] = 0
            d["dur"] = 0
            if "ts" in val:
                d["ts"] = int(val["ts"])
                d["te"] = int(val["ts"])
            d["dur"] = 1
            if "dur" in val:
                d["dur"] = int(val["dur"])
            if "args" in val and "hhash" in val["args"]:                    
                d["hhash"] = val["args"]["hhash"]
            if "ts" in val:
                interval = I.closedopen(d["ts"], d["ts"] + 1)
                if d["dur"] > 0:
                    d["te"] = int(val["ts"]) + d["dur"]
                    interval = I.closedopen(d["ts"], d["ts"] + d["dur"])
                d["interval"] = I.to_string(interval)
            if val["ph"] != "M":
                d["type"] = 0    
                if "args" in val:                    
                    if "hhash" in val["args"]:
                        d["hhash"] = val["args"]["hhash"]
                    if "size_sum" in val["args"]:
                        d["size"] = val["args"]["size_sum"]
                    if "fhash" in val["args"]:
                        d["fhash"] = val["args"]["fhash"]
            else:
                if val["name"] == "FH":
                    d["type"] = 1
                    if "args" in val:
                        if "name" in val["args"]:
                            d["name"] = val["args"]["name"]
                        if "value" in val["args"]:
                            d["hash"] = val["args"]["value"]
                elif val["name"] == "HH":
                    d["type"] = 2
                    if "args" in val:
                        if "name" in val["args"]:
                            d["name"] = val["args"]["name"]
                        if "value" in val["args"]:
                            d["hash"] = val["args"]["value"]
            
        except Exception as error:
            logging.error(f"Processing {line} failed with {error}")
    return d

## Create Dask Dataframe

In [117]:
if len(file_pattern) > 0:
    dask.bag.from_sequence(file_pattern).map(create_index).compute()
    logging.info(f"Created index for {len(file_pattern)} files")
    total_size = dask.bag.from_sequence(file_pattern).map(get_size).sum()
    n_partition = math.ceil(total_size.compute() / (128 * 1024 ** 2))
    logging.info(f"Total size of all files are {total_size} bytes")
    max_line_numbers = dask.bag.from_sequence(file_pattern).map(get_linenumber).compute()
    logging.info(f"Max lines per file are {max_line_numbers}")
    json_line_delayed = []
    total_lines = 0
    for filename, max_line in max_line_numbers:
        total_lines += max_line
        for _, start, end in generate_line_batches(filename, max_line):
            json_line_delayed.append((filename, start, end))

    logging.info(f"Loading {len(json_line_delayed)} batches out of {len(file_pattern)} files and has {total_lines} lines overall")
    json_line_bags = []
    for filename, start, end in json_line_delayed:
        num_lines = end - start + 1
        json_line_bags.append(dask.delayed(load_indexed_gzip_files, nout=num_lines)(filename, start, end))
    json_lines = dask.bag.concat(json_line_bags)
    if is_trace:
        pfw_bag = json_lines.map(load_trace).filter(lambda x: "name" in x)
    else:
        pfw_bag = json_lines.map(load_profile).filter(lambda x: "func_id" in x)
    pfw_bag.take(1)

2024-10-22 11:46:53,445 [INFO]: Created index for 1 files in /var/tmp/haridev/ipykernel_3184500/864066620.py:3


2024-10-22 11:46:53,483 [INFO]: Total size of all files are <dask.bag.core.Item object at 0x1555211c3d90> bytes in /var/tmp/haridev/ipykernel_3184500/864066620.py:6
2024-10-22 11:46:53,516 [INFO]: Max lines per file are [('/usr/WS2/haridev/datacrumbs/tests/output/ops-64_ts-64m/RAW-BUFFERED.pfw.gz', 5832428)] in /var/tmp/haridev/ipykernel_3184500/864066620.py:8
2024-10-22 11:46:53,518 [INFO]: Loading 356 batches out of 1 files and has 5832428 lines overall in /var/tmp/haridev/ipykernel_3184500/864066620.py:16


In [118]:
if is_trace:
    columns = {'hhash': "string[pyarrow]", 'pid': "uint64[pyarrow]", 'tid': "uint64[pyarrow]",
                'cat': "string[pyarrow]", 'name': "string[pyarrow]", 'type':  "uint8[pyarrow]",
            'ts': "uint64[pyarrow]", 'te': "uint64[pyarrow]", 'dur': "uint64[pyarrow]", 'interval': "string[pyarrow]", 
             'size': "string[pyarrow]", 'fhash': "string[pyarrow]", 'hash': "string[pyarrow]", 
           }
else:
    columns = {'pid': "uint64[pyarrow]", 'tid': "uint64[pyarrow]",
            'ts_us': "uint64[pyarrow]", 'dur_sec': "float32[pyarrow]", 
            'freq': "uint64[pyarrow]", 'size_bytes': "uint64[pyarrow]", 'name': "string[pyarrow]", 
            'filename': "string[pyarrow]", 
            'cat': "string[pyarrow]"}

In [119]:
events = pfw_bag.to_dataframe(meta=columns)

In [120]:
events = events.repartition(npartitions=n_partition).persist()
_ = wait(events)

In [121]:
fhash = events.query("type == 1")[["name","hash"]]
hhash = events.query("type == 2")[["name","hash"]]
event = events.query("type == 0")
fhashes = fhash.query("name.str.contains('file_0')").compute()["hash"]
fhashes = fhashes.to_list()


In [122]:
fhashes

['2337428835aa42fa0d2764000f669460']

## Analysis

In [132]:

interesting_events = event.query("fhash.isin(@value) and name == 'write'", local_dict={"value": fhashes})
interesting_events.compute()

Unnamed: 0,hhash,pid,tid,cat,name,type,ts,te,dur,interval,size,fhash,hash
12950,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,sys,write,0,2408580,2408592,12,"[2408580,2408592)",67108864.0,2337428835aa42fa0d2764000f669460,
6811,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,sys,write,0,3119402,3119414,12,"[3119402,3119414)",67108864.0,2337428835aa42fa0d2764000f669460,
14098,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,sys,write,0,3718821,3718830,9,"[3718821,3718830)",67108864.0,2337428835aa42fa0d2764000f669460,
5043,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,sys,write,0,4309873,4309884,11,"[4309873,4309884)",67108864.0,2337428835aa42fa0d2764000f669460,
12333,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,sys,write,0,4922169,4922176,7,"[4922169,4922176)",67108864.0,2337428835aa42fa0d2764000f669460,


In [133]:

# interesting_events["interval"] = interesting_events.apply(lambda x: I.to_string(I.closed(x["ts"], x["ts"]+x["dur"])), axis=1)

In [134]:
def group_func(df):
    val = I.empty()
    for index, value in df.items():
        if str(value) != 'NA':
            pad_interval = I.from_string(str(value), int)
            val = val.union(pad_interval)
    logging.debug(f"Grouped Range into {val}")
    return I.to_string(val)
def union_portions():
    return dd.Aggregation(
        'union_portions',
        chunk=lambda s: s.apply(group_func),
        agg=lambda s: s.apply(group_func)
    )
relevant_intervals = interesting_events.reduction(chunk=lambda s: s.apply(group_func), aggregate=lambda s1: s1.apply(group_func))["interval"].compute()
relevant_intervals = I.from_string(relevant_intervals, int)
relevant_intervals_list = list(relevant_intervals)
relevant_intervals_list[:10], len(relevant_intervals_list)

([[2408580,2408592),
  [3119402,3119414),
  [3718821,3718830),
  [4309873,4309884),
  [4922169,4922176)],
 5)

In [135]:
min_ts = relevant_intervals_list[0].lower
max_te = relevant_intervals_list[-1].upper
min_ts, max_te

(2408580, 4922176)

In [136]:
filtered_events = event.query(f"ts >= {min_ts - 1e5} and te <= {max_te + 1e5} and dur > 0")
filtered_events.compute()

Unnamed: 0,hhash,pid,tid,cat,name,type,ts,te,dur,interval,size,fhash,hash
1049,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,ext4_fc_track_inode,0,2308580,2308581,1,"[2308580,2308581)",,,
1050,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,ext4_inode_csum,0,2308582,2308583,1,"[2308582,2308583)",,,
1051,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,ext4_inode_csum_set,0,2308582,2308583,1,"[2308582,2308583)",,,
1052,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,ext4_fill_raw_inode,0,2308582,2308583,1,"[2308582,2308583)",,,
1053,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,__ext4_handle_dirty_metadata,0,2308583,2308584,1,"[2308583,2308584)",,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
14942,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,ext4_mark_iloc_dirty,0,5022146,5022147,1,"[5022146,5022147)",,,
14943,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,__ext4_mark_inode_dirty,0,5022146,5022147,1,"[5022146,5022147)",,,
14944,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,__ext4_journal_stop,0,5022147,5022148,1,"[5022147,5022148)",,,
14945,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,ext4_dirty_inode,0,5022147,5022148,1,"[5022147,5022148)",,,


In [137]:
len(event)

5832303

In [138]:
def contains(x):
    x["valid"] = "0"
    if x["ts"] is not np.nan and x["dur"] is not np.nan:
        val = relevant_intervals.overlaps(I.from_string(str(x["interval"]), int))           
        x["valid"] = "1" if val else "0"  
    return x
filtered_events["valid"] = "0"
valid_events = filtered_events.apply(contains, axis=1, meta=filtered_events).query("valid == '1'").persist()
_ = wait(valid_events)



In [139]:
valid_events = valid_events.sort_values("ts")
valid_events.compute()

Unnamed: 0,hhash,pid,tid,cat,name,type,ts,te,dur,interval,size,fhash,hash,valid
12945,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,balance_dirty_pages_ratelimited_flags,0,2408580,2408581,1,"[2408580,2408581)",,,,1
12946,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,balance_dirty_pages_ratelimited,0,2408580,2408581,1,"[2408580,2408581)",,,,1
12947,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,ext4_buffered_write_iter,0,2408580,2408581,1,"[2408580,2408581)",,,,1
12948,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,ext4_file_write_iter,0,2408580,2408582,2,"[2408580,2408582)",,,,1
12949,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,vfs,vfs_write,0,2408580,2408590,10,"[2408580,2408590)",,,,1
12950,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,sys,write,0,2408580,2408592,12,"[2408580,2408592)",67108864.0,2337428835aa42fa0d2764000f669460,,1
6806,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,balance_dirty_pages_ratelimited_flags,0,3119402,3119403,1,"[3119402,3119403)",,,,1
6807,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,balance_dirty_pages_ratelimited,0,3119402,3119403,1,"[3119402,3119403)",,,,1
6808,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,ext4_buffered_write_iter,0,3119402,3119403,1,"[3119402,3119403)",,,,1
6809,ecd9cccc050c9e893ab33b1a228fe76d,17132,17132,kernel,ext4_file_write_iter,0,3119402,3119404,2,"[3119402,3119404)",,,,1


In [140]:
valid_events.groupby(["name", "cat"])["dur"].sum().compute() / 1e6

name                                   cat   
balance_dirty_pages_ratelimited        kernel    0.000005
balance_dirty_pages_ratelimited_flags  kernel    0.000005
ext4_buffered_write_iter               kernel    0.000005
write                                  sys       0.000051
ext4_file_write_iter                   kernel     0.00001
vfs_write                              vfs        0.00004
Name: dur, dtype: double[pyarrow]