# 04 — LDA Analysis

In [None]:

# Update this if your data isn't under ./data
base_path = r"D:\IITB\STData\1"
 # change to r"D:\IITB\STData" on Windows if needed
save_models_to = r"./models"
save_fig_to = r"./notebooks/figures"

import os, pandas as pd, numpy as np, matplotlib.pyplot as plt
os.makedirs(save_models_to, exist_ok=True)
os.makedirs(save_fig_to, exist_ok=True)

def read_csv(name):
    p = os.path.join(base_path, name)
    return pd.read_csv(p)

print("Using base_path:", base_path)


In [None]:

import os, pandas as pd, numpy as np, pickle
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import cross_val_score

X = pd.read_csv(os.path.join(base_path,"processed_clean.csv"))
# Build a Y label if available; fallback to median split on Engagement
df_raw = pd.read_csv(os.path.join(base_path,"processed_merged.csv"))
if 'Correct' in df_raw.columns:
    y = (df_raw['Correct']>0).astype(int).values
else:
    y = (df_raw['Engagement'] > df_raw['Engagement'].median()).astype(int).values

lda = LinearDiscriminantAnalysis()
scores = cross_val_score(lda, X.values, y, cv=5)
print("LDA CV accuracy:", scores.mean().round(3))

lda.fit(X.values, y)
coef = pd.Series(lda.coef_[0], index=X.columns).sort_values(key=lambda s: s.abs(), ascending=False)
coef.head(20).to_csv(os.path.join(base_path,"lda_top_features.csv"), index=True)

with open(os.path.join(save_models_to,"lda_model.pkl"),"wb") as f:
    pickle.dump(lda, f)
print("Saved LDA model")


In [None]:
# ONE-CELL: robust labels + LDA using your column names (pupil_mean, fix_mean, sac_mean, …)
import os, glob, numpy as np, pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

BASE = r"D:\IITB\STData"
FEATURE_CSV = os.path.join(BASE, "eye_features_all_students.csv")
LABELS_CSV  = os.path.join(BASE, "labels.csv")  # optional; if not present we derive

def try_read_csv(p):
    try:
        return pd.read_csv(p)
    except Exception:
        return None

def ensure_student_id_col(df, df_name="df"):
    if df is None or df.empty:
        return df
    cols = list(df.columns)
    if "student_id" in cols:
        df["student_id"] = pd.to_numeric(df["student_id"], errors="coerce").astype("Int64")
        df = df.dropna(subset=["student_id"]).copy()
        df["student_id"] = df["student_id"].astype(int)
        return df
    for cand in ["subject","Subject","student","Unnamed: 0", cols[0]]:
        if cand in cols:
            df = df.rename(columns={cand: "student_id"})
            df["student_id"] = pd.to_numeric(df["student_id"], errors="coerce").astype("Int64")
            df = df.dropna(subset=["student_id"]).copy()
            df["student_id"] = df["student_id"].astype(int)
            return df
    raise KeyError(f"{df_name}: could not determine 'student_id' column from columns={cols}")

def derive_labels_from_subject_folders(base_dir):
    rows = []
    for sp in sorted(glob.glob(os.path.join(base_dir, "*"))):
        if not os.path.isdir(sp): 
            continue
        sid = os.path.basename(sp)
        if not sid.isdigit():
            continue
        sid_i = int(sid)
        psy = try_read_csv(os.path.join(sp, f"{sid}_PSY.csv"))
        eng = try_read_csv(os.path.join(sp, f"{sid}_ENG.csv"))
        label = None
        if psy is not None and "Correct" in psy.columns:
            label = int((pd.to_numeric(psy["Correct"], errors="coerce").fillna(0) > 0).mean() >= 0.5)
        elif eng is not None and "Engagement" in eng.columns:
            s = pd.to_numeric(eng["Engagement"], errors="coerce")
            label = int((s > s.median()).mean() >= 0.5)
        if label is not None:
            rows.append({"student_id": sid_i, "label": int(label)})
    if not rows:
        return pd.DataFrame(columns=["student_id","label"])
    return pd.DataFrame(rows).drop_duplicates("student_id")

# 1) Load features
feat = try_read_csv(FEATURE_CSV)
if feat is None or feat.empty:
    raise FileNotFoundError(f"Could not read features from {FEATURE_CSV}")
feat = ensure_student_id_col(feat, "features")
for c in feat.columns:
    if c != "student_id":
        feat[c] = pd.to_numeric(feat[c], errors="coerce")

print("Feature columns:", feat.columns.tolist())

# 2) Get/derive labels
labels_df = None
if os.path.exists(LABELS_CSV):
    labels_df = try_read_csv(LABELS_CSV)
    if labels_df is not None and not labels_df.empty:
        labels_df = ensure_student_id_col(labels_df, "labels_csv")
        if "label" not in labels_df.columns:
            for cand in ["y","Label","target","class"]:
                if cand in labels_df.columns:
                    labels_df = labels_df.rename(columns={cand:"label"})
                    break
        if "label" in labels_df.columns:
            labels_df["label"] = pd.to_numeric(labels_df["label"], errors="coerce").astype(int)
        else:
            labels_df = None

