In [7]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression, SGDRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import joblib
import os
import warnings
warnings.filterwarnings('ignore')

# For reproducibility
np.random.seed(42)

def analyze_and_preprocess(df, target_column='Exam_Score'):
    """
    Analyze and preprocess the dataset with feature engineering
    """
    print("\n==== DATASET OVERVIEW ====")
    print(f"Shape: {df.shape}")
    print("\nColumn data types:")
    print(df.dtypes)
    
    print("\nMissing values:")
    missing_values = df.isnull().sum()
    print(missing_values[missing_values > 0] if missing_values.sum() > 0 else "No missing values")
    
    # Basic statistics for numerical columns
    print("\nBasic statistics for numerical features:")
    print(df.describe())
    
    # Feature engineering - create new features
    df['Study_Efficiency'] = df['Previous_Scores'] / (df['Hours_Studied'] + 1)
    df['Sleep_Study_Ratio'] = df['Sleep_Hours'] / (df['Hours_Studied'] + 1)
    
    # Identify numerical and categorical columns
    numeric_features = df.select_dtypes(include=['int64', 'float64']).columns.tolist()
    if target_column in numeric_features:
        numeric_features.remove(target_column)
    
    categorical_features = df.select_dtypes(include=['object', 'category']).columns.tolist()
    
    # Feature selection - drop less important features based on domain knowledge
    features_to_drop = ['Gender', 'Distance_from_Home']  # These showed low correlation in EDA
    df = df.drop(features_to_drop, axis=1)
    
    # Update feature lists after dropping columns
    numeric_features = [f for f in numeric_features if f not in features_to_drop]
    categorical_features = [f for f in categorical_features if f not in features_to_drop]
    
    print(f"\nNumeric features: {len(numeric_features)}")
    print(f"Categorical features: {len(categorical_features)}")
    
    # Handle any missing values
    for col in df.columns:
        if df[col].isnull().sum() > 0:
            if col in numeric_features:
                df[col].fillna(df[col].median(), inplace=True)
            else:
                df[col].fillna(df[col].mode()[0], inplace=True)
    
    return df, numeric_features, categorical_features

def create_visualizations(df, numeric_features, categorical_features, target_column):
    """
    Create visualizations for data analysis
    """
    print("\n==== CREATING VISUALIZATIONS ====")
    os.makedirs('visualizations', exist_ok=True)
    
    # Correlation heatmap for numerical features
    plt.figure(figsize=(14, 12))
    correlation_features = numeric_features.copy()
    correlation_features.append(target_column)
    
    correlation_matrix = df[correlation_features].corr()
    sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt='.2f')
    plt.title('Correlation Heatmap of Numeric Features')
    plt.tight_layout()
    plt.savefig('visualizations/correlation_heatmap.png')
    plt.close()
    
    # Distribution of the target variable
    plt.figure(figsize=(10, 6))
    sns.histplot(df[target_column], kde=True)
    plt.title(f'Distribution of {target_column}')
    plt.xlabel(target_column)
    plt.savefig(f'visualizations/{target_column}_distribution.png')
    plt.close()
    
    # Visualizing relationships between key numeric features and the target
    if len(numeric_features) > 0:
        top_corr_features = df[numeric_features].corrwith(df[target_column]).abs().sort_values(ascending=False)
        top_features = top_corr_features.index[:min(4, len(numeric_features))].tolist()
        
        plt.figure(figsize=(15, 10))
        for i, feature in enumerate(top_features, 1):
            plt.subplot(2, 2, i)
            sns.scatterplot(x=feature, y=target_column, data=df)
            plt.title(f'{feature} vs {target_column}')
        plt.tight_layout()
        plt.savefig('visualizations/feature_relationships.png')
        plt.close()
    
    # Boxplots for categorical features (up to 4)
    if len(categorical_features) > 0:
        selected_cat_features = categorical_features[:min(4, len(categorical_features))]
        
        plt.figure(figsize=(15, 15))
        for i, feature in enumerate(selected_cat_features, 1):
            unique_values = df[feature].nunique()
            if unique_values <= 10:
                plt.subplot(2, 2, i)
                sns.boxplot(x=feature, y=target_column, data=df)
                plt.title(f'{target_column} by {feature}')
                plt.xticks(rotation=45)
        plt.tight_layout()
        plt.savefig('visualizations/categorical_boxplots.png')
        plt.close()
    
    print("Visualizations created and saved in 'visualizations' folder")

