# Workflow Analysis - DDMD

This notebook provides a simplified interface to the workflow analysis using modular Python scripts.

## Overview
The analysis process includes:
1. Loading workflow data from datalife statistics
2. Estimating transfer rates using 4D interpolation from IOR benchmark data
3. Calculating Storage Performance Metrics (SPM) for different storage configurations
4. Generating visualizations and recommendations

In [None]:
# Import required libraries and modules
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
import os

# Import our workflow analysis modules
from modules.workflow_config import DEFAULT_WF, TEST_CONFIGS, STORAGE_LIST
from modules.workflow_data_utils import (
    load_workflow_data, calculate_io_time_breakdown
)
from modules.workflow_interpolation import (
    estimate_transfer_rates_for_workflow, calculate_aggregate_filesize_per_node
)
from modules.workflow_spm_calculator import (
    calculate_spm_for_workflow, filter_storage_options,
    display_top_sorted_averaged_rank, select_best_storage_and_parallelism
)
from modules.workflow_visualization import plot_all_visualizations

from modules.workflow_data_staging import insert_data_staging_rows

print("Modules imported successfully!")

Modules imported successfully!


## Configuration

Set the workflow to analyze and other parameters.

In [2]:
# Configuration
WORKFLOW_NAME = "1kg"  # Change this to analyze different workflows
IOR_DATA_PATH = "../perf_profiles/updated_master_ior_df.csv"

print(f"Analyzing workflow: {WORKFLOW_NAME}")
print(f"Available workflows: {list(TEST_CONFIGS.keys())}")
print(f"IOR data path: {IOR_DATA_PATH}")

ALLOWED_PARALLELISM = TEST_CONFIGS[WORKFLOW_NAME]["ALLOWED_PARALLELISM"]

Analyzing workflow: 1kg
Available workflows: ['ddmd_2n_s', 'ddmd_4n_l', '1kg', '1kg_2', 'pyflex_240f', 'pyflex_s9_48f', 'ptychonn', 'montage', 'seismology', 'llm_wf', 'template_workflow']
IOR data path: ../perf_profiles/updated_master_ior_df.csv


## Step 1: Load Workflow Data

Load and process the workflow data from datalife statistics.

In [None]:
# Load workflow data
print("Loading workflow data...")
wf_df, task_order_dict, all_wf_dict = load_workflow_data(WORKFLOW_NAME, debug=False)

print(f"\nWorkflow data loaded:")
print(f"- Total records: {len(wf_df)}")
print(f"- Task definitions: {len(task_order_dict)}")
print(f"- Unique tasks: {list(wf_df['taskName'].unique())}")
print(f"- Stages: {sorted(wf_df['stageOrder'].unique())}")



