# Imports

In [1]:
from typing import Dict, List, Tuple, Optional
from concurrent.futures import ProcessPoolExecutor, as_completed

from scipy.signal import savgol_filter, medfilt

import seaborn as sns
import pandas as pd
import numpy as np
import os
import re
import time
import datetime

from tqdm import tqdm
import thicket as tt
from collections import defaultdict



In [2]:
# Track how long the notebook takes to run
NOTEBOOK_START_TIME = time.time()
print(f"Notebook started at {NOTEBOOK_START_TIME} ({datetime.datetime.fromtimestamp(NOTEBOOK_START_TIME)})")

Notebook started at 1767993847.1379 (2026-01-09 16:24:07.137900)


In [3]:
METRICS_TO_TRACK="A2rocm_smi:::energy_count:device=0,A2rocm_smi:::energy_count:device=2,A2rocm_smi:::energy_count:device=4,A2rocm_smi:::energy_count:device=6,A2rocm_smi:::gpu_clk_freq_System:device=0:current,A2rocm_smi:::gpu_clk_freq_System:device=2:current,A2rocm_smi:::gpu_clk_freq_System:device=4:current,A2rocm_smi:::gpu_clk_freq_System:device=6:current,A2rocm_smi:::temp_current:device=0:sensor=1,A2rocm_smi:::temp_current:device=2:sensor=1,A2rocm_smi:::temp_current:device=4:sensor=1,A2rocm_smi:::temp_current:device=6:sensor=1,A2rocm_smi:::power_average:device=0:sensor=0,A2rocm_smi:::power_average:device=2:sensor=0,A2rocm_smi:::power_average:device=4:sensor=0,A2rocm_smi:::power_average:device=6:sensor=0,A2rocm_smi:::memory_busy_percent:device=0,A2rocm_smi:::memory_busy_percent:device=2,A2rocm_smi:::memory_busy_percent:device=4,A2rocm_smi:::memory_busy_percent:device=6,A2rocm_smi:::busy_percent:device=0,A2rocm_smi:::busy_percent:device=2,A2rocm_smi:::busy_percent:device=4,A2rocm_smi:::busy_percent:device=6,A2coretemp:::craypm:power,A2coretemp:::craypm:energy,A2coretemp:::craypm:freshness,A2coretemp:::craypm:cpu_energy,A2coretemp:::craypm:cpu_power,A2coretemp:::craypm:memory_energy,A2coretemp:::craypm:memory_power,A2coretemp:::craypm:accel0_energy,A2coretemp:::craypm:accel0_energy_timestamp,A2coretemp:::craypm:accel0_power,A2coretemp:::craypm:accel1_energy,A2coretemp:::craypm:accel1_energy_timestamp,A2coretemp:::craypm:accel1_power,A2coretemp:::craypm:accel2_energy,A2coretemp:::craypm:accel2_energy_timestamp,A2coretemp:::craypm:accel2_power,A2coretemp:::craypm:accel3_energy,A2coretemp:::craypm:accel3_energy_timestamp,A2coretemp:::craypm:accel3_power".split(",")

In [4]:
def metrics_to_csv(group: str, thread_metrics: Dict[str, List[Tuple[float, float]]], filename: str):
    """Convert metrics to a CSV file."""
    with open(filename, 'w') as f:
        f.write("Group,Metric Name,Time,Value\n")
        for metric_name, values in thread_metrics.items():
            for time, value in values:
                f.write(f"{group},{metric_name},{time},{value}\n")

def read_metrics_dataframe(file: str) -> pd.DataFrame:
    dtype_map = {
        'Group': 'category',
        'Metric Name': 'category',
        'Time': 'float64',
        'Value': 'int64' # Int64 for precision
    }
    df = pd.read_csv(file, dtype=dtype_map)
    df = df[df['Metric Name'].isin(METRICS_TO_TRACK)]
    # For all the rocm metrics, divide by 1e6 to convert from uJ to J
    for metric_name in df['Metric Name'].unique():
        if 'rocm_smi:::energy_count:device' in metric_name or 'rocm_smi:::power_average:device' in metric_name or 'rocm_smi:::current_socket_power:device' in metric_name:
            df['Value'] = df['Value'].astype(float)
            df['Value'] = df['Value'].fillna(0.0)
            df.loc[df['Metric Name'] == metric_name, 'Value'] /= 1_000_000
    return df

def read_call_graph_dataframe(file: str) -> pd.DataFrame:
    dtype_map = {
        'Thread': 'category',
        'Group': 'category',
        'Depth': 'uint32',
        'Name': 'category',
        'Start Time': 'float64',
        'End Time': 'float64',
        'Duration': 'float64'
    }
    return pd.read_csv(file, dtype=dtype_map)


In [5]:
def sampled_to_continuous(df, time_col='Time', value_col='Value'):
    t_samples = df[time_col].astype(float)
    y = df[value_col].astype(float)
    # Check if y is empty
    if y.empty:
        print("Warning: Empty data for sampled_to_continuous. Returning zero function.")
        return lambda t: 0.0
    f = lambda t: np.interp(t, t_samples, y)
    return f

def numerical_derivative(func, h=1e-5):
    return lambda t: (func(t + h) - func(t - h)) / (2 * h)

