# Metrics calculation and visualization

## Metrics calculation functions

In [None]:
import numpy as np
from typing import Sequence, Dict
from sklearn.metrics import (
    mean_absolute_error,
    accuracy_score,
)
from scipy.stats import pearsonr
from pathlib import Path
import pandas as pd
import re


In [None]:

def _to_np(x: Sequence[int]) -> np.ndarray:
    """Convert to 1D numpy array of int."""
    arr = np.asarray(x, dtype=np.int64).reshape(-1)
    return arr

def r2_from_corr(y_true, y_pred):
    y_true = np.asarray(y_true, dtype=float)
    y_pred = np.asarray(y_pred, dtype=float)

    if y_true.shape != y_pred.shape:
        raise ValueError(f"Shape mismatch: y_true {y_true.shape}, y_pred {y_pred.shape}")

    r, _ = pearsonr(y_true, y_pred)
    return r ** 2

def frame_regression_metrics(
    y_true: Sequence[int],
    y_pred: Sequence[int],
) -> Dict[str, float]:
    
    y_true = _to_np(y_true)
    y_pred = _to_np(y_pred)

    mae = mean_absolute_error(y_true, y_pred)
    r2_corr = r2_from_corr(y_true, y_pred)

    return {"MAE": mae, "R2_corr": r2_corr}


In [None]:

def first_non_nan(series):
    s = series.dropna()
    return s.iloc[0] if not s.empty else np.nan