def build_and_train_models(X, y, numeric_features, categorical_features):
    """
    Build, train and evaluate models
    """
    print("\n==== BUILDING AND TRAINING MODELS ====")
    
    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    
    # Define preprocessing for numeric and categorical features
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])
    
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ])
    
    # Create models dictionary
    models = {
        'SGD Linear Regression': Pipeline([
            ('preprocessor', preprocessor),
            ('regressor', SGDRegressor(max_iter=1000, tol=1e-3, random_state=42, early_stopping=True))
        ]),
        
        'Linear Regression': Pipeline([
            ('preprocessor', preprocessor),
            ('regressor', LinearRegression())
        ]),
        
        'Decision Tree': Pipeline([
            ('preprocessor', preprocessor),
            ('regressor', DecisionTreeRegressor(random_state=42))
        ]),
        
        'Random Forest': Pipeline([
            ('preprocessor', preprocessor),
            ('regressor', RandomForestRegressor(n_estimators=100, random_state=42))
        ])
    }
    
    # Dictionary to store results
    results = {}
    
    # Train and evaluate each model
    for name, model in models.items():
        print(f"\nTraining {name}...")
        
        # Fit the model
        model.fit(X_train, y_train)
        
        # Predictions
        y_train_pred = model.predict(X_train)
        y_test_pred = model.predict(X_test)
        
        # Calculate metrics
        train_mse = mean_squared_error(y_train, y_train_pred)
        test_mse = mean_squared_error(y_test, y_test_pred)
        train_mae = mean_absolute_error(y_train, y_train_pred)
        test_mae = mean_absolute_error(y_test, y_test_pred)
        r2 = r2_score(y_test, y_test_pred)
        
        # Store results
        results[name] = {
            'train_mse': train_mse,
            'test_mse': test_mse,
            'train_mae': train_mae,
            'test_mae': test_mae,
            'r2': r2
        }
        
        print(f"{name} - Train MSE: {train_mse:.2f}, Test MSE: {test_mse:.2f}")
        print(f"{name} - Train MAE: {train_mae:.2f}, Test MAE: {test_mae:.2f}")
        print(f"{name} - R² Score: {r2:.2f}")
    
    # Plot model comparison
    plt.figure(figsize=(12, 6))
    models_list = list(results.keys())
    train_mse_list = [results[model]['train_mse'] for model in models_list]
    test_mse_list = [results[model]['test_mse'] for model in models_list]
    
    x = np.arange(len(models_list))
    width = 0.35
    
    plt.bar(x - width/2, train_mse_list, width, label='Train MSE')
    plt.bar(x + width/2, test_mse_list, width, label='Test MSE')
    
    plt.xlabel('Models')
    plt.ylabel('Mean Squared Error')
    plt.title('Training and Test MSE by Model')
    plt.xticks(x, models_list, rotation=45)
    plt.legend()
    plt.tight_layout()
    plt.savefig('visualizations/model_comparison_mse.png')
    plt.close()
    
    # Find the best model (lowest test MSE)
    best_model_name = min(results, key=lambda k: results[k]['test_mse'])
    print(f"\nBest model based on test MSE: {best_model_name}")
    
    return models, results, best_model_name

def visualize_linear_regression(models, X, y, numeric_features, target_column):
    """
    Visualize the linear regression model results
    """
    print("\n==== VISUALIZING LINEAR REGRESSION ====")
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    
    if 'Linear Regression' in models:
        linear_model = models['Linear Regression']
        y_pred = linear_model.predict(X_test)
        
        # Plot predicted vs actual values
        plt.figure(figsize=(10, 6))
        plt.scatter(y_test, y_pred, alpha=0.5)
        min_val = min(min(y_test), min(y_pred))
        max_val = max(max(y_test), max(y_pred))
        plt.plot([min_val, max_val], [min_val, max_val], 'r--')
        plt.xlabel('Actual Values')
        plt.ylabel('Predicted Values')
        plt.title('Linear Regression: Actual vs Predicted Values')
        plt.grid(True)
        plt.tight_layout()
        plt.savefig('visualizations/linear_regression_predictions.png')
        plt.close()
        
        # Plot loss curve for SGD
        if 'SGD Linear Regression' in models:
            sgd_model = models['SGD Linear Regression'].named_steps['regressor']
            if hasattr(sgd_model, 'loss_curve_'):
                plt.figure(figsize=(10, 6))
                plt.plot(sgd_model.loss_curve_)
                plt.title('SGD Loss Curve')
                plt.xlabel('Iterations')
                plt.ylabel('Loss')
                plt.grid(True)
                plt.tight_layout()
                plt.savefig('visualizations/sgd_loss_curve.png')
                plt.close()
    
    print("Linear regression visualizations saved in 'visualizations' folder")

