In [1]:
import plotly.graph_objects as go
import networkx as nx
import os

# reading input log file

test_name = "vist"

stat_path=f"example_stat/{test_name}"
image_path=f"{stat_path}/images"



In [2]:
# My utility functions
import utils.stat_loader as sload
import utils.stat_print as sp
import utils.vfd_stat2graph as vfd2g
import utils.vfd_graph2sankey as vfd2sk
import utils.full_stat2graph as f2g

# Improve Functions
## TODO
- Add dataset nodes
- Add file address ordering nodes


In [3]:
STAGE_START = 0
STAGE_END = 2


TASK_ORDER_LIST = sload.load_task_order_list(stat_path)
STAGE_END = sload.correct_end_stage(TASK_ORDER_LIST, STAGE_END)

TASK_ORDER_LIST = sload.current_task_order_list(TASK_ORDER_LIST, STAGE_START, STAGE_END)

TASK_LISTS = list(TASK_ORDER_LIST.keys())

print(f"TASK_ORDER_LIST = {TASK_ORDER_LIST}")
TASK_LISTS


STAGE_END is not in TASK_ORDER_LIST, set to max order: 1
TASK_ORDER_LIST = {'arldm_saveh5': 0, 'arldm_train': 1}


['arldm_saveh5', 'arldm_train']

In [4]:
vfd_files = sload.find_files_with_pattern(stat_path, "vfd")
# vfd_files = vfd_files[0:1]
print(f"vfd_files: {vfd_files}")

vfd_dict = sload.load_stat_json(vfd_files)
# print(vfd_dict)


vol_files = sload.find_files_with_pattern(stat_path, "vol")
# vol_files = vol_files[0:1]
print(f"vol_files: {vol_files}")

vol_dict = sload.load_stat_json(vol_files)

print("loading json done")

vfd_files: ['example_stat/vist/271485-vfd_data_stat.json', 'example_stat/vist/271491-vfd_data_stat.json']
loading example_stat/vist/271485-vfd_data_stat.json
loading example_stat/vist/271491-vfd_data_stat.json
vol_files: ['example_stat/vist/271485-vol_data_stat.json', 'example_stat/vist/271491-vol_data_stat.json']
loading example_stat/vist/271485-vol_data_stat.json
loading example_stat/vist/271491-vol_data_stat.json
loading json done


: 

In [5]:
# Show VFD Tracker overhead
sp.show_all_overhead("VFD", vfd_dict)

Total VFD overhead: 44.0 ms
Total POSIX IO time: 177.0 ms


