In [7]:
from glob import glob
import pandas as pd
print(f"pd {pd.__version__}")
import dask
import dask.dataframe as dd
print(f"dask {dask.__version__}")
import pyarrow as pa
print(f"pa {pa.__version__}")
import numpy as np
print(f"np {np.__version__}")
from itertools import chain

from dask.distributed import Client, LocalCluster, progress, wait
from dask.distributed import Future, get_client
from typing import Tuple, Union
import os
import intervals as I
import math

import re
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.tree import Tree
import subprocess
import json
import logging
import darshan
print(f"darshan {darshan.__version__}")

pd 2.1.1
dask 2023.6.0
pa 12.0.1
np 1.24.3
darshan 3.4.4.0


In [2]:
logging.basicConfig(filename='darshan_main.log', encoding='utf-8', level=logging.DEBUG)

In [3]:
initialized = False

In [4]:

def execute_script(cmd):
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
    (out, err) = proc.communicate()
    output = out.decode('utf-8')
    logging.debug(f"Executing cmd {cmd} and returned {output}")
    return output
def create_index(filename):
    directory = os.path.dirname(filename)
    index_file = os.path.join(directory, "index", f"{filename}.zindex")
    if not os.path.exists(index_file):
        result = execute_script([f"{ZINDEX_BIN}/zindex {filename} --regex 'id:\b([0-9]+)' --numeric --unique --index-file file:{index_file}"])
        logging.debug(f"Creating Index for {filename} returned {result}")
    return filename

def get_linenumber(filename):
    directory = os.path.dirname(filename)
    index_file = os.path.join(directory, "index", f"{filename}.zindex")
    line_number = execute_script([f"/usr/bin/sqlite3 {index_file} 'select MAX(a. line) from LineOffsets a;'"])
    if line_number == "":
        line_number = 1
    else:
        line_number = int(line_number.split("\n")[0])
    logging.debug(f" The {filename} has {line_number} lines")
    return DELIMITER.join([filename, str(line_number)])

def get_size(filename):
    directory = os.path.dirname(filename)
    index_file = os.path.join(directory, "index", f"{filename}.zindex")
    size = execute_script([f"/usr/bin/sqlite3 {index_file} 'SELECT SUM(length)  FROM LineOffsets;'"])
    #print(size)
    if size == "":
        size = 1
    else:
        size = int(size.split("\n")[0])
    logging.debug(f" The {filename} has {size/1024**3} GB size")
    return int(size)


def generate_line_batches(args):
    filename, max_line = args.split(DELIMITER)
    max_line = int(max_line)
    #print(args)
    for start in range(0, max_line, BATCH):
        logging.debug(f"Created a batch for {filename} from [{start}, {start + BATCH}] lines")
        yield DELIMITER.join([filename, str(start)]) 

def load_indexed_gzip_files(args):
    for arg in args:
        filename, start = arg.split(DELIMITER)
        directory = os.path.dirname(filename)
        index_file = os.path.join(directory, "index", f"{filename}.zindex")
        start = int(start)
        result = execute_script(f"{ZINDEX_BIN}/zq --index-file {index_file} {filename} --raw 'select a.line from LineOffsets a where a.line >= {start} AND a.line < {start+BATCH};'")
        #print(value)
        if result == "":
            logging.debug(f"Found an empty line for {filename} range [{start}, {start + BATCH}] lines")
            return None
        else:
            json_lines = result.split("\n")
            for json_line in json_lines:
                logging.debug(f"Found an {json_line} line for {filename} range [{start}, {start + BATCH}] lines")
                yield json_line

def load_objects(line, fn, time_granularity):
    d = {} 
    if line is not None and line !="" and "[" != line[0] and line != "\n" :
        val = {}
        try:
            val = json.loads(line)
            if "name" in val:
                d["index"] = val["id"]
                d["name"] = val["name"]
                d["cat"] = val["cat"]
                d["pid"] = val["pid"]
                d["tid"] = val["tid"]
                d["dur"] = val["dur"]
                d["tinterval"] = I.to_string(I.closed(val["ts"] , val["ts"] + val["dur"]))
                d["trange"] = int(((val["ts"] + val["dur"])/2.0) / time_granularity)
                d.update(io_function(val, d))
                if fn:
                    d.update(fn(val, d))
                logging.debug(f"built an dictionary for line {d}")
        except ValueError as error:
            logging.error(f"Processing {line} failed with {error}")
    return d

