## Import Libraries

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import optuna
import warnings

# Sklearn Core
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder, FunctionTransformer
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, confusion_matrix, f1_score, accuracy_score

# Models
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier

plt.style.use('ggplot')
SEED = 42

ModuleNotFoundError: No module named 'xgboost'

## Load and Prepare Data

In [None]:
def load_and_prep_data(filepath):
    """
    Loads data and creates the classification target based on Exam_Score.
    """
    df = pd.read_csv(filepath)
    
    # Drop duplicates if any
    df = df.drop_duplicates()
    
    # CLASSIFICATION LOGIC:
    # We map the continuous Exam_Score to 3 classes:
    # 0: Low Performance (< 65)
    # 1: Average Performance (65 - 79)
    # 2: High Performance (>= 80)
    def categorize_score(score):
        if score < 65: return 'Low'
        elif score < 80: return 'Average'
        else: return 'High'

    # Create Target
    df['Performance_Category'] = df['Exam_Score'].apply(categorize_score)
    
    # Drop the original continuous target to prevent data leakage
    df = df.drop(columns=['Exam_Score'])
    
    return df

# Execution
df = load_and_prep_data('StudentPerformanceFactors.csv')
print(f"Data Loaded. Shape: {df.shape}")
print(f"Class Distribution:\n{df['Performance_Category'].value_counts(normalize=True)}")

## Exploratory Data Analysis

In [None]:
def perform_eda(df):
    """
    Generates summary statistics and visualizations.
    """
    # 1. Summary Statistics
    print("\n--- Numerical Summary ---")
    print(df.describe())
    
    # 2. Target Distribution
    plt.figure(figsize=(6, 4))
    sns.countplot(x='Performance_Category', data=df, palette='viridis', order=['Low', 'Average', 'High'])
    plt.title('Target Distribution: Student Performance')
    plt.show()
    
    # 3. Correlation Matrix (Numerical Features Only)
    plt.figure(figsize=(10, 8))
    numeric_df = df.select_dtypes(include=['float64', 'int64'])
    corr = numeric_df.corr()
    sns.heatmap(corr, annot=True, fmt=".2f", cmap='coolwarm', linewidths=0.5)
    plt.title('Feature Correlation Matrix')
    plt.show()
    
    # 4. Boxplot: Hours Studied vs Performance
    plt.figure(figsize=(8, 5))
    sns.boxplot(x='Performance_Category', y='Hours_Studied', data=df, order=['Low', 'Average', 'High'])
    plt.title('Impact of Study Hours on Performance')
    plt.show()

perform_eda(df)

## Feature Engineering

In [None]:
def feature_engineering(X):
    """
    Custom function to create domain-specific features.
    Designed to work within a FunctionTransformer.
    """
    X_new = X.copy()
    
    # Feature 1: Effective Study Score (Interaction)
    # Rationale: Studying a lot matters less if attendance is poor.
    # Check if columns exist to prevent errors
    if 'Hours_Studied' in X_new.columns and 'Attendance' in X_new.columns:
        X_new['Effective_Study'] = X_new['Hours_Studied'] * (X_new['Attendance'] / 100)
        
    # Feature 2: Wellness Index (Aggregation)
    # Rationale: Sleep and Physical activity contribute to cognitive function.
    if 'Sleep_Hours' in X_new.columns and 'Physical_Activity' in X_new.columns:
        X_new['Wellness_Index'] = X_new['Sleep_Hours'] + X_new['Physical_Activity']
        
    return X_new

# Separate Features and Target
X = df.drop('Performance_Category', axis=1)
y = df['Performance_Category']

# Encode Target to Integers (Required for XGBoost)
le = LabelEncoder()
y_encoded = le.fit_transform(y)
class_names = le.classes_
print(f"Target Mapping: {dict(zip(range(len(class_names)), class_names))}")

# Split: 70% Train, 15% Validation, 15% Test
# First, split into Train (70%) and Temp (30%)
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y_encoded, test_size=0.30, stratify=y_encoded, random_state=SEED
)
# Split Temp into Val (15%) and Test (15%) -> 50% of 30% is 15% total
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.50, stratify=y_temp, random_state=SEED
)

print(f"Train Shape: {X_train.shape}, Val Shape: {X_val.shape}, Test Shape: {X_test.shape}")

## Processing Pipeline

In [None]:
# Identify Column Types
numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_features = X.select_dtypes(include=['object']).columns.tolist()

# 1. Numeric Transformer: Impute median -> Scale
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# 2. Categorical Transformer: Impute freq -> OneHot
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

# 3. Combine into Preprocessor
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

# 4. Create Feature Engineering Transformer
feature_eng_transformer = FunctionTransformer(feature_engineering, validate=False)