In [6]:
def add_task_dset_file_nodes(G, stat_dict, task_list):
    file_order_list = {} # keeptrack of dataset order in each file
    task_order_list = {}
    dset_order_list = {}
    file_page_map = {} # keep track of dataset page map in each file
    edge_stats = {} # All edge stats
    layer = 0
    for stat_file, stat_list in stat_dict.items():
        print(f"stat_file: {stat_file}")
        for li in stat_dict[stat_file]:
            k = list(li.keys())[0]
            if 'file' in k: # look for file entries
                stat = li[k]
                parts = k.split("-")
                node_order = int(parts[1])
                if node_order not in file_order_list:
                    file_order_list[node_order] = 0
                
                task_name = stat['task_name']
                # Extract taskname without PID: e.g. arldm_saveh5-1119693 tp arldm_saveh5
                task_name_base = task_name.split('-')[0]
                
                if task_name_base in task_list: # select task entries
                    if task_name_base not in task_order_list: task_order_list[task_name_base] = 0
                    else: task_order_list[task_name_base] += 1
                    
                    parts = k.split("-")
                    node_name = f"{k} : {li[k]['file_name']}"
                    access_type = stat['access_type']
                    # file_name = stat['file_name']
                    file_name = os.path.basename(stat['file_name']) # FIXME: use basename for now
                    node_order = int(parts[1])
                    
                    
                    file_stat = {"open_time": stat['open_time'], "close_time": stat['close_time'], 
                                 "file_intent": stat['file_intent'], 
                                 "file_read_cnt": stat['file_read_cnt'], 
                                 "file_write_cnt": stat['file_write_cnt'], 
                                 "access_type": stat['access_type'], "io_bytes": stat['io_bytes'], 
                                 "file_size": stat['file_size']}
                    
                    # Get used dataset statastics of this file
                    cur_dset_stats = f2g.get_all_dset_stat(stat)
                    
                    # Add file pages to file_page_map
                    dataset_page_map = f2g.dset_page_map(cur_dset_stats)
                    if file_name not in file_page_map:
                        file_page_map[file_name] = dataset_page_map
                    else:
                        for dset, pages in dataset_page_map.items():
                            if dset in file_page_map[file_name]:
                                file_page_map[file_name][dset].extend(pages)
                            else:
                                file_page_map[file_name][dset] = pages
                    
                    # TODO: currently treat 'read_write' as 'write_only', 'read_write' as 'write_only'
                    if access_type == 'read_only': # Initial input files
                        file_x = 0
                        addr_x = 1
                        dset_x = 2
                        task_x = 3
                        # ORDER: file (-> file address) -> datasets -> task
                        if not G.has_node(file_name):
                            G.add_node(file_name, pos=(file_x+layer,node_order))
                            file_node_attrs = {file_name: {'rpos':0, 'order': node_order, 'type':'file'}} # no stat here
                            nx.set_node_attributes(G, file_node_attrs)
                        else:
                            # already has node, get x position as layer
                            layer= G.nodes[file_name]['pos'][0]

                        if not G.has_node(task_name):  # add task node
                            G.add_node(task_name, pos=(task_x+layer,task_order_list[task_name_base]))
                            # TODO: change to use VFD stats here
                            task_node_attrs = {task_name: {'rpos':0, 'order': node_order, 'type':'task'}}
                            nx.set_node_attributes(G, task_node_attrs)

                        for dset, dset_stat in cur_dset_stats.items():
                            dset_node = f"{dset}-R"
                            if not G.has_node(dset_node):
                                node_order = cur_dset_stats[dset]['order']
                                G.add_node(dset_node, pos=(dset_x+layer,node_order))
                                node_type = 'dataset'
                                if dset == "file": node_type = 'file'
                                dset_node_attrs = {dset_node: {'rpos':1, 'order': node_order, 'type':node_type, 'stat': dset_stat}}
                                nx.set_node_attributes(G, dset_node_attrs)
                            ftd_attr = {'label':task_name, 'dset_stat':dset_stat, 'access_type':access_type, 'file_stat':file_stat, 'edge_type':'file-dset'}
                            if (file_name, dset_node) not in edge_stats:
                                edge_stats[(file_name, dset_node)] = ftd_attr
                            else:
                                edge_stats[(file_name, dset_node)].update(ftd_attr)
                            dtt_attr = {'label':task_name, 'dset_stat':dset_stat, 'access_type':access_type, 'file_stat':file_stat, 'edge_type':'dset-task'}
                            if (dset_node, task_name) not in edge_stats:
                                edge_stats[(dset_node, task_name)] = dtt_attr
                            else:
                                edge_stats[(dset_node, task_name)].update(dtt_attr)
                            
                            # edge_stats[(file_name, dset_node)] = {'label':task_name, 'dset_stat':dset_stat, 'access_type':access_type, 'stat':file_stat, 'edge_type':'file_to_dset'}                          
                            # edge_stats[(dset_node, task_name)] = {'label':task_name, 'dset_stat':dset_stat, 'access_type':access_type, 'stat':file_stat, 'edge_type':'dset_to_task'}
                        
                    elif access_type == 'write_only' or access_type == 'read_write': # Intermediate files
                        # ORDER: task -> datasets (-> file address) -> file
                        file_x = 3
                        addr_x = 2
                        dset_x = 1
                        task_x = 0
                        if not G.has_node(task_name):  # add task node
                            G.add_node(task_name, pos=(task_x+layer,task_order_list[task_name_base]))
                            # TODO: change to use VFD stats here
                            task_node_attrs = {task_name: {'rpos':0, 'order': node_order, 'type':'task'}}
                            nx.set_node_attributes(G, task_node_attrs)
                        else:
                            # already has node, get x position as layer
                            layer= G.nodes[task_name]['pos'][0]
                        
                        if not G.has_node(file_name):
                            G.add_node(file_name, pos=(file_x+layer,node_order))
                            file_node_attrs = {file_name: {'rpos':0, 'order': node_order, 'type':'file'}} # no stat here
                            nx.set_node_attributes(G, file_node_attrs)
                        
                        for dset, dset_stat in cur_dset_stats.items():
                            dset_node = f"{dset}-W"
                            if not G.has_node(dset_node):
                                node_order = cur_dset_stats[dset]['order']
                                G.add_node(dset_node, pos=(dset_x+layer,node_order))
                                node_type='dataset'
                                if dset == "file": node_type = 'file'
                                dset_node_attrs = {dset_node: {'rpos':1, 'order': node_order, 'type':node_type, 'stat': dset_stat}}
                                nx.set_node_attributes(G, dset_node_attrs)
                            ttd_attr = {'label':task_name, 'dset_stat':dset_stat, 'access_type':access_type, 'file_stat':file_stat, 'edge_type':'task-dset'}
                            if (task_name, dset_node) not in edge_stats:
                                edge_stats[(task_name, dset_node)] = ttd_attr
                            else:
                                edge_stats[(task_name, dset_node)].update(ttd_attr)
                            dtf_attr = {'label':task_name, 'dset_stat':dset_stat, 'access_type':access_type, 'file_stat':file_stat, 'edge_type':'dset-file'}
                            if (dset_node, file_name) not in edge_stats:
                                edge_stats[(dset_node, file_name)] = dtf_attr
                            else:
                                edge_stats[(dset_node, file_name)].update(dtf_attr)

                            # edge_stats[(task_name, dset_node)] = {'label':task_name, 'dset_stat':dset_stat, 'access_type':access_type, 'stat':file_stat, 'edge_type':'task_to_dset'}
                            # edge_stats[(dset_node, file_name)] = {'label':task_name, 'dset_stat':dset_stat, 'access_type':access_type, 'stat':file_stat, 'edge_type':'dset_to_file'}

                    layer+=2
    G.add_edges_from(edge_stats.keys())
    nx.set_edge_attributes(G, edge_stats)
    return G


