In [1]:
import glob
import os
import re
from datetime import datetime
from multiprocessing import Pool

import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import yaml
from IPython.display import display


def get_color_map(df, column):
    unique_values = df[column].unique()
    colors = plt.get_cmap("Set2").colors
    color_map = {
        value: matplotlib.colors.rgb2hex(colors[i % len(colors)])
        for i, value in enumerate(unique_values)
    }
    return color_map


def styling_fn(styler, color_map):
    styler.map(
        lambda x: "color: green"
        if x == "Complete"
        else ("color: orange" if x == "Incomplete" else "color: red"),
        subset=["status"],
    )
    styler.set_properties(**{"white-space": "pre-wrap"})

    def git_commit_highlight(val):
        color = color_map.get(val, "#ffffff")
        return f"color: {color}"

    styler.map(git_commit_highlight, subset=["git_commit"])
    return styler


pd.set_option("display.max_rows", None)

In [2]:
def extract_datetime_from_filename(filename):
    patterns = [
        "(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})"  # e.g., 2025-06-22T00:10:41.690531
    ]
    for pattern in patterns:
        match = re.search(pattern, filename)
        if match:
            dt_str = match.group(1)
            return datetime.fromisoformat(dt_str)
    return None


def get_latest_file(file_list):
    files_with_dt = []
    for f in file_list:
        dt = extract_datetime_from_filename(f)
        if dt:
            files_with_dt.append((dt, f))
    if not files_with_dt:
        return None
    latest = max(files_with_dt, key=lambda x: x[0])
    return latest[1]

In [3]:
def run_level_integrity_report(model_name, model_path, run):
    run_integrity_report = {
        "error": [],
        "missing_files": [],
        "incomplete_results": [],
        "complete_task_count": 0,
        "task_count": 0,
        "is_base_model": None,
        "is_reasoning_model": None,
        "git_commit": None,
        "status": None,
    }

    run_folder = os.path.join(model_path, f"run_{run}")

    # check if run folder exists
    if not os.path.exists(run_folder):
        # print(f"Missing run folder: {run_folder}")
        run_integrity_report["error"].append("Missing run folder")
        run_integrity_report["status"] = "Error"
        run_integrity_report["remarks"] = (
            "Run folder is missing, please check the run setup."
        )
        return model_name, run, run_integrity_report

    results = glob.glob(os.path.join(run_folder, "results", "*.json"))
    if len(results) == 0:
        # print(f"Missing results json in: {run_folder}")
        run_integrity_report["error"].append("Missing results json")
        run_integrity_report["status"] = "Error"
        run_integrity_report["remarks"] = (
            "Missing results json: Is the model loading/running?"
        )
        return model_name, run, run_integrity_report

    config_files = glob.glob(os.path.join(run_folder, "configs", "*.yaml"))
    if len(config_files) == 0:
        # print(f"Missing config yaml in: {run_folder}")
        run_integrity_report["error"].append("Missing config yaml")
        run_integrity_report["status"] = "Error"
        run_integrity_report["remarks"] = (
            "Missing config yaml: Is the model loading/running?"
        )
        return model_name, run, run_integrity_report

    latest_config_file = get_latest_file(config_files)
    try:
        config = (
            yaml.safe_load(open(latest_config_file, "r")) if latest_config_file else {}
        )
    except Exception as e:
        # print(f"Error reading config file: {latest_config_file}")
        print(e)
        run_integrity_report["error"].append("Error reading config yaml")
        run_integrity_report["status"] = "Error"
        run_integrity_report["remarks"] = "Invalid config yaml"
        return model_name, run, run_integrity_report

    run_args = config.get("run_args", {})
    skip_tasks = run_args.get("skip_task", [])
    tasks = config.get("tasks", [])
    task_count = 0
    complete_task_count = 0

    run_integrity_report["is_base_model"] = run_args.get("is_base_model", None)
    run_integrity_report["is_reasoning_model"] = run_args.get(
        "is_reasoning_model", None
    )
    run_integrity_report["git_commit"] = config["run_env"].get("seahelm_git_hash", None)

    for task in tasks:
        if task in skip_tasks:
            # print(f"Skipping task {task} in {run_folder} as it is in skip_tasks")
            continue

        # get n_runs
        n_runs = tasks[task].get("max_n_runs", None)
        if n_runs is not None:
            if run >= n_runs:
                # print(f"Skipping task {task} in {run_folder} as run number > max_n_runs")
                continue

        languages = tasks[task]["languages"]
        for lang in languages:
            task_count += 1
            # get inference file
            subfolder = tasks[task].get("aggregation_group", task)
            inference_file = os.path.join(
                run_folder,
                "inferences",
                lang,
                subfolder,
                f"{model_name}_{task}_{lang}.jsonl",
            )
            rel_path = os.path.relpath(
                inference_file, start=os.path.join(run_folder, "inferences")
            )

            if not os.path.exists(inference_file):
                # print(f"Missing inference file: {rel_path}")
                run_integrity_report["missing_files"].append(rel_path)
                continue

            # only read the first line to speed up checks
            try:
                df = pd.read_json(inference_file, lines=True, nrows=1)
            except Exception as e:
                print(f"Error reading inference file: {rel_path}")
                print(e)
                run_integrity_report["missing_files"].append(rel_path)
                continue

            # check for individual results as it is only written out once the evaluation is complete
            if "individual_scores" not in df.columns:
                # print(f"Missing individual_scores in: {rel_path}")
                run_integrity_report["incomplete_results"].append(rel_path)
                continue
            complete_task_count += 1

    run_integrity_report["task_count"] = task_count
    run_integrity_report["complete_task_count"] = complete_task_count
    # check status of run
    if (
        len(run_integrity_report["error"]) == 0
        and len(run_integrity_report["missing_files"]) == 0
        and len(run_integrity_report["incomplete_results"]) == 0
    ):
        run_integrity_report["status"] = "Complete"
        run_integrity_report["remarks"] = ""
    elif (
        len(run_integrity_report["missing_files"]) > 0
        or len(run_integrity_report["incomplete_results"]) > 0
    ):
        missing_count = len(run_integrity_report["missing_files"])
        if missing_count > 0:
            missing_status = f"{missing_count} Missing files"
        else:
            missing_status = ""

        incomplete_count = len(run_integrity_report["incomplete_results"])
        if incomplete_count > 0:
            incomplete_status = f"{incomplete_count} Incomplete results"
        else:
            incomplete_status = ""

        sep = " | " if missing_count > 0 and incomplete_count > 0 else ""

        run_integrity_report["status"] = "Incomplete"
        run_integrity_report["remarks"] = f"{missing_status}{sep}{incomplete_status}"

    return model_name, run, run_integrity_report