def numerical_integral(f, a=0.0, dx=1e-1):
    # Determinar si f soporta arrays de NumPy
    try:
        _ = f(np.array([a, a + dx]))
        f_np = f
    except Exception:
        f_np = np.vectorize(f)
    
    def F(x):
        x_arr = np.atleast_1d(x).astype(float)
        sign = np.ones_like(x_arr)
        mask = x_arr < a
        sign[mask] = -1
        x_pos = np.where(mask, a + (a - x_arr), x_arr)
        
        xmax = x_pos.max()
        t = np.arange(a, xmax + dx, dx)
        u = f_np(t)
        
        trap_heights = (u[:-1] + u[1:]) / 2
        cum = np.concatenate(([0], np.cumsum(trap_heights * dx)))
        
        idx = ((x_pos - a) // dx).astype(int)
        rem = x_pos - (a + idx * dx)
        
        f_end = f_np(x_pos)
        f_start = u[idx]
        
        integral = cum[idx] + (f_start + f_end) * rem / 2
        result = sign * integral
        
        return result.item() if np.isscalar(x) else result
    
    return F
        
def continuous_savgol_filter(f, window_length=11, polyorder=2, sample_rate=0.001):
    def f_smooth(t):
        t = np.asarray(t)
        t_min = t.min()
        t_max = t.max()
        t_samples = np.arange(t_min, t_max + sample_rate, sample_rate)
        y_samples = f(t_samples)
        y_smooth = savgol_filter(y_samples, window_length=window_length, polyorder=polyorder, mode='nearest')
        return np.interp(t, t_samples, y_smooth)
    return f_smooth

def continuous_savgol_derivative(f, window_length=15, polyorder=3, sample_rate=0.001):
    def f_prime(t):
        t = np.asarray(t)
        
        t_min = t.min()
        t_max = t.max()
        
        if (t_max - t_min) < (window_length * sample_rate):
            pad = (window_length * sample_rate) / 2
            t_min -= pad
            t_max += pad

        t_samples = np.arange(t_min, t_max + sample_rate, sample_rate)
        
        y_samples = f(t_samples)
        
        y_deriv = savgol_filter(
            y_samples, 
            window_length=window_length, 
            polyorder=polyorder, 
            deriv=1, 
            delta=sample_rate, 
            mode='nearest'
        )
        
        return np.interp(t, t_samples, y_deriv)
        
    return f_prime
        
def continous_median_filter(f, kernel_size=11, sample_rate=0.001):
    def f_smooth(t):
        t = np.asarray(t)
        t_min = t.min()
        t_max = t.max()
        t_samples = np.arange(t_min, t_max + sample_rate, sample_rate)
        y_samples = f(t_samples)
        y_smooth = medfilt(y_samples, kernel_size=kernel_size)
        return np.interp(t, t_samples, y_smooth)
    return f_smooth        

def unwrap_or_zero(f):
    if not f:
        return 0.0
    else:
        return f


### Thicket + Hatchet Integration
Convert the CSV datasets into thicket/hatchet objects for analysis

In [6]:
def create_hatchet_dag(call_graph: pd.DataFrame, metric_functions: Dict[str, callable]):
    df = call_graph.copy()
    # Asume que viene “bien formado”: cada fila es una invocación con Depth que respeta el anidamiento.
    # Si no está ordenado por Start Time, ordénalo una vez.
    df = df.sort_values(by='Start Time')

    # Filtrado opcional por duración, replica tu lógica si quieres:
    df = df[df['Duration'] >= 0.001]

    pref3 = df['Name'].str[:3].str.lower()
    df = df[~pref3.isin(['hip', 'mpi'])]
    
    # Vectoriza cálculo de métricas por fila (Δ = f(end) - f(start))
    starts = df['Start Time'].to_numpy()
    ends   = df['End Time'].to_numpy()
    names  = df['Name'].to_numpy()
    depths = df['Depth'].to_numpy()
    durs   = df['Duration'].to_numpy()


    metrics_cols = {}
    for metric, f in tqdm(metric_functions.items()):
        try:
            vals = f(ends) - f(starts)            # ideal: f soporta arrays
            vals = np.asarray(vals, dtype=float)
            vals = np.nan_to_num(vals, nan=0.0, posinf=0.0, neginf=0.0)
        except Exception:
            # Fallback si f no vectoriza
            vals = np.fromiter(
                (unwrap_or_zero(f(e) - f(s)) for s, e in zip(starts, ends)),
                dtype=float, count=len(starts)
            )
        metrics_cols[metric] = vals

    # Construcción del DAG con una pila indexada por profundidad
    # Mantenemos las referencias a los últimos nodos por cada Depth.
    # Cuando Depth sube, el nuevo nodo se anexa como hijo del nodo en Depth-1.
    dag_roots: list = []
    last_at_depth: dict[int, dict] = {}

    # Usamos itertuples para bajo overhead
    M = len(df)
    for i, (name, depth, dur) in enumerate(zip(names, depths, durs)):
        node_metrics = {'time (inc)': float(dur), 'time': 0.0}
        for m, arr in metrics_cols.items():
            node_metrics[m] = float(arr[i])

        node = {'frame': {'name': name}, 'metrics': node_metrics, 'children': []}

        if depth == 0 or (depth - 1) not in last_at_depth:
            dag_roots.append(node)
        else:
            parent = last_at_depth[depth - 1]
            parent['children'].append(node)

        last_at_depth[depth] = node

        # Si retrocede la profundidad en la siguiente fila, iremos sobreescribiendo last_at_depth
        # No hace falta “cerrar” nada aún; calculamos 'time' más adelante en una segunda pasada.

    # Segunda pasada postorden para calcular 'time' = 'time (inc)' - suma hijos.
    # Podemos hacerlo con una DFS iterativa.
    stack = [(root, False) for root in dag_roots]
    while stack:
        node, visited = stack.pop()
        if not visited:
            stack.append((node, True))
            for ch in node['children']:
                stack.append((ch, False))
        else:
            child_incs = 0.0
            for ch in node['children']:
                child_incs += ch['metrics']['time (inc)']
            node['metrics']['time'] = node['metrics']['time (inc)'] - child_incs

    return dag_roots

# Hierarchical DataFrames

Organize the data into hierarchical dataframes for easier analysis:
- `Ensemble`: all the runs of a given configuration
- `Run`: a single run of a configuration
- `Node`: a single node in a run
- `Rank`: a single MPI rank in a node

In [7]:
class Rank:
    def __init__(self, node: str, name: str, call_graph: pd.DataFrame):
        self.node = node
        self.name = name
        self.call_graph = call_graph
        assert self.call_graph.columns.tolist() == ['Thread', 'Group', 'Depth', 'Name', 'Start Time', 'End Time', 'Duration'], f"Call graph DataFrame has incorrect columns, found {call_graph.columns.tolist()}"
        
    def get_all_code_regions(self) -> List[str]:
        return self.call_graph['Name'].unique().tolist()

    def compute_attribution(
        self,
        metrics_accumulated: Dict[str, callable],
        duration_threshold: float = 0.001,
        weights_by_metric: Optional[Dict[str, np.ndarray]] = None):
        df = self.call_graph
        df = df[df['Duration'] >= duration_threshold]
        pref3 = df['Name'].str[:3].str.lower()
        df = df[~pref3.isin(['hip', 'mpi'])]

        if df.empty:
            return pd.DataFrame(columns=['Code Region'] + list(metrics_accumulated.keys()))

        starts = df['Start Time'].to_numpy()
        ends   = df['End Time'].to_numpy()
        names  = df['Name'].to_numpy()

        cols = {}
        for metric, f in metrics_accumulated.items():
            try:
                deltas = f(ends) - f(starts)
                deltas = np.asarray(deltas, dtype=float)
                deltas = np.nan_to_num(deltas, nan=0.0, posinf=0.0, neginf=0.0)
            except Exception:
                deltas = np.fromiter(
                    (unwrap_or_zero(f(e) - f(s)) for s, e in zip(starts, ends)),
                    dtype=float, count=len(starts)
                )

            if weights_by_metric and metric in weights_by_metric:
                w = np.asarray(weights_by_metric[metric], dtype=float)
                if w.shape[0] == deltas.shape[0]:
                    deltas = deltas * w
                elif w.size == 1:
                    deltas = deltas * float(w)
                else:
                    deltas = deltas * 1.0

            cols[metric] = deltas

        out = pd.DataFrame(cols)
        out['Code Region'] = names
        out = out.groupby('Code Region', as_index=False).sum()
        return out

    def refresh(self):
        return Rank(self.node, self.name, self.call_graph)

    def __str__(self):
        return f"Rank({self.name})"

def build_inclusive_masks_for_rank(breaks, s, e, names, depths):
    """
    For each segment [breaks[i], breaks[i+1]), return a dict:
      masks[region_name][i] = True  iff the region is on the call stack
                                   (ancestor of top-of-stack) for the ENTIRE segment.
    Assumptions:
      - breaks includes all start/end times (so inside a segment no boundary occurs).
      - 'depths' is 0-based, increasing with nesting.
    """
    seg_count = breaks.size - 1
    masks = defaultdict(lambda: np.zeros(seg_count, dtype=bool))

    lefts, rights = breaks[:-1], breaks[1:]
    for i in range(seg_count):
        L, R = lefts[i], rights[i]

        # Regions that cover the whole segment (active across the entire segment)
        idx = np.where((s <= L) & (e >= R))[0]
        if idx.size == 0:
            continue

        # Top of stack depth
        dmax = depths[idx].max()

        # Inclusive chain = exactly one region per depth 0..dmax that covers the whole segment
        # (there should typically be at most one per depth due to proper nesting)
        for d in range(dmax + 1):
            cand = idx[depths[idx] == d]
            if cand.size:
                # If instrumentation duplicates exist at same depth, pick the one that covers the segment;
                # here cand already covers segment; pick the first deterministically.
                j = cand[0]
                masks[names[j]][i] = True

    return masks

class Node:
    def __init__(self, name: str, metrics_df: pd.DataFrame, ranks: List[Rank]):
        self.name = name
        self.ranks = ranks
        self.metrics = metrics_df
        assert self.metrics.columns.tolist() == ['Group', 'Metric Name', 'Time', 'Value'], f"Metrics DataFrame has incorrect columns, found {metrics_df.columns.tolist()}"

    def refresh(self):
        return Node(self.name, self.metrics, [rank.refresh() for rank in self.ranks])

    def get_metric_names(self) -> List[str]:
        return self.metrics['Metric Name'].unique().tolist()
    
    def get_metric_samples(self, metric_name: str) -> pd.DataFrame:
        return self.metrics[self.metrics['Metric Name'] == metric_name]

    def _filtered_callgraph(self, rank: Rank, duration_threshold: float):
        df = rank.call_graph
        df = df[df['Duration'] >= duration_threshold]
        pref3 = df['Name'].str[:3].str.lower()
        # df = df[~pref3.isin(['hip', 'mpi'])]
        # Finite and non-degenerate
        mfin = np.isfinite(df['Start Time']) & np.isfinite(df['End Time'])
        df = df[mfin & (df['End Time'] > df['Start Time'])]
        return df

    # ---- NEW CORE HELPER: inclusive vs exclusive controlled by flag ----
    def _compute_attribution_core(
        self,
        ranks_to_metrics: Dict[str, List[str]],
        instantaneous_metrics = [],
        duration_threshold: float = 0.0,
        inclusive: bool = True,
    ):
        used_metrics = sorted({m for ml in ranks_to_metrics.values() for m in ml})
        if not used_metrics:
            return pd.DataFrame(columns=['Code Region'])

        metrics_continuous = {m: sampled_to_continuous(self.get_metric_samples(m)) for m in used_metrics}
        metrics_accumulated = {
            m: (numerical_integral(metrics_continuous[m]) if m in instantaneous_metrics else metrics_continuous[m])
            for m in used_metrics
        }

        rank_df = {r.name: self._filtered_callgraph(r, duration_threshold) for r in self.ranks}
        rank_events = {
            rname: (
                df['Start Time'].to_numpy(),
                df['End Time'].to_numpy(),
                df['Name'].to_numpy(),
                df['Depth'].astype(int).to_numpy()
            )
            for rname, df in rank_df.items()
        }

        metric_to_ranks: Dict[str, List[str]] = {}
        for rname, mlist in ranks_to_metrics.items():
            for m in mlist:
                metric_to_ranks.setdefault(m, []).append(rname)

        rank_tables: Dict[str, Dict[str, Dict[str, float]]] = {r.name: {} for r in self.ranks}

        for metric in used_metrics:
            f = metrics_accumulated[metric]
            rnames = [rn for rn in metric_to_ranks.get(metric, []) if rn in rank_events]
            if not rnames:
                continue

            all_s, all_e = [], []
            for rn in rnames:
                s, e, _, _ = rank_events[rn]
                if s.size:
                    all_s.append(s); all_e.append(e)
            if not all_s:
                continue

            S_concat = np.concatenate(all_s)
            E_concat = np.concatenate(all_e)
            breaks = np.unique(np.concatenate([S_concat, E_concat]))
            breaks = breaks[np.isfinite(breaks)]
            if breaks.size < 2:
                continue
            seg_count = breaks.size - 1

            # dE per segment
            try:
                E_vals = f(breaks)
                E_vals = np.asarray(E_vals, dtype=float)
                if E_vals.shape != breaks.shape:
                    raise ValueError
            except Exception:
                print(f"Warning: Non-vectorized metric function for metric {metric}, falling back to loop.")
                E_vals = np.array([float(f(float(t))) for t in breaks], dtype=float)

            dE = E_vals[1:] - E_vals[:-1]
            dE = np.nan_to_num(dE, nan=0.0, posinf=0.0, neginf=0.0)
            dE = np.maximum(dE, 0.0)

            # Which ranks are active in each segment?
            active_by_rank: Dict[str, np.ndarray] = {}
            lr_by_rank: Dict[str, Tuple[np.ndarray, np.ndarray]] = {}

            zeros_seg = np.zeros(seg_count, dtype=bool)
            for rn in rnames:
                s, e, _, _ = rank_events[rn]
                if s.size == 0:
                    active_by_rank[rn] = zeros_seg
                    lr_by_rank[rn] = (np.empty(0, dtype=int), np.empty(0, dtype=int))
                    continue

                L = np.searchsorted(breaks, s, side='right') - 1
                R = np.searchsorted(breaks, e, side='left') - 1
                valid = (R >= L) & (R >= 0) & (L < seg_count)
                L = np.clip(L[valid], 0, seg_count - 1)
                R = np.clip(R[valid], 0, seg_count - 1)

                lr_by_rank[rn] = (L, R)

                # coverage count via difference array
                diff = np.zeros(seg_count + 1, dtype=int)
                for l, r in zip(L, R):
                    diff[l] += 1
                    diff[r + 1] -= 1
                c = np.cumsum(diff[:-1])
                active_by_rank[rn] = (c > 0)

            # k(t): number of active ranks per segment
            k = np.zeros(seg_count, dtype=int)
            for rn in rnames:
                k += active_by_rank[rn].astype(int)

            # Energy share per segment for each rank: ΔE/k (if k==0, remains unattributed)
            share_per_seg = np.zeros_like(dE, dtype=float)
            mask_k = (k > 0)
            share_per_seg[mask_k] = dE[mask_k] / k[mask_k]

            # --- Attribution per rank ---
            for rn in rnames:
                s, e, names, depths = rank_events[rn]
                if s.size == 0:
                    continue

                L, R = lr_by_rank[rn]
                if L.size == 0:
                    continue

                if inclusive:
                    masks: Dict[str, np.ndarray] = defaultdict(lambda: np.zeros(seg_count, dtype=bool))
                    for j in range(L.size):
                        l = L[j]; r = R[j]
                        if r < l:
                            continue
                        masks[names[j]][l:r+1] = True

                    for region_name, seg_mask in masks.items():
                        if not seg_mask.any():
                            continue
                        contrib = float(share_per_seg[seg_mask].sum())
                        if contrib == 0.0:
                            continue
                        tbl = rank_tables[rn].setdefault(region_name, {})
                        tbl[metric] = tbl.get(metric, 0.0) + contrib
                else:
                    top_depth = np.full(seg_count, -1, dtype=int)
                    top_idx   = np.full(seg_count, -1, dtype=int)

                    # For each event j, mark segments [L[j], R[j]] and keep the deepest
                    for j in range(L.size):
                        l = L[j]; r = R[j]
                        if r < l:
                            continue
                        d = depths[j]
                        seg_slice = slice(l, r + 1)

                        current_depths = top_depth[seg_slice]
                        mask = d >= current_depths
                        if not np.any(mask):
                            continue

                        # Update depths
                        current_depths[mask] = d
                        top_depth[seg_slice] = current_depths

                        # Update indices
                        current_idx = top_idx[seg_slice]
                        current_idx[mask] = j
                        top_idx[seg_slice] = current_idx

                    # Now attribute share_per_seg only to top-of-stack region per segment
                    for seg_i, j in enumerate(top_idx):
                        if j < 0:
                            continue  # rank not active here
                        contrib = float(share_per_seg[seg_i])
                        if contrib == 0.0:
                            continue
                        region_name = names[j]
                        tbl = rank_tables[rn].setdefault(region_name, {})
                        tbl[metric] = tbl.get(metric, 0.0) + contrib

        # --- Convert to per-rank DataFrames and do a wide merge ---
        attribution = None
        for rank in self.ranks:
            rn = rank.name
            # Filter only the metrics requested for this rank
            wanted = ranks_to_metrics.get(rn, [])
            if not wanted:
                continue

            rows = []
            for region, mdict in rank_tables.get(rn, {}).items():
                row = {'Code Region': region}
                # Ensure a column for each requested metric
                for m in wanted:
                    row[m] = mdict.get(m, 0.0)
                rows.append(row)

            df_rank = pd.DataFrame(rows) if rows else pd.DataFrame({'Code Region': []})
            # Ensure columns if they are missing
            for m in wanted:
                if m not in df_rank.columns:
                    df_rank[m] = 0.0

            # Rename metrics with rank suffix to avoid collisions
            rename_map = {m: f"{m}_{rn}" for m in wanted}
            df_rank = df_rank.rename(columns=rename_map)

            # Outer merge on 'Code Region'
            attribution = df_rank if attribution is None else pd.merge(
                attribution, df_rank, on='Code Region', how='outer'
            )

        # If there was nothing, return minimal header
        if attribution is None:
            all_cols = ['Code Region'] + [f"{m}_{r.name}" for r, ml in ranks_to_metrics.items() for m in ml]
            return pd.DataFrame(columns=all_cols)

        return attribution.fillna(0.0)

    def compute_attribution(
        self,
        ranks_to_metrics: Dict[str, List[str]] = {},
        instantaneous_metrics = [],
        duration_threshold: float = 0.001,
    ):
        return self._compute_attribution_core(
            ranks_to_metrics=ranks_to_metrics,
            instantaneous_metrics=instantaneous_metrics,
            duration_threshold=duration_threshold,
            inclusive=True,
        )

    def compute_attribution_exclusive(
        self,
        ranks_to_metrics: Dict[str, List[str]] = {},
        instantaneous_metrics = [],
        duration_threshold: float = 0.001,
    ):
        """
        Exclusive attribution: for each segment and rank, only the *deepest*
        active region on the stack receives that segment's share of energy.
        """
        return self._compute_attribution_core(
            ranks_to_metrics=ranks_to_metrics,
            instantaneous_metrics=instantaneous_metrics,
            duration_threshold=duration_threshold,
            inclusive=False,
        )

    def __str__(self):
        return f"Node(name={self.name}, ranks={[rank.name for rank in self.ranks]})"

class Run:
    def __init__(self, path: str, nodes: List[Node] = []):
        self.path = path
        self.nodes = nodes
        
    def refresh(self):
        return Run(self.path, [node.refresh() for node in self.nodes])

    @staticmethod
    def from_trace_path(trace_path: str, node_ranks: Dict[str, List[str]]):
        # Load the trace dataframes for each node and its ranks
        if not os.path.exists(trace_path):
            raise FileNotFoundError(f"Trace file {trace_path} does not exist.")
        # print(f"Loading trace from {trace_path} for idle period {idle_period_ms} ms and active period {active_period_ms} ms.")
        # trace_to_csv(trace_path, processes, METRICS_TO_TRACK)
        # Load the CSV files into dataframes
        all_threads = []
        for _, ranks in node_ranks.items():
            all_threads.extend(ranks)
        result = {}
        # trace_to_csv(trace_path, all_threads, METRICS_TO_TRACK)
        for node, ranks in node_ranks.items():
            if not os.path.exists(os.path.join(trace_path, f"{ranks[0]}_metrics.csv")):
                raise FileNotFoundError(f"Metrics file for rank {ranks[0]} does not exist in trace path {trace_path}.")
            metrics_df = read_metrics_dataframe(os.path.join(trace_path, f"{ranks[0]}_metrics.csv"))
            # print(f"Loaded metrics dataframe from {trace_path}")
            assert metrics_df.columns.tolist() == ['Group', 'Metric Name', 'Time', 'Value'], f"Metrics DataFrame has incorrect columns, found {metrics_df.columns.tolist()}"
            rank_callgraphs = []
            for rank in ranks:
                if os.path.exists(os.path.join(trace_path, f"{rank}_Master_thread_callgraph.csv")):
                    call_graph = read_call_graph_dataframe(os.path.join(trace_path, f"{rank}_Master_thread_callgraph.csv"))
                    assert call_graph.columns.tolist() == ['Thread', 'Group', 'Depth', 'Name', 'Start Time', 'End Time', 'Duration'], f"Call graph DataFrame has incorrect columns, found {call_graph.columns.tolist()}"
                    # Construct the Hatchet and Thicket graphs
                    rank_callgraphs.append(Rank(node, rank, call_graph))
                else:
                    raise FileNotFoundError(f"Call graph file for rank {rank} does not exist in trace path {trace_path}.")
            result[node] = Node(node, metrics_df, rank_callgraphs)
        return Run(trace_path, list(result.values()))

    def compute_attribution(self, ranks_to_metrics: Dict[str, List[str]]={}, instantaneous_metrics=[]):
        attributions = {}
        for node in self.nodes:
            # print(f"Computing attribution for node {node.name}...")
            attributions[node.name] = node.compute_attribution(ranks_to_metrics, instantaneous_metrics)
        return attributions
    
    def compute_attribution_exclusive(self, ranks_to_metrics: Dict[str, List[str]] = {}, instantaneous_metrics = []):
        attributions = {}
        for node in self.nodes:
            # print(f"Computing EXCLUSIVE attribution for node {node.name}...")
            attributions[node.name] = node.compute_attribution_exclusive(
                ranks_to_metrics,
                instantaneous_metrics,
            )
        return attributions
    
    def get_metric_samples(self, node_name: str, metric_name: str) -> pd.DataFrame:
        for node in self.nodes:
            if node.name == node_name:
                return node.get_metric_samples(metric_name)
        raise ValueError(f"Node {node_name} not found in run.")
    
    def to_thicket(self, **metadata) -> tt.Thicket:
        thickets = []
        
        for node in self.nodes:
            # self.get_metric_samples(node)
            metric_functions = {}
            for metric_name in node.get_metric_names():
                metric_functions[metric_name] = sampled_to_continuous(node.get_metric_samples(metric_name))
            
            for rank in node.ranks:
                print(f"Creating Thicket for node {node.name}, rank {rank.name}...")
                dag = create_hatchet_dag(rank.call_graph, metric_functions)
                tf = tt.Thicket.from_literal(dag)
                # tf = tt.Thicket.from_literal(dag)
                print("Thicket created from DAG.")
                tf.metadata = pd.DataFrame.from_dict(tf.profile_mapping, orient="index")
                tf.metadata['rank'] = rank.name
                tf.metadata['node'] = node.name
                tf.metadata = tf.metadata.assign(**metadata)
                tf.metadata.index.name = tf.dataframe.index.names[1]
        
                thickets.append(tf)
        
        print(f"Total thickets created: {len(thickets)}")
        if thickets:
            print("Combining thickets...")
            return tt.Thicket.concat_thickets(thickets, disable_tqdm=True)
        raise ValueError("No thickets were created from the run.")
        
    def __str__(self):
        return f"Run(path={self.path}, nodes={[str(node) for node in self.nodes]})"

class Ensemble:
    def __init__(self, runs: List[Run]):
        self.runs = runs
        self.attributions = {}
        self.attributions_exclusive = {}
    
    @staticmethod
    def from_trace_paths(trace_paths: List[str], node_ranks: Dict[str, List[str]]):
        return ensemble_from_trace_path_helper(trace_paths, node_ranks)
 
    def refresh(self):
        return Ensemble([run.refresh() for run in self.runs])
    
    def compute_attribution(self, ranks_to_metrics: Dict[str, List[str]]={}, instantaneous_metrics=[]):
        if self.attributions != {}:
            print("Attributions already computed, returning cached results.")
            return self.attributions
        
        for run in self.runs:
            self.attributions[os.path.basename(os.path.dirname(run.path))] = run.compute_attribution(ranks_to_metrics, instantaneous_metrics)
        return self.attributions
 
    def compute_attribution_across_runs(self, ranks_to_metrics: Dict[str, List[str]] = {}, instantaneous_metrics = []):
        attributions = self.compute_attribution(ranks_to_metrics, instantaneous_metrics)
        if not attributions:
            return pd.DataFrame(columns=['Run', 'Node', 'Code Region', 'Metric', 'Value'])

        records = []
        for run_name, node_attribs in attributions.items():
            for node_name, df in node_attribs.items():
                if df is None or df.empty: 
                    continue
                code = df['Code Region'].to_numpy()
                for metric in df.columns:
                    if metric == 'Code Region': 
                        continue
                    vals = df[metric].to_numpy()
                    # Extend in bulk (no per-iteration concatenations)
                    records.extend(
                        {'Run': run_name, 'Node': node_name, 'Code Region': c, 'Metric': metric, 'Value': v}
                        for c, v in zip(code, vals)
                    )
        return pd.DataFrame.from_records(records)

    def compute_attribution_exclusive(self, ranks_to_metrics: Dict[str, List[str]] = {}, instantaneous_metrics = []):
        if self.attributions_exclusive != {}:
            print("Exclusive attributions already computed, returning cached results.")
            return self.attributions_exclusive
        
        for run in self.runs:
            run_name = os.path.basename(os.path.dirname(run.path))
            print(f"Computing EXCLUSIVE attribution for run {run_name}...")
            self.attributions_exclusive[run_name] = run.compute_attribution_exclusive(
                ranks_to_metrics,
                instantaneous_metrics,
            )
        return self.attributions_exclusive

    def compute_attribution_exclusive_across_runs(
        self,
        ranks_to_metrics: Dict[str, List[str]] = {},
        instantaneous_metrics = [],
    ):
        attributions = self.compute_attribution_exclusive(ranks_to_metrics, instantaneous_metrics)
        if not attributions:
            return pd.DataFrame(columns=['Run', 'Node', 'Code Region', 'Metric', 'Value'])

        records = []
        for run_name, node_attribs in attributions.items():
            for node_name, df in node_attribs.items():
                if df is None or df.empty:
                    continue
                code = df['Code Region'].to_numpy()
                for metric in df.columns:
                    if metric == 'Code Region':
                        continue
                    vals = df[metric].to_numpy()
                    records.extend(
                        {
                            'Run': run_name,
                            'Node': node_name,
                            'Code Region': c,
                            'Metric': metric,
                            'Value': v,
                        }
                        for c, v in zip(code, vals)
                    )
        return pd.DataFrame.from_records(records)
    
    def to_thicket(self) -> tt.Thicket:
        thickets = []
        for run in self.runs:
            name = os.path.basename(os.path.dirname(run.path))
            print(f"Creating Thicket for run {run.path} ({name})...")
            tf = run.to_thicket(trace=run.path, run=name)
            thickets.append(tf)
        if thickets:
            return tt.Thicket.concat_thickets(thickets, disable_tqdm=True)
        else:
            return tt.Thicket()


_global_executor = None

def get_threadpool(max_workers=None):
    global _global_executor
    if _global_executor is None:
        max_workers = max_workers or min(os.cpu_count(), 256)
        # print(f"Initializing global thread pool with {max_workers} workers...\r")
        _global_executor = ProcessPoolExecutor(max_workers=max_workers)
    return _global_executor

def run_from_trace_path_helper(args) -> Run:
    path, node_ranks = args
    return Run.from_trace_path(path, node_ranks)

def _load_metrics(args):
    path, node, mpath = args
    if not os.path.exists(mpath):
        raise FileNotFoundError(f"Metrics file {mpath} missing.")
    df = read_metrics_dataframe(mpath)
    assert df.columns.tolist() == ['Group', 'Metric Name', 'Time', 'Value'], f"Bad metrics columns in {mpath}: {df.columns.tolist()}"
    return path, node, df

def _load_callgraph(args):
    path, node, rank, cpath = args
    if not os.path.exists(cpath):
        raise FileNotFoundError(f"Call graph file {cpath} missing.")
    df = read_call_graph_dataframe(cpath)
    assert df.columns.tolist() == ['Thread', 'Group', 'Depth', 'Name', 'Start Time', 'End Time', 'Duration'], f"Bad callgraph columns in {cpath}: {df.columns.tolist()}"
    return path, node, rank, df

def ensemble_from_trace_path_helper(trace_paths: List[str], node_ranks: Dict[str, List[str]]) -> Ensemble:
    # Flatten all work across all runs (trace_to_csv + metrics + callgraphs)
    for path in trace_paths:
        if not os.path.exists(path):
            raise FileNotFoundError(f"Trace path {path} does not exist.")
    all_threads = [rank for ranks in node_ranks.values() for rank in ranks]

    exec_csv = get_threadpool(128)

    # Build tasks for metrics (one per (run,node)) and callgraphs (one per (run,node,rank))
    metrics_tasks = []
    callgraph_tasks = []
    for path in trace_paths:
        for node, ranks in node_ranks.items():
            print(f"Preparing trace data for run {path}, node {node}...")
            print(f'Metrics will be loaded from: {os.path.join(path, f"{ranks[0]}_metrics.csv")}')
            metrics_tasks.append((path, node, os.path.join(path, f"{ranks[0]}_metrics.csv")))
            for rank in ranks:
                callgraph_tasks.append((path, node, rank, os.path.join(path, f"{rank}_Master_thread_callgraph.csv")))

    exec_load = get_threadpool()
    fut_metrics = [exec_load.submit(_load_metrics, t) for t in metrics_tasks]
    fut_callgraphs = [exec_load.submit(_load_callgraph, t) for t in callgraph_tasks]

    metrics_by_run_node = defaultdict(dict)
    for f in tqdm(as_completed(fut_metrics), total=len(fut_metrics), desc="Loading metrics", leave=False):
        path, node, dfm = f.result()
        metrics_by_run_node[(path, node)] = dfm

    callgraph_by_run_node_rank = defaultdict(dict)
    for f in tqdm(as_completed(fut_callgraphs), total=len(fut_callgraphs), desc="Loading callgraphs", leave=False):
        path, node, rank, dfc = f.result()
        callgraph_by_run_node_rank[(path, node, rank)] = dfc

    # Assemble Run objects
    runs = []
    for path in trace_paths:
        node_objs = []
        for node, ranks in node_ranks.items():
            mdf = metrics_by_run_node.get((path, node))
            if mdf is None:
                raise ValueError(f"Missing metrics for run {path}, node {node}")
            rank_objs = []
            for rank in ranks:
                cdf = callgraph_by_run_node_rank.get((path, node, rank))
                if cdf is None:
                    raise ValueError(f"Missing callgraph for run {path}, node {node}, rank {rank}")
                rank_objs.append(Rank(node, rank, cdf))
            node_objs.append(Node(node, mdf, rank_objs))
        runs.append(Run(path, node_objs))
    
    return Ensemble(runs)

# First, replace all function names with the following function:
def replace_prefix_and_suffix(name: str) -> str:
    # name = name.replace('HPLMXP_', 'HPL_')  # Replace HPLMXP with HPL
    if '<' in name:
        name = name.split('<')[0]
    if '(' in name:
        name = name.split('(')[0]
    return name.strip()


def take_alphanumeric(s):
    # Take up to the first non-alphanumeric characters
    match = re.match(r'^[~<, >:a-zA-Z0-9_]+', s)
    return replace_prefix_and_suffix(match.group(0) if match else s)

# Begin Analysis!
Load in the rocHPL data

In [8]:
HPL_NODE_RANKS = {
    'node0': ['MPI Rank 0', 'MPI Rank 1', 'MPI Rank 2', 'MPI Rank 3', 'MPI Rank 4', 'MPI Rank 5', 'MPI Rank 6', 'MPI Rank 7'],
}
start_convert = time.time()
os.system("LD_LIBRARY_PATH=\"/opt/rocm-6.4.1/lib:/lustre/orion/csc688/world-shared/scorep-amd/install/lib:$LD_LIBRARY_PATH\" CHPL_RT_NUM_THREADS_PER_LOCALE=64 ./fast-OTF2/trace_to_csv_parallel_real -nl1 --trace ./frontier-1-node-single-HPL-run/traces.otf2 --outputDir ./frontier-1-node-single-HPL-run")
start_load = time.time()
ens = Ensemble.from_trace_paths(['./frontier-1-node-single-HPL-run'], HPL_NODE_RANKS)
end = time.time()

Number of locations: 45
Number of readers: 45
Event Summary: total events read across readers = 17056588 in 2.83616 seconds
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 7_Master_thread_callgraph.csv
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 3_Master_thread_callgraph.csv
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 5_Master_thread_callgraph.csv
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 6_Master_thread_callgraph.csv
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 4_Master_thread_callgraph.csv
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 2_Master_thread_callgraph.csv
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 0_Master_thread_callgraph.csv
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 0_UnknownLocation_callgraph.csv
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 1_Master_thread_callgraph.csv
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 0_metrics.csv
Writing: ./frontier-1-node-single-HPL-run/MPI Rank 3_metrics.csv
Writing: ./fron

                                                                 

In [9]:
print(f"Trace conversion finished in {start_load - start_convert:.2f} seconds.")
print(f"Loaded Ensemble in {end - start_load:.2f} seconds.")
print(f"Converted and loaded ensemble in {end - start_convert:.2f} seconds.")

Trace conversion finished in 10.54 seconds.
Loaded Ensemble in 1.57 seconds.
Converted and loaded ensemble in 12.11 seconds.


In [10]:
HPL_NODE_RANKS = {
    'node0': ['MPI Rank 0', 'MPI Rank 1', 'MPI Rank 2', 'MPI Rank 3', 'MPI Rank 4', 'MPI Rank 5', 'MPI Rank 6', 'MPI Rank 7'],
    'node1': ['MPI Rank 8', 'MPI Rank 9', 'MPI Rank 10', 'MPI Rank 11', 'MPI Rank 12', 'MPI Rank 13', 'MPI Rank 14', 'MPI Rank 15'],
    'node2': ['MPI Rank 16', 'MPI Rank 17', 'MPI Rank 18', 'MPI Rank 19', 'MPI Rank 20', 'MPI Rank 21', 'MPI Rank 22', 'MPI Rank 23'],
    'node3': ['MPI Rank 24', 'MPI Rank 25', 'MPI Rank 26', 'MPI Rank 27', 'MPI Rank 28', 'MPI Rank 29', 'MPI Rank 30', 'MPI Rank 31'],
    'node4': ['MPI Rank 32', 'MPI Rank 33', 'MPI Rank 34', 'MPI Rank 35', 'MPI Rank 36', 'MPI Rank 37', 'MPI Rank 38', 'MPI Rank 39'],
    'node5': ['MPI Rank 40', 'MPI Rank 41', 'MPI Rank 42', 'MPI Rank 43', 'MPI Rank 44', 'MPI Rank 45', 'MPI Rank 46', 'MPI Rank 47'],
    'node6': ['MPI Rank 48', 'MPI Rank 49', 'MPI Rank 50', 'MPI Rank 51', 'MPI Rank 52', 'MPI Rank 53', 'MPI Rank 54', 'MPI Rank 55'],
    'node7': ['MPI Rank 56', 'MPI Rank 57', 'MPI Rank 58', 'MPI Rank 59', 'MPI Rank 60', 'MPI Rank 61', 'MPI Rank 62', 'MPI Rank 63'],
    'node8': ['MPI Rank 64', 'MPI Rank 65', 'MPI Rank 66', 'MPI Rank 67', 'MPI Rank 68', 'MPI Rank 69', 'MPI Rank 70', 'MPI Rank 71'],
    'node9': ['MPI Rank 72', 'MPI Rank 73', 'MPI Rank 74', 'MPI Rank 75', 'MPI Rank 76', 'MPI Rank 77', 'MPI Rank 78', 'MPI Rank 79'],
    'node10': ['MPI Rank 80', 'MPI Rank 81', 'MPI Rank 82', 'MPI Rank 83', 'MPI Rank 84', 'MPI Rank 85', 'MPI Rank 86', 'MPI Rank 87'],
    'node11': ['MPI Rank 88', 'MPI Rank 89', 'MPI Rank 90', 'MPI Rank 91', 'MPI Rank 92', 'MPI Rank 93', 'MPI Rank 94', 'MPI Rank 95'],
    'node12': ['MPI Rank 96', 'MPI Rank 97', 'MPI Rank 98', 'MPI Rank 99', 'MPI Rank 100', 'MPI Rank 101', 'MPI Rank 102', 'MPI Rank 103'],
    'node13': ['MPI Rank 104', 'MPI Rank 105', 'MPI Rank 106', 'MPI Rank 107', 'MPI Rank 108', 'MPI Rank 109', 'MPI Rank 110', 'MPI Rank 111'],
    'node14': ['MPI Rank 112', 'MPI Rank 113', 'MPI Rank 114', 'MPI Rank 115', 'MPI Rank 116', 'MPI Rank 117', 'MPI Rank 118', 'MPI Rank 119'],
    'node15': ['MPI Rank 120', 'MPI Rank 121', 'MPI Rank 122', 'MPI Rank 123', 'MPI Rank 124', 'MPI Rank 125', 'MPI Rank 126', 'MPI Rank 127'],
}

start_convert = time.time()
os.system("LD_LIBRARY_PATH=\"/opt/rocm-6.4.1/lib:/lustre/orion/csc688/world-shared/scorep-amd/install/lib:$LD_LIBRARY_PATH\" CHPL_RT_NUM_THREADS_PER_LOCALE=64 ./fast-OTF2/trace_to_csv_parallel_real -nl1 --trace ./frontier-16-node-single-HPL-run/traces.otf2 --outputDir ./frontier-16-node-single-HPL-run")
start_load = time.time()
ens = Ensemble.from_trace_paths(['./frontier-16-node-single-HPL-run'], HPL_NODE_RANKS)
end = time.time()

Number of locations: 816
Number of readers: 64
Event Summary: total events read across readers = 848806084 in 104.969 seconds
Writing: ./frontier-16-node-single-HPL-run/MPI Rank 17_Master_thread_callgraph.csv
Writing: ./frontier-16-node-single-HPL-run/MPI Rank 11_Master_thread_callgraph.csv
Writing: ./frontier-16-node-single-HPL-run/MPI Rank 127_Master_thread_callgraph.csv
Writing: ./frontier-16-node-single-HPL-run/MPI Rank 121_Master_thread_callgraph.csv
Writing: ./frontier-16-node-single-HPL-run/MPI Rank 102_Master_thread_callgraph.csv
Writing: ./frontier-16-node-single-HPL-run/MPI Rank 107_Master_thread_callgraph.csv
Writing: ./frontier-16-node-single-HPL-run/MPI Rank 123_Master_thread_callgraph.csv
Writing: ./frontier-16-node-single-HPL-run/MPI Rank 106_Master_thread_callgraph.csv
Writing: ./frontier-16-node-single-HPL-run/MPI Rank 103_Master_thread_callgraph.csv
Writing: ./frontier-16-node-single-HPL-run/MPI Rank 105_Master_thread_callgraph.csv
Writing: ./frontier-16-node-single-H

                                                                     

In [11]:
print(f"Trace conversion finished in {start_load - start_convert:.2f} seconds.")
print(f"Loaded Ensemble in {end - start_load:.2f} seconds.")
print(f"Converted and loaded ensemble in {end - start_convert:.2f} seconds.")

Trace conversion finished in 292.81 seconds.
Loaded Ensemble in 23.30 seconds.
Converted and loaded ensemble in 316.11 seconds.


In [12]:
HPL_NODE_RANKS = {
    'node0': ['MPI Rank 0', 'MPI Rank 1', 'MPI Rank 2', 'MPI Rank 3', 'MPI Rank 4', 'MPI Rank 5', 'MPI Rank 6', 'MPI Rank 7'],
}
start_convert = time.time()
os.system("cd frontier-1-node-single-HPL-run/ && LD_LIBRARY_PATH=\"/opt/rocm-6.4.1/lib:/lustre/orion/csc688/world-shared/scorep-amd/install/lib:$LD_LIBRARY_PATH\" ../otf2csv ./traces.otf2")
start_load = time.time()
ens = Ensemble.from_trace_paths(['./frontier-1-node-single-HPL-run'], HPL_NODE_RANKS)
end = time.time()

Option enabled: Skipping consecutive duplicate metric values.
Processing events...
CSV conversion completed in 9.000000 seconds.
Preparing trace data for run ./frontier-1-node-single-HPL-run, node node0...
Metrics will be loaded from: ./frontier-1-node-single-HPL-run/MPI Rank 0_metrics.csv


                                                                 

In [13]:
print(f"Trace conversion finished in {start_load - start_convert:.2f} seconds.")
print(f"Loaded Ensemble in {end - start_load:.2f} seconds.")
print(f"Converted and loaded ensemble in {end - start_convert:.2f} seconds.")

Trace conversion finished in 8.80 seconds.
Loaded Ensemble in 0.64 seconds.
Converted and loaded ensemble in 9.44 seconds.


In [14]:
HPL_NODE_RANKS = {
    'node0': ['MPI Rank 0', 'MPI Rank 1', 'MPI Rank 2', 'MPI Rank 3', 'MPI Rank 4', 'MPI Rank 5', 'MPI Rank 6', 'MPI Rank 7'],
    'node1': ['MPI Rank 8', 'MPI Rank 9', 'MPI Rank 10', 'MPI Rank 11', 'MPI Rank 12', 'MPI Rank 13', 'MPI Rank 14', 'MPI Rank 15'],
    'node2': ['MPI Rank 16', 'MPI Rank 17', 'MPI Rank 18', 'MPI Rank 19', 'MPI Rank 20', 'MPI Rank 21', 'MPI Rank 22', 'MPI Rank 23'],
    'node3': ['MPI Rank 24', 'MPI Rank 25', 'MPI Rank 26', 'MPI Rank 27', 'MPI Rank 28', 'MPI Rank 29', 'MPI Rank 30', 'MPI Rank 31'],
    'node4': ['MPI Rank 32', 'MPI Rank 33', 'MPI Rank 34', 'MPI Rank 35', 'MPI Rank 36', 'MPI Rank 37', 'MPI Rank 38', 'MPI Rank 39'],
    'node5': ['MPI Rank 40', 'MPI Rank 41', 'MPI Rank 42', 'MPI Rank 43', 'MPI Rank 44', 'MPI Rank 45', 'MPI Rank 46', 'MPI Rank 47'],
    'node6': ['MPI Rank 48', 'MPI Rank 49', 'MPI Rank 50', 'MPI Rank 51', 'MPI Rank 52', 'MPI Rank 53', 'MPI Rank 54', 'MPI Rank 55'],
    'node7': ['MPI Rank 56', 'MPI Rank 57', 'MPI Rank 58', 'MPI Rank 59', 'MPI Rank 60', 'MPI Rank 61', 'MPI Rank 62', 'MPI Rank 63'],
    'node8': ['MPI Rank 64', 'MPI Rank 65', 'MPI Rank 66', 'MPI Rank 67', 'MPI Rank 68', 'MPI Rank 69', 'MPI Rank 70', 'MPI Rank 71'],
    'node9': ['MPI Rank 72', 'MPI Rank 73', 'MPI Rank 74', 'MPI Rank 75', 'MPI Rank 76', 'MPI Rank 77', 'MPI Rank 78', 'MPI Rank 79'],
    'node10': ['MPI Rank 80', 'MPI Rank 81', 'MPI Rank 82', 'MPI Rank 83', 'MPI Rank 84', 'MPI Rank 85', 'MPI Rank 86', 'MPI Rank 87'],
    'node11': ['MPI Rank 88', 'MPI Rank 89', 'MPI Rank 90', 'MPI Rank 91', 'MPI Rank 92', 'MPI Rank 93', 'MPI Rank 94', 'MPI Rank 95'],
    'node12': ['MPI Rank 96', 'MPI Rank 97', 'MPI Rank 98', 'MPI Rank 99', 'MPI Rank 100', 'MPI Rank 101', 'MPI Rank 102', 'MPI Rank 103'],
    'node13': ['MPI Rank 104', 'MPI Rank 105', 'MPI Rank 106', 'MPI Rank 107', 'MPI Rank 108', 'MPI Rank 109', 'MPI Rank 110', 'MPI Rank 111'],
    'node14': ['MPI Rank 112', 'MPI Rank 113', 'MPI Rank 114', 'MPI Rank 115', 'MPI Rank 116', 'MPI Rank 117', 'MPI Rank 118', 'MPI Rank 119'],
    'node15': ['MPI Rank 120', 'MPI Rank 121', 'MPI Rank 122', 'MPI Rank 123', 'MPI Rank 124', 'MPI Rank 125', 'MPI Rank 126', 'MPI Rank 127'],
}

start_convert = time.time()
os.system("cd frontier-16-node-single-HPL-run/ && LD_LIBRARY_PATH=\"/opt/rocm-6.4.1/lib:/lustre/orion/csc688/world-shared/scorep-amd/install/lib:$LD_LIBRARY_PATH\" ../otf2csv ./traces.otf2")
start_load = time.time()
ens = Ensemble.from_trace_paths(['./frontier-16-node-single-HPL-run'], HPL_NODE_RANKS)
end = time.time()

Option enabled: Skipping consecutive duplicate metric values.
Processing events...
CSV conversion completed in 658.000000 seconds.
Preparing trace data for run ./frontier-16-node-single-HPL-run, node node0...
Metrics will be loaded from: ./frontier-16-node-single-HPL-run/MPI Rank 0_metrics.csv
Preparing trace data for run ./frontier-16-node-single-HPL-run, node node1...
Metrics will be loaded from: ./frontier-16-node-single-HPL-run/MPI Rank 8_metrics.csv
Preparing trace data for run ./frontier-16-node-single-HPL-run, node node2...
Metrics will be loaded from: ./frontier-16-node-single-HPL-run/MPI Rank 16_metrics.csv
Preparing trace data for run ./frontier-16-node-single-HPL-run, node node3...
Metrics will be loaded from: ./frontier-16-node-single-HPL-run/MPI Rank 24_metrics.csv
Preparing trace data for run ./frontier-16-node-single-HPL-run, node node4...
Metrics will be loaded from: ./frontier-16-node-single-HPL-run/MPI Rank 32_metrics.csv
Preparing trace data for run ./frontier-16-nod

                                                                     

In [15]:
print(f"Trace conversion finished in {start_load - start_convert:.2f} seconds.")
print(f"Loaded Ensemble in {end - start_load:.2f} seconds.")
print(f"Converted and loaded ensemble in {end - start_convert:.2f} seconds.")

Trace conversion finished in 657.94 seconds.
Loaded Ensemble in 20.38 seconds.
Converted and loaded ensemble in 678.33 seconds.


In [16]:
# Get the end time of the notebook
NOTEBOOK_END_TIME = time.time()
print(f"Notebook end time: {NOTEBOOK_END_TIME}")
print(f"Notebook end at {NOTEBOOK_END_TIME} ({datetime.datetime.fromtimestamp(NOTEBOOK_END_TIME)})")

NOTEBOOK_DURATION = NOTEBOOK_END_TIME - NOTEBOOK_START_TIME
print(f"Notebook execution time: {NOTEBOOK_DURATION:.2f} seconds")

Notebook end time: 1767994863.4637973
Notebook end at 1767994863.4637973 (2026-01-09 16:41:03.463797)
Notebook execution time: 1016.33 seconds
