In [None]:
# =============================================
# Academic Performance Pipeline (Tabular + XAI)
# =============================================
# Requirements (install as needed):
# pip install pandas numpy scikit-learn scipy torch torchvision torchaudio shap lime transformers einops

import os
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

from typing import Dict, List, Tuple, Optional
from collections import defaultdict

from sklearn.model_selection import StratifiedKFold, train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.feature_selection import mutual_info_classif
from sklearn.feature_selection import chi2
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.utils import check_random_state

from scipy import stats

# Optional libraries
try:
    import shap
    HAVE_SHAP = True
except Exception:
    HAVE_SHAP = False

try:
    from lime.lime_tabular import LimeTabularExplainer
    HAVE_LIME = True
except Exception:
    HAVE_LIME = False

# Deep/transformer libs
try:
    import torch
    import torch.nn as nn
    from torch.utils.data import Dataset, DataLoader
    from einops import rearrange
    HAVE_TORCH = True
except Exception:
    HAVE_TORCH = False

try:
    from transformers import AutoTokenizer, AutoModelForSequenceClassification
    HAVE_TRANSFORMERS = True
except Exception:
    HAVE_TRANSFORMERS = False


# -----------------------------
# 0) CONFIG: paths & columns
# -----------------------------
RNG = 42
DATA_PATH = "your_dataset.csv"  # <-- set your CSV path here

TARGET_COL = "CGPA_label"  # Binary or multi-class label for academic performance
TEXT_COL = None            # e.g., "essay" if you have a text field; otherwise leave None

# Feature groups (edit to match your data)
MENTAL_FEATURES = [
    "Financial_Stress","Anxiety_Score","Depression_Score","Stress_Level",
    "Sleep_Quality","Counseling_Service_Use","Substance_Use","Chronic_Illness"
]
PERSONAL_FEATURES = ["Age","Relationship_Status","Extracurricular_Involvement","Family_History","Gender"]
SOCIAL_FEATURES = ["Residence_Type","Physical_Activity","Social_Support","Diet_Quality"]
COURSE_COL = "Course"     # for subgroup analysis
NUMERIC_FEATURES = ["Age","CGPA","Stress_Level","Depression_Score","Anxiety_Score",
                    "Financial_Stress","Semester_Credit_Load"]
CATEGORICAL_FEATURES = [
    "Course","Gender","Sleep_Quality","Physical_Activity","Diet_Quality","Social_Support",
    "Relationship_Status","Substance_Use","Counseling_Service_Use","Family_History",
    "Chronic_Illness","Extracurricular_Involvement","Residence_Type"
]

# Helper: safe intersection with dataframe columns
def keep_existing(cols: List[str], df: pd.DataFrame) -> List[str]:
    return [c for c in cols if c in df.columns]

# -----------------------------------------
# 1) DATA LOAD (expects a CSV)
# -----------------------------------------
def load_data(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)
    return df

# -----------------------------------------
# 2) PREPROCESS (impute/scale/encode)
# 2.1) feature categorization (already via lists)
# -----------------------------------------
def make_preprocessor(df: pd.DataFrame):
    num_cols = keep_existing(NUMERIC_FEATURES, df)
    cat_cols = keep_existing(CATEGORICAL_FEATURES, df)

    numeric = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler())
    ])
    categorical = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
    ])
    pre = ColumnTransformer(
        transformers=[
            ("num", numeric, num_cols),
            ("cat", categorical, cat_cols)
        ],
        remainder="drop"
    )
    return pre, num_cols, cat_cols

# -----------------------------------------
# 3) FEATURE SELECTION
#    IG, Gain Ratio, Entropy, Gini
# -----------------------------------------
def entropy_of_series(x: pd.Series) -> float:
    vals, counts = np.unique(x.astype(str), return_counts=True)
    p = counts / counts.sum()
    return -(p * np.log2(p + 1e-12)).sum()

def gini_of_series(x: pd.Series) -> float:
    vals, counts = np.unique(x.astype(str), return_counts=True)
    p = counts / counts.sum()
    return 1.0 - (p**2).sum()

def info_gain(feature: pd.Series, target: pd.Series) -> float:
    h_y = entropy_of_series(target)
    # conditional entropy H(Y|X)
    cond_entropy = 0.0
    for v, cnt in target.groupby(feature.astype(str)).size().items():
        idx = (feature.astype(str) == v)
        h = entropy_of_series(target[idx])
        cond_entropy += (idx.mean()) * h
    return h_y - cond_entropy

def gain_ratio(feature: pd.Series, target: pd.Series) -> float:
    ig = info_gain(feature, target)
    split_info = entropy_of_series(feature)
    return ig / (split_info + 1e-12)