if labels_df is None or labels_df.empty:
    print("labels.csv missing/invalid → deriving from PSY/ENG …")
    labels_df = derive_labels_from_subject_folders(BASE)

# Fallback: build proxy labels from your feature names
if labels_df is None or labels_df.empty:
    print("No PSY/ENG labels found → creating proxy labels from features (median split).")
    # <—— these match your CSV: pupil_mean, fix_mean, sac_mean (and counts/stds as backups)
    proxy_candidates = [
        "pupil_mean",
        "fix_mean", "sac_mean",
        "fix_count", "sac_count",
        "pupil_std", "fix_std", "sac_std",
    ]
    proxy_col = next((c for c in proxy_candidates if c in feat.columns), None)
    if proxy_col is None:
        raise KeyError(f"No suitable proxy feature found. Looked for {proxy_candidates}. "
                       f"Available: {feat.columns.tolist()}")
    tmp = feat[["student_id", proxy_col]].dropna().copy()
    m = tmp[proxy_col].median()
    tmp["label"] = (tmp[proxy_col] > m).astype(int)
    labels_df = tmp[["student_id","label"]].copy()

labels_df = ensure_student_id_col(labels_df, "labels_final")
labels_df["label"] = pd.to_numeric(labels_df["label"], errors="coerce").astype(int)
labels_df = labels_df.dropna(subset=["student_id","label"]).drop_duplicates("student_id")
print("Label counts:", labels_df["label"].value_counts().to_dict())

# 3) Merge & prepare X, y
df = feat.merge(labels_df, on="student_id", how="inner")
if df.empty:
    raise RuntimeError("Merge produced 0 rows. Check overlap between features and labels.")
X = df.drop(columns=["student_id","label"]).select_dtypes(include=[np.number]).copy()
y = df["label"].astype(int).values
X = X[[c for c in X.columns if X[c].nunique(dropna=True) > 1]]  # drop constants
print("Merged shape:", df.shape, "| Final feature count:", X.shape[1])

# 4) Quick LDA sanity check
X_train, X_test, y_train, y_test = train_test_split(
    X.values, y, test_size=0.25, random_state=42, stratify=y if len(np.unique(y))==2 else None
)
lda_pipe = Pipeline([
    ("imp", SimpleImputer(strategy="median")),
    ("scale", StandardScaler()),
    ("lda", LinearDiscriminantAnalysis()),
])
lda_pipe.fit(X_train, y_train)
pred = lda_pipe.predict(X_test)
print("LDA accuracy:", round(accuracy_score(y_test, pred), 3))
print("Confusion matrix:\n", confusion_matrix(y_test, pred))
print("Classification report:\n", classification_report(y_test, pred, digits=3))


In [None]:
# === Save the fitted LDA pipeline ===
from joblib import dump
import os

models_dir = "../models"  # notebook is in notebooks/, so ../models is repo/models
os.makedirs(models_dir, exist_ok=True)

lda_path = os.path.join(models_dir, "lda_model.pkl")
dump(lda_pipe, lda_path)

print(f"✅ LDA pipeline saved to: {lda_path}")


from joblib import load
_ = load(lda_path)
print("✅ Loaded OK.")


In [None]:
# === PCA vs LDA comparison ===
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

# Make sure X, y exist and contain only numeric columns
X_num = X.select_dtypes(include=[np.number]).copy()

# Fit PCA on all features (2D for visualization)
pca2 = PCA(n_components=2, random_state=42)
Xp = pca2.fit_transform(X_num)

# LDA transform to 1D (binary) or 2D (multiclass). Here we’ll plot 1D on x, 0 on y.
# Use the same lda_pipe you trained (with imputer+scaler inside).
# We need to call transform on the *preprocessed* X: use lda_pipe[:-1] (all steps except last LDA) to get scaled features.
pre_X = lda_pipe[:-1].transform(X_num)           # impute + scale
Xd_1d = lda_pipe[-1].transform(pre_X)            # LDA -> 1D for 2 classes
Xd = np.c_[Xd_1d, np.zeros_like(Xd_1d)]          # stack a zero y to plot on a line

# Plot
fig, axs = plt.subplots(1, 2, figsize=(12, 5))

# PCA scatter
axs[0].scatter(Xp[:, 0], Xp[:, 1], c=y, s=40)
axs[0].set_title("PCA (2D) of student features")
axs[0].set_xlabel("PC1")
axs[0].set_ylabel("PC2")