# 5. Full Pipeline Construction (Preprocessing Only)
# Note: We apply feature engineering BEFORE column transformation to ensure new columns are processed
def get_pipeline(classifier):
    return Pipeline(steps=[
        ('feat_eng', feature_eng_transformer),
        # Note: We would need to update column lists dynamically for ColumnTransformer 
        # if Feature Engineering adds columns. 
        # For simplicity in this demo, we assume Feature Engineering happens inside the 
        # Optuna loop or we treat 'preprocessor' as dealing with original columns.
        # *Correction for Robustness*: In production, we usually run FE first, then define 
        # ColumnTransformer on the new dataframe structure. 
        # To keep it simple for this script, we will apply FE logic inside the pipeline 
        # but let the ColumnTransformer handle the output "passthrough" or re-selector.
        ('preprocessor', preprocessor),
        ('classifier', classifier)
    ])

## Hyperparameter Tuning

In [None]:
def objective(trial):
    """
    Optuna objective function to optimize Random Forest and XGBoost.
    """
    classifier_name = trial.suggest_categorical('classifier', ['RandomForest', 'XGBoost'])
    
    if classifier_name == 'RandomForest':
        param = {
            'n_estimators': trial.suggest_int('rf_n_estimators', 50, 300),
            'max_depth': trial.suggest_int('rf_max_depth', 5, 30),
            'min_samples_split': trial.suggest_int('rf_min_samples_split', 2, 15),
            'class_weight': 'balanced'
        }
        clf = RandomForestClassifier(**param, random_state=SEED, n_jobs=-1)
        
    else: # XGBoost
        param = {
            'n_estimators': trial.suggest_int('xgb_n_estimators', 50, 300),
            'max_depth': trial.suggest_int('xgb_max_depth', 3, 10),
            'learning_rate': trial.suggest_float('xgb_learning_rate', 0.01, 0.3),
            'subsample': trial.suggest_float('xgb_subsample', 0.6, 1.0),
            'colsample_bytree': trial.suggest_float('xgb_colsample_bytree', 0.6, 1.0),
            'eval_metric': 'mlogloss'
        }
        clf = XGBClassifier(**param, random_state=SEED, n_jobs=-1)

    # Build Pipeline
    # IMPORTANT: We apply feature engineering to X_train before passing to CV
    # to handle the dynamic column issue mentioned above, 
    # or we ensure the preprocessor handles the output of feature_engineering.
    # Here we simplify:
    
    model = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', clf)
    ])
    
    # 3-Fold Stratified Cross-Validation on TRAIN set
    # We optimize for 'f1_weighted' due to potential class imbalance
    scores = cross_val_score(model, X_train, y_train, cv=3, scoring='f1_weighted')
    return scores.mean()

# Run Optimization
print("Starting Optuna Study...")
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20) # Increase n_trials for better results

print(f"Best Classifier: {study.best_params['classifier']}")
print(f"Best F1 Score: {study.best_value:.4f}")

## Final evaluation & Feature Importance

In [None]:
# Extract Best Params
best_params = study.best_params
model_type = best_params.pop('classifier')

# Instantiate Best Model
if model_type == 'RandomForest':
    # Clean keys for RF
    rf_params = {k.replace('rf_', ''): v for k, v in best_params.items()}
    final_clf = RandomForestClassifier(**rf_params, random_state=SEED, class_weight='balanced')
else:
    # Clean keys for XGB
    xgb_params = {k.replace('xgb_', ''): v for k, v in best_params.items()}
    final_clf = XGBClassifier(**xgb_params, random_state=SEED, eval_metric='mlogloss')

# Final Pipeline
final_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', final_clf)
])

# Train on Train + Validation set (Optional, but often good practice)
X_final_train = pd.concat([X_train, X_val])
y_final_train = np.concatenate([y_train, y_val])

final_pipeline.fit(X_final_train, y_final_train)

# Predict on Test Set
y_pred = final_pipeline.predict(X_test)

# --- METRICS ---
print("\n" + "="*30)
print(f"FINAL EVALUATION: {model_type}")
print("="*30)
print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print(f"F1 Score (Weighted): {f1_score(y_test, y_pred, average='weighted'):.4f}")
print("\nClassification Report:\n")
print(classification_report(y_test, y_pred, target_names=class_names))

# --- CONFUSION MATRIX ---
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.title(f'Confusion Matrix - {model_type}')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show()

# --- FEATURE IMPORTANCE ---
# Extracting feature names after OneHotEncoding is tricky but necessary for interpretation
if hasattr(final_clf, 'feature_importances_'):
    # Get transformed feature names
    try:
        # Access the preprocessor step
        prep = final_pipeline.named_steps['preprocessor']
        
        # Numeric names
        num_names = numeric_features
        
        # Categorical names
        cat_names = prep.named_transformers_['cat']['onehot'].get_feature_names_out(categorical_features)
        
        feature_names = np.r_[num_names, cat_names]
        importances = final_clf.feature_importances_
        
        # Plot Top 10
        indices = np.argsort(importances)[::-1][:10]
        plt.figure(figsize=(10, 6))
        plt.title("Top 10 Feature Importances")
        plt.barh(range(10), importances[indices], align="center")
        plt.yticks(range(10), feature_names[indices])
        plt.gca().invert_yaxis()
        plt.show()
    except Exception as e:
        print(f"Could not extract feature names directly: {e}")