def feature_scores(df: pd.DataFrame, y: pd.Series, feature_list: List[str]) -> pd.DataFrame:
    rows = []
    for f in feature_list:
        s = df[f]
        # convert numeric/cat to string categories for entropy/gini stability
        ig = info_gain(s, y)
        gr = gain_ratio(s, y)
        ent = entropy_of_series(s)
        gi = gini_of_series(s)
        rows.append({"feature": f, "info_gain": ig, "gain_ratio": gr, "entropy": ent, "gini": gi})
    return pd.DataFrame(rows).sort_values("info_gain", ascending=False).reset_index(drop=True)

def select_by_criterion(scores: pd.DataFrame, criterion: str, top_k: Optional[int]=None, thr: Optional[float]=None) -> List[str]:
    assert criterion in {"info_gain","gain_ratio","entropy","gini"}
    if criterion in {"entropy","gini"}:
        # lower is simpler; but user asked to *apply* “Entropy” as selection:
        # we choose top_k by descending info_gain but report entropy; or you can pick lowest entropy
        sort_asc = True
    else:
        sort_asc = False
    s = scores.sort_values(criterion, ascending=sort_asc)
    if thr is not None:
        if sort_asc:
            chosen = s[s[criterion] <= thr]
        else:
            chosen = s[s[criterion] >= thr]
    else:
        chosen = s
    if top_k is not None:
        chosen = chosen.head(top_k)
    return chosen["feature"].tolist()

# -----------------------------------------
# 4) MODELS
# -----------------------------------------
def classical_models(random_state=RNG) -> Dict[str, object]:
    return {
        "SVM": SVC(kernel="rbf", probability=True, random_state=random_state),
        "LR": LogisticRegression(max_iter=2000, n_jobs=None),
        "RF": RandomForestClassifier(n_estimators=300, max_depth=None, random_state=random_state),
        "GB": GradientBoostingClassifier(random_state=random_state),
    }

# Optional LSTM for tabular (treats rows as sequences of features)
class TabularLSTM(nn.Module):
    def __init__(self, n_features, hidden=64, num_classes=2):
        super().__init__()
        self.lstm = nn.LSTM(input_size=1, hidden_size=hidden, num_layers=1, batch_first=True)
        self.fc = nn.Linear(hidden*n_features, num_classes)

    def forward(self, x):
        # x: (B, F) -> (B, F, 1)
        x = x.unsqueeze(-1)
        out, _ = self.lstm(x)
        out = out.reshape(out.size(0), -1)
        return self.fc(out)

# Minimal TabTransformer (TaBERT-style) for tabular
class TabDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self): return len(self.X)
    def __getitem__(self, i): return self.X[i], self.y[i]

class TabTransformer(nn.Module):
    def __init__(self, n_features, d_model=64, n_heads=4, depth=2, num_classes=2, p=0.1):
        super().__init__()
        self.in_proj = nn.Linear(n_features, d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=n_heads, batch_first=True, dropout=p)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=depth)
        self.cls = nn.Linear(d_model, num_classes)

    def forward(self, x):
        # x: (B, F) -> treat as a length-F sequence of 1 token dimension
        x = self.in_proj(x)              # (B, d_model)
        x = x.unsqueeze(1).repeat(1, 1, 1)  # single token
        h = self.encoder(x)              # (B, 1, d_model)
        return self.cls(h[:,0,:])

def train_torch_classifier(model, X_train, y_train, X_val, y_val, epochs=20, lr=1e-3, batch=64):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    ds_tr = TabDataset(X_train, y_train)
    ds_va = TabDataset(X_val, y_val)
    dl_tr = DataLoader(ds_tr, batch_size=batch, shuffle=True)
    dl_va = DataLoader(ds_va, batch_size=batch, shuffle=False)
    opt = torch.optim.AdamW(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()

    for _ in range(epochs):
        model.train()
        for xb, yb in dl_tr:
            xb, yb = xb.to(device), yb.to(device)
            opt.zero_grad()
            logits = model(xb)
            loss = loss_fn(logits, yb)
            loss.backward()
            opt.step()

    # predict on val
    model.eval()
    with torch.no_grad():
        preds = []
        for xb, _ in dl_va:
            xb = xb.to(device)
            logits = model(xb)
            preds.append(torch.argmax(logits, dim=1).cpu().numpy())
    y_pred = np.concatenate(preds)
    return y_pred, model

# Optional text transformers (BERT/RoBERTa) if a text column exists
def infer_text_transformer(model_name, texts, labels):
    # Placeholder minimal inference; for full training, fine-tune as needed.
    return None  # kept simple to avoid heavy training here


# -----------------------------------------
# 5) Evaluation helpers
# -----------------------------------------
def score_pack(y_true, y_pred) -> Dict[str, float]:
    return {
        "acc": accuracy_score(y_true, y_pred),
        "precision": precision_score(y_true, y_pred, average="weighted", zero_division=0),
        "recall": recall_score(y_true, y_pred, average="weighted", zero_division=0),
        "f1": f1_score(y_true, y_pred, average="weighted", zero_division=0),
    }

def run_classical(X, y, models: Dict[str, object]):
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=RNG, stratify=y)
    results = {}
    for name, mdl in models.items():
        mdl.fit(X_tr, y_tr)
        y_pred = mdl.predict(X_te)
        results[name] = score_pack(y_te, y_pred)
    return results

