In [1]:
import os
import numpy as np
import pandas as pd
import joblib
import matplotlib.pyplot as plt

from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score


# ================================
# CONFIG
# ================================
SPLIT_DIR   = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\split_Data_5fold_subjects"
MODEL_DIR   = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\saved_models_LR_EEG_Binary"
RESULTS_CSV = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\LR_EEG_Binary_results.csv"
PLOTS_DIR   = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\LR_EEG_Binary_plots"

os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(PLOTS_DIR, exist_ok=True)

fold_ids = [f"fold{idx:02d}" for idx in range(1, 24)]
results = []

WIN_SEC = 2
WIN_PER_HOUR = 3600 / WIN_SEC
TAU = 60   # Firing Power window length


# ================================
# EEG FEATURE SELECTOR
# ================================
def extract_eeg_features(columns):
    """Select only EEG features: ch1_* and ch2_*"""
    return [c for c in columns if c.startswith("ch1_") or c.startswith("ch2_")]


# ================================
# FP + METRICS FUNCTIONS
# ================================
def compute_firing_power(prob, thr, tau=TAU):
    """
    prob: predicted probability for class=1
    thr:  threshold to binarize prob → O(n)
    tau:  window size for firing power
    """
    O = (prob >= thr).astype(float)
    kernel = np.ones(tau) / tau
    fp = np.convolve(O, kernel, mode="full")[:len(O)]
    return fp, O


def compute_ss_fpr(fp, y_true, thr):
    """
    Compute Sensitivity and FPR/h for binary labels (0/1)
    y_true: 0 = interictal, 1 = preictal
    """
    alarms = (fp >= thr).astype(int)

    TP = np.sum((y_true == 1) & (alarms == 1))
    FN = np.sum((y_true == 1) & (alarms == 0))
    FP = np.sum((y_true == 0) & (alarms == 1))

    ss = TP / (TP + FN + 1e-9)
    total_hours = len(y_true) / WIN_PER_HOUR
    fpr_h = FP / (total_hours + 1e-9)

    return ss, fpr_h, FP


def tune_threshold(prob_val, y_val, tau=TAU):
    """
    Grid-search over threshold T using validation data.
    Criterion: Maximize sensitivity, then minimize FPR/h.
    """
    thr_grid = np.arange(0.1, 0.91, 0.05)
    best_thr, best_ss, best_fprh = 0.5, -1, np.inf

    for thr in thr_grid:
        fp_val, _ = compute_firing_power(prob_val, thr, tau)
        ss, fprh, _ = compute_ss_fpr(fp_val, y_val, thr)

        if ss > best_ss or (np.isclose(ss, best_ss) and fprh < best_fprh):
            best_thr = thr
            best_ss = ss
            best_fprh = fprh

    return best_thr, best_ss, best_fprh


# ================================
# Plot Functions
# ================================
def plot_prediction_view(time_sec, fp, y_true, thr, fold_id, save_dir):
    plt.figure(figsize=(12, 4))
    plt.plot(time_sec, fp, label="Firing Power", linewidth=1.5)
    plt.axhline(thr, color="red", linestyle="--", label=f"Threshold = {thr:.2f}")

    seizure = (y_true == 1)
    plt.fill_between(
        time_sec, 0, 1,
        where=seizure,
        color="orange",
        alpha=0.2,
        transform=plt.gca().get_xaxis_transform(),
        label="Preictal"
    )

    plt.xlabel("Time (sec)")
    plt.ylabel("Firing Power")
    plt.title(f"Prediction Mode — {fold_id}")
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, f"{fold_id}_prediction.png"), dpi=200)
    plt.close()


