In [None]:
from pathlib import Path
from typing import List, Tuple, Optional, Dict, Any

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


# ==========================
#      CONFIG À ADAPTER
# ==========================

# Dossier qui contient les splits (train, eval, test, etc.)
# Exemple chez toi: C:\Users\...\tuh_eeg_reduit\tuh_eeg_reduit\data
DATA_ROOT = Path(r"edf/")

MIN_PREICTAL = 120.0  # seuil pour dire qu'on a ≥ 2 min avant la 1ère crise


# ==========================
#     FONCTIONS UTILITAIRES
# ==========================

Interval = Tuple[float, float]


def read_duration_from_csv_bi(csv_bi_path: Path) -> Optional[float]:
    """
    Lit la durée de l'enregistrement dans les lignes de commentaires du csv_bi.

    On cherche une ligne du type:
    '# duration = 301.0000 secs'
    """
    try:
        with csv_bi_path.open("r", encoding="utf-8") as f:
            for line in f:
                if line.startswith("# duration"):
                    # ex: "# duration = 301.0000 secs"
                    try:
                        parts = line.strip().split("=")
                        if len(parts) >= 2:
                            right = parts[1].strip()  # "301.0000 secs"
                            dur_str = right.split()[0]  # "301.0000"
                            return float(dur_str)
                    except Exception:
                        return None
                # si on a dépassé les commentaires, on arrête
                if not line.startswith("#"):
                    break
    except Exception:
        return None
    return None


def read_seizure_intervals(csv_bi_path: Path) -> List[Interval]:
    """
    Lit le fichier csv_bi (annotations globales) et renvoie la liste des intervalles 'seiz'.
    """
    df = pd.read_csv(csv_bi_path, comment="#")
    if "label" not in df.columns or "start_time" not in df.columns or "stop_time" not in df.columns:
        return []
    mask = df["label"].astype(str).str.contains("seiz", case=False)
    seizures = df[mask]
    return [(float(row.start_time), float(row.stop_time)) for row in seizures.itertuples(index=False)]


def get_patient_session_from_path(edf_path: Path) -> Tuple[str, str]:
    """
    Extrait patient et session à partir du chemin TUSZ.
    Exemple:
      .../train/aaaaaaac/s001_2002/02_tcp_le/aaaaaaac_s001_t000.edf
      -> patient = aaaaaaac, session = s001_2002
    """
    parts = edf_path.parts
    if len(parts) < 4:
        return "unknown", "unknown"
    patient = parts[-4]
    session = parts[-3]
    return patient, session


# ==========================
#    PARCOURS ET STATISTIQUES
# ==========================

def scan_dataset(data_root: Path) -> pd.DataFrame:
    """
    Parcourt tous les splits dans data_root et retourne un DataFrame
    avec une ligne par enregistrement EDF.
    """
    rows: List[Dict[str, Any]] = []

    # Chaque sous-dossier direct de data_root est un split (train, eval, test, ...)
    for split_dir in sorted(p for p in data_root.iterdir() if p.is_dir()):
        split_name = split_dir.name
        print(f"Scan du split: {split_name}")

        edf_files = sorted(split_dir.rglob("*.edf"))
        print(f"  {len(edf_files)} fichiers EDF trouvés.")

        for edf_path in edf_files:
            patient, session = get_patient_session_from_path(edf_path)
            recording = edf_path.stem

            csv_bi_path = edf_path.with_suffix(".csv_bi")
            if not csv_bi_path.exists():
                # fallback éventuel .csv
                alt = edf_path.with_suffix(".csv")
                if alt.exists():
                    csv_bi_path = alt
                else:
                    print(f"  [WARN] csv_bi introuvable pour {edf_path}")
                    continue

            # Lire durée
            duration = read_duration_from_csv_bi(csv_bi_path)

            # Lire crises
            seizures = read_seizure_intervals(csv_bi_path)
            n_seiz = len(seizures)

            if n_seiz > 0:
                onsets = [s for (s, e) in seizures]
                first_onset = float(min(onsets))
                total_seiz_dur = float(sum(e - s for (s, e) in seizures))
            else:
                first_onset = np.nan
                total_seiz_dur = 0.0

            has_seiz = n_seiz > 0
            has_preictal_ge_2min = has_seiz and (first_onset >= MIN_PREICTAL)

            rows.append(
                dict(
                    split=split_name,
                    patient=patient,
                    session=session,
                    recording=recording,
                    duration_s=duration,
                    n_seizures=n_seiz,
                    has_seizure=has_seiz,
                    first_seiz_onset_s=first_onset,
                    total_seiz_dur_s=total_seiz_dur,
                    has_preictal_ge_120s=has_preictal_ge_2min,
                )
            )

    df = pd.DataFrame(rows)
    return df