def run_torch_model(X, y, model_ctor, **kwargs):
    if not HAVE_TORCH:
        return {}
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=RNG, stratify=y)
    n_features = X.shape[1]
    model = model_ctor(n_features=n_features, num_classes=len(np.unique(y)), **kwargs)
    y_pred, trained = train_torch_classifier(model, X_tr, y_tr, X_te, y_te, epochs=kwargs.get("epochs", 20))
    return {"TorchModel": score_pack(y_te, y_pred)}, trained

# -----------------------------------------
# 5.x) Category × Selector × Models
# -----------------------------------------
def prepare_matrix(df, target_col, feature_list, preprocessor):
    X = df[feature_list].copy()
    y = df[target_col].copy().astype(int)
    X_proc = preprocessor.fit_transform(df[feature_list])
    return X, y, X_proc

def evaluate_block(df, target_col, group_features, selector_name, scores_df, pre_template, top_k=None):
    selected = select_by_criterion(scores_df, selector_name, top_k=top_k)
    if len(selected) == 0:
        return {"selected": [], "results": {}}
    pre = pre_template
    X = df[selected]
    y = df[target_col].astype(int)
    X_proc = pre.fit_transform(X)

    cls_res = run_classical(X_proc, y, classical_models())
    torch_res = {}
    if HAVE_TORCH:
        # LSTM
        res_lstm, _ = run_torch_model(X_proc, y, model_ctor=lambda n_features, num_classes, **kw: TabularLSTM(n_features, hidden=64, num_classes=num_classes), epochs=20)
        # TabTransformer (TaBERT-style)
        res_tab, _ = run_torch_model(X_proc, y, model_ctor=lambda n_features, num_classes, **kw: TabTransformer(n_features, d_model=64, n_heads=4, depth=2, num_classes=num_classes), epochs=20)
        torch_res.update({ "LSTM": list(res_lstm.values())[0], "TaBERT": list(res_tab.values())[0] })
    return {"selected": selected, "results": {**cls_res, **torch_res}}

def run_all_blocks(df: pd.DataFrame, target_col: str):

    # Preprocessor used inside each block (re-fit per block)
    pre, _, _ = make_preprocessor(df)

    blocks = []
    for group_name, feats in [
        ("Mental", keep_existing(MENTAL_FEATURES, df)),
        ("Social", keep_existing(SOCIAL_FEATURES, df)),
        ("Personal", keep_existing(PERSONAL_FEATURES, df)),
        ("CGPA_Target", keep_existing(MENTAL_FEATURES+SOCIAL_FEATURES+PERSONAL_FEATURES, df))
    ]:
        if len(feats)==0:
            continue
        scores = feature_scores(df[feats], df[TARGET_COL], feats)
        for sel in ["info_gain", "gain_ratio", "entropy"]:
            block = evaluate_block(df, TARGET_COL, feats, sel, scores, pre, top_k=None)
            blocks.append({
                "group": group_name,
                "selector": sel,
                "selected_features": block["selected"],
                "results": block["results"]
            })
    return blocks

# -----------------------------------------
# 6) Course-wise performance
# -----------------------------------------
def course_wise_performance(df: pd.DataFrame, target_col: str):
    out = []
    models = classical_models()
    pre, _, _ = make_preprocessor(df)
    for c, dsub in df.groupby(COURSE_COL):
        if dsub[target_col].nunique() < 2:  # need both classes
            continue
        X = dsub.drop(columns=[target_col])
        y = dsub[target_col].astype(int)
        X_proc = pre.fit_transform(X)
        res = run_classical(X_proc, y, models)
        out.append({"course": c, **{f"{m}_{k}": v for m,sc in res.items() for k,v in sc.items()}})
    return pd.DataFrame(out)

# -----------------------------------------
# 7) LIME & SHAP
# -----------------------------------------
def run_xai_examples(df: pd.DataFrame, target_col: str, feature_list: List[str], preprocessor):
    X = df[feature_list]
    y = df[target_col].astype(int)
    X_proc = preprocessor.fit_transform(X)

    rf = RandomForestClassifier(n_estimators=300, random_state=RNG).fit(X_proc, y)

    xai = {}
    if HAVE_SHAP:
        explainer = shap.TreeExplainer(rf)
        shap_values = explainer.shap_values(X_proc[:500])  # sample
        xai["shap_values_shape"] = [np.array(v).shape for v in shap_values] if isinstance(shap_values, list) else np.array(shap_values).shape

    if HAVE_LIME:
        explainer = LimeTabularExplainer(
            training_data=np.array(X_proc),
            mode="classification",
            feature_names=[f"f{i}" for i in range(X_proc.shape[1])],
            discretize_continuous=False
        )
        explanation = explainer.explain_instance(
            data_row=X_proc[0],
            predict_fn=rf.predict_proba,
            num_features=10
        )
        xai["lime_top"] = explanation.as_list()

    return xai

