In [165]:
from sklearn import preprocessing
from sklearn.metrics import mean_absolute_percentage_error, r2_score, mean_absolute_percentage_error


# For 2D analysis
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from scipy.optimize import curve_fit
from sklearn.preprocessing import MinMaxScaler
# from utils import period2freq, freq2period

# For PCA
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from scipy.optimize import curve_fit

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
import csv
import time
import glob
import os

MULTI_NODES = False # True: uses tasksPerNodes; False: uses parallelism
NORMALIZE = False
INITIAL_STAGE = False
DEBUG = True

# Datasize in KB
data_size_kb = {'4mb': 4096, '16mb': 16384, '64mb': 65536,
            '256mb': 262144, '512mb': 524288, '1gb': 1048576,
            '5gb': 5242880, '50gb': 52428800, '100gb': 104857600,
            '300gb': 314572800,}

# Key Parameters
WF_PARAMS = ['operation', 'randomOffset', 'transferSize', 
            'aggregateFilesizeMB', 'numTasks', 'parallelism', 'totalTime', 
            'numNodesList', 'numNodes', 'tasksPerNode', 'trMiB', 'storageType',
            'opCount','taskName','taskPID', 'fileName', 'stageOrder']

TARGET_PARAMS = [ "bestStorage" ]
op_dict = {0: "write", 1: "read"}

test_configs = {
    "1kg": {
        "SCRIPT_ORDER": "1kg_script_order",
        "NUM_NODES": 10,
        "exp_data_path": ".", 
        "test_folders": ['par_6000_10n_PFS_ps300'] 
    }
}

# Load experiment data
CURR_WF="1kg"

SCRIPT_ORDER = test_configs[CURR_WF]["SCRIPT_ORDER"]
NUM_NODES = test_configs[CURR_WF]["NUM_NODES"]
exp_data_path = test_configs[CURR_WF]["exp_data_path"]
test_folders = test_configs[CURR_WF]["test_folders"]


# Parameter Notes for Datalife:
Each entry in the table represent only one single edge in the workflow. An directed edge connects a **fileName** and a **taskName**, representing data access.

---
- **operation**: The type of I/O operation {0: "write", 1: "read"}, value 1 represents read (e.g. a directed edge edge from a **fileName** to a **taskName**), value 0 represents write (e.g. a directed edge edge from a **taskName** to a **fileName**)
- **randomOffset**: The type of data access pattern { 0: "sequential file access", 1: "random file access"}
- **transferSize**: Average I/O size of the particular I/O operation to a file, calculated from aggregateFilesizeMB/opCount
- **aggregateFilesizeMB**: Total I/O size of a particular I/O operation to a file for a task
- **numTasks**: Number of parallel tasks for this particular task
- **totalTime**: The total I/O time of of a particular I/O operation to a file for a task
- **numNodes**: Number of nodes used for this particular task
- **tasksPerNode**: numTasks/numNodes for a task
- **bwMiB**: transferRate of a particular I/O operation to a file for a task, calculated from aggregateFilesizeMB/totalTime
- **storageType**: The storage type used in this task. {0: "localssd", 1: "beegfs/pfs", 2: "lustre", 3: "unknown"}
- **opCount**: the number of I/O operation count of a particular I/O operation to a file for a task
- **taskName**: the task name that is running for a particular workflow
- **taskPID**: the task PID
- **fileName**: the name of file that a I/O operation is for

In [166]:
def transform_store_code(storage_type):
    if storage_type == "localssd":
        store_code = 0
    elif storage_type == "beegfs" or storage_type == "pfs":
        store_code = 1
    elif storage_type == "lustre":
        store_code = 2
    else:
        store_code = 3
    return store_code

def decode_store_code(store_code):
    if store_code == 0:
        storage_type = "localssd"
    elif store_code == 1:
        storage_type = "beegfs"
    elif store_code == 2:
        storage_type = "lustre"
    else:
        storage_type = "unknown"
    return storage_type

def bytes_to_mb(file_size):
    """
    Convert a file size from bytes to megabytes (MB).
    
    Parameters:
    - file_size (str, int, or float): The file size in bytes (as an int/float) 
      or a string representation with size and unit (e.g., "1024 KiB").
      
    Returns:
    - float: The file size in MB.
    """
    # If file_size is a string, parse the value and unit
    if isinstance(file_size, str):
        size_num, size_unit = file_size.split()
        size_num = float(size_num)
        
        # Convert size to MB based on the unit
        if size_unit == "Bytes" or size_unit == "B":
            return size_num / (1024 ** 2)  # Convert bytes to MB
        elif size_unit == "KiB":
            return size_num / 1024  # Convert KiB to MB
        elif size_unit == "MiB":
            return size_num  # Already in MB
        elif size_unit == "GiB":
            return size_num * 1024  # Convert GiB to MB
        elif size_unit == "TiB":
            return size_num * (1024 ** 2)  # Convert TiB to MB
        else:
            raise ValueError(f"Unknown size unit: {size_unit}")
    elif isinstance(file_size, (int, float)):
        # If file_size is an integer or float, assume it's in bytes
        return file_size / (1024 ** 2)  # Convert bytes to MB
    else:
        raise TypeError("file_size must be a string or a number")


In [167]:
def is_sequential(numbers):
    if not numbers:  # Check if the list is empty
        return False

    sorted_numbers = sorted(numbers)  # Sort the numbers
    return all(sorted_numbers[i] + 1 == sorted_numbers[i + 1] for i in range(len(sorted_numbers) - 1))


def get_stat_file_pids(all_files):
    # Extract target tasks from blk_files
    target_tasks = set()
    for blk_file in all_files:
        # Get the filename without the path
        filename = os.path.basename(blk_file)
        # repalce ".local" for now
        filename = filename.replace(".local", "")
        
        # Split filename by '.'
        parts = filename.split('.')
        # print(f"get_stat_file_pids() : parts = {parts}")
        if len(parts) >= 3:
            # Get the target task from the -3 extension
            task = parts[-3]
            target_tasks.add(task)
    target_tasks = sorted(target_tasks)
    return target_tasks

import os
import glob
import json

import os
import json