In [None]:
main_folder = ""
expected_n_runs = 8

pool = Pool(32)
model_path_list = glob.glob(os.path.join(main_folder, "**", "**"))

reports = {}
starmap_args = []
for model_path in model_path_list:
    model_integrity_report = {}
    model_name = os.path.basename(model_path)

    starmap_args.extend(
        [(model_name, model_path, run) for run in range(expected_n_runs)]
    )

results = pool.starmap(run_level_integrity_report, starmap_args)

for model_name, run, run_integrity_report in results:
    if model_name not in reports:
        reports[model_name] = {}

    reports[model_name][f"run_{run}"] = run_integrity_report

In [5]:
def parse_report_into_df(reports):
    outputs = []
    for model_name, model_report in reports.items():
        for run, run_report in model_report.items():
            outputs.append(
                {
                    "model": model_name,
                    "run": run,
                    "git_commit": run_report.get("git_commit", None),
                    "status": run_report["status"],
                    "task_count": f"{run_report['complete_task_count']}/{run_report['task_count']}",
                    "remarks": run_report["remarks"],
                    "is_base_model": run_report.get("is_base_model", None),
                    "is_reasoning_model": run_report.get("is_reasoning_model", None),
                    "errors": "\n".join(run_report["error"]),
                    "missing_files": "\n".join(run_report["missing_files"]),
                    "incomplete_results": "\n".join(run_report["incomplete_results"]),
                }
            )
    df = pd.DataFrame(outputs)
    return df

In [None]:
df = parse_report_into_df(reports)
color_map = get_color_map(df, "git_commit")
display(df.style.pipe(styling_fn, color_map=color_map))