G_VFD = nx.DiGraph()
G_VFD = add_task_dset_file_nodes(G_VFD, vfd_dict, TASK_LISTS)

stat_file: example_stat/vist/271485-vfd_data_stat.json
stat_file: example_stat/vist/271491-vfd_data_stat.json


In [7]:
def add_file_page(G, file_page_nodes_attr, dset_page_edges):
    add_edge_stat = {}
    edges_to_remove = []
    nodes_to_add = {}
    
    for edge in G.edges():
        edge_stat = G.edges[edge]
        
        if edge_stat['edge_type'] == 'file-dset': # read
            # print(f"edge: {edge} -> {edge_stat['dset_stat']}")
            access_type = edge_stat['access_type']
            edges_to_remove.append(edge)
            file_name = edge[0]
            dset_name = edge[1]
            
            for page_dset_edge in dset_page_edges:
                new_edge_stat = dset_page_edges[page_dset_edge]
                page = page_dset_edge[0]
                page_name = f"{page}-R"                    
                new_dset_name = f"{page_dset_edge[1]}-R"
                if new_dset_name == dset_name:
                    # print(f"new_edge: {new_edge}")
                    if not G.has_node(page_name):
                        node_x = G.nodes[file_name]['pos'][0] + 1 # after file nodes
                        node_order = file_page_nodes_attr[page]['pos'][1]
                        nodes_to_add[page_name] = {page_name: {'pos':(node_x,node_order) , 'rpos':1, 'order': node_order, 'type':'addr', 'size': file_page_nodes_attr[page]['size'], 'range': file_page_nodes_attr[page]['range']}}
                        page_stat = {'size': file_page_nodes_attr[page]['size'], 'range': file_page_nodes_attr[page]['range'], 'access_cnt': dset_page_edges[page_dset_edge]['access_cnt']}
                        # Add page to dset edge
                        add_edge_stat[(page_name, new_dset_name)] = {'label':edge_stat['label'], 
                                     'access_type':access_type, 
                                     'page_stat':page_stat,
                                     'edge_type':'page-dset',
                                     'file_stat':edge_stat['file_stat'],
                                     'dset_stat':edge_stat['dset_stat']} 
                        # Add file to page edge
                        add_edge_stat[(file_name, page_name)] = {'label':edge_stat['label'], 
                                     'access_type':access_type, 
                                     'page_stat':page_stat,
                                     'edge_type':'file-page',
                                     'file_stat':edge_stat['file_stat'],
                                     'dset_stat':edge_stat['dset_stat']} 
                        
        if edge_stat['edge_type'] == 'dset-file': # write
            access_type = edge_stat['access_type']
            edges_to_remove.append(edge)
            file_name = edge[1]
            dset_name = edge[0]
            
            for page_dset_edge in dset_page_edges:
                new_edge_stat = dset_page_edges[page_dset_edge]
                page = page_dset_edge[1]
                page_name = f"{page}-W"
                new_dset_name = f"{page_dset_edge[0]}-W"
                if new_dset_name == dset_name:
                    # print(f"new_edge: {new_edge}")
                    if not G.has_node(page_name):
                        node_x = G.nodes[dset_name]['pos'][0] + 1 # after dset nodes
                        node_order = file_page_nodes_attr[page]['pos'][1]
                        nodes_to_add[page_name] = {page_name: {'pos':(node_x,node_order) , 'rpos':1, 'order': node_order, 'type':'addr', 'size': file_page_nodes_attr[page]['size'], 'range': file_page_nodes_attr[page]['range']}}
                        page_stat = {'size': file_page_nodes_attr[page]['size'], 'range': file_page_nodes_attr[page]['range'], 'access_cnt': dset_page_edges[page_dset_edge]['access_cnt']}
                        # Add page to dset edge
                        add_edge_stat[(new_dset_name, page_name)] = {'label':edge_stat['label'], 
                                     'access_type':access_type, 
                                     'page_stat':page_stat,
                                     'edge_type':'dset-page',
                                     'file_stat':edge_stat['file_stat'],
                                     'dset_stat':edge_stat['dset_stat']}
                        # Add file to page edge
                        add_edge_stat[(page_name, file_name)] = {'label':edge_stat['label'], 
                                     'access_type':access_type, 
                                     'page_stat':page_stat,
                                     'edge_type':'page-file',
                                     'file_stat':edge_stat['file_stat'],
                                     'dset_stat':edge_stat['dset_stat']} 
    

    
    return add_edge_stat, edges_to_remove, nodes_to_add