def plot_risk_view(time_sec, fp, fold_id, save_dir, low_thr=0.3, high_thr=0.7):
    risk = np.zeros_like(fp)
    risk[(fp >= low_thr) & (fp < high_thr)] = 1
    risk[fp >= high_thr] = 2

    colors = np.array(["green", "yellow", "red"])
    c = colors[risk.astype(int)]   

    plt.figure(figsize=(12, 2))
    plt.scatter(time_sec, np.zeros_like(time_sec), c=c, s=12, marker="s")
    plt.yticks([])
    plt.xlabel("Time (sec)")
    plt.title(f"Risk Levels — {fold_id}")
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, f"{fold_id}_risk.png"), dpi=200)
    plt.close()



# ================================
# CLASS WEIGHTS (VALIDATION SEARCH)
# ================================
WEIGHTS = [
    {"name": "w1", "weights": {0: 1, 1: 5}},
    {"name": "w2", "weights": {0: 1, 1: 10}},
    {"name": "w3", "weights": {0: 1, 1: 15}},
    {"name": "balanced", "weights": "balanced"},
]


# ================================
# MAIN LOOP OVER FOLDS
# ================================
for fold in fold_ids:

    train_path = os.path.join(SPLIT_DIR, f"{fold}_train.csv")
    val_path   = os.path.join(SPLIT_DIR, f"{fold}_val.csv")
    test_path  = os.path.join(SPLIT_DIR, f"{fold}_test.csv")

    if not (os.path.exists(train_path) and os.path.exists(val_path) and os.path.exists(test_path)):
        print(f"Skipping {fold}")
        continue

    print("\n============")
    print(f"Processing {fold}")
    print("============")

    df_train = pd.read_csv(train_path)
    df_val   = pd.read_csv(val_path)
    df_test  = pd.read_csv(test_path)

    # 1) Drop label=2 → binary problem: 0 vs 1
    df_train = df_train[df_train["label"] != 2]
    df_val   = df_val[df_val["label"] != 2]
    df_test  = df_test[df_test["label"] != 2]

    df_train["label"] = (df_train["label"] == 1).astype(int)
    df_val["label"]   = (df_val["label"] == 1).astype(int)
    df_test["label"]  = (df_test["label"] == 1).astype(int)

    # 2) Select EEG-only features
    eeg_feats = extract_eeg_features(df_train.columns)

    X_train = df_train[eeg_feats].fillna(0).to_numpy()
    y_train = df_train["label"].to_numpy()

    X_val = df_val[eeg_feats].fillna(0).to_numpy()
    y_val = df_val["label"].to_numpy()

    X_test = df_test[eeg_feats].fillna(0).to_numpy()
    y_test = df_test["label"].to_numpy()

    # Time axis for plotting
    if "start_time_sec" in df_test.columns:
        t_test = df_test["start_time_sec"].to_numpy()
    else:
        t_test = np.arange(len(df_test)) * WIN_SEC

    # 3) Standardization
    scaler = StandardScaler()
    X_train_s = scaler.fit_transform(X_train)
    X_val_s   = scaler.transform(X_val)
    X_test_s  = scaler.transform(X_test)

    # 4) Validation — choose best class_weight using validation F1
    best_model = None
    best_name = None
    best_f1 = -1

    for w in WEIGHTS:
        model = LogisticRegression(
            max_iter=2000,
            n_jobs=-1,
            random_state=42,
            class_weight=w["weights"]
        )
        model.fit(X_train_s, y_train)

        pred_val = model.predict(X_val_s)
        f1_val = f1_score(y_val, pred_val)

        print(f"{fold} | {w['name']} | Val F1 = {f1_val:.4f}")

        if f1_val > best_f1:
            best_f1 = f1_val
            best_model = model
            best_name = w["name"]

    print(f">>> Best weight = {best_name} (F1={best_f1:.4f})")

    # 5) Get probabilities for class=1
    p_val  = best_model.predict_proba(X_val_s)[:, 1]
    p_test = best_model.predict_proba(X_test_s)[:, 1]

    # 6) Tune threshold on validation
    best_thr, ss_val_best, fprh_val_best = tune_threshold(p_val, y_val)
    print(f"{fold} | Best threshold from val = {best_thr:.2f}")

    # 7) Compute test metrics with Firing Power
    fp_test, _ = compute_firing_power(p_test, best_thr)
    ss_test, fprh_test, FP_test = compute_ss_fpr(fp_test, y_test, best_thr)

    # 8) Save metrics
    results.append({
        "fold": fold,
        "best_weight": best_name,
        "threshold": best_thr,
        "F1_val": best_f1,
        "SS_test": ss_test,
        "FPRh_test": fprh_test,
        "FP_test": FP_test
    })

    # 9) Save model + scaler
    joblib.dump(best_model, os.path.join(MODEL_DIR, f"{fold}_{best_name}.pkl"))
    joblib.dump(scaler,     os.path.join(MODEL_DIR, f"{fold}_{best_name}_scaler.pkl"))

    # 10) Plots
    plot_prediction_view(t_test, fp_test, y_test, best_thr, fold, PLOTS_DIR)
    plot_risk_view(t_test, fp_test, fold, PLOTS_DIR)