# -----------------------------------------
# 8) Statistical tests
# -----------------------------------------
def statistical_tests(df: pd.DataFrame, target_col: str, feature_list: List[str]) -> pd.DataFrame:
    """
    For numeric features: t-test / ANOVA across target classes.
    For categorical features: chi-square (contingency) and z-test (two-proportion if binary).
    """
    rows = []
    y = df[target_col]
    classes = sorted(y.unique())

    for f in feature_list:
        s = df[f]
        if np.issubdtype(s.dtype, np.number):
            groups = [s[y==cl] for cl in classes]
            # t-test only if binary classes
            p_t = stats.ttest_ind(groups[0], groups[1], equal_var=False, nan_policy="omit").pvalue if len(groups)==2 else np.nan
            # one-way ANOVA if >=2 classes
            p_a = stats.f_oneway(*groups).pvalue if len(groups)>=2 else np.nan
            p_chi = np.nan
            p_z = np.nan
        else:
            # chi-square
            tbl = pd.crosstab(s, y)
            chi2_stat, p_chi, dof, _ = stats.chi2_contingency(tbl)
            # z-test for two-proportion if binary y and binary feature
            if len(classes)==2 and tbl.shape[0]==2:
                # compute pooled proportion test
                counts = tbl.values
                p1 = counts[0,0] / counts[:,0].sum()
                p2 = counts[0,1] / counts[:,1].sum()
                p_pool = (counts[0,0] + counts[0,1]) / counts.sum()
                se = np.sqrt(p_pool*(1-p_pool)*(1/counts[:,0].sum() + 1/counts[:,1].sum()))
                z = (p1 - p2) / (se + 1e-12)
                p_z = 2 * (1 - stats.norm.cdf(abs(z)))
            else:
                p_z = np.nan
            p_t = np.nan
            p_a = np.nan

        rows.append({"feature": f, "t_test_p": p_t, "anova_p": p_a, "chi2_p": p_chi, "ztest_p": p_z})
    return pd.DataFrame(rows)

# -----------------------------------------
# MAIN
# -----------------------------------------
def main():
    df = load_data(DATA_PATH)

    # (Optional) create/derive target if CGPA numeric exists:
    if TARGET_COL not in df.columns and "CGPA" in df.columns:
        # Example: high (>=3.5) vs others
        df["CGPA_label"] = (df["CGPA"] >= 3.5).astype(int)

    assert TARGET_COL in df.columns, f"Target column '{TARGET_COL}' missing."

    # Preprocessor for global use
    pre, num_cols, cat_cols = make_preprocessor(df)

    # ===== 5) SEPARATE RESULTS BY CATEGORY × SELECTOR =====
    all_blocks = run_all_blocks(df, TARGET_COL)
    print("\n=== Blocked Results (Group × Selector) ===")
    for b in all_blocks:
        print(f"\n[{b['group']}] via {b['selector']}")
        print("Selected features:", b["selected_features"])
        print(pd.DataFrame(b["results"]).T.round(3))

    # ===== 6) COURSE-WISE =====
    if COURSE_COL in df.columns:
        cw = course_wise_performance(df, TARGET_COL)
        print("\n=== Course-wise results (classical models) ===")
        print(cw.round(3).to_string(index=False))

    # ===== 7) XAI (LIME/SHAP) on a representative model with top features (e.g., IG over ALL features) =====
    usable_feats = keep_existing(MENTAL_FEATURES+SOCIAL_FEATURES+PERSONAL_FEATURES, df)
    sc_all = feature_scores(df[usable_feats], df[TARGET_COL], usable_feats)
    top10 = select_by_criterion(sc_all, "info_gain", top_k=10)
    xai = run_xai_examples(df, TARGET_COL, top10, pre)
    print("\n=== XAI ===")
    if HAVE_SHAP: print("SHAP shapes:", xai.get("shap_values_shape"))
    else: print("SHAP not available.")
    if HAVE_LIME: print("LIME top features (first instance):", xai.get("lime_top"))
    else: print("LIME not available.")

    # ===== 8) Statistical tests =====
    stats_df = statistical_tests(df, TARGET_COL, usable_feats)
    print("\n=== Statistical tests ===")
    print(stats_df.round(4).to_string(index=False))

if __name__ == "__main__":
    main()