In [5]:
def io_function(json_object, current_dict):
    d = {}
    d["phase"] = 0
    if "compute" in json_object["name"]:
        d["compute_time"] = current_dict["tinterval"]
        d["phase"] = 1
    else:
        d["compute_time"] = I.to_string(I.empty())
    if "POSIX" in json_object["cat"]:
        d["io_time"] = current_dict["tinterval"]
        d["phase"] = 2
    else:
        d["io_time"] = I.to_string(I.empty())
    if "args" in json_object:
        if "filename" in json_object["args"]:
            d["filename"] = json_object["args"]["filename"]        
        if "hostname" in json_object["args"]:
            d["hostname"] = json_object["args"]["hostname"]
            
        if "POSIX" == json_object["cat"]:
            if "write" in json_object["name"]:
                d["size"] = int(json_object["args"]["ret"])
            elif "read" in json_object["name"] and "readdir" not in json_object["name"]:
                d["size"] = int(json_object["args"]["ret"])
        else:
            if "image_size" in json_object["args"]:
                d["size"] = json_object["args"]["image_size"]
    return d

def io_columns():
    return {
        'hostname': "string[pyarrow]",
        'compute_time': "string[pyarrow]",
        'io_time': "string[pyarrow]",
        'filename': "string[pyarrow]",
        'phase': "uint16[pyarrow]",
        'size': "uint64[pyarrow]"
    }

In [6]:
def group_func(df):
    val = I.empty()
    for index, value in df.items():
        val = val.union(I.from_string(str(value), int))
    logging.debug(f"Grouped Range into {val}")
    return I.to_string(val)

def union_portions():        
    return dd.Aggregation(
        'union_portions',
        chunk=lambda s: s.apply(group_func),     
        agg=lambda s: s.apply(group_func)
    )
def difference_portion(df, a, b):
    return I.from_string(str(df[a]), int) - I.from_string(str(df[b]), int)
def size_portion(df, col):
    val = 0.0
    ia = I.from_string(str(df[col]), int)
    for i in list(ia):
        if not i.is_empty():
            val += i.upper - i.lower
    logging.debug(f"Calculating size of Interval {val}")
    return val
def percentile(n):
    return dd.Aggregation(
    name='percentile_{:02.0f}'.format(n*100),
    # this computes the median on each partition
    chunk=lambda s: s.quantile(n),
    # this combines results across partitions; the input should just be a list of length 1
    agg=lambda s0: s0.quantile(n),
    )
median_fun = dd.Aggregation(
    name="median",
    # this computes the median on each partition
    chunk=lambda s: s.median(),
    # this combines results across partitions; the input should just be a list of length 1
    agg=lambda s0: s0.median(),
)
def human_format(num):
    if num:
        num = float('{:.3g}'.format(num))
        magnitude = 0
        while abs(num) >= 1024:
            magnitude += 1
            num /= 1024.0
        return '{}{}'.format('{:.0f}'.format(num).rstrip('.'), ['', 'KB', 'MB', 'GB', 'TB'][magnitude])
    else:
        return "NA"

In [5]:
def generate_darshan_records(log_file):
    def get_dict(row):
        d = {}
        d["size"] = row["length"]
        d["ts"] = int(row["start_time"] * 10e6)
        d["dur"] = int(row["end_time"] * 10e6) -  d["ts"]
        d["tinterval"] = I.to_string(I.closed(d["ts"] , d["ts"] + d["dur"]))
        d["trange"] = int(((d["ts"] + d["dur"])/2.0) / time_granularity)
        d["phase"] = 2
        d["compute_time"] = I.to_string(I.empty())
        d["io_time"] = d["tinterval"]
        return d
    report = darshan.DarshanReport(log_file, read_all=True)
    if "DXT_POSIX" in report.modules:
        time_granularity=10e3
        for val in report.records['DXT_POSIX'].to_df():
            d = {}
            fileid = val["id"]
            write_df = val["write_segments"]
            read_df = val["read_segments"]
            d["hostname"] = val["hostname"]
            d["pid"] = val["rank"]
            d["tid"] = 0
            d["cat"] = "POSIX"
            d["filename"] = report.data['name_records'][fileid]
            for index, row in write_df.iterrows():
                d["name"] = "write"
                d.update(get_dict(row))
                yield d
            for index, row in read_df.iterrows():
                d["name"] = "read"
                d.update(get_dict(row))
                yield d   

In [8]:
HOST_PATTERN=r'corona(\d+)'
ZINDEX_BIN="/usr/WS2/iopp/software/zindex/build/Release"
REBUILD_INDEX=False
BATCH=1024**2
DELIMITER=";"

