In [71]:
import os
import pandas as pd
import numpy as np
from io import StringIO
import re
from datetime import datetime
from scipy.stats import pearsonr, spearmanr
import util

In [72]:
def parse_amg_time(job_dir):
    amg_file = os.path.join(job_dir, "output-AMG2023.log")
    amg_wall_time = None
    if os.path.isfile(amg_file):
        with open(amg_file, "r") as af:
            lines = af.readlines()
            for i, line in enumerate(lines):
                if "GMRES Solver:" in line:
                    if i + 1 < len(lines):
                        match = re.search(r"wall clock time\s*=\s*([\d.]+)\s*seconds", lines[i + 1])
                        if match:
                            amg_wall_time = float(match.group(1))
                            break
    return amg_wall_time

def parse_deepcam_time(job_dir, nsteps):
    """
    Parse the deepcam.out file in deepCAM:
    Sum up values from lines like "step XXXX: time YYYms" (milliseconds) and return total time in seconds.
    """
    filepath = os.path.join(job_dir, 'output-deepcam-with_performance_counters.log')
    if not os.path.isfile(filepath):
        return None

    total_time_ms = 0.0
    step_num = 1
    with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            match = re.search(f"step {step_num}:" + r"\s*time\s+([0-9\.]+)ms", line)
            if match:
                total_time_ms += float(match.group(1))
                step_num += 1
            
            # stop after reaching n steps
            if step_num == nsteps:
                break

    if step_num == nsteps:
        return total_time_ms / 1000.0
    return None

def parse_nanogpt_time(job_dir, niters):
    """
    Parse the nanoGPT.out file in nanoGPT:
    Sum up times from lines such as "iter xx: ... time ZZZZms ..." (milliseconds) and return total time in seconds.
    """
    filepath = os.path.join(job_dir, 'output-nanoGPT-with_performance_counters.log')
    if not os.path.isfile(filepath):
        return None

    total_time_ms = 0.0
    iter_num = 1
    with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            match = re.search(f"iter {iter_num}:.*" + r"time\s+([0-9\.]+)ms", line)
            if match:
                total_time_ms += float(match.group(1))
                iter_num += 1

            # stop after reaching n steps
            if iter_num == niters:
                break

    if iter_num == niters:
        return total_time_ms / 1000.0
    return None

def parse_milc_time(job_dir):
    return util.parse_app_time(job_dir, 'MILC', 64)

def parse_allreduce_time(job_dir, allreduce_size):
    allreduce_file = os.path.join(job_dir, 'output-allreduce.log')
    allreduce_time = None
    
    # Read only time for largest allreduce size
    if os.path.isfile(allreduce_file):
        with open(allreduce_file, 'r') as af:
            content = af.read()
            match = re.search(f"{allreduce_size}" + r'\s+(.*)\s+seconds', content)
            if match:
                allreduce_time = float(match.group(1))
    return allreduce_time

def parse_timestamp(job_dir, app):
    """
    Parse a run directory name, e.g., "2024-12-26_12-22-14-job34298015",
    into a datetime object, assuming the format: YYYY-MM-DD_hh-mm-ss-jobxxxx.
    """
    timestamp_files = {
        "AMG2023": "output-AMG2023.log",
        "MILC": "job-output.log",
        "deepcam": "job-output.log",
        "nanoGPT": "job-output.log",
        "allreduce_mpi": "output-allreduce.log",
        "allreduce_rccl": "output-allreduce.log",
    }

    if app == "AMG2023":
        pattern = re.compile(r"start AMG2023: (.*)\n")
    elif app == "allreduce":
        pattern = re.compile(r"start allreduce: (.*)\n")
    else:
        pattern = re.compile(r"start run: (.*)\n")
    
    timestamp_file = os.path.join(job_dir, timestamp_files[app])
    if not os.path.isfile(timestamp_file):
        return None
    
    with open(timestamp_file, 'r') as f:
        fcontent = f.read()
        match = pattern.search(fcontent)
        if match:
            date_time_str = match.group(1)  # Extract the matched date string
            try:
                # Parse the date string into a datetime object
                date_time_obj = datetime.strptime(date_time_str, "%a %d %b %Y %I:%M:%S %p %Z")
                return date_time_obj
            except ValueError as e:
                try:
                    date_time_obj = datetime.strptime(date_time_str, "%a %b %d %I:%M:%S %p %Z %Y")
                    return date_time_obj
                except ValueError as e:
                    return None
    return None