def analyze_single_file(path, aggregate_runs=True):
    """
    Adaptively analyze a single result file:
    - Load the JSONL file.
    - Check which of the four columns (gt_edv / gt_esv / pred_edv / pred_esv) contain valid numeric values.
    - Compute the corresponding metrics for any available “GT + Pred” pair:
    - If only EDV has a valid pair: compute EDV regression metrics only.
    - If only ESV has a valid pair: compute ESV regression metrics only.
    Return:
        df_num: the original DataFrame with numeric conversion applied, keeping all rows for plotting.
        metrics: a dict of metrics, or None.
    """
    path = Path(path)
    df = pd.read_json(path, lines=True)

    for col in ["gt_edv", "gt_esv", "pred_edv", "pred_esv"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
        else:
            # still keep non-existing columns for later processing
            df[col] = np.nan

    # aggregate multiple runs for the same video_index
    if aggregate_runs and "run_id" in df.columns:
                
        agg_dict = {}
        for col in df.columns:
            if col in ["gt_edv", "gt_esv"]:
                agg_dict[col] = first_non_nan
            elif col in ["pred_edv", "pred_esv", "latency_sec"]:
                agg_dict[col] = "mean"
            elif col == "run_id":
                agg_dict[col] = "nunique"
            else:
                agg_dict[col] = "first"

        df = df.groupby("video_index", as_index=False).agg(agg_dict)

    mask_gt_edv   = df["gt_edv"].notna()
    mask_gt_esv   = df["gt_esv"].notna()
    mask_pred_edv = df["pred_edv"].notna()
    mask_pred_esv = df["pred_esv"].notna()

    n_gt_edv   = int(mask_gt_edv.sum())
    n_gt_esv   = int(mask_gt_esv.sum())

    n_pred_edv = int(mask_pred_edv.sum())
    n_pred_esv = int(mask_pred_esv.sum())

    mask_edv_pair = mask_gt_edv & mask_pred_edv
    mask_esv_pair = mask_gt_esv & mask_pred_esv

    n_edv_pair = int(mask_edv_pair.sum())
    n_esv_pair = int(mask_esv_pair.sum())


    if n_edv_pair == 0 and n_esv_pair == 0:
        print(f"[WARN] {path.name}: no EDV or ESV pairs with both GT and Pred. Skip metrics.")
        return df, None

    metrics = {
        "frame_errors": {},
    }

    # calculate for EDV and ESV separately
    if n_edv_pair > 0:
        edv_true = df.loc[mask_edv_pair, "gt_edv"].values
        edv_pred = df.loc[mask_edv_pair, "pred_edv"].values

        metrics["frame_errors"]["EDV"] = frame_regression_metrics(edv_true, edv_pred)


    if n_esv_pair > 0:
        esv_true = df.loc[mask_esv_pair, "gt_esv"].values
        esv_pred = df.loc[mask_esv_pair, "pred_esv"].values

        metrics["frame_errors"]["ESV"] = frame_regression_metrics(esv_true, esv_pred)

    return df, metrics


In [None]:
def compute_binary_metrics(y_true, y_pred):
    """
    y_true, y_pred: 1D numpy array, values in {0,1}
    Returns:
      - accuracy, precision, recall, f1
      - confusion matrix (tn, fp, fn, tp)
    """
    y_true = np.asarray(y_true).astype(int)
    y_pred = np.asarray(y_pred).astype(int)

    assert set(np.unique(y_true)).issubset({0, 1})
    assert set(np.unique(y_pred)).issubset({0, 1})

    tp = int(((y_true == 1) & (y_pred == 1)).sum())
    tn = int(((y_true == 0) & (y_pred == 0)).sum())
    fp = int(((y_true == 0) & (y_pred == 1)).sum())
    fn = int(((y_true == 1) & (y_pred == 0)).sum())

    total = tp + tn + fp + fn
    accuracy = (tp + tn) / total if total > 0 else np.nan
    precision = tp / (tp + fp) if (tp + fp) > 0 else np.nan
    recall    = tp / (tp + fn) if (tp + fn) > 0 else np.nan
    if np.isnan(precision) or np.isnan(recall) or (precision + recall) == 0:
        f1 = np.nan
    else:
        f1 = 2 * precision * recall / (precision + recall)

    return {
        "n_samples": int(total),
        "accuracy": float(accuracy),
        "precision": float(precision) if not np.isnan(precision) else np.nan,
        "recall": float(recall) if not np.isnan(recall) else np.nan,
        "f1": float(f1) if not np.isnan(f1) else np.nan,
        "confusion_matrix": {
            "tn": tn,
            "fp": fp,
            "fn": fn,
            "tp": tp,
        },
    }

def analyze_single_file_binary(path, aggregate_runs=True):
    """
    For non-medical binary task:

    Returns:
      - df_num: DataFrame
      - metrics: dict or None
    """
    path = Path(path)
    df = pd.read_json(path, lines=True)

    # Convert to numeric, keep NaN
    for col in ["gt_edv", "gt_esv", "pred_edv", "pred_esv"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
        else:
            df[col] = np.nan

    # Determine if this is an EDV or ESV task
    n_pred_edv_non_nan = int(df["pred_edv"].notna().sum())
    n_pred_esv_non_nan = int(df["pred_esv"].notna().sum())

    if n_pred_edv_non_nan > 0 and n_pred_esv_non_nan == 0:
        task = "EDV"
        gt_col = "gt_edv"
        pred_col = "pred_edv"
    elif n_pred_esv_non_nan > 0 and n_pred_edv_non_nan == 0:
        task = "ESV"
        gt_col = "gt_esv"
        pred_col = "pred_esv"

    print(
        f"[INFO] {path.name}: detected binary task={task}, "
        f"using GT={gt_col}, PRED={pred_col}"
    )

    mask_pair = df[gt_col].notna() & df[pred_col].notna()
    n_pair = int(mask_pair.sum())
    print(f"[INFO] {path.name}: raw n_gt_pred_pair={n_pair}")

    if n_pair == 0:
        print(f"[WARN] {path.name}: no valid ({gt_col}, {pred_col}) pairs. Skip metrics.")
        return df, None

    # aggregate multiple run_id to video_index level
    if aggregate_runs and "run_id" in df.columns and "video_index" in df.columns:
        agg_dict = {}
        for col in df.columns:
            if col == gt_col:
                agg_dict[col] = first_non_nan
            elif col == pred_col:
                agg_dict[col] = "mean"
            elif col == "run_id":
                agg_dict[col] = "nunique"
            else:
                agg_dict[col] = "first"

        df = df.groupby("video_index", as_index=False).agg(agg_dict)

        mask_pair = df[gt_col].notna() & df[pred_col].notna()
        n_pair = int(mask_pair.sum())
        print(f"[INFO] {path.name} after aggregation: n_gt_pred_pair={n_pair}")
        if n_pair == 0:
            print(
                f"[WARN] {path.name}: no valid ({gt_col}, {pred_col}) pairs "
                "after aggregation. Skip metrics."
            )
            return df, None

    y_true = df.loc[mask_pair, gt_col].values
    y_pred = df.loc[mask_pair, pred_col].values

    metrics_binary = compute_binary_metrics(y_true, y_pred)

    print(
        f"[METRICS] {path.name} [{task}]: "
        f"n={metrics_binary['n_samples']}, "
        f"acc={metrics_binary['accuracy']:.4f}, "
        f"prec={metrics_binary['precision']:.4f}, "
        f"rec={metrics_binary['recall']:.4f}, "
        f"f1={metrics_binary['f1']:.4f}"
    )

    metrics = {
        "task": task,
        "binary_metrics": metrics_binary,
    }
    return df, metrics


In [None]:

def out_pattern_region(prompt_end="esv_edv"):
    if prompt_end == "edv_esv":
        return re.compile(r"LARGEST_AREA_INDEX\s*=\s*(-?\d+)\s*,\s*SMALLEST_AREA_INDEX\s*=\s*(-?\d+)", re.IGNORECASE)
    elif prompt_end == "esv_edv":
        return re.compile(r"SMALLEST_AREA_INDEX\s*=\s*(-?\d+)\s*,\s*LARGEST_AREA_INDEX\s*=\s*(-?\d+)", re.IGNORECASE)
    elif prompt_end == "edv":
        return re.compile(r"LARGEST_AREA_INDEX\s*=\s*(-?\d+)", re.IGNORECASE)
    elif prompt_end == "esv":
        return re.compile(r"SMALLEST_AREA_INDEX\s*=\s*(-?\d+)", re.IGNORECASE)

In [None]:


def analyze_single_file_region(path, tol_frame=3, aggregate_runs=True):
    """
    Adaptive analysis of a single result file:
      1. Read jsonl
      2. Determine which of the columns gt_edv / gt_esv / pred_edv / pred_esv have valid numeric values
      3. Calculate corresponding metrics for columns with "GT + Pred pairs"
         - Only EDV has pairs -> calculate regression and hit_rate for EDV only
         - Only ESV has pairs -> calculate regression and hit_rate for ESV only
         - Both columns have pairs -> additionally calculate pair_hit_rate
      4. Returns:
         - df_num: numeric and aggregated DataFrame
         - metrics: dict or None
    """
    path = Path(path)
    filename = path.name
    df = pd.read_json(path, lines=True)

    for col in ["gt_edv", "gt_esv", "pred_edv", "pred_esv"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
        else:
            # still keep non-existing columns for later processing
            df[col] = np.nan

    if "segmented" in filename:
        task = filename.replace("results_", "").replace("_video_random_seed", "").replace("_segmented_echo", "").replace(".jsonl", "")
    else:
        task = filename.replace("results_", "").replace("_video_random_seed", "").replace("_echo", "").replace(".jsonl", "")
    seed = task.split("_")[-1]

    task_end = task.split("_", 1)[-1].replace(f"_{seed}", "")
    print(f"[INFO] {path.name}: detected task_end={task_end}, seed={seed}")
    outpat = out_pattern_region(prompt_end=task_end)
        
    index = 0
    for row in list(df['output']):
        match = outpat.search(row)
        if match:
            if task_end == "edv_esv":
                edv_index = int(match.group(1))
                esv_index = int(match.group(2))
                df.at[index, 'pred_edv'] = edv_index
                df.at[index, 'pred_esv'] = esv_index
            elif task_end == "esv_edv":
                esv_index = int(match.group(1))
                edv_index = int(match.group(2))
                df.at[index, 'pred_edv'] = edv_index
                df.at[index, 'pred_esv'] = esv_index
            elif task_end == "edv":
                edv_index = int(match.group(1))
                df.at[index, 'pred_edv'] = edv_index
            elif task_end == "esv":
                esv_index = int(match.group(1))
                df.at[index, 'pred_esv'] = esv_index
        index += 1

    # aggregate multiple runs for the same video_index
    if aggregate_runs and "run_id" in df.columns:
                
        agg_dict = {}
        for col in df.columns:
            if col in ["gt_edv", "gt_esv"]:
                agg_dict[col] = first_non_nan
            elif col in ["pred_edv", "pred_esv", "latency_sec"]:
                agg_dict[col] = "mean"
            elif col == "run_id":
                agg_dict[col] = "nunique"
            else:
                agg_dict[col] = "first"

        df = df.groupby("video_index", as_index=False).agg(agg_dict)

    mask_gt_edv   = df["gt_edv"].notna()
    mask_gt_esv   = df["gt_esv"].notna()
    mask_pred_edv = df["pred_edv"].notna()
    mask_pred_esv = df["pred_esv"].notna()

    n_gt_edv   = int(mask_gt_edv.sum())
    n_gt_esv   = int(mask_gt_esv.sum())

    n_pred_edv = int(mask_pred_edv.sum())
    n_pred_esv = int(mask_pred_esv.sum())

    mask_edv_pair = mask_gt_edv & mask_pred_edv
    mask_esv_pair = mask_gt_esv & mask_pred_esv

    n_edv_pair = int(mask_edv_pair.sum())
    n_esv_pair = int(mask_esv_pair.sum())

    print(f"[INFO] {path.name}: "
          f"n_gt_edv={n_gt_edv}, n_pred_edv={n_pred_edv}, n_edv_pair={n_edv_pair}; "
          f"n_gt_esv={n_gt_esv}, n_pred_esv={n_pred_esv}, n_esv_pair={n_esv_pair}")

    if n_edv_pair == 0 and n_esv_pair == 0:
        print(f"[WARN] {path.name}: no EDV or ESV pairs with both GT and Pred. Skip metrics.")
        return df, None
    
    elif n_edv_pair == 1:
        print(f"[WARN] {path.name}: only one EDV pairs with both GT and Pred. Skip metrics.")
        return df, None
    
    elif n_esv_pair == 1:
        print(f"[WARN] {path.name}: only one ESV pairs with both GT and Pred. Skip metrics.")
        return df, None

    metrics = {
        "frame_errors": {},
        "seed": seed,
    }

    # calculate for EDV and ESV separately
    if n_edv_pair > 1:
        edv_true = df.loc[mask_edv_pair, "gt_edv"].values
        edv_pred = df.loc[mask_edv_pair, "pred_edv"].values

        metrics["frame_errors"]["EDV"] = frame_regression_metrics(edv_true, edv_pred)

    if n_esv_pair > 1:
        esv_true = df.loc[mask_esv_pair, "gt_esv"].values
        esv_pred = df.loc[mask_esv_pair, "pred_esv"].values

        metrics["frame_errors"]["ESV"] = frame_regression_metrics(esv_true, esv_pred)

    return df, metrics


In [None]:
def analyze_single_file_binary_region(path, aggregate_runs=True):
    """
    For non-medical binary task

    Returns:
      - df_num: DataFrame
      - metrics: dict or None
    """
    path = Path(path)
    filename = path.name
    df = pd.read_json(path, lines=True)

    for col in ["gt_edv", "gt_esv", "pred_edv", "pred_esv"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
        else:
            df[col] = np.nan

    if "segmented" in filename:
        task = filename.replace("results_", "").replace("_frame_random_seed", "").replace("_segmented_echo", "").replace(".jsonl", "")
    else:
        task = filename.replace("results_", "").replace("_frame_random_seed", "").replace("_echo", "").replace(".jsonl", "")
    seed = task.split("_")[-1]

    task_end = task.split("_", 1)[-1].replace(f"_{seed}", "")
    outpat = out_pattern_region(prompt_end=task_end)
        
    index = 0
    for row in list(df['output']):
        match = outpat.search(row)
        if match:
            if task_end == "edv_esv":
                edv_index = int(match.group(1))
                esv_index = int(match.group(2))
                df.at[index, 'pred_edv'] = edv_index
                df.at[index, 'pred_esv'] = esv_index
            elif task_end == "esv_edv":
                esv_index = int(match.group(1))
                edv_index = int(match.group(2))
                df.at[index, 'pred_edv'] = edv_index
                df.at[index, 'pred_esv'] = esv_index
            elif task_end == "edv":
                edv_index = int(match.group(1))
                df.at[index, 'pred_edv'] = edv_index
            elif task_end == "esv":
                esv_index = int(match.group(1))
                df.at[index, 'pred_esv'] = esv_index
        index += 1

    n_pred_edv_non_nan = int(df["pred_edv"].notna().sum())
    n_pred_esv_non_nan = int(df["pred_esv"].notna().sum())

    if n_pred_edv_non_nan > 0 and n_pred_esv_non_nan == 0:
        task = "EDV"
        gt_col = "gt_edv"
        pred_col = "pred_edv"
    elif n_pred_esv_non_nan > 0 and n_pred_edv_non_nan == 0:
        task = "ESV"
        gt_col = "gt_esv"
        pred_col = "pred_esv"

    print(
        f"[INFO] {path.name}: detected binary task={task}, "
        f"using GT={gt_col}, PRED={pred_col}"
    )

    mask_pair = df[gt_col].notna() & df[pred_col].notna()
    n_pair = int(mask_pair.sum())
    print(f"[INFO] {path.name}: raw n_gt_pred_pair={n_pair}")

    if n_pair == 0:
        print(f"[WARN] {path.name}: no valid ({gt_col}, {pred_col}) pairs. Skip metrics.")
        return df, None

    # aggregate multiple runs for the same video_index
    if aggregate_runs and "run_id" in df.columns and "video_index" in df.columns:
        agg_dict = {}
        for col in df.columns:
            if col == gt_col:
                agg_dict[col] = first_non_nan
            elif col == pred_col:
                # multiple runs: average then round -> 0/1. Equivalent to majority voting
                agg_dict[col] = "mean"
            elif col == "run_id":
                agg_dict[col] = "nunique"
            else:
                agg_dict[col] = "first"

        df = df.groupby("video_index", as_index=False).agg(agg_dict)

        # construct valid pair mask again
        mask_pair = df[gt_col].notna() & df[pred_col].notna()
        n_pair = int(mask_pair.sum())
        print(f"[INFO] {path.name} after aggregation: n_gt_pred_pair={n_pair}")
        if n_pair == 0:
            print(
                f"[WARN] {path.name}: no valid ({gt_col}, {pred_col}) pairs "
                "after aggregation. Skip metrics."
            )
            return df, None

    # final arrays for metrics
    y_true = df.loc[mask_pair, gt_col].values
    y_pred = df.loc[mask_pair, pred_col].values

    metrics_binary = compute_binary_metrics(y_true, y_pred)

    print(
        f"[METRICS] {path.name} [{task}]: "
        f"n={metrics_binary['n_samples']}, "
        f"acc={metrics_binary['accuracy']:.4f}, "
        f"prec={metrics_binary['precision']:.4f}, "
        f"rec={metrics_binary['recall']:.4f}, "
        f"f1={metrics_binary['f1']:.4f}"
    )

    metrics = {
        "task": task,
        "binary_metrics": metrics_binary,
    }
    return df, metrics



## metrics calculating

In [None]:
# example usage
file_dir = "your/path/to/results/"
dir_list = [
    'LLaVA-NeXT-Video-34B',
 'LLaVA-NeXT-Video-7B',
 'Qwen3-VL-32B',
 'Qwen3-VL-8B',
 'gemma-3n',
 'llava-interleave'
]

In [None]:
for dirname in dir_list:
    file_list = os.listdir(os.path.join(file_dir, dirname))
    file_list.sort()
    print(f"Processing directory: {dirname}")

    metric_list = []
    for filename in file_list:
        if "video" in filename:
            # video: EDV/ESV localization task
            result_file = os.path.join(file_dir, dirname, filename)
            print(f"Processing result file: {result_file}")
            df_num, metrics = analyze_single_file(result_file)
            
        elif "frame" in filename:
            # frame: binary classification task
            result_file = os.path.join(file_dir, dirname, filename)
            print(f"Processing result file: {result_file}")
            df_num, metrics = analyze_single_file_binary(result_file)
        
        metric_list.append({
                "file_name": filename,
                "result_file": result_file,
                "metrics": metrics,
            })

    df = pd.json_normalize(metric_list, sep="_")
    if "Qwen" in dirname:
        df['task_type'] = df['file_name'].str.replace(f'results_{dirname}-Instruct_', '').str.replace('echo_', '').str.replace('.jsonl', '')
    elif "NeXT" in dirname:
        df['task_type'] = df['file_name'].str.replace(f'results_{dirname}-hf_', '').str.replace('echo_', '').str.replace('.jsonl', '')
    elif "gemma" in dirname:
        df['task_type'] = df['file_name'].str.replace(f'results_{dirname}-e4b-it_', '').str.replace('echo_', '').str.replace('.jsonl', '')
    elif "interleave" in dirname:
        df['task_type'] = df['file_name'].str.replace(f'results_{dirname}-qwen-7b-hf_', '').str.replace('echo_', '').str.replace('.jsonl', '')
    df['model_name'] = dirname

    # choose and rename columns for output
    df_reg = df[['task_type', 'metrics_frame_errors_EDV_MAE', 'metrics_frame_errors_EDV_R2_corr',
            'metrics_frame_errors_ESV_MAE', 'metrics_frame_errors_ESV_RMSE', 'metrics_frame_errors_ESV_R2_score',
            'metrics_frame_errors_ESV_R2_corr','model_name']].copy().dropna(how="all", subset=['metrics_frame_errors_EDV_MAE',
            'metrics_frame_errors_EDV_R2_corr', 'metrics_frame_errors_ESV_MAE', 'metrics_frame_errors_ESV_RMSE',
            'metrics_frame_errors_ESV_R2_score', 'metrics_frame_errors_ESV_R2_corr',])
    df_reg.columns = df_reg.columns.str.replace('metrics_frame_errors_', '', regex=False)
    df_reg.to_csv(f"your/path/to/{dirname}_regression_metrics.csv", index=False)

    df_bin = df[['task_type', 'metrics_binary_metrics_accuracy',
        'metrics_binary_metrics_precision', 'metrics_binary_metrics_recall',
        'metrics_binary_metrics_f1', 'model_name']].copy().dropna(how="all", subset=['metrics_binary_metrics_accuracy',
        'metrics_binary_metrics_precision', 'metrics_binary_metrics_recall',
        'metrics_binary_metrics_f1',])
    df_bin.columns = df_bin.columns.str.replace('metrics_binary_metrics_', '', regex=False)
    df_bin.to_csv(f"your/path/to/{dirname}_binary_metrics.csv", index=False)


## Monte Carlo baseline calculate

### Frame localization random baseline

In [None]:
import numpy as np

def random_predict_edv_esv(num_frames):

    pred_edv = []
    pred_esv = []
    for T in num_frames:
        pred_edv.append(np.random.randint(0, T))
        pred_esv.append(np.random.randint(0, T))
    return np.array(pred_edv), np.array(pred_esv)


In [5]:
annotation_file = "/home/dili10/scripts/vlm_private/segmented_frame_locate_annotation.csv"
annotations_df = pd.read_csv(annotation_file)

video_names = (
        annotations_df["FileName"]
        .dropna()
        .drop_duplicates()
        .tolist()
    )

videos = video_names[:100]

In [8]:
video_lengths = []
gt_edv_list = []
gt_esv_list = []
for vid_idx, vn in enumerate(videos):
    video_data = annotations_df[annotations_df['FileName'] == vn]
    
    edv_idx = video_data.loc[video_data['Label'] == 'EDV', 'Frame'].iloc[0]
    esv_idx = video_data.loc[video_data['Label'] == 'ESV', 'Frame'].iloc[0]
    gt_edv_list.append(edv_idx)
    gt_esv_list.append(esv_idx)

    pick_length = (abs(edv_idx - esv_idx) + 1) * 7 // 3
    pick_range = range(int(pick_length))
    video_lengths.append(pick_length)


In [10]:
n_runs = 10000
mae_edv_random = []
mae_esv_random = []

for _ in range(n_runs):
    pred_edv_r, pred_esv_r = random_predict_edv_esv(video_lengths)
    err_edv = np.abs(pred_edv_r - np.array(gt_edv_list))
    err_esv = np.abs(pred_esv_r - np.array(gt_esv_list))
    mae_edv_random.append(err_edv.mean())
    mae_esv_random.append(err_esv.mean())

mae_edv_random = np.array(mae_edv_random)
mae_esv_random = np.array(mae_esv_random)

In [12]:
mean_random_edv = mae_edv_random.mean()
low_95_edv = np.percentile(mae_edv_random, 2.5)
high_95_edv = np.percentile(mae_edv_random, 97.5)
mean_random_esv = mae_esv_random.mean()
low_95_esv = np.percentile(mae_esv_random, 2.5)
high_95_esv = np.percentile(mae_esv_random, 97.5)

In [13]:
low_95_edv, high_95_edv, low_95_esv, high_95_esv, mean_random_edv, mean_random_esv

(np.float64(50.92),
 np.float64(55.64),
 np.float64(66.32975),
 np.float64(71.01),
 np.float64(53.271193999999994),
 np.float64(68.63563099999999))

### Binary task random baseline

In [None]:
import re
import numpy as np
import pandas as pd


all_df = pd.read_csv("your/path/to/binary_results.csv")

def extract_pred_esv(row):
    """
    Use pred_esv column if available; otherwise parse from output text.
    """

    if pd.notna(row["pred_esv"]):
        return int(row["pred_esv"])

    text = str(row["output"])
    m = re.search(r"SMALLEST_AREA_INDEX\s*=\s*([01])", text)
    if m:
        return int(m.group(1))

    # if neither not found, return NaN
    return np.nan

# prepare all_df with y_true and y_pred
all_df["y_true"] = all_df["gt_esv"].astype("float")  # or astype(int)
all_df["y_pred"] = all_df.apply(extract_pred_esv, axis=1)

# drop rows with NaN in y_true or y_pred
df_clean = all_df.dropna(subset=["y_true", "y_pred"]).copy()
df_clean["y_true"] = df_clean["y_true"].astype(int)
df_clean["y_pred"] = df_clean["y_pred"].astype(int)


In [None]:

def monte_carlo_random_baseline(y_true, n_trials=10000, rng=None):
    """
    Monte Carlo random baseline based on the ground-truth labels y_true.
    Random strategy: for each sample, independently predict 0 or 1 with probability 0.5.
    Returns:
        mean_acc: mean accuracy of the random baseline
        ci_low, ci_high: 95% confidence interval
    """
    y_true = np.asarray(y_true, dtype=int)
    n = y_true.shape[0]

    if rng is None:
        rng = np.random.default_rng(2025)

    accs = np.empty(n_trials, dtype=float)
    for i in range(n_trials):
        y_rand = rng.integers(0, 2, size=n)
        accs[i] = (y_rand == y_true).mean()

    mean_acc = accs.mean()
    ci_low, ci_high = np.percentile(accs, [2.5, 97.5])
    return mean_acc, ci_low, ci_high


# group by run_id
rng = np.random.default_rng(2025)
rows = []

for name, g in all_df.groupby("name"):
    idx_set = set(g["video_index"].tolist())
    expected = set(range(100))
    if idx_set != expected:
        raise ValueError(f"Missing video_index in group {name}: found {sorted(idx_set)}, expected {sorted(expected)}")

    if 'edv' in name.lower():
        y_true = g["gt_edv"].to_numpy()
    else:
        y_true = g["gt_esv"].to_numpy()
    # drop NaN
    y_true = y_true[~pd.isna(y_true)].astype(int)

    n_samples = len(y_true)
    if n_samples == 0:
        continue

    mean_acc, ci_low, ci_high = monte_carlo_random_baseline(
        y_true, n_trials=10000, rng=rng
    )

    rows.append({
        "name": name.replace('.', ''),
        "n_samples": n_samples,
        "random_mean_accuracy": mean_acc,
        "random_ci_low": ci_low,
        "random_ci_high": ci_high,
    })

mc_df = pd.DataFrame(rows)
mc_df.to_csv("your/path/to/monte_carlo_random_baseline_results.csv", index=False)

## metrics plotting

### Binary task plotting

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.lines import Line2D
from matplotlib.patches import Patch


def plot_metric_all_models_tasks(
    df,
    metric="accuracy",
    task_order=None,
    model_order=None,
    palette=None,
    save_path=None,
    show_random=True,
):
    """
    df: binary_metrics containing metrics from all models and tasks
    metric: "accuracy" / "precision" / "recall" / "f1"
    task_order: list of task_group to specify order
    model_order: list of model_name to specify order
    palette: dict or list or None
    save_path: str or None
    show_random: bool, whether to show random baseline if available
    """

    if task_order is None:
        task_order = sorted(df["task_group"].unique())

    if model_order is None:
        model_order = sorted(df["model_name"].unique())
    
    pretty_order = [task_pretty[t] for t in task_order]

    # palette_dict: pretty_label -> color
    if isinstance(palette, dict):
        palette_dict = palette
    else:
        colors = sns.color_palette(palette, n_colors=len(pretty_order)) if palette is not None \
                 else sns.color_palette(n_colors=len(pretty_order))
        palette_dict = dict(zip(pretty_order, colors))

    plt.figure(figsize=(2.5 * len(model_order), 8))

    # boxplot
    ax = sns.boxplot(
        data=df,
        x="model_name",
        y=metric,
        hue="task_group_pretty",
        order=model_order,
        hue_order=pretty_order,
        palette=palette_dict,
        width=0.7,
        showfliers=False,
        zorder=2,
    )

    sns.stripplot(
        data=df,
        x="model_name",
        y=metric,
        hue="task_group_pretty",
        order=model_order,
        hue_order=pretty_order,
        dodge=True,
        alpha=0.4,
        linewidth=0,
        size=6,
        palette=palette_dict,
        ax=ax,
        zorder=3,
    )

    if show_random and {
        "random_accuracy", "random_ci_low", "random_ci_high"
    }.issubset(df.columns) and metric == "accuracy":

        baseline = (
            df[[
                "task_group",
                "task_group_pretty",
                "random_accuracy",
                "random_ci_low",
                "random_ci_high",
            ]]
            .drop_duplicates(subset=["task_group"])
        )

        # color and style for random baseline
        band_color = "0.9"
        band_alpha = 0.5
        line_color = "0.3"
        line_alpha = 0.9
        line_width = 1.5
        line_style = "--"

        # expand along x axis
        x_min, x_max = -0.5, len(model_order) - 0.5

        for _, row in baseline.iterrows():
            pretty = row["task_group_pretty"]
            y_mean = row["random_accuracy"]
            y_low = row["random_ci_low"]
            y_high = row["random_ci_high"]

            color = palette_dict.get(pretty, "grey")

            # CI interval: light band
            ax.fill_between(
                [x_min, x_max],
                [y_low, y_low],
                [y_high, y_high],
                color=band_color,
                alpha=band_alpha,
                zorder=0,
            )

            # Mean line: dashed line
            ax.hlines(
                y_mean,
                x_min,
                x_max,
                colors=line_color,
                linestyles=line_style,
                linewidth=line_width,
                alpha=line_alpha,
                zorder=1,
            )

            ax._random_band_color = band_color
            ax._random_band_alpha = band_alpha
            ax._random_line_color = line_color
            ax._random_line_style = line_style
            ax._random_line_width = line_width

    # remove duplicate legend entries
    handles, labels = ax.get_legend_handles_labels()
    handles = handles[:len(pretty_order)]
    labels = labels[:len(pretty_order)]


    if hasattr(ax, "_random_band_color"):
        band_proxy = Patch(
            facecolor=ax._random_band_color,
            edgecolor="none",
            alpha=ax._random_band_alpha,
        )
        line_proxy = Line2D(
            [0], [0],
            color=ax._random_line_color,
            linestyle=ax._random_line_style,
            linewidth=ax._random_line_width,
        )

        handles = handles + [band_proxy, line_proxy]
        labels = labels + ["Random 95% CI", "Random mean"]

    ax.legend(
        handles,
        labels,
        title="Task",
        loc="upper left",
        bbox_to_anchor=(0.02, 0.98),
        borderaxespad=0.,
        fontsize=12,
        title_fontsize=15,
        markerscale=1.2,
        handlelength=1.5,
    )
        
    ax.set_xlabel("")
    ax.set_ylabel(metric.capitalize(), fontsize=15)

    plt.xticks(rotation=20, ha="center", fontsize=15)
    plt.yticks(fontsize=15)
    plt.tight_layout()

    if save_path is not None:
        plt.savefig(save_path, dpi=300, bbox_inches="tight")

    plt.show()


In [None]:
# example usage
binary_metrics = pd.read_csv("your/path/to/binary_metrics.csv")
task_order = [
    "edv_frame",
    "esv_frame",
    "segmented_edv_frame",
    "segmented_esv_frame",
    "non_medical_edv_frame",
    "non_medical_esv_frame",
]

task_pretty = {
    "edv_frame": "EDV",
    "esv_frame": "ESV",
    "segmented_edv_frame": "Seg EDV",
    "segmented_esv_frame": "Seg ESV",
    "non_medical_edv_frame": "Non-med EDV",
    "non_medical_esv_frame": "Non-med ESV",
}

palette = sns.color_palette("Set2", n_colors=len(task_order))
sns.set_theme(style="whitegrid", context="talk")
palette_colors = sns.color_palette("Set2", n_colors=len(task_order))

palette = {
    task_pretty[t]: c
    for t, c in zip(task_order, palette_colors)
}

plot_metric_all_models_tasks(
    df=binary_metrics,
    metric="accuracy",
    task_order=task_order,
    model_order=[ 'LLaVA-Interleave','LLaVA-NeXT-Video-7B', 'LLaVA-NeXT-Video-34B','Gemma-3n', 
                 'Qwen3-VL-8B', 'Qwen3-VL-32B' ],
    palette=palette,
    save_path="t2_accuracy_with_random.png",
    show_random=True,
)


### Frame localization task plotting

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.lines import Line2D
from matplotlib.patches import Patch

def plot_reg_metric_all_models_tasks(
    df,
    metric="EDV_MAE",
    task_order=None,
    model_order=None,
    palette=None,
    save_path=None,
    random_band=None
):
    """
    df: regression metrics containing metrics from all models and tasks
    metric: "EDV_MAE" / "ESV_MAE"
    task_order: list of task_group to specify order
    model_order: list of model_name to specify order
    palette: dict or list or None
    save_path: str or None
    random_band: band info for random baseline, dict of metric -> (mean, low, high)
    """
    if task_order is None:
        task_order = sorted(df["task_group"].unique())

    if model_order is None:
        model_order = sorted(df["model_name"].unique())

    sub = df.dropna(subset=[metric]).copy()
    if sub.empty:
        print(f"[WARN] metric {metric} has no non-NaN values, skip.")
        return

    pretty_order = [
        df.loc[df["task_group"] == t, "task_group_pretty"].iloc[0]
        for t in task_order
        if t in sub["task_group"].unique()
    ]

    plt.figure(figsize=(2.5 * len(model_order), 15))

    # boxplot
    ax = sns.boxplot(
        data=sub,
        x="model_name",
        y=metric,
        hue="task_group_pretty",
        order=model_order,
        hue_order=pretty_order,
        palette=palette,
        width=0.7,
        fliersize=2,
        zorder=2,
        showfliers=False,
    )

    sns.stripplot(
        data=sub,
        x="model_name",
        y=metric,
        hue="task_group_pretty",
        order=model_order,
        hue_order=pretty_order,
        dodge=True,
        alpha=0.4,
        linewidth=0,
        size=6,
        palette=palette,
        ax=ax,
        zorder=3,
    )

    # remove duplicate legend entries
    handles, labels = ax.get_legend_handles_labels()
    handles = handles[:len(pretty_order)]
    labels = labels[:len(pretty_order)]

    band_proxy = None
    line_proxy = None

    if random_band is not None and metric in random_band:
        y_mean, y_low, y_high = random_band[metric]
        y_mean = float(y_mean)
        y_low = float(y_low)
        y_high = float(y_high)

        # color and style for random baseline
        band_color = "0.9"   # 浅灰
        band_alpha = 0.5
        line_color = "0.3"   # 深灰
        line_alpha = 0.9
        line_width = 1.5
        line_style = "--"

        x_min, x_max = -0.5, len(model_order) - 0.5

        ax.fill_between(
            [x_min, x_max],
            [y_low, y_low],
            [y_high, y_high],
            color=band_color,
            alpha=band_alpha,
            zorder=0,
        )

        ax.hlines(
            y_mean,
            x_min,
            x_max,
            colors=line_color,
            linestyles=line_style,
            linewidth=line_width,
            alpha=line_alpha,
            zorder=1,
        )

        band_proxy = Patch(
            facecolor=band_color,
            edgecolor="none",
            alpha=band_alpha,
        )
        line_proxy = Line2D(
            [0], [0],
            color=line_color,
            linestyle=line_style,
            linewidth=line_width,
        )

    if band_proxy is not None and line_proxy is not None:
        handles = handles + [band_proxy, line_proxy]
        labels = labels + ["Random 95% CI", "Random mean"]

    ax.legend(
        handles,
        labels,
        title="Task",
        loc="upper right",
        bbox_to_anchor=(0.98, 0.98),
        borderaxespad=0.,
        fontsize=12,
        title_fontsize=15,
    )

    ax.set_xlabel("")
    if metric == "EDV_MAE":
        ax.set_ylabel("EDV MAE", fontsize=15)
    elif metric == "ESV_MAE":
        ax.set_ylabel("ESV MAE", fontsize=15)

    ax.tick_params(axis="x", labelsize=15)
    plt.xticks(rotation=20, ha="center", fontsize=15)

    plt.tight_layout()

    if save_path is not None:
        plt.savefig(save_path, dpi=300, bbox_inches="tight")

    plt.show()


In [None]:
# example usage
reg_all = pd.read_csv("your/path/to/regression_metrics.csv")
task_order = [
    'edv_esv_video', 'esv_edv_video','edv_video','esv_video', 
    'segmented_edv_esv_video', 'segmented_esv_edv_video','segmented_edv_video','segmented_esv_video', 
    'non_medical_edv_esv_video','non_medical_esv_edv_video', 'non_medical_edv_video','non_medical_esv_video'    
]
task_order = [t for t in task_order if t in reg_all["task_group"].unique()]

# set your display names
task_pretty = {
    "edv_esv_video": "EDV+ESV",
    'esv_edv_video': "ESV+EDV",
    "edv_video": "EDV-only",
    "esv_video": "ESV-only",
    "segmented_edv_esv_video": "Segm EDV+ESV",
    'segmented_esv_edv_video': "Segm ESV+EDV",
    "segmented_edv_video": "Segm EDV",
    "segmented_esv_video": "Segm ESV",
    'non_medical_edv_esv_video': "Non-med EDV+ESV",
    'non_medical_esv_edv_video': "Non-med ESV+EDV",
    "non_medical_edv_video": "Non-med EDV",
    "non_medical_esv_video": "Non-med ESV",
}
reg_all["task_group_pretty"] = reg_all["task_group"].map(
    lambda x: task_pretty.get(x, x)
)

sns.set_theme(style="whitegrid", context="talk")
palette = sns.color_palette("Set2", n_colors=len(task_order))


In [None]:

mean_random_edv = 11.873749
low_95_edv = 10.28
high_95_edv = 13.53

mean_random_esv = 11.616163999999998
low_95_esv = 10.02
high_95_esv = 13.22

random_band = {
    "EDV_MAE": (mean_random_edv, low_95_edv, high_95_edv),
    "ESV_MAE": (mean_random_esv, low_95_esv, high_95_esv),
}

out_dir = metrics_dir / "mae_boxplots_all_models"
out_dir.mkdir(exist_ok=True, parents=True)

reg_metrics = [
    "EDV_MAE",
    "ESV_MAE",
]

for m in reg_metrics:
    plot_reg_metric_all_models_tasks(
        df=reg_all,
        metric=m,
        task_order=task_order,
        model_order=['LLaVA-Interleave', 'LLaVA-NeXT-Video-7B', 'LLaVA-NeXT-Video-34B','Gemma-3n',
                       'Qwen3-VL-8B', 'Qwen3-VL-32B'], 
        palette=palette,
        save_path=out_dir / f"seaborn_all_models_{m}.png",
        random_band=random_band
    )