In [9]:
class DarshanAnalyzer:
    
    def __init__(self, file_pattern, time_granularity=10e3):
        file_pattern = glob(file_pattern)
        for file in file_pattern:
            if not file.endswith('.darshan'):
                raise Exception(f"Only darshan files supported: {file}.")
        logging.debug(f"Processing files {file_pattern}")
        create_bag = dask.bag.from_delayed([dask.delayed(generate_darshan_records)(file) 
                                                for file in file_pattern])
        columns = {'name':"string[pyarrow]", 'cat': "string[pyarrow]",
                   'pid': "uint64[pyarrow]",'tid': "uint64[pyarrow]",
                   'dur': "uint64[pyarrow]", 'tinterval': "string[pyarrow]",
                   'trange': "uint64[pyarrow]", 'hostname': "string[pyarrow]",
                    'compute_time': "string[pyarrow]", 'io_time': "string[pyarrow]",
                    'filename': "string[pyarrow]", 'phase': "uint16[pyarrow]",
                    'size': "uint64[pyarrow]"}
        events = create_bag.to_dataframe(meta=columns)
        n_partition = 1
        logging.debug(f"Number of partitions used are {n_partition}")
        self.events = events.repartition(npartitions=n_partition).compute()
        logging.info(f"Loaded events")
        
    
    def _calculate_time(self):
        grouped_df = self.events.groupby("trange").agg({"io_time":union_portions(),"compute_time":union_portions()})
        grouped_df["only_io"] = grouped_df[["io_time", "compute_time"]].apply(difference_portion, a="io_time", b="compute_time", axis=1, meta=("string[pyarrow]"))
        grouped_df["only_compute"] = grouped_df[["io_time", "compute_time"]].apply(difference_portion, a="compute_time", b="io_time", axis=1, meta=("string[pyarrow]"))
        total_io_time, total_compute_time, only_io, only_compute = dask.compute(
            grouped_df[["io_time"]].apply(size_portion, col="io_time", axis=1).sum(),
            grouped_df[["compute_time"]].apply(size_portion, col="compute_time", axis=1).sum(),
            grouped_df[["only_io"]].apply(size_portion, col="only_io", axis=1).sum(),
            grouped_df[["only_compute"]].apply(size_portion, col="only_compute", axis=1).sum(),
        )
        logging.info(f"Calculated times are total_io_time {total_io_time}us, total_compute_time {total_compute_time}us, only_io {only_io}us, only_compute {only_compute}us")
        return total_io_time, total_compute_time, only_io, only_compute
    
    def _create_interval(self, list_items):
        logging.debug(f"Creating interval from {list_items}")
        prev = list_items[0]
        val = I.closed(prev,prev)
        for proc in list_items[1:]:
            val = val | I.closed(prev,proc)
            prev = proc
        logging.info(f"Created an interval of {val}")
        return val
    
    def _create_host_intervals(self, hosts_list):
        logging.debug(f"Creating regex for {hosts_list}")
        is_first = True
        value = I.empty()
        for host in hosts_list:    
            val = int(re.findall(HOST_PATTERN, host)[0])
            if is_first:
                prev = val
                is_first = False
                value = I.closed(prev,prev)
            else:
                value = value | I.closed(prev,val)
        val = re.findall(HOST_PATTERN, hosts_list[0])[0]
        regex = hosts_list[0].replace(val,str(value))
        logging.info(f"Created regex value {val}")
        return regex
    
    def _remove_numbers(self, string_items):
        logging.debug(f"Removing numbers from {string_items}")
        item_sets = set()
        for file in string_items:
            item_sets.add(re.sub(r'\d', 'X', str(file)))
        logging.info(f"List after removing numbers {list(item_sets)}")
        return list(item_sets)
    
    def summary(self):
        total_io_time, total_compute_time, only_io, only_compute = self._calculate_time()
        hosts_used, filenames_accessed, num_procs, compute_tid, posix_tid, io_by_operations = dask.compute(
            self.events["hostname"].unique(),
            self.events["filename"].unique(),
            self.events["pid"].unique(),
            self.events.query("phase == 1")["tid"].unique(),
            self.events.query("phase == 2")["tid"].unique(),
            self.events.query("phase == 2").groupby(["name"]).agg({"dur":[sum,"count"], "size":[sum,"mean",median_fun,min,max,percentile(.25),percentile(.75)]})
        )
        num_events = len(self.events)
        
        hosts_used = hosts_used.to_list()
        hosts_used_regex_str = self._create_host_intervals(hosts_used)
        
        filenames_accessed = filenames_accessed.to_list()
        filename_basename_regex_str = self._remove_numbers(filenames_accessed)
        
        num_procs = num_procs.to_list()
        proc_name_regex = self._create_interval(num_procs)
        
        io_by_ops_dict = io_by_operations.T.to_dict()

        # Create a new Table object from Rich library
        table = Table(box=None, show_header=False)

        # Add columns to the table for the key and value
        table.add_column(style="cyan")
        table.add_column()
        app_tree = Tree("Scheduler Allocation Details")
        app_tree.add(f"Nodes: {str(len(hosts_used))} {hosts_used_regex_str}")
        app_tree.add(f"Processes: {str(len(num_procs))} {str(proc_name_regex)}")
        thread_tree = Tree("Thread allocations across nodes (includes dynamically created threads)")
        thread_tree.add(f"Compute: {str(len(compute_tid))}")
        thread_tree.add(f"I/O: {str(len(posix_tid))}")
        app_tree.add(thread_tree)
        app_tree.add(f"Events Recorded: {str(num_events)}")
        table.add_row("Allocation",app_tree)

        data_tree = Tree("Description of Dataset Used")
        data_tree.add(f"Files: {str(len(filenames_accessed))} {filename_basename_regex_str}")
        table.add_row("Dataset",data_tree)

        io_tree = Tree("Behavior of Application")
        io_time = Tree("Split of Time in application")
        io_time.add(f"Compute: {total_compute_time/1e6:.3f} sec")
        io_time.add(f"Overall I/O: {total_io_time/1e6:.3f} sec")
        io_time.add(f"Unoverlapped I/O: {only_io/1e6:.3f} sec")
        io_time.add(f"Unoverlapped Compute: {only_compute/1e6:.3f} sec")
        io_tree.add(io_time)
        padding_size = 6
        key_padding_size = 15
        io_ts = Tree("Transfer size distribution by function")
        io_ts.add(f"{'Function':<{key_padding_size}}|{'min':<{padding_size}}|{'25':<{padding_size}}|{'mean':<{padding_size}}|{'median':<{padding_size}}|{'75':<{padding_size}}|{'max':<{padding_size}}|")
        for key, value in io_by_ops_dict.items():
            if "close" not in key or "open" not in key:
                io_ts.add(f"{key.split('.')[-1]:<{key_padding_size}}|{human_format(value[('size', 'min')]):<{padding_size}}|{human_format(value[('size', 'percentile_25')]):<{padding_size}}|{human_format(value[('size', 'mean')]):<{padding_size}}|{human_format(value[('size', 'median')]):<{padding_size}}|{human_format(value[('size', 'percentile_75')]):<{padding_size}}|{human_format(value[('size', 'max')]):<{padding_size}}|")
        io_tree.add(io_ts)
        io_ops = Tree("Event count by function")
        for key, value in io_by_ops_dict.items():
            io_ops.add(f"{key.split('.')[-1]} : {value[('dur', 'count')]}")
        io_tree.add(io_ops)               
        table.add_row("I/O Behavior",io_tree)
        console = Console()

        # Print the table with Rich formatting
        console.print(Panel(table, title='Summary'))
        