def add_stat_to_df(trial_folder, monitor_timer_stat_io, 
                   operation, fname, task_pid, store_code):
    # Process the file name
    fname = fname.replace(".local", ".")
    fileName = ".".join(fname.split(".")[:-4])  # Remove last 4 extensions
    fileName = os.path.basename(fileName)      # Keep only the basename
    
    if monitor_timer_stat_io[1] == 0:
        monitor_timer_stat_io[1] = 1 # at least 1 operation if has I/O size
        # print(f"Error: opCount is 0 for task_pid[{task_pid}] fileName[{fileName}]")
    
    # Initialize statistics
    tmp_write_stat = {
        'aggregateFilesizeMB': bytes_to_mb(monitor_timer_stat_io[2]),
        'transferSize': monitor_timer_stat_io[2] / monitor_timer_stat_io[1],
        'operation': int(operation),
        'totalTime': monitor_timer_stat_io[0],
        'trMiB': bytes_to_mb(monitor_timer_stat_io[2] / monitor_timer_stat_io[0]),
        'storageType': store_code,
        'opCount': monitor_timer_stat_io[1],
        'taskPID': task_pid,
        'fileName': fileName,
    }
    print(f"fileName = {fileName}")
    
    if tmp_write_stat['totalTime'] > 100:
        print(f"Recorded large totalTime[{monitor_timer_stat_io}] from task_pid[{task_pid}] fileName[{fileName}]")
    
    if "6818-dc111" in task_pid:
        print(f"monitor_timer_stat_io: {monitor_timer_stat_io}")
    
    # Determine operation type
    op = "w" if operation == 0 else "r"
    
    # Ensure the trial folder exists
    if not os.path.exists(trial_folder):
        print(f"Trial folder does not exist: {trial_folder}")
        return tmp_write_stat
    
    # List all files in the trial folder
    all_files = os.listdir(trial_folder)
    # print(f"Total files in {trial_folder}: {len(all_files)}")
    
    # Find matching files using substring matching
    matching_files = [
        os.path.join(trial_folder, file)
        for file in all_files
        if f"{fileName}.{task_pid}.local.{op}" in file
    ]
    
    # if len(matching_files) == 0:
    #     print(f"No matching files found for fileName[{fileName}] task_pid[{task_pid}] op[{op}]")
    # else:
    #     print(f"Found {len(matching_files)} matching files: {matching_files}")
    
    # Process matching files to determine write pattern
    write_pattern = 0  # 0: seq, 1: rand
    for matching_file in matching_files:
        try:
            with open(matching_file) as f:
                w_blk_trace_data = json.load(f)
                blk_list = w_blk_trace_data.get('io_blk_range', [])
                
                # Validate blk_list length
                if len(blk_list) >= 4:
                    if blk_list[3] == -2:
                        write_pattern = 1
                        print(f"Detected random write pattern in file: {matching_file}")
                        break
                else:
                    print(f"Warning: Invalid 'io_blk_range' in file: {matching_file}")
        except Exception as e:
            print(f"Error processing file {matching_file}: {e}")
    
    # Update statistics
    tmp_write_stat['randomOffset'] = write_pattern
    
    return tmp_write_stat



In [168]:
target_tasks = ["python"] # omit srun from 1kgenome run

all_wf_df = pd.DataFrame(columns=WF_PARAMS)

# Find task PID's input and output to match script name
def get_wf_result_df(tests, WF_PARAMS, target_tasks, storageType="localssd"):
    wf_df = pd.DataFrame(columns=WF_PARAMS)

    # Identify trial folders
    wf_trial_folders = [
        folder for folder in glob.glob(f"{tests}/*")
        if folder.endswith(("t1", "t2", "t3"))
    ]
    print(f"Trial folders: {wf_trial_folders}")

    store_code = transform_store_code(storageType)

    for trial_folder in wf_trial_folders:
        blk_files = glob.glob(f"{trial_folder}/*_blk_trace.json")
        datalife_monitor = glob.glob(f"{trial_folder}/*.datalife.json")
        target_tasks = get_stat_file_pids(blk_files)
        print(f"blk_files count: {len(blk_files)}")
        print(f"datalife_monitor count: {len(datalife_monitor)}")
        print(f"target_tasks: {target_tasks}")

        for datalife_json in datalife_monitor:
            task_pid = os.path.basename(datalife_json).split(".")[1]
            if task_pid not in target_tasks:
                # print(f"Task PID[{task_pid}] not in target_tasks")
                continue

            try:
                with open(datalife_json) as f:
                    datalife_data = json.load(f)
            except json.JSONDecodeError:
                print(f"Error loading file: {datalife_json}")
                continue

            task_name = list(datalife_data.keys())[0]
            monitor_timer_stat = datalife_data[task_name]['monitor']
            system_timer_stat = datalife_data[task_name]['system']
            monitor_timer_targets = ["read", "write"]

            for fname in [f for f in blk_files if f".{task_pid}." in f]:
                op_type = "read" if ".r_blk_trace." in fname else "write"
                monitor_stat = monitor_timer_stat[op_type]
                
                # Extra check if monitor_stat has zero value, use value from the other op_type
                for i in range(len(monitor_stat)):
                    if monitor_stat[i] == 0:
                        if op_type == "read":
                            monitor_stat[i] = monitor_timer_stat["write"][i]
                        else:
                            monitor_stat[i] = monitor_timer_stat["read"][i]

                # if bytes_to_mb(monitor_stat[2]) == 0:
                #     print(f"No {op_type} stat for task_name[{task_name}] task_pid[{task_pid}]")
                #     continue
                
                # taskParallelism = task_name_to_parallelism[task_name]
                tmp_stat = add_stat_to_df(
                    trial_folder, monitor_stat, 
                    1 if op_type == "read" else 0,
                    fname, task_pid, store_code
                )
                wf_df = wf_df._append(tmp_stat, ignore_index=True)

    return wf_df

def get_test_folder_dfs(test_folder, WF_PARAMS, target_tasks,
                        storageType="localssd"):
    folder_dfs = pd.DataFrame(columns=WF_PARAMS)

    for tests in test_folder:
        stat_path = f"{exp_data_path}/{tests}"

        # Generate workflow data
        wf_df = get_wf_result_df(stat_path, WF_PARAMS, target_tasks,
                                 storageType=storageType)
        print(wf_df.head(5))
        print(f"df shape: {wf_df.shape}")

        # Append workflow data to the folder dataframe
        folder_dfs = folder_dfs._append(wf_df, ignore_index=True)
        
    return folder_dfs