In [73]:
def flatten_dataframe(df, append_col_name=False):
    """
    Flattens a Pandas DataFrame into a single row where column names are
    a combination of the original column name and the index.

    Args:
        df (pd.DataFrame): The input DataFrame.

    Returns:
        pd.DataFrame: A flattened DataFrame with a single row.
    """
    flattened_data = {}
    for col in df.columns:
        for i in range(len(df)):
            col_name = f"{df.index[i]} - {col}" if append_col_name else df.index[i]
            flattened_data[col_name] = df.loc[df.index[i], col]
    return pd.DataFrame(flattened_data, index=[0])

def calculate_parbs_cong_rate(app_type, job_dir, counter_type):
    counters_files = {
        "AMG2023": 'output-AMG2023.log',
        "deepcam": 'output-deepcam-with_performance_counters.log',
        "nanoGPT": 'output-nanoGPT-with_performance_counters.log',
        "MILC": 'output-MILCs.log',
        "allreduce_rccl": 'output-allreduce.log',
        "allreduce_mpi": 'output-allreduce.log',
    }
    counters_file = os.path.join(job_dir, counters_files[app_type])

    if not os.path.exists(counters_file):
        return None

    # parse counter logs
    counter_logs = {}
    pattern = r"CXI_COUNTER_DATA\s+(frontier\d+)\s+(\d+)\s+(\w+)\s+(\d+)\s+(\d+)"
    with open(counters_file, 'r') as cf:
        for line in cf:
            match = re.match(pattern, line)
            if match:
                node_id = match.group(1)
                nic_number = match.group(2)
                counter_name = match.group(3)
                counter_val = match.group(4)
                counter_val_per_s = match.group(5)
                
                if (node_id, nic_number, counter_name) in counter_logs:
                    raise Warning("counter value already seen for node_id/nic_number", node_id, nic_number, counter_name)
                counter_logs[(node_id, nic_number, counter_name)] = float(counter_val)
    
    # define reduce functions based on counter_type
    reduce_funcs = {
        'Min': np.min,
        'Max': np.max,
        'Mean': np.mean,
    }
    non_posted_cong_rates = []
    posted_cong_rates = []
    for node_id, nic_number, counter_name in counter_logs:
        if counter_name == 'parbs_tarb_pi_non_posted_blocked_cnt':
            non_posted_cong_rates.append(
                counter_logs[(node_id, nic_number, counter_name)] / counter_logs[(node_id, nic_number, 'parbs_tarb_pi_non_posted_pkts')]
            )
        elif counter_name == 'parbs_tarb_pi_posted_blocked_cnt':
            posted_cong_rates.append(
                counter_logs[(node_id, nic_number, counter_name)] / counter_logs[(node_id, nic_number, 'parbs_tarb_pi_posted_pkts')]
            )
    
    return {
        'parbs_tarb_pi_non_posted_cong_rate': reduce_funcs[counter_type](non_posted_cong_rates),
        'parbs_tarb_pi_posted_cong_rate': reduce_funcs[counter_type](posted_cong_rates),
    }

def parse_app_network_counters(app_type, job_dir, counter_type):
    counters_files = {
        "AMG2023": 'output-AMG2023.log',
        "deepcam": 'output-deepcam-with_performance_counters.log',
        "nanoGPT": 'output-nanoGPT-with_performance_counters.log',
        "MILC": 'output-MILC.log',
        "allreduce_rccl": 'output-allreduce.log',
        "allreduce_mpi": 'output-allreduce.log',
    }
    counters_file = os.path.join(job_dir, counters_files[app_type])

    if not os.path.exists(counters_file):
        return None

    with open(counters_file, 'r') as cf:
        content = cf.read()
        if "srun: error:" in content:
            return None

    data = ""
    with open(counters_file, 'r') as cf:
        counter_line = False
        for line in cf:
            if "MPICH Slingshot CXI Counter Summary:" in line:
                counter_line = True
                continue
            elif counter_line and \
                not line.startswith("CXI_COUNTER_DATA") and \
                not re.search("end\s?\w*:", line):
                # found summary data line
                data += line

    if not data:
        return None

    # Read the fixed-width file from the string
    df = pd.read_fwf(StringIO(data))

    # Rename the columns for clarity (adjust as needed)
    df.columns = ["Counter", "Samples", "Min", "Min_per_s", "Mean", "Mean_per_s", "Max", "Max_per_s"]

    # drop columns
    df = df[['Counter', counter_type]]

    return flatten_dataframe(df.set_index('Counter'), append_col_name=False)