In [10]:
if not initialized:
    workers = 16
    cluster = LocalCluster(n_workers=workers)  # Launches a scheduler and workers locally
    client = Client(cluster)  # Connect to distributed cluster and override default
    initialized = True
    logging.info(f"Initialized Client with {workers} workers")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34401 instead


In [3]:
filename = "/usr/workspace/haridev/dlio_paper_results/darshan_overhead_corona_1_40/*.darshan"
from glob import glob
file_pattern = glob(filename)
len(file_pattern)

84

In [8]:
file_pattern = glob(filename)
all_records = []
for file in file_pattern:
    for record in generate_darshan_records(file):
        all_records.append(all_records)

In [None]:
pd.DataFrame.from_dict(all_records, orient='columns')

In [9]:
df = pd.DataFrame(all_records)

In [12]:
analyzer = DarshanAnalyzer(filename)

In [25]:
analyzer._calculate_time()

TypeError: 'Aggregation' object is not callable

In [None]:
analyzer.events["filename"].unique().compute()

In [None]:
analyzer.events.head()

In [None]:
%%timeit -n 1 -r 10

In [None]:

analyzer.summary()

In [None]:
cat df_analyzer_main.log

In [None]:
cat /usr/workspace/haridev/dftracer-results/cosmoflow-profile-compress-meta-scale-new-16/dlio.log | grep Ending