wf_pfs_df = pd.DataFrame(columns=WF_PARAMS)



wf_pfs_df = wf_pfs_df._append(get_test_folder_dfs(test_folders, 
                                        WF_PARAMS, target_tasks,
                                        storageType="pfs"), ignore_index=True)



Trial folders: ['./par_6000_10n_PFS_ps300/300_p_10n_PFS_t1']
blk_files count: 1810
datalife_monitor count: 1070
target_tasks: ['13757-dc026', '13765-dc026', '13895-dc026', '13912-dc026', '13926-dc026', '13927-dc026', '13937-dc026', '13973-dc026', '13976-dc026', '14253-dc026', '14383-dc026', '14542-dc026', '14648-dc026', '14738-dc026', '14972-dc026', '14977-dc026', '14980-dc026', '14988-dc026', '14998-dc026', '15007-dc026', '15023-dc026', '15025-dc026', '15028-dc026', '15029-dc026', '15031-dc026', '15033-dc026', '15035-dc026', '15037-dc026', '15039-dc026', '15041-dc026', '15816-dc177', '15818-dc177', '15820-dc177', '15873-dc177', '15876-dc177', '15918-dc026', '15919-dc026', '15921-dc177', '15948-dc177', '15950-dc026', '15980-dc177', '15985-dc177', '15987-dc177', '15989-dc177', '15991-dc177', '15993-dc177', '15995-dc177', '15997-dc177', '15999-dc177', '16001-dc177', '16003-dc177', '16005-dc177', '16008-dc177', '16009-dc177', '16011-dc177', '16013-dc177', '16015-dc177', '16017-dc177', '16

  wf_df = wf_df._append(tmp_stat, ignore_index=True)


fileName = SIFT.chr10.vcf
Detected random write pattern in file: ./par_6000_10n_PFS_ps300/300_p_10n_PFS_t1/SIFT.chr10.vcf.195109-dc273.local.r_blk_trace.json
fileName = chr3n.tar.gz
Detected random write pattern in file: ./par_6000_10n_PFS_ps300/300_p_10n_PFS_t1/chr3n.tar.gz.26700-dc135.local.r_blk_trace.json
fileName = columns.txt
fileName = sifted.SIFT.chr3.txt
Detected random write pattern in file: ./par_6000_10n_PFS_ps300/300_p_10n_PFS_t1/sifted.SIFT.chr3.txt.26700-dc135.local.r_blk_trace.json
fileName = chr3-GBR.tar.gz
fileName = chr8n.tar.gz
Detected random write pattern in file: ./par_6000_10n_PFS_ps300/300_p_10n_PFS_t1/chr8n.tar.gz.30109-dc257.local.r_blk_trace.json
fileName = sifted.SIFT.chr8.txt
Detected random write pattern in file: ./par_6000_10n_PFS_ps300/300_p_10n_PFS_t1/sifted.SIFT.chr8.txt.30109-dc257.local.r_blk_trace.json
fileName = chr8-AFR-freq.tar.gz
fileName = columns.txt
fileName = chr6n-1401-1601.tar.gz
fileName = columns.txt
Detected random write pattern in fil

  folder_dfs = folder_dfs._append(wf_df, ignore_index=True)
  wf_pfs_df = wf_pfs_df._append(get_test_folder_dfs(test_folders,


In [169]:
wf_pfs_df.shape, wf_pfs_df.columns

((1810, 17),
 Index(['operation', 'randomOffset', 'transferSize', 'aggregateFilesizeMB',
        'numTasks', 'parallelism', 'totalTime', 'numNodesList', 'numNodes',
        'tasksPerNode', 'trMiB', 'storageType', 'opCount', 'taskName',
        'taskPID', 'fileName', 'stageOrder'],
       dtype='object'))

In [170]:
def match_script_name(tests):
    # Find folders ending with [t1, t2, t3] in the test_folders
    test_folders = glob.glob(f"{tests}/*")
    wf_trial_folders = [folder for folder in test_folders if folder.endswith("t1") or folder.endswith("t2") or folder.endswith("t3")]
    print(f"Trial folders: {wf_trial_folders}")

    pid_input_output_dict = {}

    for trial_folder in wf_trial_folders:
        blk_files = glob.glob(f"{trial_folder}/*_blk_trace.json")
        print(f"len(blk_files) = {len(blk_files)}")
        unique_pids = get_stat_file_pids(blk_files)

        for pid in unique_pids:
            if pid not in pid_input_output_dict:
                pid_input_output_dict[pid] = {
                    "input": [],
                    "output": [],
                    "prevTask": "",
                    "taskName": ""
                }

            # Find the blk_trace_jsons files with the current task_pid
            w_blk_trace_jsons = glob.glob(f"{trial_folder}/*.{pid}.local.w_blk_trace.json")
            r_blk_trace_jsons = glob.glob(f"{trial_folder}/*.{pid}.local.r_blk_trace.json")

            # Replace ".local" with an empty string
            w_blk_trace_jsons = [f.replace(".local", "") for f in w_blk_trace_jsons]
            r_blk_trace_jsons = [f.replace(".local", "") for f in r_blk_trace_jsons]

            # Process write (output) files
            for w_file_path in w_blk_trace_jsons:
                w_file_name_parts = w_file_path.split(".")
                w_file_name = '.'.join(w_file_name_parts[:-3])  # Remove the last 3 extensions
                w_file_basename = os.path.basename(w_file_name)
                pid_input_output_dict[pid]['output'].append(w_file_basename)

            # Process read (input) files
            for r_file_path in r_blk_trace_jsons:
                r_file_name_parts = r_file_path.split(".")
                r_file_name = '.'.join(r_file_name_parts[:-3])  # Remove the last 3 extensions
                r_file_basename = os.path.basename(r_file_name)
                if r_file_basename not in pid_input_output_dict[pid]['input']:
                    pid_input_output_dict[pid]['input'].append(r_file_basename)

    return pid_input_output_dict

def get_wf_pid_script_dict(test_folder):

    all_wf_dict= {}

    for tests in test_folder:
        # # check of test folder starts with seq or par
        # if tests.startswith("seq"):
        #     numTasksWrite = 1
        #     numTasksRead = 1
        # else:
        #     numTasksWrite = 1
        #     numTasksRead = 1

        # io_size_dfs
        wf_dict = match_script_name(f"{exp_data_path}/{tests}")

        # # corr_matrix(wf_df, storageType)
        all_wf_dict.update(wf_dict)
    return all_wf_dict


all_wf_dict = get_wf_pid_script_dict(test_folders)

print(all_wf_dict)



Trial folders: ['./par_6000_10n_PFS_ps300/300_p_10n_PFS_t1']
len(blk_files) = 1810
{'13757-dc026': {'input': ['columns.txt', 'ALL.chr1.250000.vcf'], 'output': ['chr1n-1801-2001.tar.gz'], 'prevTask': '', 'taskName': ''}, '13765-dc026': {'input': ['ALL.chr1.250000.vcf', 'columns.txt'], 'output': ['chr1n-5401-5601.tar.gz'], 'prevTask': '', 'taskName': ''}, '13895-dc026': {'input': ['ALL.chr1.250000.vcf', 'columns.txt'], 'output': ['chr1n-1201-1401.tar.gz'], 'prevTask': '', 'taskName': ''}, '13912-dc026': {'input': ['columns.txt', 'ALL.chr1.250000.vcf'], 'output': ['chr1n-2601-2801.tar.gz'], 'prevTask': '', 'taskName': ''}, '13926-dc026': {'input': ['ALL.chr1.250000.vcf', 'columns.txt'], 'output': ['chr1n-2001-2201.tar.gz'], 'prevTask': '', 'taskName': ''}, '13927-dc026': {'input': ['columns.txt', 'ALL.chr1.250000.vcf'], 'output': ['chr1n-1601-1801.tar.gz'], 'prevTask': '', 'taskName': ''}, '13937-dc026': {'input': ['columns.txt', 'ALL.chr1.250000.vcf'], 'output': ['chr1n-4001-4201.tar.gz'

In [171]:
# Add prevTask column
wf_pfs_df['prevTask'] = ""
wf_pfs_df['taskName'] = "unknown"
print(wf_pfs_df.head(5))
print(wf_pfs_df.shape)

  operation randomOffset   transferSize  aggregateFilesizeMB numTasks  \
0         0            0   81791.000000             0.078002      NaN   
1         1            1    8191.921474          2421.758036      NaN   
2         1            1    8191.921474          2421.758036      NaN   
3         1            1    8191.912230          2421.794365      NaN   
4         0            0  101056.000000             0.096375      NaN   

  parallelism  totalTime numNodesList numNodes tasksPerNode        trMiB  \
0         NaN   0.000049          NaN      NaN          NaN  1593.503085   
1         NaN  19.632952          NaN      NaN          NaN   123.351699   
2         NaN  19.632952          NaN      NaN          NaN   123.351699   
3         NaN  19.658824          NaN      NaN          NaN   123.191215   
4         NaN   0.000050          NaN      NaN          NaN  1917.557288   

  storageType opCount taskName      taskPID                fileName  \
0           1       1  unknown   

In [172]:
# save to initial df
wf_pfs_df.to_csv(f'./analyzed_data/first_df.csv', index=False)

In [173]:
import re
        
def matches_pattern(file_path, patterns):
    """Match a file path against task definition patterns."""
    file_name = os.path.basename(file_path)
    for pattern in patterns:
        try:
            regex_pattern = re.compile(pattern)
            if regex_pattern.fullmatch(file_name):
                return True
        except re.error as e:
            print(f"Invalid regex: {pattern}, Error: {e}")
    return False

def assign_task_names(tasks, task_order_dict):
    """Assign task names and predecessors to tasks based on patterns."""
    for task_pid, details in tasks.items():
        input_paths = details.get('input', [])
        output_paths = details.get('output', [])
        task_name = details.get('taskName', 'unknown')  # Use existing or default to 'unknown'

        # Iterate through each task definition
        for task, definition in task_order_dict.items():
            # Check if any output matches
            if any(matches_pattern(op, definition['outputs']) for op in output_paths):
                task_name = task
                tasks[task_pid]['taskName'] = task_name
                tasks[task_pid]['stage_order'] = definition['stage_order']
                break

            # If no output matches, check for input matches
            for prevTask, predecessor_def in definition['predecessors'].items():
                if any(matches_pattern(ip, predecessor_def.get('inputs', [])) for ip in input_paths):
                    task_name = task
                    tasks[task_pid]['taskName'] = task_name
                    tasks[task_pid]['stage_order'] = definition['stage_order']
                    tasks[task_pid]['prevTask'] = prevTask
                    # print(f"Input match found: Task [{task}] prevTask [{prevTask}] with input_patterns {predecessor_def.get('inputs', [])}")
                    break

        # If no valid match, warn about the task
        if task_name == 'unknown':
            print(f"Warning: Task PID {task_pid} could not be assigned a valid taskName.")

    return tasks
        
# Load task ordering json file
task_order_dict = {}
with open(f"{exp_data_path}/{SCRIPT_ORDER}.json") as f:
    task_order_dict = json.load(f)

print(f"task_order_dict : {task_order_dict}")
# Create a mapping from taskName to parallelism
task_name_to_parallelism = {task: info['parallelism'] for task, info in task_order_dict.items()}
task_name_to_num_tasks = {task: info['num_tasks'] for task, info in task_order_dict.items()}
print(task_name_to_parallelism)
print(task_name_to_num_tasks)

# Fill in task names
assign_task_names(all_wf_dict, task_order_dict)



# Unique list of taskNames
taskNames = set([v['taskName'] for v in all_wf_dict.values()])
print(f"Unique taskNames: {taskNames}")
print(f"all_wf_dict:")
for k,v in all_wf_dict.items():
    print(f"{k}:{v}")
print(f"all_wf_dict-----")

if DEBUG:
    print(wf_pfs_df['fileName'].unique())
    print(wf_pfs_df['taskName'].unique())

task_order_dict : {'individuals': {'stage_order': 1, 'parallelism': 300, 'num_tasks': 300, 'predecessors': {'initial_data': {'inputs': ['ALL\\.chr.*\\.250000\\.vcf', 'columns\\.txt']}}, 'outputs': ['chr.*n-.*-.*\\.tar\\.gz']}, 'individuals_merge': {'stage_order': 2, 'parallelism': 10, 'num_tasks': 10, 'predecessors': {'individuals': {'inputs': ['chr.*n-.*-.*\\.tar\\.gz']}}, 'outputs': ['chr.*n\\.tar\\.gz']}, 'sifting': {'stage_order': 2, 'parallelism': 10, 'num_tasks': 10, 'predecessors': {'initial_data': {'inputs': ['ALL\\.chr([1-9]|10)\\.phase3_shapeit2_mvncall_integrated_v5\\.20130502\\.sites\\.annotation\\.vcf']}}, 'outputs': ['sifted.*\\.txt']}, 'mutation_overlap': {'stage_order': 3, 'parallelism': 10, 'num_tasks': 10, 'predecessors': {'initial_data': {'inputs': ['SAS', 'EAS', 'GBR', 'AMR', 'AFR', 'EUR', 'ALL', 'columns\\.txt']}, 'sifting': {'inputs': ['sifted.*\\.txt']}, 'individuals_merge': {'inputs': ['chr.*n\\.tar\\.gz']}}, 'outputs': ['chr\\d+-[A-Z]{3}\\.tar\\.gz$', 'chr\\d+-

In [174]:
# Create a mapping from taskPID to taskName
task_pid_to_name = {pid: info['taskName'] for pid, info in all_wf_dict.items()}
# print(task_pid_to_name)
# Update the DataFrame with the taskName
wf_pfs_df['taskName'] = wf_pfs_df['taskPID'].map(task_pid_to_name).fillna('unknown')

# Create a mapping from taskPID to prevTask
task_pid_to_prod_task = {pid: info['prevTask'] for pid, info in all_wf_dict.items()}
# print(task_pid_to_prod_task)
# add prevTask column to the DataFrame
wf_pfs_df['prevTask'] = wf_pfs_df['taskPID'].map(task_pid_to_prod_task).fillna('unknown')

# Print the updated DataFrame
# print(wf_pfs_df.head(5))
print(wf_pfs_df.shape)
df_unknown = wf_pfs_df[wf_pfs_df['taskName'] == 'unknown']
print(f"df unknown ({df_unknown.shape}):\n{df_unknown.head(5)}")
# print(f"df found:\n{wf_pfs_df[wf_pfs_df['taskName'] != 'unknown']}")

for pid, info in all_wf_dict.items():
    if 'stage_order' not in info:
        print(f"Missing 'stage_order' for taskPID: {pid}, info: {info}")
        if CURR_WF == "seismology":
            info['stage_order'] = 1
            info['taskName'] = "sG1IterDecon"

# Create a mapping from taskPID to stage_order
task_pid_to_stage_order = {pid: info['stage_order'] for pid, info in all_wf_dict.items()}
# print(task_pid_to_stage_order)
# add prevTask column to the DataFrame
wf_pfs_df['stageOrder'] = wf_pfs_df['taskPID'].map(task_pid_to_stage_order).fillna('-1')


# remove rows with filename contaiing string "SIFT.chr*.vcf"
for chrom in range(0, 11):
    wf_pfs_df = wf_pfs_df[~wf_pfs_df['fileName'].str.contains(f"SIFT.chr{chrom}.vcf")]
    
# Adjust dataframe prevTask
for index, row in wf_pfs_df.iterrows():
    if row['operation'] == 0:
        if row['taskName'] == '':
            wf_pfs_df.at[index, 'taskName'] = 'none'
    else:
        # Adjust read task predecessors
        taskName = row['taskName']
        fileName = row['fileName']
        if CURR_WF == "seismology":
            if taskName == '':
                # # Update taskName for read tasks to "sG1IterDecon"
                # wf_pfs_df.at[index, 'taskName'] = 'sG1IterDecon'
                taskName = 'sG1IterDecon'
                wf_pfs_df.at[index, 'taskName'] = 'sG1IterDecon'
        task_definition = task_order_dict[taskName]

        for task, inputs in task_definition['predecessors'].items():
            input_patterns = inputs['inputs']
            if matches_pattern(fileName, input_patterns):
                wf_pfs_df.at[index, 'prevTask'] = task

# Set dataframe all write operation's prevTask to empty
wf_pfs_df.loc[wf_pfs_df["operation"] == 0, "prevTask"] = ''

(1810, 18)
df unknown ((0, 18)):
Empty DataFrame
Columns: [operation, randomOffset, transferSize, aggregateFilesizeMB, numTasks, parallelism, totalTime, numNodesList, numNodes, tasksPerNode, trMiB, storageType, opCount, taskName, taskPID, fileName, stageOrder, prevTask]
Index: []


In [175]:
# print(f"df found:\n{wf_pfs_df[wf_pfs_df['taskName'] == 'trackstats']}")


# # Below values can only be updated once task name and per task parallelism is known
import math

# Assuming 'wf_pfs_df' is the DataFrame with a 'taskName' column
for index, row in wf_pfs_df.iterrows():
    task_name = row['taskName']
    if task_name in task_name_to_parallelism:
        task_parallelism = task_name_to_parallelism[task_name]
        task_num_tasks = task_name_to_num_tasks[task_name]

        
        # row['numNodesList'] = NUM_NODES
        # row['numTasks'] = task_num_tasks
        # row['tasksPerNode'] = math.ceil(task_parallelism / NUM_NODES)

        # Update the DataFrame
        wf_pfs_df.at[index, 'numNodesList'] = [NUM_NODES] #row['numNodesList']
        wf_pfs_df.at[index, 'numNodes'] = NUM_NODES #row['numNodesList']
        wf_pfs_df.at[index, 'numTasks'] = task_num_tasks
        wf_pfs_df.at[index, 'tasksPerNode'] = math.ceil(task_parallelism / NUM_NODES)
        wf_pfs_df.at[index, 'parallelism'] = task_parallelism


if DEBUG:
    print(wf_pfs_df.shape)
    print(wf_pfs_df.head(10))

# # Remove row with nan data on any of the columns
# wf_pfs_df = wf_pfs_df.dropna()
print(wf_pfs_df.shape)

(1800, 18)
  operation randomOffset   transferSize  aggregateFilesizeMB numTasks  \
0         0            0   81791.000000             0.078002      300   
1         1            1    8191.921474          2421.758036      300   
2         1            1    8191.921474          2421.758036      300   
3         1            1    8191.912230          2421.794365      300   
4         0            0  101056.000000             0.096375      300   
5         1            1    8191.912230          2421.794365      300   
6         1            1    8191.922788          2421.789674      300   
7         1            1    8191.922788          2421.789674      300   
8         0            0   87880.000000             0.083809      300   
9         1            1    8191.912230          2421.794365      300   

  parallelism  totalTime numNodesList numNodes tasksPerNode        trMiB  \
0         300   0.000049         [10]       10           30  1593.503085   
1         300  19.632952         

In [176]:
def mark_final_data(df):
    # Filter out write operations with empty prevTask
    write_ops = df[(df["operation"] == 0) & (df["prevTask"].astype(str).str.strip() == '')]

    # Build a mapping of stageOrder -> set of fileNames read in that stage
    read_files_by_stage = (
        df[df["operation"] == 1]
        .groupby("stageOrder")["fileName"]
        .apply(set)
        .to_dict()
    )

    for idx, row in write_ops.iterrows():
        file_written = row["fileName"]
        stage_written = row["stageOrder"]
        stage_to_check = stage_written + 1

        if file_written not in read_files_by_stage.get(stage_to_check, set()):
            df.at[idx, "prevTask"] = "final_data"

mark_final_data(wf_pfs_df)
# print(wf_pfs_df.head(20))

In [177]:
# Expand the dataframe for multi-nodes configuration calculation
def expand_df(wf_pfs_df):

    # Create a new DataFrame to store updated rows
    updated_rows = []

    # Iterate through each row in the DataFrame
    for index, row in wf_pfs_df.iterrows():
        NUM_NODES = row['numNodesList']  # Extract the list of numNodes
        print(f"NUM_NODES: {NUM_NODES}")
        for num_nodes in NUM_NODES:
            # Create a copy of the current row
            new_row = row.copy()
            
            # Update the numNodes and tasksPerNode for the new row
            tasksPerNode = math.ceil(row['parallelism'] / num_nodes)
            new_row['tasksPerNode'] = tasksPerNode
            new_row['numNodes'] = num_nodes
            
            
            # Append the updated row to the list
            updated_rows.append(new_row)

    # Create a new DataFrame with the updated rows
    expanded_df = pd.DataFrame(updated_rows)

    # Reset the index of the expanded DataFrame
    expanded_df.reset_index(drop=True, inplace=True)

    # # Print the updated DataFrame for verification
    # print(expanded_df.shape)
    # print(expanded_df.head())
    
    return expanded_df


if MULTI_NODES:
    wf_pfs_df = expand_df(wf_pfs_df)
    # Print the updated DataFrame for verification
    if DEBUG:
        print(wf_pfs_df.shape)
        print(wf_pfs_df.head())

# for rows when parallelism is 1, update numNodes to 1
for index, row in wf_pfs_df.iterrows():
    if row['parallelism'] == 1:
        wf_pfs_df.at[index, 'numNodes'] = 1

In [178]:
# Print list of unique taskNames
wf_pfs_df['taskName'].unique()

array(['individuals', 'frequency', 'mutation_overlap', 'sifting',
       'individuals_merge'], dtype=object)

In [179]:
# Save the updated DataFrame to a CSV file
wf_pfs_df.to_csv(f'./analyzed_data/{test_folders[0]}.csv', index=False)

task_name_to_parallelism

{'individuals': 300,
 'individuals_merge': 10,
 'sifting': 10,
 'mutation_overlap': 10,
 'frequency': 10}

In [180]:
# Calculate I/O time per taskName
write_sub_df = wf_pfs_df[wf_pfs_df['operation'] == 0]
read_sub_df = wf_pfs_df[wf_pfs_df['operation'] == 1]

task_io_time_total = wf_pfs_df.groupby('taskName')['totalTime'].sum()
task_io_time_write = write_sub_df.groupby('taskName')['totalTime'].sum()
task_io_time_read = read_sub_df.groupby('taskName')['totalTime'].sum()

task_io_time_adjust = {"read": 0, "write": 0}
total_wf_io_time = 0
total_wf_io_time_write = 0
total_wf_io_time_read = 0
print("Total I/O time per taskName:")
# NUM_NODES is a fixed number now
for task, write_time in task_io_time_write.items():
    # Adjust I/O time by parallelism and number of nodes
    write_time_adjusted = write_time / (task_name_to_parallelism[task] * NUM_NODES)
    task_io_time_adjust["write"] += write_time_adjusted
    total_wf_io_time_write += write_time_adjusted
    total_wf_io_time += write_time_adjusted
    print(f" {task} (write): {write_time_adjusted} (sec)")

for task, read_time in task_io_time_read.items():
    # Adjust I/O time by parallelism and number of nodes
    read_time_adjusted = read_time / (task_name_to_parallelism[task] * NUM_NODES)
    task_io_time_adjust["read"] += read_time_adjusted
    total_wf_io_time_read += read_time_adjusted
    total_wf_io_time += read_time_adjusted
    print(f" {task} (read): {read_time_adjusted} (sec)")

print(f"Total I/O time per workflow: {total_wf_io_time}")



Total I/O time per taskName:
 frequency (write): 0.01182337785 (sec)
 individuals (write): 4.496766333333333e-06 (sec)
 individuals_merge (write): 5.513027e-05 (sec)
 mutation_overlap (write): 0.00083436507 (sec)
 sifting (write): 2.3969059999999998e-05 (sec)
 frequency (read): 0.02121502092 (sec)
 individuals (read): 3.960484083507333 (sec)
 individuals_merge (read): 0.040972154999999996 (sec)
 mutation_overlap (read): 0.00233166633 (sec)
 sifting (read): 0.98425757022 (sec)
Total I/O time per workflow: 5.022001834993667


In [181]:
wf_pfs_df.head(5)

Unnamed: 0,operation,randomOffset,transferSize,aggregateFilesizeMB,numTasks,parallelism,totalTime,numNodesList,numNodes,tasksPerNode,trMiB,storageType,opCount,taskName,taskPID,fileName,stageOrder,prevTask
0,0,0,81791.0,0.078002,300,300,4.9e-05,[10],10,30,1593.503085,1,1,individuals,6818-dc111,chr2n-2201-2401.tar.gz,1,
1,1,1,8191.921474,2421.758036,300,300,19.632952,[10],10,30,123.351699,1,309988,individuals,6818-dc111,ALL.chr2.250000.vcf,1,initial_data
2,1,1,8191.921474,2421.758036,300,300,19.632952,[10],10,30,123.351699,1,309988,individuals,6818-dc111,columns.txt,1,initial_data
3,1,1,8191.91223,2421.794365,300,300,19.658824,[10],10,30,123.191215,1,309993,individuals,25051-dc135,ALL.chr3.250000.vcf,1,initial_data
4,0,0,101056.0,0.096375,300,300,5e-05,[10],10,30,1917.557288,1,1,individuals,25051-dc135,chr3n-1201-1401.tar.gz,1,


# Build Workflow Graph
Get max stage order for iteration
## First layers Nodes:
- stageOrder == 0
    - Add row as node attributes
    - Need node "order" 

## Middle layers
- Find previous layer nodes  (stageOrder -1)
 - Find if has the sane fileName, and make sure operation is 0 in stageOrder-1 and 1 in stageOrder
 - link edge, calculate IOI and SPM

## Final Layer
- When stageOrder reached max, stops

In [182]:
import networkx as nx

# Initialize the workflow graph as a directed graph
WFG = nx.DiGraph()

# Get the unique and sorted stage orders
stage_order_list = sorted(int(x) for x in wf_pfs_df['stageOrder'].unique())
print(stage_order_list)

# Dictionary to store task nodes by stage and task name
stage_task_node_dict = {}


# Add nodes to the graph
for i, row in wf_pfs_df.iterrows():
    nodeName = f"{row['taskName']}:{row['taskPID']}:{row['fileName']}"  # Unique node identifier
    new_nodeData = row.to_dict()  # Node attributes as dictionary
    stageOrder = row['stageOrder']
    taskName = row['taskName']

    # if "openmm" in nodeName:
    #     print(f"Processing node: {nodeName}")

    # Check if the node already exists
    if WFG.has_node(nodeName):
        # Update node data by replacing NaN values with new valid values
        existing_nodeData = WFG.nodes[nodeName]
        for key, value in new_nodeData.items():
            if key in existing_nodeData:
                # Replace NaN with valid value if applicable
                if (
                    (existing_nodeData[key] is None or isinstance(existing_nodeData[key], float) and math.isnan(existing_nodeData[key])) 
                    and value is not None and not (isinstance(value, float) and math.isnan(value))
                ):
                    existing_nodeData[key] = value
            else:
                # Add the new key-value pair
                existing_nodeData[key] = value
        # Update the node data in the graph
        WFG.nodes[nodeName].update(existing_nodeData)
    else:
        # Add the node with attributes to the graph
        WFG.add_node(nodeName, **new_nodeData)

    # Populate the stage-task-node dictionary
    stage_task_node_dict.setdefault(stageOrder, {}).setdefault(taskName, []).append(nodeName)

# Show the number of nodes and edges in the graph
print(f"Number of nodes: {WFG.number_of_nodes()}")
print(f"First five nodes:")
for info in list(WFG.nodes(data=True))[:5]:
    print(info)

print(f"Number of edges: {WFG.number_of_edges()}")



[1, 2, 3]
Number of nodes: 1800
First five nodes:
('individuals:6818-dc111:chr2n-2201-2401.tar.gz', {'operation': 0, 'randomOffset': 0, 'transferSize': 81791.0, 'aggregateFilesizeMB': 0.0780019760131836, 'numTasks': 300, 'parallelism': 300, 'totalTime': 4.895e-05, 'numNodesList': [10], 'numNodes': 10, 'tasksPerNode': 30, 'trMiB': 1593.503085049716, 'storageType': 1, 'opCount': 1, 'taskName': 'individuals', 'taskPID': '6818-dc111', 'fileName': 'chr2n-2201-2401.tar.gz', 'stageOrder': 1, 'prevTask': ''})
('individuals:6818-dc111:ALL.chr2.250000.vcf', {'operation': 1, 'randomOffset': 1, 'transferSize': 8191.921474379654, 'aggregateFilesizeMB': 2421.75803565979, 'numTasks': 300, 'parallelism': 300, 'totalTime': 19.632952451, 'numNodesList': [10], 'numNodes': 10, 'tasksPerNode': 30, 'trMiB': 123.35169871694148, 'storageType': 1, 'opCount': 309988, 'taskName': 'individuals', 'taskPID': '6818-dc111', 'fileName': 'ALL.chr2.250000.vcf', 'stageOrder': 1, 'prevTask': 'initial_data'})
('individuals

In [183]:
def add_producer_consumer_edge(WFG, prod_nodes, cons_nodes):
    """
    Add edges between producer and consumer nodes, calculating edge attributes for each storage type.
    """
    for taskName, prevNodeNames in prod_nodes.items():
        for prod_node_name in prevNodeNames:
            prod_fileName = WFG.nodes[prod_node_name]['fileName'].strip()
            
            max_op_count = WFG.nodes[prod_node_name]['opCount']

            for cons_task_name, currNodeNames in cons_nodes.items():
                for cons_node_name in currNodeNames:
                    if WFG.nodes[cons_node_name].get('prevTask') == taskName:
                        cons_fileName = WFG.nodes[cons_node_name]['fileName'].strip()

                        # Check if file names match
                        if prod_fileName == cons_fileName:
                            edge_attributes = {}

                            # Calculate edge attributes for each storage type
                            for storage in ['ssd', 'beegfs']:
                                prod_keys = [
                                    key for key in WFG.nodes[prod_node_name].keys()
                                    if key.startswith(f'estimated_trMiB_{storage}_')
                                ]
                                cons_keys = [
                                    key for key in WFG.nodes[cons_node_name].keys()
                                    if key.startswith(f'estimated_trMiB_{storage}_')
                                ]

                                for prod_key in prod_keys:
                                    try:
                                        n_prod = int(prod_key.split('_')[-1][:-1])
                                    except ValueError:
                                        continue

                                    prod_estimated_trMiB = WFG.nodes[prod_node_name].get(prod_key)
                                    if prod_estimated_trMiB is None or math.isnan(prod_estimated_trMiB):
                                        continue

                                    for cons_key in cons_keys:
                                        try:
                                            n_cons = int(cons_key.split('_')[-1][:-1])
                                        except ValueError:
                                            continue

                                        cons_estimated_trMiB = WFG.nodes[cons_node_name].get(cons_key)
                                        if cons_estimated_trMiB is None or math.isnan(cons_estimated_trMiB):
                                            continue

                                        prod_aggregateFilesizeMB = WFG.nodes[prod_node_name]['aggregateFilesizeMB']
                                        cons_aggregateFilesizeMB = WFG.nodes[cons_node_name]['aggregateFilesizeMB']
                                        prod_opCount = WFG.nodes[prod_node_name]['opCount']
                                        cons_opCount = WFG.nodes[cons_node_name]['opCount']
                                        
                                        # Get the attribute that starts with "estimated_ts_slope_{storage}_"
                                        prod_slope_key = [
                                            key for key in WFG.nodes[prod_node_name].keys()
                                            if key.startswith(f'estimated_ts_slope_{storage}_{n_prod}p')
                                        ][0]
                                        cons_slope_key = [
                                            key for key in WFG.nodes[cons_node_name].keys()
                                            if key.startswith(f'estimated_ts_slope_{storage}_{n_cons}p')
                                        ][0]
                                        prod_ts_slope = WFG.nodes[prod_node_name][prod_slope_key]
                                        cons_ts_slope = WFG.nodes[prod_node_name][cons_slope_key]
                                        
                                        # prod_op_weight = prod_opCount / (prod_opCount + cons_opCount)
                                        # cons_op_weight = cons_opCount / (prod_opCount + cons_opCount)
                                        # estT_prod = prod_opCount * prod_aggregateFilesizeMB / (prod_estimated_trMiB )
                                        # estT_cons = cons_opCount * cons_aggregateFilesizeMB / (cons_estimated_trMiB )
                                        
                                        # TS increases -> OP decrease => Perform Increase, less time
                                        # TS decrease -> OP increases => Perform Decrease, more time

                                        if prod_ts_slope > 0 :
                                            # If transfer_size has increased I/O affects, means it's becoming larger, then OP_count becomes smaller
                                            # but if OP count is large, means the transfer size is skwered with one large I/O operation
                                            # Thus penalize using OP count
                                            # but if OP count is small, it won't affect much
                                            estT_prod = prod_opCount * prod_aggregateFilesizeMB / prod_estimated_trMiB
                                        else:
                                            # If transfer size has decreased I/O affects, means it's becoming smaller, then operation count becomes larger
                                            # but if OP count is smal, means the transfer size is skwered with small total file size
                                            # Thus reward using OP count
                                            # but if OP count is large, it should have been reflected in the estimated_trMiB
                                            estT_prod = (1/prod_opCount) * prod_aggregateFilesizeMB / prod_estimated_trMiB
                                            
                                        if cons_ts_slope > 0:
                                            estT_cons = cons_opCount * cons_aggregateFilesizeMB / cons_estimated_trMiB 
                                        else:
                                            estT_cons = (1/cons_opCount) * cons_aggregateFilesizeMB / cons_estimated_trMiB
                                        
                                        SPM = estT_prod / estT_cons if estT_cons > 0 else float('inf')

                                        edge_attributes.update({
                                            f'estT_prod_{storage}_{n_prod}p': estT_prod,
                                            f'estT_cons_{storage}_{n_cons}p': estT_cons,
                                            f'SPM_{storage}_{n_prod}_{n_cons}p': SPM,
                                            'prod_aggregateFilesizeMB': prod_aggregateFilesizeMB,
                                            'cons_aggregateFilesizeMB': cons_aggregateFilesizeMB,
                                            'prod_max_parallelism': WFG.nodes[prod_node_name]['parallelism'],
                                            'cons_max_parallelism': WFG.nodes[cons_node_name]['parallelism'],
                                        })

                            # Add or update the edge in the graph
                            if WFG.has_edge(prod_node_name, cons_node_name):
                                WFG.edges[prod_node_name, cons_node_name].update(edge_attributes)
                            else:
                                WFG.add_edge(prod_node_name, cons_node_name, **edge_attributes)



def handle_initial_stage(WFG, cons_nodes):
    """
    Handle the initial stage where there is no producer, only consumers.
    Calculate and store `estT_cons` for each consumer node as node attributes
    with placeholders for producer-related values.
    """
    for cons_task_name, currNodeNames in cons_nodes.items():
        for cons_node_name in currNodeNames:
            # Update node attributes
            node_attributes = {}

            # Calculate node attributes for each storage type
            for storage in ['ssd', 'beegfs']:
                cons_keys = [
                    key for key in WFG.nodes[cons_node_name].keys()
                    if key.startswith(f'estimated_trMiB_{storage}_')
                ]

                for cons_key in cons_keys:
                    try:
                        # Extract the parallelism level from the key (e.g., estimated_trMiB_ssd_4p)
                        n_cons = int(cons_key.split('_')[-1][:-1])
                    except ValueError:
                        continue

                    cons_estimated_trMiB = WFG.nodes[cons_node_name].get(cons_key)

                    # Skip invalid or missing values
                    if cons_estimated_trMiB is None or math.isnan(cons_estimated_trMiB):
                        continue

                    cons_aggregateFilesizeMB = WFG.nodes[cons_node_name]['aggregateFilesizeMB']
                    cons_opCount = WFG.nodes[cons_node_name]['opCount']

                    # Calculate estimated task I/O time as intensity for consumer
                    estT_cons = (cons_opCount * cons_aggregateFilesizeMB) / cons_estimated_trMiB

                    # Store node attributes
                    node_attributes.update({
                        f'estT_prod_{storage}_0p': 0,  # Placeholder
                        f'estT_cons_{storage}_{n_cons}p': estT_cons,
                        'prod_aggregateFilesizeMB': 0,  # Placeholder
                        'cons_aggregateFilesizeMB': cons_aggregateFilesizeMB,
                        'prod_max_parallelism': 0,  # Placeholder
                        'cons_max_parallelism': WFG.nodes[cons_node_name]['parallelism'],
                    })

            # Update the node attributes in the graph
            WFG.nodes[cons_node_name].update(node_attributes)

for currOrder in stage_order_list:
    cons_nodes = stage_task_node_dict.get(currOrder, {})
    if currOrder == 0:
        # Handle stage 0
        if INITIAL_STAGE:
            handle_initial_stage(WFG, cons_nodes)
    else:
        # Add edges between producer and consumer nodes
        prevOrder = currOrder - 1
        prod_nodes = stage_task_node_dict.get(prevOrder, {})
        add_producer_consumer_edge(WFG, prod_nodes, cons_nodes)