def get_job_data(app_type, job_dir, counter_type, allreduce_size=None):
    job_df = parse_app_network_counters(app_type, job_dir, counter_type)
    if job_df is None:
        return None

    parse_app_time = {
        'AMG2023': parse_amg_time,
        'deepcam': lambda job_dir: parse_deepcam_time(job_dir, 470),
        'MILC': parse_milc_time,
        'nanoGPT': lambda job_dir: parse_nanogpt_time(job_dir, 30),
        'allreduce_rccl': lambda job_dir: parse_allreduce_time(job_dir, allreduce_size),
        'allreduce_mpi': lambda job_dir: parse_allreduce_time(job_dir, allreduce_size),
    }
    app_time = parse_app_time[app_type](job_dir)
    if not app_time:
        print(f"[WARN] no job time found for {os.path.basename(job_dir)}. Skipping.")
        return None

    job_df['app_time'] = app_time
    job_df['timestamp'] = parse_timestamp(job_dir, app_type)
    job_df['job_id'] = os.path.basename(job_dir)

    # derived counters for congestion rate
    cong_rates = calculate_parbs_cong_rate(app_type, job_dir, counter_type)
    for derived_counter in cong_rates:
        job_df[derived_counter] = cong_rates[derived_counter]

    return job_df

def get_all_job_data(app_type, base_dir, counter_type, allreduce_size=None):
    job_dir_pattern = {
        'AMG2023': r"amg-\d{7}",
        'deepcam': r"deepcam-\d{7}",
        'nanoGPT': r"nanogpt-\d{7}",
        'MILC': r"milc_40.64-\d{7}",
        'allreduce_rccl': r"(deepcam-\d{7})|(nanogpt-\d{7})|(allreduce_rccl-\d{7})",
        'allreduce_mpi': r"(amg-\d{7})",
    }
    # handle edge case for allreduce job_dirs
    if app_type == "allreduce_mpi":
        job_dirs = [os.path.join(base_dir, 'AMG2023_logs', '64nodes', d) for d in os.listdir(os.path.join(base_dir, 'AMG2023_logs', '64nodes'))]
    elif app_type == "allreduce_rccl":
        job_dirs = [os.path.join(base_dir, 'deepcam_logs', '64nodes', d) for d in os.listdir(os.path.join(base_dir, 'deepcam_logs', '64nodes'))]
        job_dirs = job_dirs + \
                [os.path.join(base_dir, 'nanoGPT_logs', '64nodes', d) for d in os.listdir(os.path.join(base_dir, 'nanoGPT_logs', '64nodes'))]
        job_dirs = job_dirs + \
                [os.path.join(base_dir, 'allreduce_rccl_logs', '64nodes', d) for d in os.listdir(os.path.join(base_dir, 'allreduce_rccl_logs', '64nodes'))]
    else:
        job_dirs = [os.path.join(base_dir, d) for d in os.listdir(base_dir)]

    all_jobs = None
    for d in job_dirs:
        match = re.search(job_dir_pattern[app_type], os.path.basename(d))
        if match:
            job_df = get_job_data(app_type, d, counter_type, allreduce_size)
            if job_df is None:
                continue

            if all_jobs is None:
                all_jobs = job_df
            else:
                all_jobs = pd.concat([all_jobs, job_df])
    all_jobs = all_jobs.reset_index().drop('index', axis=1)
    all_jobs = all_jobs.fillna(0)
    
    # OLD: postprocessing for cong rate now done in calculate_parbs_cong_rate()
    # # post processing:
    # for col in all_jobs.columns:
    #     match = re.search(r'parbs_tarb_pi_posted_blocked_cnt(.*)', col)
    #     if match:
    #         all_jobs[f'parbs_tarb_pi_posted_cong_rate{match.group(1)}'] = \
    #             all_jobs[f'parbs_tarb_pi_posted_blocked_cnt{match.group(1)}'] / all_jobs[f'parbs_tarb_pi_posted_pkts{match.group(1)}']
    #         continue
    #     match = re.search(r'parbs_tarb_pi_non_posted_blocked_cnt(.*)', col)
    #     if match:
    #         all_jobs[f'parbs_tarb_pi_non_posted_cong_rate{match.group(1)}'] = \
    #             all_jobs[f'parbs_tarb_pi_non_posted_blocked_cnt{match.group(1)}'] / all_jobs[f'parbs_tarb_pi_non_posted_pkts{match.group(1)}']
    #         continue
    return all_jobs

