In [7]:
import pandas as pd
from pathlib import Path

experiment_dir = Path("experiments/Metrics")
experiment_dir.exists()

True

In [8]:
import re
from typing import Literal



metric_names = ["LPIPS", "PSNR", "SSIM", "Ref_LPIPS", "Ref_PSNR", "Ref_SSIM", "FID_0", "FID_4", "FID_8"]
experiment_names = ["Finetune_Diffusion", "Neurad_Base", "Neurad_Checkpoint", "Neurad_Training", "Neurad_Finetuned_Diffusion"]
ranks = ["4", "128"]

neurad_experiments = {
    "Neurad_Base", "Neurad_Checkpoint", "Neurad_Training", "Neurad_Finetuned_Diffusion"
}
finetune_experiments = {
    "Finetune_Diffusion"
}


def get_all_metrics_infos(experiment_name):
    infos = []

    if experiment_name == "Finetune_Diffusion":
        for rank in ranks:
            for metric in metric_names:
                path = experiment_dir / f"{experiment_name}-Rank{rank}-{metric}.csv"
                if path.exists():
                    infos.append({"path": path, "rank": rank, "metric": metric})
        
    else:
        for metric in metric_names:
            path = experiment_dir / f"{experiment_name}-{metric}.csv"
            if path.exists():
                infos.append({"path": path, "metric": metric})

    return infos

    

def get_metric_by_name(experiment_name, metric):
    if experiment_name == "Finetune_Diffusion":
        for rank in ranks:
            path = experiment_dir / f"{experiment_name}-Rank{rank}-{metric}.csv"
            if path.exists():
                return pd.read_csv(path)

    path = experiment_dir / f"{experiment_name}-{metric}.csv"    
    if not path.exists():
        return None
    return pd.read_csv(path)


parse_run_pattern1 = re.compile(r"\d+_\d+_(?P<run>\d+).*")

def filter_column_name(col_name, pattern, value_name = "run"):
    pattern_match = pattern.match(col_name)
    if not pattern_match:
        return col_name

    group_dict = pattern.match(col_name).groupdict()
    return group_dict[value_name]

def parse_run_from_column_name(col_name, pattern = parse_run_pattern1):
    pattern_match = pattern.match(col_name)
    if not pattern_match:
        return col_name

    run = pattern_match.group("run")
    return "run_" + run

def clean_metrics(experiment_name, metrics):
    columns_to_remove = [
        column for column in metrics.columns
        if (column.endswith("__MIN") or column.endswith("__MAX"))
    ] 
    metrics = metrics.drop(columns_to_remove, axis=1)
    metrics.columns = list(map(lambda c: parse_run_from_column_name(c, parse_run_pattern1), metrics.columns))

    return metrics 

ex_name = "Neurad_Finetuned_Diffusion"
ex_metric = "LPIPS"

metrics = get_metric_by_name(ex_name, ex_metric)
metrics = clean_metrics(ex_name, metrics)
metrics

Unnamed: 0,Step,run_12146255,run_12146254,run_12146251,run_12146253,run_12146252,run_12146250,run_12146249,run_12146248,run_12146247,run_12146246,run_12146245,run_12146244
0,5000,0.164532,0.229889,0.221506,0.184695,0.156776,0.189177,0.1665,0.222872,0.198615,0.195498,0.229031,0.200974
1,10000,0.156522,0.210406,0.212082,0.181136,0.159866,0.177039,0.364607,0.211682,0.180468,0.353962,0.212767,0.179689
2,15000,0.14407,0.201924,0.200672,0.172626,0.143719,0.168676,0.554299,0.199954,0.167611,,0.200343,0.170008
3,20000,0.165186,0.195418,0.197309,0.166059,0.16612,0.164382,0.572561,0.197184,0.1659,,0.195899,0.166445
4,25000,0.370172,0.193881,0.192417,0.161945,0.191269,0.162209,0.637893,0.19015,0.162846,,0.190852,0.161966
5,30000,0.377656,0.191825,0.483814,0.163782,,0.162221,0.594238,0.192888,0.159024,,0.190852,0.159033
6,35000,0.368122,0.18962,,0.164174,,0.157841,,0.192694,0.162861,,0.188724,0.157851
7,40000,,0.188671,,0.163171,,0.15768,,0.186542,0.404629,,0.188499,0.158182


In [9]:
run_column_pattern = re.compile(r"run_(?P<run>\d+)")

def select_columns_matching_pattern(df, pattern):
    return df.filter(regex=pattern)

In [10]:
improve_direction = {
    "SSIM": "max",
    "PSNR": "max",
    "LPIPS": "min",
    "Ref_SSIM": "max",
    "Ref_PSNR": "max",
    "Ref_LPIPS": "min",
    "FID_0": "min",
    "FID_4": "min",
    "FID_8": "min"
}

def find_best_step(metrics, metric_name):
    runs = select_columns_matching_pattern(metrics, run_column_pattern)
    if improve_direction[metric_name] == "max":
        best_step = runs.idxmax()
    else:
        best_step = runs.idxmin()

    return(best_step)

best_steps = find_best_step(metrics, ex_metric)
best_steps

run_12146255    2
run_12146254    7
run_12146251    4
run_12146253    4
run_12146252    2
run_12146250    7
run_12146249    0
run_12146248    7
run_12146247    5
run_12146246    0
run_12146245    7
run_12146244    6
dtype: int64

In [11]:
def get_best_values(metrics, best_steps):
    subset = select_columns_matching_pattern(metrics, run_column_pattern)
    
    metrics = {}

    for i_col, run in enumerate(subset.columns):
        best_step = best_steps.iloc[i_col]
        best_value = subset.loc[best_step, run]

        try:
            metrics[str(run)] = float(best_value) 
        except BaseException as e:
            print(f"Error: {e}")
            print("Best value", best_value)
            raise e

    return metrics


get_best_values(metrics, best_steps)


{'run_12146255': 0.1440704315900802,
 'run_12146254': 0.1886714845895767,
 'run_12146251': 0.192417487502098,
 'run_12146253': 0.1619447767734527,
 'run_12146252': 0.1437189579010009,
 'run_12146250': 0.157680481672287,
 'run_12146249': 0.1665003001689911,
 'run_12146248': 0.1865417063236236,
 'run_12146247': 0.1590242981910705,
 'run_12146246': 0.1954975128173828,
 'run_12146245': 0.1884991377592086,
 'run_12146244': 0.1578506976366043}

In [None]:
def get_best_metrics(experiment_name, reference_metric: str):
    ref_metrics = get_metric_by_name(experiment_name, reference_metric)
    ref_metrics = clean_metrics(experiment_name, ref_metrics)
    best_steps = find_best_step(ref_metrics, reference_metric)
    

    merged_metrics = {}

    for metrics_info in get_all_metrics_infos(experiment_name):
        
        metrics = pd.read_csv(metrics_info["path"])
        metrics = clean_metrics(experiment_name, metrics)
        values = get_best_values(metrics, best_steps)

        metric_name = metrics_info["metric"]

        for run, value in values.items():
            if run not in merged_metrics:
                merged_metrics[run] = {}

            merged_metrics[run][metric_name] = value

        
    return pd.DataFrame.from_dict(merged_metrics, orient="index")

get_best_metrics("Finetune_Diffusion", "LPIPS")
        