Loading workflow data...
Trial folders: ['./1kgenome/par_6000_10n_nfs_ps300/300_p_10n_PFS_t1']
blk_files count: 1810
datalife_monitor count: 1070
target_tasks: ['13757-dc026', '13765-dc026', '13895-dc026', '13912-dc026', '13926-dc026', '13927-dc026', '13937-dc026', '13973-dc026', '13976-dc026', '14253-dc026', '14383-dc026', '14542-dc026', '14648-dc026', '14738-dc026', '14972-dc026', '14977-dc026', '14980-dc026', '14988-dc026', '14998-dc026', '15007-dc026', '15023-dc026', '15025-dc026', '15028-dc026', '15029-dc026', '15031-dc026', '15033-dc026', '15035-dc026', '15037-dc026', '15039-dc026', '15041-dc026', '15816-dc177', '15818-dc177', '15820-dc177', '15873-dc177', '15876-dc177', '15918-dc026', '15919-dc026', '15921-dc177', '15948-dc177', '15950-dc026', '15980-dc177', '15985-dc177', '15987-dc177', '15989-dc177', '15991-dc177', '15993-dc177', '15995-dc177', '15997-dc177', '15999-dc177', '16001-dc177', '16003-dc177', '16005-dc177', '16008-dc177', '16009-dc177', '16011-dc177', '16013-dc177',

  wf_df = pd.concat([wf_df, new_row], ignore_index=True)


Detected random write pattern in file: ./1kgenome/par_6000_10n_nfs_ps300/300_p_10n_PFS_t1/ALL.chr5.250000.vcf.65447-dc148.local.r_blk_trace.json
Detected random write pattern in file: ./1kgenome/par_6000_10n_nfs_ps300/300_p_10n_PFS_t1/columns.txt.65447-dc148.local.r_blk_trace.json
Detected random write pattern in file: ./1kgenome/par_6000_10n_nfs_ps300/300_p_10n_PFS_t1/columns.txt.65532-dc148.local.r_blk_trace.json
Detected random write pattern in file: ./1kgenome/par_6000_10n_nfs_ps300/300_p_10n_PFS_t1/ALL.chr5.250000.vcf.65532-dc148.local.r_blk_trace.json
Detected random write pattern in file: ./1kgenome/par_6000_10n_nfs_ps300/300_p_10n_PFS_t1/chr10n.tar.gz.196118-dc273.local.r_blk_trace.json
Detected random write pattern in file: ./1kgenome/par_6000_10n_nfs_ps300/300_p_10n_PFS_t1/sifted.SIFT.chr10.txt.196118-dc273.local.r_blk_trace.json
Detected random write pattern in file: ./1kgenome/par_6000_10n_nfs_ps300/300_p_10n_PFS_t1/ALL.chr3.250000.vcf.25149-dc135.local.r_blk_trace.json
Det

In [4]:
# Display first few rows
print("\nFirst few rows of workflow data:")
print(wf_df.head())
print(wf_df.columns)


First few rows of workflow data:
   operation  randomOffset  transferSize  aggregateFilesizeMB  numTasks  \
0          0             0  81791.000000             0.078002       300   
1          0             0  81791.000000             0.078002       300   
2          0             0  81791.000000             0.078002       300   
3          1             1   8191.921474          2421.758036       300   
4          1             1   8191.921474          2421.758036       300   

   parallelism  totalTime numNodesList  numNodes  tasksPerNode        trMiB  \
0          300   0.000049   [2, 5, 10]         2           150  1593.503085   
1          300   0.000049   [2, 5, 10]         5            60  1593.503085   
2          300   0.000049   [2, 5, 10]        10            30  1593.503085   
3          300  19.632952   [2, 5, 10]         2           150   123.351699   
4          300  19.632952   [2, 5, 10]         5            60   123.351699   

   storageType  opCount     taskName    

## Step 2: Calculate I/O Time Breakdown

Calculate the I/O time breakdown for each task in the workflow.

In [5]:
# Get configuration for the workflow
config = TEST_CONFIGS[WORKFLOW_NAME]
num_nodes_list = config["NUM_NODES_LIST"]

# Create task name to parallelism mapping
task_name_to_parallelism = {task: info['parallelism'] for task, info in task_order_dict.items()}

# Calculate I/O time breakdown
print("Calculating I/O time breakdown...")
io_breakdown = calculate_io_time_breakdown(wf_df, task_name_to_parallelism, num_nodes_list)

print(f"\nI/O breakdown results:")
for key, value in io_breakdown.items():
    if isinstance(value, dict):
        print(f"{key}:")
        for sub_key, sub_value in value.items():
            print(f"  {sub_key}: {sub_value:.2f} seconds")
    else:
        print(f"{key}: {value:.2f} seconds")

Calculating I/O time breakdown...
Total I/O time per taskName:
 frequency (write): 0.1182337785 (sec)
 individuals (write): 4.4967663333333335e-05 (sec)
 individuals_merge (write): 0.0005513027000000001 (sec)
 mutation_overlap (write): 0.0083436507 (sec)
 sifting (write): 0.0002396906 (sec)
 frequency (read): 0.2121502092 (sec)
 individuals (read): 39.60484083507334 (sec)
 individuals_merge (read): 0.40972155000000005 (sec)
 mutation_overlap (read): 0.0233166633 (sec)
 sifting (read): 19.5169614252 (sec)
Total I/O time per workflow: 59.89440407293667

I/O breakdown results:
total_io_time: 59.89 seconds
total_write_time: 0.13 seconds
total_read_time: 59.77 seconds
task_io_time_adjust:
  read: 59.77 seconds
  write: 0.13 seconds


## Step 2.1: Calculate Aggregate File Size per Node

Calculate the aggregate file size per node for proper scaling.

In [6]:
# Calculate aggregate file size per node
print("Calculating aggregate file size per node...")
wf_df = calculate_aggregate_filesize_per_node(wf_df)

print("\nAggregate file size calculation completed.")
print(f"Updated columns: {[col for col in wf_df.columns if 'aggregateFilesizeMB' in col]}")

Calculating aggregate file size per node...

Aggregate file size calculation completed.
Updated columns: ['aggregateFilesizeMBtask', 'aggregateFilesizeMB']


In [7]:
# # Display rows with taskName include string "stage-"
# staged = insert_data_staging_rows(wf_df)
# print(staged[staged['taskName'].str.startswith('stage_')][['taskName', 'stageOrder', 'operation']])

## Step 2.2: Insert data movement steps to workflow

In [8]:
# Step 2.2: Insert data staging (I/O) rows into the workflow DataFrame

# Set debug=True to see detailed output, or False for silent operation
wf_df = insert_data_staging_rows(wf_df, debug=False)

print("Data staging rows inserted. New DataFrame shape:", wf_df.shape)
display(wf_df.head(10))

Data staging rows inserted. New DataFrame shape: (6450, 19)


Unnamed: 0,operation,randomOffset,transferSize,aggregateFilesizeMBtask,numTasks,parallelism,totalTime,numNodesList,numNodes,tasksPerNode,trMiB,storageType,opCount,taskName,taskPID,fileName,stageOrder,prevTask,aggregateFilesizeMB
0,cp,0,4096.0,,60,60,,"[2, 5, 10]",2,30,,beegfs-ssd,60,stage_in-individuals,,"chr2n-2201-2401.tar.gz,ALL.chr2.250000.vcf,col...",0.5,,96875.299334
1,cp,0,4096.0,,60,60,,"[2, 5, 10]",5,12,,beegfs-ssd,60,stage_in-individuals,,"chr2n-2201-2401.tar.gz,ALL.chr2.250000.vcf,col...",0.5,,96875.299334
2,cp,0,4096.0,,60,60,,"[2, 5, 10]",10,6,,beegfs-ssd,60,stage_in-individuals,,"chr2n-2201-2401.tar.gz,ALL.chr2.250000.vcf,col...",0.5,,96875.299334
3,cp,0,4096.0,,60,60,,"[2, 5, 10]",2,30,,beegfs-ssd,60,stage_in-individuals,,"chr7n-2401-2601.tar.gz,columns.txt,ALL.chr7.25...",0.5,,96873.45429
4,cp,0,4096.0,,60,60,,"[2, 5, 10]",5,12,,beegfs-ssd,60,stage_in-individuals,,"chr7n-2401-2601.tar.gz,columns.txt,ALL.chr7.25...",0.5,,96873.45429
5,cp,0,4096.0,,60,60,,"[2, 5, 10]",10,6,,beegfs-ssd,60,stage_in-individuals,,"chr7n-2401-2601.tar.gz,columns.txt,ALL.chr7.25...",0.5,,96873.45429
6,cp,0,4096.0,,60,60,,"[2, 5, 10]",2,30,,beegfs-ssd,60,stage_in-individuals,,"ALL.chr8.250000.vcf,chr8n-3201-3401.tar.gz,col...",0.5,,96875.235543
7,cp,0,4096.0,,60,60,,"[2, 5, 10]",5,12,,beegfs-ssd,60,stage_in-individuals,,"ALL.chr8.250000.vcf,chr8n-3201-3401.tar.gz,col...",0.5,,96875.235543
8,cp,0,4096.0,,60,60,,"[2, 5, 10]",10,6,,beegfs-ssd,60,stage_in-individuals,,"ALL.chr8.250000.vcf,chr8n-3201-3401.tar.gz,col...",0.5,,96875.235543
9,cp,0,4096.0,,60,60,,"[2, 5, 10]",2,30,,beegfs-ssd,60,stage_in-individuals,,"columns.txt,chr7n-1801-2001.tar.gz,ALL.chr7.25...",0.5,,96874.61659


In [9]:
# Check for tasks being added to stage_out
for checktask in ['inference', 'aggregate']:
    new_rows = wf_df[wf_df['taskName'] == f'stage_out-{checktask}']
    if not new_rows.empty:
        print(f"\nSample of stage_out-{checktask} rows:")
        print(new_rows[['taskName', 'fileName', 'stageOrder', 'operation']].head(3))
        print(f"Total stage_out-{checktask} rows: {len(new_rows)}")
    else:
        print(f"\nNo stage_out-{checktask} tasks found in the DataFrame")


No stage_out-inference tasks found in the DataFrame

No stage_out-aggregate tasks found in the DataFrame


In [10]:
# Display first few rows
print("\nFirst few rows of workflow data:")
print(wf_df.head())
print(wf_df.columns)
print(wf_df[wf_df['operation'] == 0]['taskName'].unique())


First few rows of workflow data:
  operation  randomOffset  transferSize  aggregateFilesizeMBtask  numTasks  \
0        cp             0        4096.0                      NaN        60   
1        cp             0        4096.0                      NaN        60   
2        cp             0        4096.0                      NaN        60   
3        cp             0        4096.0                      NaN        60   
4        cp             0        4096.0                      NaN        60   

   parallelism totalTime numNodesList  numNodes  tasksPerNode trMiB  \
0           60             [2, 5, 10]         2            30         
1           60             [2, 5, 10]         5            12         
2           60             [2, 5, 10]        10             6         
3           60             [2, 5, 10]         2            30         
4           60             [2, 5, 10]         5            12         

  storageType  opCount              taskName taskPID  \
0  beegfs-ssd 

## Step 3: Load IOR Benchmark Data and Estimate Transfer Rates

Load the IOR benchmark data and estimate transfer rates for different storage configurations.

In [11]:
# Load IOR benchmark data
print("Loading IOR benchmark data...")
if os.path.exists(IOR_DATA_PATH):
    df_ior = pd.read_csv(IOR_DATA_PATH)
    print(f"Loaded {len(df_ior)} IOR benchmark records")

    # Convert operation string to integer
    df_ior['operation'] = df_ior['operation'].map({'write': 0, 'read': 1, 'cp': 2, 'scp': 3})
    # print(df_ior.head(10))

    # Estimate transfer rates
    print("\nEstimating transfer rates...")
    cp_scp_parallelism = set(wf_df.loc[wf_df['operation'].isin(['cp', 'scp']), 'parallelism'].unique())
    ALLOWED_PARALLELISM = sorted(set(ALLOWED_PARALLELISM).union(cp_scp_parallelism))

    # Then call the function:
    wf_df = estimate_transfer_rates_for_workflow(
        wf_df, df_ior, STORAGE_LIST, ALLOWED_PARALLELISM, multi_nodes=True, debug=False
    )
    # wf_df = estimate_transfer_rates_for_workflow(wf_df, df_ior, STORAGE_LIST, ALLOWED_PARALLELISM)
    print("Transfer rate estimation completed")
    
    # Show estimated transfer rate columns
    estimated_cols = [col for col in wf_df.columns if col.startswith('estimated_trMiB_')]
    print(f"\nEstimated transfer rate columns: {len(estimated_cols)}")
    print(f"Sample columns: {estimated_cols}")
else:
    print(f"Warning: IOR data file not found at {IOR_DATA_PATH}")
    print("Skipping transfer rate estimation...")
    df_ior = pd.DataFrame()

# Save the estimated transfer rates to a new CSV file
os.makedirs("./analysis_data", exist_ok=True)
wf_df.to_csv(f"./analysis_data/{WORKFLOW_NAME}_estimated_transfer_rates.csv", index=True)
print(f"Saved estimated transfer rates to: ./analysis_data/{WORKFLOW_NAME}_estimated_transfer_rates.csv")

Loading IOR benchmark data...
Loaded 21916 IOR benchmark records

Estimating transfer rates...


  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_storage] = None
  wf_pfs_df[col_name_ts_slope] = None
  wf_pfs_df[col_name_tr_st

Transfer rate estimation completed

Estimated transfer rate columns: 66
Sample columns: ['estimated_trMiB_beegfs-ssd_30p', 'estimated_trMiB_beegfs-ssd_12p', 'estimated_trMiB_beegfs-ssd_6p', 'estimated_trMiB_beegfs-tmpfs_30p', 'estimated_trMiB_beegfs-tmpfs_12p', 'estimated_trMiB_beegfs-tmpfs_6p', 'estimated_trMiB_ssd-ssd_30p', 'estimated_trMiB_ssd-ssd_12p', 'estimated_trMiB_ssd-ssd_6p', 'estimated_trMiB_tmpfs-tmpfs_30p', 'estimated_trMiB_tmpfs-tmpfs_12p', 'estimated_trMiB_tmpfs-tmpfs_6p', 'estimated_trMiB_localssd_150p', 'estimated_trMiB_beegfs_150p', 'estimated_trMiB_tmpfs_150p', 'estimated_trMiB_localssd_60p', 'estimated_trMiB_beegfs_60p', 'estimated_trMiB_tmpfs_60p', 'estimated_trMiB_localssd_30p', 'estimated_trMiB_beegfs_30p', 'estimated_trMiB_tmpfs_30p', 'estimated_trMiB_beegfs-ssd_5p', 'estimated_trMiB_beegfs-ssd_2p', 'estimated_trMiB_beegfs-ssd_1p', 'estimated_trMiB_beegfs-tmpfs_5p', 'estimated_trMiB_beegfs-tmpfs_2p', 'estimated_trMiB_beegfs-tmpfs_1p', 'estimated_trMiB_ssd-ssd_5p

In [12]:
# Show all rows
pd.set_option('display.max_rows', None)
# Show all columns
pd.set_option('display.max_columns', None)
# Show full width
pd.set_option('display.width', None)
# Don't truncate column content
pd.set_option('display.max_colwidth', None)

# print(wf_df.head(10))

# Check for tasks being added to stage_out
for checktask in ['inference', 'aggregate']:
    new_rows = wf_df[wf_df['taskName'] == f'stage_out-{checktask}']
    if not new_rows.empty:
        print(f"\nSample of stage_out-{checktask} rows:")
        print(new_rows.head(3))
        print(f"Total stage_out-{checktask} rows: {len(new_rows)}")
    else:
        print(f"\nNo stage_out-{checktask} tasks found in the DataFrame")


No stage_out-inference tasks found in the DataFrame

No stage_out-aggregate tasks found in the DataFrame


## Step 5: Calculate SPM Values

Calculate Storage Performance Metrics (SPM) for the workflow.

In [13]:
# Calculate SPM values
print("Calculating SPM values...")
# Set debug=True for verbose output, debug=False for minimal output
spm_results = calculate_spm_for_workflow(wf_df, debug=False)

print(f"\nSPM calculation completed:")
print(f"- Producer-consumer pairs: {len(spm_results)}")
for pair in spm_results.keys():
    print(f"  - {pair}")

Calculating SPM values...

SPM calculation completed:
- Producer-consumer pairs: 15
  - stage_in-individuals:individuals
  - individuals:stage_out-individuals
  - individuals:individuals_merge
  - stage_in-individuals_merge:individuals_merge
  - stage_in-sifting:sifting
  - individuals_merge:stage_out-individuals_merge
  - individuals_merge:frequency
  - individuals_merge:mutation_overlap
  - sifting:stage_out-sifting
  - sifting:frequency
  - sifting:mutation_overlap
  - stage_in-frequency:frequency
  - stage_in-mutation_overlap:mutation_overlap
  - frequency:stage_out-frequency
  - mutation_overlap:stage_out-mutation_overlap


In [14]:
# # printe all spm results with producer : consumer pairs

# # Print weighted SPM values for debugging
# for pair, data in spm_results.items():
#     print(f"\nProducer-Consumer Pair: {pair}")
#     print("SPM:")
#     for storage_n, spm_values in data['SPM'].items():
#         print(f"  {storage_n}: {spm_values[0:10]} ...")
#     print("estT_prod:")
#     for storage_n, estT_prod_values in data['estT_prod'].items():
#         print(f"  {storage_n}: {estT_prod_values[0:10]} ...")
#     print("estT_cons:")
#     for storage_n, estT_cons_values in data['estT_cons'].items():
#         print(f"  {storage_n}: {estT_cons_values[0:10]} ...")
#     print("dsize_prod:")
#     for storage_n, dsize_prod_values in data['dsize_prod'].items():
#         print(f"  {storage_n}: {dsize_prod_values[0:10]} ...")
#     print("dsize_cons:")
#     for storage_n, dsize_cons_values in data['dsize_cons'].items():
#         print(f"  {storage_n}: {dsize_cons_values[0:10]} ...")

## Step 6: Filter Storage Options and Select Best Configuration

Filter storage options and select the best storage configuration for each producer-consumer pair.

In [15]:
# Filter storage options
print("Filtering storage options...")
filtered_spm_results = filter_storage_options(spm_results, WORKFLOW_NAME)

# Select best storage and parallelism
print("\nSelecting best storage and parallelism...")
best_results = select_best_storage_and_parallelism(spm_results, baseline=0)

print("\nBest storage configurations:")
for pair, config in best_results.items():
    print(f"{pair}:")
    print(f"  Best storage: {config['best_storage_type']}")
    print(f"  Best parallelism: {config['best_parallelism']}")
    print(f"  Best rank: {config['best_rank']:.2f}")

Filtering storage options...

Selecting best storage and parallelism...
Selecting best storage configurations for 15 pairs...
Selected best storage configurations for 15 pairs.

Best storage configurations:
stage_in-individuals:individuals:
  Best storage: tmpfs
  Best parallelism: tmpfs_30_60p
  Best rank: 541752.79
individuals:stage_out-individuals:
  Best storage: beegfs-ssd
  Best parallelism: beegfs-ssd_150_30p
  Best rank: 19012.23
individuals:individuals_merge:
  Best storage: tmpfs
  Best parallelism: tmpfs_60_1p
  Best rank: 26.50
stage_in-individuals_merge:individuals_merge:
  Best storage: tmpfs
  Best parallelism: tmpfs_30_1p
  Best rank: 24.50
stage_in-sifting:sifting:
  Best storage: tmpfs
  Best parallelism: tmpfs_20_1p
  Best rank: 17114.59
individuals_merge:stage_out-individuals_merge:
  Best storage: beegfs-ssd
  Best parallelism: beegfs-ssd_5_5p
  Best rank: 0.05
individuals_merge:frequency:
  Best storage: localssd
  Best parallelism: localssd_1_1p
  Best rank: 51.5

## Step 7: Display Top Results

Display the top storage configurations based on rank values.

In [16]:
# Display top results
print("Displaying top results...")
display_top_sorted_averaged_rank(spm_results, top_n=20)

Displaying top results...
Top 20 Averaged SPM Values Closest to Baseline = 0:

Producer: stage_in-individuals, Consumer: individuals
- Rank 1: tmpfs_30_60p with Averaged rank = 541752.7889960634
- Rank 2: tmpfs_12_60p with Averaged rank = 541752.7889960634
- Rank 3: tmpfs_6_60p with Averaged rank = 541752.7889960634
- Rank 4: tmpfs_30_30p with Averaged rank = 656099.4329467526
- Rank 5: tmpfs_12_30p with Averaged rank = 656099.4329467526
- Rank 6: tmpfs_6_30p with Averaged rank = 656099.4329467526
- Rank 7: tmpfs_30_150p with Averaged rank = 826115.7846037552
- Rank 8: tmpfs_12_150p with Averaged rank = 826115.7846037552
- Rank 9: tmpfs_6_150p with Averaged rank = 826115.7846037552

Producer: individuals, Consumer: stage_out-individuals
- Rank 1: beegfs-tmpfs_150_12p with Averaged rank = 15870.89332779289
- Rank 2: beegfs-tmpfs_60_12p with Averaged rank = 15870.89332779289
- Rank 3: beegfs-tmpfs_30_12p with Averaged rank = 15870.89332779289
- Rank 4: beegfs-tmpfs_150_6p with Averaged r

## Step 8: Generate Visualizations

Generate comprehensive visualizations of the analysis results.

In [17]:
# # Generate visualizations
# print("Generating visualizations...")
# plot_all_visualizations(wf_df, best_results, io_breakdown['task_io_time_adjust'])
# print("Visualizations completed!")

## Step 9: Save Results

Save the analysis results to files for future reference.

In [18]:
#FIXME Step 9: Debug and save filtered SPM results
import os
import pandas as pd
import numpy as np
from modules.workflow_results_exporter import extract_producer_consumer_results, print_storage_analysis

# Debug: Check what's in filtered_spm_results
print("=== Debugging filtered_spm_results ===")
print(f"Type: {type(filtered_spm_results)}")
print(f"Length: {len(filtered_spm_results) if filtered_spm_results else 'None/Empty'}")

if filtered_spm_results:
    print(f"Keys: {list(filtered_spm_results.keys())[:5]}...")  # First 5 keys
    
    # Examine first item structure
    first_key = list(filtered_spm_results.keys())[0]
    first_value = filtered_spm_results[first_key]
    print(f"\nFirst item - Key: '{first_key}'")
    print(f"  Type: {type(first_value)}")
    if isinstance(first_value, dict):
        print(f"  Keys: {list(first_value.keys())}")
        for k, v in first_value.items():
            print(f"    {k}: {type(v)} = {v}")

# Try to extract results
print("\n=== Attempting to Extract Results ===")
try:
    results_df = extract_producer_consumer_results(filtered_spm_results, wf_df)
    print(f"Extracted {len(results_df)} rows")
    
    if not results_df.empty:
        print("Sample data:")
        print(results_df.head())
        
        # Save to CSV
        output_dir = "workflow_spm_results"
        os.makedirs(output_dir, exist_ok=True)
        workflow_name = "ddmd_4n_l"  # or your workflow name
        csv_filename = f"{workflow_name}_filtered_spm_results.csv"
        csv_path = os.path.join(output_dir, csv_filename)
        
        results_df.to_csv(csv_path, index=False)
        print(f"✓ Saved to: {csv_path}")
        
        # Print storage analysis
        print_storage_analysis(results_df)
        
    else:
        print("Error: Extracted DataFrame is empty - trying alternative method...")
        
        # Alternative extraction method
        results_data = []
        for pair, data in filtered_spm_results.items():
            if isinstance(data, dict):
                # Try to find storage and SPM information
                storage_info = None
                spm_value = None
                
                # Look for storage-related keys
                for key in data.keys():
                    if 'storage' in key.lower() or 'type' in key.lower():
                        storage_info = data[key]
                    if 'spm' in key.lower() or 'rank' in key.lower():
                        spm_value = data[key]
                
                if storage_info:
                    producer, consumer = pair.split(':') if ':' in pair else ('unknown', 'unknown')
                    results_data.append({
                        'producer': producer,
                        'producerStage': -1,
                        'consumer': consumer,
                        'consumerStage': -1,
                        'prodParallelism': np.nan,
                        'consParallelism': np.nan,
                        'p-c-Storage': storage_info,
                        'p-c-SPM': spm_value if spm_value else np.nan
                    })
        
        if results_data:
            alt_df = pd.DataFrame(results_data)
            
            # Fill stage information
            task_stage_mapping = {}
            for _, row in wf_df.iterrows():
                task_name = row['taskName']
                stage_order = row['stageOrder']
                if task_name not in task_stage_mapping:
                    task_stage_mapping[task_name] = stage_order
            
            for i, row in alt_df.iterrows():
                if row['producer'] in task_stage_mapping:
                    alt_df.at[i, 'producerStage'] = task_stage_mapping[row['producer']]
                if row['consumer'] in task_stage_mapping:
                    alt_df.at[i, 'consumerStage'] = task_stage_mapping[row['consumer']]
            
            # Save alternative results
            output_dir = "workflow_spm_results"
            os.makedirs(output_dir, exist_ok=True)
            workflow_name = "ddmd_4n_l"
            csv_filename = f"{workflow_name}_filtered_spm_results_alt.csv"
            csv_path = os.path.join(output_dir, csv_filename)
            
            alt_df.to_csv(csv_path, index=False)
            print(f"✓ Saved alternative results to: {csv_path}")
            print(f"Alternative DataFrame shape: {alt_df.shape}")
            print(alt_df.head())
        else:
            print("Error: No data could be extracted with alternative method")
            
except Exception as e:
    print(f"Error: {e}")
    import traceback
    traceback.print_exc()

=== Debugging filtered_spm_results ===
Type: <class 'dict'>
Length: 15
Keys: ['stage_in-individuals:individuals', 'individuals:stage_out-individuals', 'individuals:individuals_merge', 'stage_in-individuals_merge:individuals_merge', 'stage_in-sifting:sifting']...

First item - Key: 'stage_in-individuals:individuals'
  Type: <class 'dict'>
  Keys: ['SPM', 'estT_prod', 'estT_cons', 'rank', 'par_prod', 'par_cons', 'dsize_prod', 'dsize_cons']
    SPM: <class 'dict'> = {'tmpfs_30_150p': [633.0774095084666], 'tmpfs_30_60p': [1581.2511084869695], 'tmpfs_30_30p': [958.850520396719], 'tmpfs_12_150p': [614.7773446933302], 'tmpfs_12_60p': [1535.5426416549562], 'tmpfs_12_30p': [931.1334886280622], 'tmpfs_6_150p': [613.8894221402886], 'tmpfs_6_60p': [1533.3248583314153], 'tmpfs_6_30p': [929.7886530845929]}
    estT_prod: <class 'dict'> = {'beegfs-tmpfs_30p': [46512.55708453747, 46511.60313628819, 46512.52410278834, 46512.20408349901, 46511.84024443488, 46511.76013065692, 46512.34915347363, 46512.097

## Summary

The workflow analysis has been completed successfully. The results include:

1. **Workflow Data**: Processed datalife statistics organized in a DataFrame
2. **I/O Breakdown**: Time analysis for each task in the workflow
3. **Transfer Rate Estimates**: Estimated transfer rates for different storage configurations
4. **SPM Values**: Storage Performance Metrics for producer-consumer pairs
5. **Best Configurations**: Recommended storage and parallelism settings
6. **Visualizations**: Comprehensive plots and charts
7. **Saved Results**: CSV and JSON files for future reference

### Key Findings

The analysis provides insights into:
- Which storage types perform best for each workflow stage
- Optimal parallelism levels for different storage configurations
- I/O bottlenecks and performance characteristics
- Recommendations for storage selection

### Next Steps

You can:
- Analyze different workflows by changing the `WORKFLOW_NAME` variable
- Modify the analysis parameters in the configuration
- Use the saved results for further analysis or comparison
- Run the analysis programmatically using the `workflow_analysis_main.py` script