# 11) Save CSV file with metrics
pd.DataFrame(results).to_csv(RESULTS_CSV, index=False)

print("\n=== DONE BINARY EEG PREDICTION ===")
print("Saved models to:", MODEL_DIR)
print("Saved plots to:", PLOTS_DIR)
print("Saved results to:", RESULTS_CSV)


Processing fold01
fold01 | w1 | Val F1 = 0.3084
fold01 | w2 | Val F1 = 0.3091
fold01 | w3 | Val F1 = 0.3089
fold01 | balanced | Val F1 = 0.2890
>>> Best weight = w2 (F1=0.3091)
fold01 | Best threshold from val = 0.50

Processing fold02
fold02 | w1 | Val F1 = 0.3065
fold02 | w2 | Val F1 = 0.3072
fold02 | w3 | Val F1 = 0.3071
fold02 | balanced | Val F1 = 0.2874
>>> Best weight = w2 (F1=0.3072)
fold02 | Best threshold from val = 0.45

Processing fold03
fold03 | w1 | Val F1 = 0.3098
fold03 | w2 | Val F1 = 0.3101
fold03 | w3 | Val F1 = 0.3100
fold03 | balanced | Val F1 = 0.2830
>>> Best weight = w2 (F1=0.3101)
fold03 | Best threshold from val = 0.45

Processing fold04
fold04 | w1 | Val F1 = 0.3062
fold04 | w2 | Val F1 = 0.3069
fold04 | w3 | Val F1 = 0.3068
fold04 | balanced | Val F1 = 0.2896
>>> Best weight = w2 (F1=0.3069)
fold04 | Best threshold from val = 0.50

Processing fold05
fold05 | w1 | Val F1 = 0.3094
fold05 | w2 | Val F1 = 0.3094
fold05 | w3 | Val F1 = 0.3092
fold05 | balanced |

In [2]:
import os
import numpy as np
import pandas as pd
import joblib
import matplotlib.pyplot as plt

from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score


# ================================
# CONFIG
# ================================
SPLIT_DIR   = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\split_Data_5fold_subjects"
MODEL_DIR   = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\saved_models_LR_AllSignals_Binary"
RESULTS_CSV = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\LR_AllSignals_Binary_results.csv"
PLOTS_DIR   = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\LR_AllSignals_Binary_plots"

os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(PLOTS_DIR, exist_ok=True)

fold_ids = [f"fold{idx:02d}" for idx in range(1, 24)]
results = []

WIN_SEC = 2
WIN_PER_HOUR = 3600 / WIN_SEC
TAU = 60   # Firing Power window length


# ================================
# FEATURE SELECTOR: ALL SIGNALS
# ================================
def extract_all_features(columns):
    """
    Use all feature columns, excluding meta info and label.
    """
    drop_cols = [
        "label", "subject", "patient",
        "session", "run", "start_time",
        "start_time_sec", "window_idx"
    ]
    drop_cols = [c for c in drop_cols if c in columns]
    return [c for c in columns if c not in drop_cols]