file_page_nodes_attr, dset_page_edges = f2g.get_file_dset_maps(vfd_dict, TASK_LISTS)
add_edge_stat,edges_to_remove,nodes_to_add = add_file_page(G_VFD, file_page_nodes_attr, dset_page_edges)
# for k,v in add_edge_stat.items():
#     print(f"add_edge_stat: {k} -> {v}")
print(f"edges_to_add: {add_edge_stat.keys()}")
print(f"edges_to_remove: {edges_to_remove}")
G_VFD = f2g.update_nodes_edges(G_VFD,add_edge_stat, edges_to_remove, nodes_to_add)

stat_file: example_stat/vist/271485-vfd_data_stat.json
stat_file: example_stat/vist/271491-vfd_data_stat.json
file_page_nodes_attr: {'[0-567)': {'pos': (0, 0), 'rpos': 0, 'range': (0, 567), 'size': 37158912}, '[567-1134)': {'pos': (0, 1), 'rpos': 0, 'range': (567, 1134), 'size': 37158912}, '[1134-1701)': {'pos': (0, 2), 'rpos': 0, 'range': (1134, 1701), 'size': 37158912}, '[1701-2268)': {'pos': (0, 3), 'rpos': 0, 'range': (1701, 2268), 'size': 37158912}}
dset_page_edges: {('[0-567)', 'dii'): {'access_cnt': 2, 'access_type': 'read'}, ('[1701-2268)', 'dii'): {'access_cnt': 2, 'access_type': 'read'}, ('dii', '[0-567)'): {'access_cnt': 2, 'access_type': 'write'}, ('[0-567)', 'file'): {'access_cnt': 42, 'access_type': 'read'}, ('[1701-2268)', 'file'): {'access_cnt': 2, 'access_type': 'read'}, ('file', '[1701-2268)'): {'access_cnt': 1, 'access_type': 'write'}, ('[0-567)', 'image0'): {'access_cnt': 10, 'access_type': 'read'}, ('[1701-2268)', 'image0'): {'access_cnt': 2, 'access_type': 'read'}

In [8]:
task_file_map = sload.load_task_file_map(stat_path, test_name, TASK_LISTS)

for task, stat in task_file_map.items():
    print(f"{task} : {stat}")



task_file_map = {'arldm_saveh5-271485': {'order': 0, 'io_cnt': 118, 'input': [], 'output': ['/home/mtang11/experiments/ARLDM/output_data/vistsis_out.h5']}, 'arldm_train-271491': {'order': 1, 'io_cnt': 17, 'input': ['/mnt/common/mtang11/experiments/ARLDM/output_data/vistsis_out.h5'], 'output': []}, 'arldm_train-271566': {'order': 1, 'io_cnt': 11, 'input': ['/mnt/common/mtang11/experiments/ARLDM/output_data/vistsis_out.h5'], 'output': []}}
arldm_saveh5-271485 : {'order': 0, 'io_cnt': 118, 'input': [], 'output': ['/home/mtang11/experiments/ARLDM/output_data/vistsis_out.h5']}
arldm_train-271491 : {'order': 1, 'io_cnt': 17, 'input': ['/mnt/common/mtang11/experiments/ARLDM/output_data/vistsis_out.h5'], 'output': []}
arldm_train-271566 : {'order': 1, 'io_cnt': 11, 'input': ['/mnt/common/mtang11/experiments/ARLDM/output_data/vistsis_out.h5'], 'output': []}


In [9]:
# G_VFD = vfd2g.set_task_position_full(G_VFD, task_file_map, STAGE_START)
# sp.display_all_nodes_attr(G_VFD)
all_edge_types = nx.get_edge_attributes(G_VFD,'edge_type')
edge_types = []
for edge in all_edge_types:
    edge_types.append(all_edge_types[edge])

print(f"edge_types: {set(edge_types)}")

edge_types: {'dset-task', 'task-dset', 'file-page', 'page-dset', 'page-file', 'dset-page'}


In [10]:
# sp.display_all_nodes_attr(G_VFD)
# G_VFD = vfd2g.set_file_position(G_VFD, task_file_map)


sp.draw_graph(G_VFD, test_name, stat_path, graph_type="vfd", prefix=f'{(STAGE_END+1)}s', save=True)

In [None]:
# sp.display_all_edges_attr(G_VFD)

# Add networkx to Sankey diagram

## Statistics for Sankey
Below are needed edge attributes before generating the sankey diagram:
- access_cnt : The total file/dataset access count 
- access_size : The total read and write access size
- operation : The operation type : read, write, read_write
- bandwidth : Get the per access size and time, then calculate the bandwidth. Average the bandwidth if multiple accesses. (TODO: currently only recording one access size and time)




In [None]:
f2g.prepare_sankey_stat_full(G_VFD)

In [None]:
vfd2sk.time_to_file_x_pos(G_VFD)

In [None]:
vfd_nodes, vfd_nodes_dict = vfd2sk.get_nodes_for_sankey(G_VFD, label_on=True)

# print(vfd_nodes)

vfd_links = vfd2sk.get_links_for_sankey(G_VFD, vfd_nodes_dict, val_sqrt=False)
fig = go.Figure(go.Sankey(
            node = vfd_nodes,
            link = vfd_links, orientation='h'))

width = 2000
height = 800

fig.update_layout(
    autosize=False,
    width=width,
    height=height,
    margin=dict(
        l=width/100,
        r=width/50,
        b=height/100,
        t=height/5,
        pad=2
    ),
    font=dict(size=18),
)


fig.show()
save_html_path = f"{stat_path}/vfd-{(STAGE_END+1)}s-{test_name}-sankey-labeled-s4.html"
fig.write_html(save_html_path)
print(f"Sankey saved to {save_html_path}")

In [None]:
stat_str = sp.show_vfd_stats(G_VFD)
print(stat_str)



In [None]:
# Create a manual legend using annotations.

def add_legend_to_graph(fig,save_img=False):
    LEGEND_ITEMS = {
        "Tasks":{"color":"red", "text":"Tasks"},
        "Files":{"color":"blue", "text":"Files"},
        "Edges":{"color":"lightblue", "text":"File bandwidth, darker the color, higher the bandwidth"},
    }

    legend_items = [
        go.layout.Annotation(
            x=0.9,  # X-coordinate for legend item
            y=0.85 - i * 0.03,  # Y-coordinate for legend item (adjust for position)
            xref='paper',
            yref='paper',
            text=f"{item_type} - {LEGEND_ITEMS[item_type]['color']}",
            showarrow=False,
            font=dict(size=14, color=LEGEND_ITEMS[item_type]['color']),
            # bordercolor='grey',  # Set the border color for the legend box
            # borderwidth=1,  # Set the border width for the legend box
            bgcolor='rgba(255, 255, 255, 0.7)',  # Add a transparent background color
        )
        for i, item_type in enumerate(LEGEND_ITEMS.keys())
    ]
    fig.update_layout(annotations=legend_items)
    fig.show()
    if save_img:
        fig.write_html(f"{stat_path}/vfd-{(STAGE_END+1)}s-{test_name}-sankey-annotated.html")