def tune_best_model(best_model_name, X, y, numeric_features, categorical_features):
    """
    Perform hyperparameter tuning on the best model
    """
    print(f"\n==== TUNING {best_model_name} ====")
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    
    # Define preprocessing
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])
    
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ])
    
    # Initialize default values
    pipeline = None
    param_grid = {}
    
    # Define parameter grid and pipeline based on best model
    if best_model_name == 'SGD Linear Regression':
        param_grid = {
            'regressor__max_iter': [1000, 2000],
            'regressor__alpha': [0.0001, 0.001],
            'regressor__learning_rate': ['constant', 'adaptive'],
            'regressor__eta0': [0.01, 0.1]
        }
        pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('regressor', SGDRegressor(random_state=42))
        ])
    elif best_model_name == 'Random Forest':
        param_grid = {
            'regressor__n_estimators': [100, 200],
            'regressor__max_depth': [None, 10, 20],
            'regressor__min_samples_split': [2, 5]
        }
        pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('regressor', RandomForestRegressor(random_state=42))
        ])
    elif best_model_name == 'Linear Regression':
        print("Linear Regression doesn't have significant hyperparameters to tune")
        pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('regressor', LinearRegression())
        ])
        # Fit and evaluate even without tuning
        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)
        test_mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        return pipeline, test_mse, r2
    elif best_model_name == 'Decision Tree':
        param_grid = {
            'regressor__max_depth': [None, 5, 10, 20],
            'regressor__min_samples_split': [2, 5, 10],
            'regressor__min_samples_leaf': [1, 2, 4]
        }
        pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('regressor', DecisionTreeRegressor(random_state=42))
        ])
    
    # Only perform grid search if we have parameters to tune
    if param_grid:
        grid_search = GridSearchCV(
            pipeline,
            param_grid,
            cv=5,
            scoring='neg_mean_squared_error',
            n_jobs=-1,
            verbose=1
        )
        
        print("Starting grid search...")
        grid_search.fit(X_train, y_train)
        
        print(f"Best parameters: {grid_search.best_params_}")
        
        # Evaluate best model
        best_model = grid_search.best_estimator_
        y_pred = best_model.predict(X_test)
        test_mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        
        print(f"Tuned model - Test MSE: {test_mse:.2f}, R² Score: {r2:.2f}")
        
        return best_model, test_mse, r2
    else:
        # For models without tuning, just return the pipeline
        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)
        test_mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        
        print(f"Model - Test MSE: {test_mse:.2f}, R² Score: {r2:.2f}")
        return pipeline, test_mse, r2

def save_best_model(best_model, numeric_features, categorical_features, target_column):
    """
    Save the best model and create a prediction script
    """
    print("\n==== SAVING BEST MODEL ====")
    
    os.makedirs('models', exist_ok=True)
    
    # Save the model
    model_file = 'models/best_student_performance_model.pkl'
    joblib.dump(best_model, model_file)
    
    # Save feature lists
    feature_file = 'models/model_features.pkl'
    joblib.dump({
        'numeric_features': numeric_features,
        'categorical_features': categorical_features,
        'target_column': target_column
    }, feature_file)
    
    print(f"Best model saved as '{model_file}'")

# Main execution
if __name__ == "__main__":
    print("==== STUDENT PERFORMANCE PREDICTION MODEL ====")
    
    # Load data
    df = pd.read_csv('studentPerformanceFactors.csv')
    target_column = 'Exam_Score'
    
    # Analyze and preprocess data
    df, numeric_features, categorical_features = analyze_and_preprocess(df, target_column)
    
    # Create visualizations
    create_visualizations(df, numeric_features, categorical_features, target_column)
    
    # Prepare features and target
    X = df.drop(target_column, axis=1)
    y = df[target_column]
    
    # Build and train models
    models, results, best_model_name = build_and_train_models(X, y, numeric_features, categorical_features)
    
    # Visualize results
    visualize_linear_regression(models, X, y, numeric_features, target_column)
    
    # Tune the best model
    best_model, test_mse, r2 = tune_best_model(best_model_name, X, y, numeric_features, categorical_features)
    
    # Save the best model (only if we got a valid model back)
    if best_model is not None:
        save_best_model(best_model, numeric_features, categorical_features, target_column)
    
    print("\n==== MODEL TRAINING COMPLETE ====")
    print(f"Best model: {best_model_name}")
    if test_mse is not None:
        print(f"Test MSE: {test_mse:.2f}")
    if r2 is not None:
        print(f"R² Score: {r2:.2f}")

==== STUDENT PERFORMANCE PREDICTION MODEL ====

==== DATASET OVERVIEW ====
Shape: (6607, 20)

Column data types:
Hours_Studied                  int64
Attendance                     int64
Parental_Involvement          object
Access_to_Resources           object
Extracurricular_Activities    object
Sleep_Hours                    int64
Previous_Scores                int64
Motivation_Level              object
Internet_Access               object
Tutoring_Sessions              int64
Family_Income                 object
Teacher_Quality               object
School_Type                   object
Peer_Influence                object
Physical_Activity              int64
Learning_Disabilities         object
Parental_Education_Level      object
Distance_from_Home            object
Gender                        object
Exam_Score                     int64
dtype: object

Missing values:
Teacher_Quality             78
Parental_Education_Level    90
Distance_from_Home          67
dtype: int64

Basic sta