# ================================
# FP + METRICS FUNCTIONS
# ================================
def compute_firing_power(prob, thr, tau=TAU):
    O = (prob >= thr).astype(float)
    kernel = np.ones(tau) / tau
    fp = np.convolve(O, kernel, mode="full")[:len(O)]
    return fp, O


def compute_ss_fpr(fp, y_true, thr):
    alarms = (fp >= thr).astype(int)

    TP = np.sum((y_true == 1) & (alarms == 1))
    FN = np.sum((y_true == 1) & (alarms == 0))
    FP = np.sum((y_true == 0) & (alarms == 1))

    ss = TP / (TP + FN + 1e-9)
    total_hours = len(y_true) / WIN_PER_HOUR
    fpr_h = FP / (total_hours + 1e-9)

    return ss, fpr_h, FP


def tune_threshold(prob_val, y_val, tau=TAU):
    thr_grid = np.arange(0.1, 0.91, 0.05)
    best_thr, best_ss, best_fprh = 0.5, -1, np.inf

    for thr in thr_grid:
        fp_val, _ = compute_firing_power(prob_val, thr, tau)
        ss, fprh, _ = compute_ss_fpr(fp_val, y_val, thr)

        if ss > best_ss or (np.isclose(ss, best_ss) and fprh < best_fprh):
            best_thr = thr
            best_ss = ss
            best_fprh = fprh

    return best_thr, best_ss, best_fprh


# ================================
# Plot Functions
# ================================
def plot_prediction_view(time_sec, fp, y_true, thr, fold_id, save_dir):
    plt.figure(figsize=(12, 4))
    plt.plot(time_sec, fp, label="Firing Power", linewidth=1.5)
    plt.axhline(thr, color="red", linestyle="--", label=f"Threshold = {thr:.2f}")

    seizure = (y_true == 1)
    plt.fill_between(
        time_sec, 0, 1,
        where=seizure,
        color="orange",
        alpha=0.2,
        transform=plt.gca().get_xaxis_transform(),
        label="Preictal"
    )

    plt.xlabel("Time (sec)")
    plt.ylabel("Firing Power")
    plt.title(f"Prediction Mode — {fold_id}")
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, f"{fold_id}_prediction.png"), dpi=200)
    plt.close()


def plot_risk_view(time_sec, fp, fold_id, save_dir, low_thr=0.3, high_thr=0.7):
    risk = np.zeros_like(fp)
    risk[(fp >= low_thr) & (fp < high_thr)] = 1
    risk[fp >= high_thr] = 2

    colors = np.array(["green", "yellow", "red"])
    c = colors[risk.astype(int)] 

    plt.figure(figsize=(12, 2))
    plt.scatter(time_sec, np.zeros_like(time_sec), c=c, s=12, marker="s")
    plt.yticks([])
    plt.xlabel("Time (sec)")
    plt.title(f"Risk Levels — {fold_id}")
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, f"{fold_id}_risk.png"), dpi=200)
    plt.close()


# ================================
# CLASS WEIGHTS (VALIDATION SEARCH)
# ================================
WEIGHTS = [
    {"name": "w1", "weights": {0: 1, 1: 5}},
    {"name": "w2", "weights": {0: 1, 1: 10}},
    {"name": "w3", "weights": {0: 1, 1: 15}},
    {"name": "balanced", "weights": "balanced"},
]


