In [4]:
import os
import pickle
from typing import Tuple, Dict, Any

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings

from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import classification_report, confusion_matrix, f1_score, balanced_accuracy_score
from sklearn.inspection import permutation_importance
import sklearn

# CONFIG
DATA_PATH = "student_wellbeing_dataset.csv"  # set to your file path if different
OUTPUT_DIR = "data"                          # directory where outputs will be saved
RANDOM_STATE = 42

warnings.filterwarnings("ignore")


def load_data(path: str) -> pd.DataFrame:
    if not os.path.exists(path):
        raise FileNotFoundError(f"Data file not found at: {path}")
    return pd.read_csv(path)


def detect_and_prepare_target(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]:
    possible_targets = [c for c in df.columns if c.lower() in
                        ["target", "label", "wellbeing", "well_being", "risk", "category",
                         "risk_level", "wellbeing_category", "status"]]
    target_col = possible_targets[0] if possible_targets else None

    if not target_col:
        candidates = [c for c in df.columns if df[c].nunique() <= 5]
        if not candidates:
            raise ValueError("No obvious low-cardinality target column found. Please specify the target column.")
        target_col = candidates[0]

    # Normalize and map common forms to exact labels
    vals = df[target_col].dropna().astype(str).str.strip()
    vals_normal = vals.str.lower().str.replace("_", " ").str.replace("-", " ")

    mapping = {
        'moderate': 'Moderate Risk',
        'moderate risk': 'Moderate Risk',
        'moderaterisk': 'Moderate Risk',
        'med': 'Moderate Risk',
        'high': 'High Risk',
        'high risk': 'High Risk',
        'highrisk': 'High Risk',
        'low': 'Safe',
        'safe': 'Safe'
    }

    mapped = vals_normal.map(mapping)
    # where mapping failed, keep original trimmed value
    df[target_col] = mapped.fillna(vals)

    df = df[~df[target_col].isnull()].reset_index(drop=True)
    return df, target_col


def split_features_target(df: pd.DataFrame, target_col: str) -> Tuple[pd.DataFrame, pd.Series]:
    X = df.drop(columns=[target_col])
    y = df[target_col]
    id_like = [c for c in X.columns if c.lower().endswith('id') or c.lower() in ['id', 'student_id', 'sid']]
    if id_like:
        X = X.drop(columns=id_like)
    return X, y


def build_preprocessor(X: pd.DataFrame) -> Tuple[ColumnTransformer, list, list]:
    numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist()
    categorical_cols = X.select_dtypes(include=['object', 'category', 'bool']).columns.tolist()

    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    # compatibility with sklearn versions
    if sklearn.__version__ >= "1.2":
        onehot = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
    else:
        onehot = OneHotEncoder(handle_unknown='ignore', sparse=False)

    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', onehot)
    ])

    preprocessor = ColumnTransformer(transformers=[
        ('num', numeric_transformer, numeric_cols),
        ('cat', categorical_transformer, categorical_cols)
    ], remainder='drop')

    return preprocessor, numeric_cols, categorical_cols