# LDA scatter (1D shown as line)
axs[1].scatter(Xd[:, 0], Xd[:, 1], c=y, s=40)
axs[1].set_title("LDA projection (1D shown on x-axis)")
axs[1].set_xlabel("LD1")
axs[1].set_yticks([])

plt.tight_layout()
plt.show()

print("Explained variance (PCA):", np.round(pca2.explained_variance_ratio_, 3))


In [None]:
# === Save PCA (2D) and LDA (1D) projections ===
import os, numpy as np, pandas as pd
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

exports = "../notebooks/figures"
os.makedirs(exports, exist_ok=True)

# numeric X
X_num = X.select_dtypes(include=[np.number]).copy()

# PCA 2D
pca2 = PCA(n_components=2, random_state=42)
Xp = pca2.fit_transform(X_num)

pca_df = pd.DataFrame(Xp, columns=["PC1","PC2"])
pca_df["label"] = y
pca_df.to_csv("../eye_features_pca.csv", index=False)

plt.figure(figsize=(6,5))
plt.scatter(Xp[:,0], Xp[:,1], c=y, s=40)
plt.title("PCA (2D) of student features")
plt.xlabel("PC1"); plt.ylabel("PC2")
plt.tight_layout(); plt.savefig(os.path.join(exports,"04_pca_scatter.png"), dpi=200)
plt.show()

# LDA 1D
pre_X = lda_pipe[:-1].transform(X_num)      # impute+scale
ld1 = lda_pipe[-1].transform(pre_X).ravel() # 1D

lda_df = pd.DataFrame({"LD1": ld1, "label": y})
lda_df.to_csv("../eye_features_lda1d.csv", index=False)

plt.figure(figsize=(6,2.8))
plt.scatter(ld1, np.zeros_like(ld1), c=y, s=40)
plt.title("LDA projection (1D)"); plt.yticks([])
plt.xlabel("LD1")
plt.tight_layout(); plt.savefig(os.path.join(exports,"04_lda_1d.png"), dpi=200)
plt.show()

print("Saved:", "../eye_features_pca.csv", "and", "../eye_features_lda1d.csv")


In [None]:
pip install seaborn


In [None]:
# === Save confusion matrix & classification report as files ===
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np, matplotlib.pyplot as plt, os

exports = "../notebooks/figures"
os.makedirs(exports, exist_ok=True)

cm = confusion_matrix(y_test, pred)
cr = classification_report(y_test, pred, digits=3)

# Save text
with open("../lda_classification_report.txt", "w") as f:
    f.write(cr)

# Plot confusion matrix with matplotlib
plt.figure(figsize=(4,3))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title("LDA Confusion Matrix")
plt.colorbar()
tick_marks = np.arange(len(set(y_test)))
plt.xticks(tick_marks, tick_marks)
plt.yticks(tick_marks, tick_marks)
plt.xlabel("Predicted")
plt.ylabel("True")

# Add numbers
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, cm[i, j], ha="center", va="center", color="red")

plt.tight_layout()
plt.savefig(os.path.join(exports, "04_lda_confusion.png"), dpi=200)
plt.show()

print("Saved:", "../lda_classification_report.txt", "and 04_lda_confusion.png")


In [None]:
# === 5-fold CV on the same pipeline ===
from sklearn.model_selection import StratifiedKFold, cross_val_score
import numpy as np

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(lda_pipe, X, y, scoring="accuracy", cv=cv)
print("5-fold CV accuracy:", np.round(scores, 3), " | mean:", scores.mean().round(3))


In [None]:
# === LDA coefficients as feature importances ===
import numpy as np, pandas as pd, matplotlib.pyplot as plt, os

# refit on all data to get coefs aligned with lda_pipe's preprocessing
lda_pipe.fit(X, y)

coef = lda_pipe[-1].coef_.ravel()  # shape (n_features,)
names = X.columns.to_list()
imp = pd.Series(np.abs(coef), index=names).sort_values(ascending=False)

imp.to_csv("../lda_feature_importance.csv", header=["abs_coef"])
plt.figure(figsize=(7,4))
imp.head(15).iloc[::-1].plot(kind="barh")
plt.title("LDA | Top features (|coef|)")
plt.tight_layout(); plt.savefig("../notebooks/figures/04_lda_feature_importance.png", dpi=200)
plt.show()

print("Saved:", "../lda_feature_importance.csv")


In [None]:
# === Inference: load lda_model.pkl and predict ===
from joblib import load
import pandas as pd

lda_loaded = load("../models/lda_model.pkl")

# Example: take first 3 students from your existing table (replace with your new rows)
sample = X.head(3).copy()
pred_labels = lda_loaded.predict(sample)
pred_proba  = lda_loaded.predict_proba(sample)[:,1] if hasattr(lda_loaded, "predict_proba") else None

out = sample.copy()
out["pred_label"] = pred_labels
if pred_proba is not None:
    out["pred_prob_1"] = pred_proba
out