# ================================
# MAIN LOOP OVER FOLDS
# ================================
for fold in fold_ids:

    train_path = os.path.join(SPLIT_DIR, f"{fold}_train.csv")
    val_path   = os.path.join(SPLIT_DIR, f"{fold}_val.csv")
    test_path  = os.path.join(SPLIT_DIR, f"{fold}_test.csv")

    if not (os.path.exists(train_path) and os.path.exists(val_path) and os.path.exists(test_path)):
        print(f"Skipping {fold}")
        continue

    print("\n============")
    print(f"Processing {fold}")
    print("============")

    df_train = pd.read_csv(train_path)
    df_val   = pd.read_csv(val_path)
    df_test  = pd.read_csv(test_path)

    # 1) Drop label=2 → binary 0/1
    df_train = df_train[df_train["label"] != 2]
    df_val   = df_val[df_val["label"] != 2]
    df_test  = df_test[df_test["label"] != 2]

    df_train["label"] = (df_train["label"] == 1).astype(int)
    df_val["label"]   = (df_val["label"] == 1).astype(int)
    df_test["label"]  = (df_test["label"] == 1).astype(int)

    # 2) All-signal features
    feat_cols = extract_all_features(df_train.columns)

    X_train = df_train[feat_cols].fillna(0).to_numpy()
    y_train = df_train["label"].to_numpy()

    X_val = df_val[feat_cols].fillna(0).to_numpy()
    y_val = df_val["label"].to_numpy()

    X_test = df_test[feat_cols].fillna(0).to_numpy()
    y_test = df_test["label"].to_numpy()

    if "start_time_sec" in df_test.columns:
        t_test = df_test["start_time_sec"].to_numpy()
    else:
        t_test = np.arange(len(df_test)) * WIN_SEC

    # 3) Scaling
    scaler = StandardScaler()
    X_train_s = scaler.fit_transform(X_train)
    X_val_s   = scaler.transform(X_val)
    X_test_s  = scaler.transform(X_test)

    # 4) Validation: choose best class_weight
    best_model = None
    best_name = None
    best_f1 = -1

    for w in WEIGHTS:
        model = LogisticRegression(
            max_iter=2000,
            n_jobs=-1,
            random_state=42,
            class_weight=w["weights"]
        )
        model.fit(X_train_s, y_train)

        pred_val = model.predict(X_val_s)
        f1_val = f1_score(y_val, pred_val)

        print(f"{fold} | {w['name']} | Val F1 = {f1_val:.4f}")

        if f1_val > best_f1:
            best_f1 = f1_val
            best_model = model
            best_name = w["name"]

    print(f">>> Best weight = {best_name} (F1={best_f1:.4f})")

    # 5) Probabilities
    p_val  = best_model.predict_proba(X_val_s)[:, 1]
    p_test = best_model.predict_proba(X_test_s)[:, 1]

    # 6) Threshold tuning on validation
    best_thr, ss_val_best, fprh_val_best = tune_threshold(p_val, y_val)
    print(f"{fold} | Best threshold from val = {best_thr:.2f}")

    # 7) Test FP metrics
    fp_test, _ = compute_firing_power(p_test, best_thr)
    ss_test, fprh_test, FP_test = compute_ss_fpr(fp_test, y_test, best_thr)

    results.append({
        "fold": fold,
        "best_weight": best_name,
        "threshold": best_thr,
        "F1_val": best_f1,
        "SS_test": ss_test,
        "FPRh_test": fprh_test,
        "FP_test": FP_test
    })

    # 8) Save model + scaler
    joblib.dump(best_model, os.path.join(MODEL_DIR, f"{fold}_{best_name}.pkl"))
    joblib.dump(scaler,     os.path.join(MODEL_DIR, f"{fold}_{best_name}_scaler.pkl"))

    # 9) Plots
    plot_prediction_view(t_test, fp_test, y_test, best_thr, fold, PLOTS_DIR)
    plot_risk_view(t_test, fp_test, fold, PLOTS_DIR)

# 10) Save CSV
pd.DataFrame(results).to_csv(RESULTS_CSV, index=False)

print("\n=== DONE BINARY ALL-SIGNALS PREDICTION ===")
print("Saved models to:", MODEL_DIR)
print("Saved plots to:", PLOTS_DIR)
print("Saved results to:", RESULTS_CSV)



Processing fold01
fold01 | w1 | Val F1 = 0.3104
fold01 | w2 | Val F1 = 0.3092
fold01 | w3 | Val F1 = 0.3089
fold01 | balanced | Val F1 = 0.3029
>>> Best weight = w1 (F1=0.3104)
fold01 | Best threshold from val = 0.30