In [None]:
app_type = "AMG2023" # AMG2023/deepcam/nanoGPT/allreduce_rccl/allreduce_mpi
counter_type = "Mean" # Min/Mean/Max

if app_type.startswith("allreduce"):
    base_dir = os.path.join(f"/lustre/orion/csc547/scratch/keshprad/perfvar/")
    # mpi
    # allreduce_size = 1024
    # allreduce_size = 1048576
    # rccl
    # allreduce_size = 16777216
    allreduce_size = 2147483648
else:
    base_dir = os.path.join(f"/lustre/orion/csc547/scratch/keshprad/perfvar/{app_type}_logs/64nodes/")
    allreduce_size = None

all_jobs = get_all_job_data(app_type, base_dir, counter_type, allreduce_size)

TypeError: 'NoneType' object is not iterable

In [None]:
all_jobs

Unnamed: 0,atu_cache_evictions,atu_cache_hit_base_page_size_0,atu_cache_hit_derivative1_page_size_0,lpe_net_match_priority_0,lpe_net_match_overflow_0,lpe_net_match_request_0,lpe_rndzv_puts_0,lpe_rndzv_puts_offloaded_0,hni_rx_paused_0,hni_rx_paused_1,...,parbs_tarb_pi_posted_cong_rate,rh:sct_timeouts,rh:spt_timeouts,pct_trs_rsp_nack_drops,rh:pkts_cancelled_o,rh:nack_no_target_trs,rh:tct_timeouts,rh:nack_no_matching_conn,rh:sct_in_use,rh:connections_cancelled
0,689513,212161676,78130091,25449811,4558274,322,877328,877328,58263516,290260875,...,1.893478,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,681563,212454568,78448096,25498848,4491114,311,877291,877291,69352624,332175352,...,2.108670,1.0,6.0,4.0,1.0,0.0,0.0,0.0,0.0,0.0
2,680461,212562378,78045225,25486947,4497059,328,877686,877686,47800216,243561972,...,1.670247,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,688572,212415182,78640530,25373578,4611477,302,877551,877551,51868399,261387024,...,1.733014,1.0,5.0,9.0,1.0,0.0,0.0,0.0,0.0,0.0
4,691625,212386373,78095928,25416171,4574721,365,877150,877150,51825246,264396819,...,1.743789,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
109,706220,211908746,78529602,25450138,4532617,311,877869,877869,57468790,285521935,...,1.843233,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
110,692568,212309488,78140528,25441668,4575727,352,877995,877995,47063488,243452244,...,1.640894,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
111,691602,212340486,78283221,25490966,4499921,303,877887,877887,59790476,292793245,...,1.907754,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
112,687022,212467681,78771771,25405262,4606690,357,877505,877505,54135105,269463923,...,1.774125,0.0,4.0,4.0,0.0,0.0,0.0,0.0,0.0,0.0


In [None]:
counters = []
pear_corrs = []
pear_ps = []
spear_corrs = []
spear_ps = []

for c in all_jobs.columns:
    if c in {"timestamp", "job_id", "app_time"}:
        # skip
        continue
    
    counters.append(c)
    pear_corr, pear_p = pearsonr(all_jobs['app_time'], all_jobs[c])
    spear_corr, spear_p = spearmanr(all_jobs['app_time'], all_jobs[c])
    pear_corrs.append(pear_corr)
    pear_ps.append(pear_p)
    spear_corrs.append(spear_corr)
    spear_ps.append(spear_p)

corr_df = pd.DataFrame({
    'counter': counters,
    'pearson': pear_corrs,
    'pearson_p': pear_ps,
    'spearman': spear_corrs,
    'spearman_p': spear_ps,
})
corr_df

  pear_corr, pear_p = pearsonr(all_jobs['app_time'], all_jobs[c])
  spear_corr, spear_p = spearmanr(all_jobs['app_time'], all_jobs[c])