def print_global_stats(df: pd.DataFrame):
    print("\n===== STATISTIQUES GLOBALES =====")

    print("\nNombre total d'enregistrements par split :")
    print(df.groupby("split")["recording"].count())

    print("\nEnregistrements avec au moins une crise par split :")
    print(df.groupby("split")["has_seizure"].sum())

    print("\nEnregistrements avec 1ère crise après 120 s (pré-ictal ≥ 2 min) par split :")
    print(df.groupby("split")["has_preictal_ge_120s"].sum())

    print("\nNombre total de crises par split :")
    print(df.groupby("split")["n_seizures"].sum())

    print("\nDurée totale de crises (s) par split :")
    print(df.groupby("split")["total_seiz_dur_s"].sum())

    # Petite stat globale sur les onsets
    has_seiz_df = df[df["has_seizure"]]
    if not has_seiz_df.empty:
        print("\nTemps de première crise (tous splits confondus) :")
        print("  min  :", has_seiz_df["first_seiz_onset_s"].min())
        print("  médian :", has_seiz_df["first_seiz_onset_s"].median())
        print("  max  :", has_seiz_df["first_seiz_onset_s"].max())


def make_plots(df: pd.DataFrame):
    """
    Produit quelques visualisations simples avec matplotlib.
    """

    # Histogramme des temps de première crise
    has_seiz_df = df[df["has_seizure"] & df["first_seiz_onset_s"].notna()]
    if not has_seiz_df.empty:
        plt.figure()
        has_seiz_df["first_seiz_onset_s"].hist(bins=50)
        plt.axvline(MIN_PREICTAL, linestyle="--", label="120 s")
        plt.title("Distribution des temps de première crise")
        plt.xlabel("Temps de première crise (s)")
        plt.ylabel("Nombre d'enregistrements")
        plt.legend()
        plt.tight_layout()

    # Histogramme des durées d'enregistrement (si les durées sont dispo)
    if df["duration_s"].notna().any():
        plt.figure()
        df["duration_s"].dropna().hist(bins=50)
        plt.title("Distribution des durées d'enregistrements")
        plt.xlabel("Durée (s)")
        plt.ylabel("Nombre d'enregistrements")
        plt.tight_layout()

    # Barplot : nombre d'enregistrements avec crise par split
    plt.figure()
    df.groupby("split")["has_seizure"].sum().plot(kind="bar")
    plt.title("Nombre d'enregistrements avec crises par split")
    plt.ylabel("Nombre d'enregistrements")
    plt.tight_layout()

    # Barplot : nombre d'enregistrements avec pré-ictal ≥ 2 min par split
    plt.figure()
    df.groupby("split")["has_preictal_ge_120s"].sum().plot(kind="bar")
    plt.title("Enregistrements avec pré-ictal ≥ 120 s par split")
    plt.ylabel("Nombre d'enregistrements")
    plt.tight_layout()

    plt.show()


def main():
    if not DATA_ROOT.exists():
        raise FileNotFoundError(f"DATA_ROOT introuvable: {DATA_ROOT}")

    print(f"Racine des données: {DATA_ROOT}")

    df = scan_dataset(DATA_ROOT)

    # Sauvegarder aussi les stats brutes si tu veux
    out_csv = DATA_ROOT / "tusz_global_stats.csv"
    df.to_csv(out_csv, index=False)
    print(f"\nDataFrame global sauvegardé dans: {out_csv}")
    print(f"{len(df)} enregistrements au total.")

    print_global_stats(df)
    make_plots(df)


if __name__ == "__main__":
    main()


: 