def train_and_evaluate(X: pd.DataFrame, y: pd.Series, preprocessor: ColumnTransformer,
                       numeric_cols: list, categorical_cols: list) -> Dict[str, Any]:
    models = {
        'LogisticRegression': Pipeline(steps=[('pre', preprocessor),
                                              ('clf', LogisticRegression(max_iter=1000, class_weight='balanced',
                                                                         random_state=RANDOM_STATE))]),
        'RandomForest': Pipeline(steps=[('pre', preprocessor),
                                        ('clf', RandomForestClassifier(n_estimators=200, class_weight='balanced',
                                                                       random_state=RANDOM_STATE))]),
        'GradientBoosting': Pipeline(steps=[('pre', preprocessor),
                                            ('clf', GradientBoostingClassifier(n_estimators=200,
                                                                               random_state=RANDOM_STATE))])
    }

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, stratify=y, random_state=RANDOM_STATE
    )

    results = {}
    for name, pipe in models.items():
        print(f"\nTraining {name}...")
        cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
        scores = cross_val_score(pipe, X_train, y_train, cv=cv, scoring='f1_macro')
        print(f"CV macro-F1 scores: {scores}, mean={scores.mean():.4f}")
        pipe.fit(X_train, y_train)

        preds = pipe.predict(X_test)
        report = classification_report(y_test, preds, zero_division=0)
        cm = confusion_matrix(y_test, preds, labels=['Safe', 'Moderate Risk', 'High Risk'])
        f1 = f1_score(y_test, preds, average='macro', zero_division=0)
        bal_acc = balanced_accuracy_score(y_test, preds)

        print(f"Test macro F1: {f1:.4f}, Balanced accuracy: {bal_acc:.4f}")
        print(report)
        print("Confusion matrix (rows=true, cols=pred):")
        print(pd.DataFrame(cm, index=['True_Safe', 'True_Moderate', 'True_High'],
                           columns=['Pred_Safe', 'Pred_Moderate', 'Pred_High']))

        results[name] = {
            'pipeline': pipe,
            'cv_scores': scores,
            'test_f1_macro': f1,
            'balanced_accuracy': bal_acc,
            'classification_report': report,
            'confusion_matrix': cm
        }

    best_name = max(results.keys(), key=lambda k: results[k]['test_f1_macro'])
    best_model = results[best_name]['pipeline']
    results['best'] = {'name': best_name, 'model': best_model}

    # RandomForest feature importances
    if 'RandomForest' in results:
        try:
            rf_pipe = results['RandomForest']['pipeline']
            pre = rf_pipe.named_steps['pre']
            cat_features = pre.named_transformers_['cat'].named_steps['onehot'].get_feature_names_out(categorical_cols).tolist() if categorical_cols else []
            feature_names = numeric_cols + cat_features
            rf = rf_pipe.named_steps['clf']
            importances = rf.feature_importances_
            fi_df = pd.DataFrame({'feature': feature_names, 'importance': importances}).sort_values('importance', ascending=False)
            results['RandomForest']['feature_importances'] = fi_df
            print("\nTop 10 RandomForest features:")
            print(fi_df.head(10))
        except Exception as e:
            print("Could not extract RandomForest importances:", e)

    # Permutation importance on best model
    try:
        print("\nComputing permutation importance on test set for best model...")
        r = permutation_importance(best_model, X_test, y_test, n_repeats=10, random_state=RANDOM_STATE, scoring='f1_macro')
        pre = best_model.named_steps['pre']
        cat_features = pre.named_transformers_['cat'].named_steps['onehot'].get_feature_names_out(categorical_cols).tolist() if categorical_cols else []
        feat_names = numeric_cols + cat_features
        perm_df = pd.DataFrame({'feature': feat_names, 'importance_mean': r.importances_mean}).sort_values('importance_mean', ascending=False)
        results['permutation_importance'] = perm_df
        print(perm_df.head(10))
    except Exception as e:
        print("Permutation importance failed:", e)

    # Save predictions
    preds_full = best_model.predict(X)
    out_df = X.copy()
    out_df['true_label'] = y.values
    out_df['predicted_label'] = preds_full
    preds_path = os.path.join(OUTPUT_DIR, 'predictions_with_labels.csv')
    out_df.to_csv(preds_path, index=False)
    print(f"\nSaved predictions to: {preds_path}")

    # Save model
    model_path = os.path.join(OUTPUT_DIR, 'best_model.pkl')
    with open(model_path, 'wb') as f:
        pickle.dump(best_model, f)
    print(f"Saved best model ({best_name}) to: {model_path}")

    # Class distribution plot
    plt.figure(figsize=(6, 4))
    y.value_counts().plot(kind='bar')
    plt.title('Class distribution in dataset')
    plt.xlabel('Class')
    plt.ylabel('Count')
    plt.tight_layout()
    plt_path = os.path.join(OUTPUT_DIR, 'class_distribution.png')
    plt.savefig(plt_path)
    plt.close()
    print(f"Saved class distribution plot to: {plt_path}")

    return results


def main():
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    print("Loading data...")
    df = load_data(DATA_PATH)
    print(f"Loaded shape: {df.shape}")

    df, target_col = detect_and_prepare_target(df)
    print(f"Using target column: {target_col}")

    X, y = split_features_target(df, target_col)
    preprocessor, numeric_cols, categorical_cols = build_preprocessor(X)

    results = train_and_evaluate(X, y, preprocessor, numeric_cols, categorical_cols)

    recommendations = {
        'Safe': 'General well-being resources; periodic check-ins; encourage healthy study-life balance.',
        'Moderate Risk': 'Early counseling, stress-management workshops, parental + teacher notification, monitor academic load.',
        'High Risk': 'Immediate counseling referral, mental health evaluation, crisis intervention plan, involve guardians and healthcare professionals.'
    }

    print('\nIntervention recommendations:')
    for k, v in recommendations.items():
        print(f"- {k}: {v}")

    summary = {
        'dataset_shape': [df.shape],
        'target_column': [target_col],
        'best_model': [results['best']['name']],
        'best_model_test_macro_f1': [results[results['best']['name']]['test_f1_macro']]
    }
    summary_df = pd.DataFrame(summary)
    summary_path = os.path.join(OUTPUT_DIR, 'pipeline_summary.csv')
    summary_df.to_csv(summary_path, index=False)
    print(f"Saved pipeline summary to: {summary_path}")


if __name__ == '__main__':
    main()


Loading data...
Loaded shape: (5000, 15)
Using target column: Risk_Level

Training LogisticRegression...
CV macro-F1 scores: [0.95350411 0.96599333 0.96445071 0.95823217 0.95355663], mean=0.9591
Test macro F1: 0.9631, Balanced accuracy: 0.9591
               precision    recall  f1-score   support

    High Risk       1.00      0.93      0.96       241
Moderate Risk       0.99      0.95      0.97       384
         Safe       0.92      1.00      0.96       375

     accuracy                           0.96      1000
    macro avg       0.97      0.96      0.96      1000
 weighted avg       0.97      0.96      0.96      1000

Confusion matrix (rows=true, cols=pred):
               Pred_Safe  Pred_Moderate  Pred_High
True_Safe            375              0          0
True_Moderate         20            364          0
True_High             12              5        224

Training RandomForest...
CV macro-F1 scores: [0.95710432 0.96458566 0.96693557 0.95820067 0.9557069 ], mean=0.9605
Test ma