Processing fold02
fold02 | w1 | Val F1 = 0.3074
fold02 | w2 | Val F1 = 0.3074
fold02 | w3 | Val F1 = 0.3071
fold02 | balanced | Val F1 = 0.2971
>>> Best weight = w2 (F1=0.3074)
fold02 | Best threshold from val = 0.50

Processing fold03
fold03 | w1 | Val F1 = 0.3111
fold03 | w2 | Val F1 = 0.3103
fold03 | w3 | Val F1 = 0.3100
fold03 | balanced | Val F1 = 0.2979
>>> Best weight = w1 (F1=0.3111)
fold03 | Best threshold from val = 0.35

Processing fold04
fold04 | w1 | Val F1 = 0.3083
fold04 | w2 | Val F1 = 0.3070
fold04 | w3 | Val F1 = 0.3068
fold04 | balanced | Val F1 = 0.2991
>>> Best weight = w1 (F1=0.3083)
fold04 | Best threshold from val = 0.35

Processing fold05
fold05 | w1 | Val F1 = 0.3126
fold05 | w2 | Val F1 = 0.3096
fold05 | w3 | Val F1 = 0.3092
fold05 | balanced |

In [3]:
import pandas as pd

# Paths to result CSVs
EEG_CSV   = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\LR_EEG_Binary_results.csv"
ALL_CSV   = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\LR_AllSignals_Binary_results.csv"
OUTPUT_CSV = r"C:\Users\ririh\OneDrive\المستندات\ecg_project_new\LR_Binary_EEG_vs_AllSignals_Comparison.csv"

# Load
df_eeg = pd.read_csv(EEG_CSV)
df_all = pd.read_csv(ALL_CSV)

df_eeg["method"] = "EEG_only"
df_all["method"] = "All_signals"

# Merge
df = pd.concat([df_eeg, df_all], ignore_index=True)

# Compute averages per method
avg = df.groupby("method").mean(numeric_only=True)

print("\n===== AVERAGE METRICS (per method) =====\n")
print(avg.round(4))

# Build comparison table
comparison = avg.copy()
comparison = comparison[["F1_val", "SS_test", "FPRh_test", "FP_test"]]
comparison = comparison.rename(columns={
    "F1_val": "F1_val (validation)",
    "SS_test": "Sensitivity_test",
    "FPRh_test": "FPR/h_test",
    "FP_test": "FP_test"
})

print("\n===== COMPARISON: EEG_only vs All_signals =====\n")
print(comparison.round(4))

# Best method per metric
print("\n===== BEST METHOD PER METRIC =====\n")
print("Best F1_val (validation):  ", comparison["F1_val (validation)"].idxmax())
print("Best Sensitivity_test:     ", comparison["Sensitivity_test"].idxmax())
print("Lowest FPR/h_test:         ", comparison["FPR/h_test"].idxmin())
print("Lowest FP_test:            ", comparison["FP_test"].idxmin())

# Save
comparison.to_csv(OUTPUT_CSV)
print("\nSaved comparison to:", OUTPUT_CSV)



===== AVERAGE METRICS (per method) =====

             threshold  F1_val  SS_test  FPRh_test     FP_test
method                                                        
All_signals     0.3696  0.3082   0.9981  1463.1285  57026.4783
EEG_only        0.4478  0.3067   0.9979  1463.1830  57006.8696

===== COMPARISON: EEG_only vs All_signals =====

             F1_val (validation)  Sensitivity_test  FPR/h_test     FP_test
method                                                                    
All_signals               0.3082            0.9981   1463.1285  57026.4783
EEG_only                  0.3067            0.9979   1463.1830  57006.8696

===== BEST METHOD PER METRIC =====

Best F1_val (validation):   All_signals
Best Sensitivity_test:      All_signals
Lowest FPR/h_test:          All_signals
Lowest FP_test:             EEG_only

Saved comparison to: C:\Users\ririh\OneDrive\المستندات\ecg_project_new\LR_Binary_EEG_vs_AllSignals_Comparison.csv