Unnamed: 0,counter,pearson,pearson_p,spearman,spearman_p
0,atu_cache_evictions,0.290008,0.001748863,0.254701,0.006242763
1,atu_cache_hit_base_page_size_0,0.258583,0.005471278,0.041899,0.6580406
2,atu_cache_hit_derivative1_page_size_0,0.157894,0.09338394,0.547227,2.978787e-10
3,lpe_net_match_priority_0,-0.617882,2.418586e-13,-0.624714,1.103363e-13
4,lpe_net_match_overflow_0,0.625006,1.066458e-13,0.630084,5.873074e-14
5,lpe_net_match_request_0,-0.152613,0.1050145,0.062288,0.5103063
6,lpe_rndzv_puts_0,0.07052,0.4559219,0.109076,0.2479982
7,lpe_rndzv_puts_offloaded_0,0.07052,0.4559219,0.109076,0.2479982
8,hni_rx_paused_0,-0.176923,0.05968844,-0.152783,0.1046223
9,hni_rx_paused_1,-0.206991,0.02712657,-0.195342,0.03726857


In [None]:
corr_df.sort_values(by='pearson', key=abs, ascending=False).head(50).reset_index().drop('index', axis=1)

Unnamed: 0,counter,pearson,pearson_p,spearman,spearman_p
0,lpe_net_match_overflow_0,0.625006,1.066458e-13,0.630084,5.873074e-14
1,parbs_tarb_pi_posted_pkts,0.621451,1.609025e-13,0.644527,1.012434e-14
2,lpe_net_match_priority_0,-0.617882,2.418586e-13,-0.624714,1.103363e-13
3,parbs_tarb_pi_non_posted_blocked_cnt,-0.349784,0.0001363206,-0.358701,8.895033e-05
4,parbs_tarb_pi_non_posted_pkts,0.348479,0.0001449586,0.385002,2.340782e-05
5,parbs_tarb_pi_non_posted_cong_rate,-0.340023,0.0002144501,-0.354456,0.0001091686
6,atu_cache_evictions,0.290008,0.001748863,0.254701,0.006242763
7,parbs_tarb_pi_posted_cong_rate,-0.270625,0.003589055,-0.260874,0.005056902
8,parbs_tarb_pi_posted_blocked_cnt,-0.266743,0.004120016,-0.250983,0.007070817
9,atu_cache_hit_base_page_size_0,0.258583,0.005471278,0.041899,0.6580406


In [None]:
corr_df.sort_values(by='spearman', key=abs, ascending=False).head(50).reset_index().drop('index', axis=1)

Unnamed: 0,counter,pearson,pearson_p,spearman,spearman_p
0,parbs_tarb_pi_posted_pkts,0.621451,1.609025e-13,0.644527,1.012434e-14
1,lpe_net_match_overflow_0,0.625006,1.066458e-13,0.630084,5.873074e-14
2,lpe_net_match_priority_0,-0.617882,2.418586e-13,-0.624714,1.103363e-13
3,atu_cache_hit_derivative1_page_size_0,0.157894,0.09338394,0.547227,2.978787e-10
4,parbs_tarb_pi_non_posted_pkts,0.348479,0.0001449586,0.385002,2.340782e-05
5,parbs_tarb_pi_non_posted_blocked_cnt,-0.349784,0.0001363206,-0.358701,8.895033e-05
6,parbs_tarb_pi_non_posted_cong_rate,-0.340023,0.0002144501,-0.354456,0.0001091686
7,parbs_tarb_pi_posted_cong_rate,-0.270625,0.003589055,-0.260874,0.005056902
8,atu_cache_evictions,0.290008,0.001748863,0.254701,0.006242763
9,parbs_tarb_pi_posted_blocked_cnt,-0.266743,0.004120016,-0.250983,0.007070817


In [None]:
out_dir = os.path.join("/ccs/home/keshprad/perf-variability/counter_corr")
size_name = {
    1024: "1KB",
    1048576: "1MB",
    16777216: "16MB",
    2147483648: "2GB",
}

if app_type.startswith("allreduce"):
    app_type = 'mpiallreduce' if app_type == "allreduce_mpi" else 'rcclallreduce'
    out_name = f"{app_type}_{size_name[allreduce_size]}_{counter_type.lower()}.csv"
else:
    out_name = f"{app_type}_{counter_type.lower()}.csv"
corr_df.to_csv(os.path.join(out_dir, out_name))