In [None]:
### Imports ###

# general
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# preprocessing
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.decomposition import PCA

# models
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from sklearn.linear_model import Lasso, Ridge, ElasticNet

# training and evaluation
from sklearn.model_selection import KFold, GridSearchCV, RandomizedSearchCV, cross_val_score, cross_val_predict

# analysis
from sklearn.inspection import permutation_importance
import shap

# warnings and outputs
import warnings
from sklearn.exceptions import ConvergenceWarning

warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings('ignore', category=UserWarning, message=".*ConvergenceWarning.*")

np.random.seed(42)

# Prediction

In [2]:
def preprocess_data(
    dataset, 
    label_column='Trabajo Final', 
    remove_columns=['Team'], 
    pca_components=None, 
    test_indices=None
):
    """
    Preprocess the data by separating features and target, scaling numerical data,
    encoding categorical data, and optionally applying PCA. Optionally split into 
    train and test using provided test indices.

    Parameters:
    - dataset (pd.DataFrame): The input dataset.
    - label_column (str): The target column.
    - remove_columns (list): List of additional columns to remove.
    - pca_components (int): Number of PCA components to retain (optional).
    - test_indices (list or pd.Index, optional): Indices for the test set.

    Returns:
    - dict: A dictionary with processed train (and optionally test) data, targets.
    """
    # Separate features and target
    X = dataset.drop(columns=[label_column] + remove_columns)
    y = dataset[label_column]

    # Initialize train and test sets as None
    X_train, X_test, y_train, y_test = X, None, y, None

    # Split data into train and test sets if test_indices are provided
    if test_indices is not None:
        train_indices = dataset.index.difference(test_indices)
        X_train, X_test = X.loc[train_indices], X.loc[test_indices]
        y_train, y_test = y.loc[train_indices], y.loc[test_indices]

    # Identify numerical and categorical columns
    numerical_cols = X_train.select_dtypes(include=['number']).columns
    categorical_cols = X_train.select_dtypes(include=['object', 'category']).columns

    # Scale numerical columns
    if not numerical_cols.empty:
        scaler = StandardScaler()
        X_train[numerical_cols] = scaler.fit_transform(X_train[numerical_cols])
        if X_test is not None:
            X_test[numerical_cols] = scaler.transform(X_test[numerical_cols])

    # Encode categorical columns
    if not categorical_cols.empty:
        encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
        encoded_train = pd.DataFrame(
            encoder.fit_transform(X_train[categorical_cols]),
            columns=encoder.get_feature_names_out(categorical_cols),
            index=X_train.index
        )
        X_train = X_train.drop(columns=categorical_cols).join(encoded_train)

        if X_test is not None:
            encoded_test = pd.DataFrame(
                encoder.transform(X_test[categorical_cols]),
                columns=encoder.get_feature_names_out(categorical_cols),
                index=X_test.index
            )
            X_test = X_test.drop(columns=categorical_cols).join(encoded_test)

    # Apply PCA if specified
    if pca_components:
        pca = PCA(n_components=pca_components)
        X_train_pca = pca.fit_transform(X_train)
        X_train = pd.DataFrame(
            X_train_pca, 
            columns=[f"PC{i+1}" for i in range(X_train_pca.shape[1])],
            index=X_train.index
        )
        if X_test is not None:
            X_test_pca = pca.transform(X_test)
            X_test = pd.DataFrame(
                X_test_pca, 
                columns=[f"PC{i+1}" for i in range(X_test_pca.shape[1])],
                index=X_test.index
            )

    # Return processed train and (optionally) test sets
    result = {
        'X_train': X_train,
        'y_train': y_train
    }
    
    # If test set exists, return it too
    if X_test is not None and y_test is not None:
        result.update({
            'X_test': X_test,
            'y_test': y_test
        })

    return result

In [3]:
# Function to evaluate models
def evaluate_models(X, y, models, param_grids=None, random_configurations=None, random_seed=42):
    print(f'Results for {len(X.columns)} features.')

    # Initialize a dictionary to store results
    results = {}
    predictions = {}
    best_model_name = None
    best_model_score = -float('inf')

    # Define cross-validation strategy
    kfold = KFold(n_splits=5, shuffle=True, random_state=random_seed)

    # Iterate over each model
    for model_name, model in models.items():
        print(f"Evaluating {model_name}...")

        # Set random_state for models that support it
        if hasattr(model, 'random_state'):
            model.set_params(random_state=random_seed)
        
        # Get the parameter grid for the current model, if available
        param_grid = param_grids.get(model_name, {}) if param_grids else {}
        
        # If there's no parameter grid, we just use the base model (no hyperparameter tuning)
        if not param_grid:
            print(f"Using base model with default parameters for {model_name}.")
            # Use cross_val_score directly for base model evaluation without GridSearchCV
            scores = cross_val_score(model, X, y, cv=kfold.split(X, y), scoring='neg_mean_squared_error', n_jobs=-1)
            best_score = best_score
            average_score = scores.mean()
            best_params = None
            fitted_model = model.fit(X, y)
            grid_search = None
            prediction = fitted_model.predict(X)
        else:
            # Perform Grid Search or Randomized Search with cross-validation if parameter grid exists
            if random_configurations:
                grid_search = RandomizedSearchCV(
                    estimator=model, 
                    param_distributions=param_grid, 
                    n_iter=random_configurations,
                    cv=kfold.split(X, y),
                    n_jobs=-1,
                    scoring='neg_mean_squared_error',
                    random_state=random_seed,
                    verbose=1)
            else: 
                grid_search = GridSearchCV(estimator=model, 
                    param_grid=param_grid, 
                    cv=kfold.split(X, y),
                    n_jobs=-1,
                    scoring='neg_mean_squared_error',
                    verbose=1)
            
            # Fit the model with the grid search
            grid_search.fit(X, y)

            # Extract results from GridSearchCV
            mean_scores = grid_search.cv_results_['mean_test_score']
            param_combinations = grid_search.cv_results_['params']

            # Find the best parameter combination
            best_index = mean_scores.argmax() # Index where the mean negative MSE is the least negative (highest)
            best_params = param_combinations[best_index]
            best_score = mean_scores[best_index]

        print(f"\nBest Parameters: {best_params}")
        print(f"Best Mean MSE (over 5 folds): {-best_score:.4f}\n")
        
        fitted_model = grid_search.best_estimator_
        prediction = cross_val_predict(fitted_model, X, y, cv=kfold)

        # Calculate R^2 score using cross-validated predictions
        #r2_scores = cross_val_score(fitted_model, X, y, cv=kfold, scoring='r2', n_jobs=-1)
        #mean_r2 = r2_scores.mean()
        r2 = r2_score(y, prediction)
        print(f"Mean R^2 score (over 5 folds): {r2:.4f}\n")

        # Save the best model and prediction
        results[model_name] = {
            'best_params': best_params,
            'mse': best_score,
            'r2': r2,
            'fitted_model': fitted_model,
            'grid_search': grid_search
        }
        predictions[model_name] = prediction
        
        # Track the best model
        if best_score > best_model_score:
            best_model_score = best_score
            best_model_name = model_name

    # Print the best model and its score
    print(f"The best model is {best_model_name} with a MSE score of {best_model_score:.4f}")
    
    return results, predictions

In [4]:
# Define models and parameter grids
models = {
    'RandomForest': RandomForestRegressor(),
    'GradientBoosting': GradientBoostingRegressor(),
    'XGBoost': XGBRegressor(),
    'Lasso': Lasso(),
    'Ridge': Ridge(),
    #'ElasticNet': ElasticNet(),
    'LightGBM': LGBMRegressor()
}

param_grids = {
    'RandomForest': {
        'verbose': [0],
        'n_estimators': [50, 100, 200, 500],
        'max_depth': [3, 5, 7, 10],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 5],
        'max_features': ['sqrt', 'log2'],
        'bootstrap': [True, False]
    },
    'GradientBoosting': {
        'verbose': [0],
        'n_estimators': [50, 100, 200, 500],
        'learning_rate': [0.01, 0.05, 0.1, 0.2],
        'max_depth': [3, 5, 7, 10],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 5],
        'subsample': [0.7, 0.8, 0.9, 1.0],
        'max_features': ['sqrt', 'log2']
    },
    'XGBoost': {
        'n_estimators': [50, 100, 200, 500],
        'learning_rate': [0.01, 0.05, 0.1, 0.2],
        'max_depth': [3, 5, 7, 10],
        'min_child_weight': [1, 3, 5],
        'subsample': [0.7, 0.8, 0.9, 1.0],
        'colsample_bytree': [0.7, 0.8, 1.0],
        'reg_alpha': [0, 0.01, 0.1, 1],
        'reg_lambda': [0, 0.1, 0.5, 1],
        'gamma': [0, 0.1, 0.5, 1]
    },
    'Lasso': {
        'alpha': [0.001, 0.01, 0.1, 1, 10],
        'max_iter': [1000, 5000, 10000],
        'tol': [1e-3, 1e-4, 1e-5]
    },
    'Ridge': {
        'alpha': [0.001, 0.01, 0.1, 1, 10, 100],
        'max_iter': [1000, 5000, 10000],
        'tol': [1e-3, 1e-4, 1e-5]
    },
    'ElasticNet': {
        'alpha': [0.001, 0.01, 0.1, 1, 10],
        'l1_ratio': [0.1, 0.5, 0.9],
        'max_iter': [1000, 5000, 10000],
        'tol': [1e-3, 1e-4, 1e-5]
    },
    'LightGBM': {
        'verbose': [-1],
        'n_estimators': [50, 100, 200, 500],
        'learning_rate': [0.01, 0.05, 0.1],
        'max_depth': [3, 5, 7, 10, -1],
        'num_leaves': [31, 50, 100],
        'min_child_samples': [5, 10, 20],
        'subsample': [0.7, 0.8, 0.9, 1.0],
        'colsample_bytree': [0.7, 0.8, 1.0],
        'reg_alpha': [0, 0.01, 0.1, 1],
        'reg_lambda': [0, 0.1, 0.5, 1]
    }
}

In [5]:
def analyze_results(model, X, y, X_test=None, y_test=None, model_name="Model", max_features=20):
    """
    Perform comprehensive evaluation of a model with optional test set support.

    Parameters:
    - model: The trained model to evaluate.
    - X (pd.DataFrame): Training/validation features.
    - y (pd.Series): Training/validation target.
    - X_test (pd.DataFrame): Test features, optional.
    - y_test (pd.Series): Test target, optional.
    - model_name (str): Name of the model for display purposes.
    - max_features (int): Number of features to display in SHAP and importance plots.

    Returns:
    None
    """
    print(f"=== Analysis for {model_name} ===")
    main_color = 'orange'

    # Test Results (If Available)
    if X_test is not None and y_test is not None:
        print("\nEvaluating on test data...")

        # Predict on test data
        y_test_pred = model.predict(X_test)

        # Residual Analysis: For test set
        def plot_residuals_test(y_true, y_pred, baseline_residuals=None, title=None):
            residuals = y_true - y_pred

            # Baseline residuals (using mean prediction)
            baseline_residuals = baseline_residuals if baseline_residuals is not None else y_true - np.mean(y)

            plt.figure(figsize=(10, 6))
            plt.scatter(y_pred, residuals, alpha=0.7, color=main_color, label='Residuals')
            plt.scatter(y_pred, baseline_residuals, alpha=0.4, color='gray', label='Baseline Residuals', marker='x')
            plt.axhline(0, color='red', linestyle='--', label='Zero Residual')
            plt.xlabel("Predicted Values")
            plt.ylabel("Residuals")
            plt.title(title)
            plt.legend()
            plt.show()

        print("\nPlotting residuals for test data...")
        plot_residuals_test(y_test, y_test_pred, baseline_residuals=y_test - np.mean(y), title=f"Residuals Plot for {model_name} (Test)")

        # Calculate R² score for test
        r2_test = r2_score(y_test, y_test_pred)
        print(f"Test R² Score: {r2_test:.4f}")

        # Calculate MSE for test
        mse_test = mean_squared_error(y_test, y_test_pred)
        print(f"Test MSE: {mse_test:.4f}")

        # Calculate MSE for baseline
        mse_test_baseline = mean_squared_error(y_test, np.full_like(y_test, np.mean(y)))
        print(f"Baseline MSE (Mean Prediction): {mse_test_baseline:.4f}")

        # SHAP Analysis: On test data
        print("\nCalculating SHAP values for the test data...")
        explainer = shap.Explainer(model, X_test)
        shap_values = explainer(X_test)
        shap.summary_plot(shap_values, X_test, max_display=max_features)

        # Permutation Feature Importance: On test data
        print("\nCalculating Permutation Feature Importance for the test data...")
        result = permutation_importance(model, X_test, y_test, n_repeats=100, random_state=42, n_jobs=-1)
        importance_df = pd.DataFrame({
            'feature': X_test.columns, 
            'importance': result.importances_mean
        }).sort_values(by='importance', ascending=False)
        print(importance_df)
        plt.figure(figsize=(10, 6))
        plt.barh(importance_df['feature'], importance_df['importance'], color=main_color)
        plt.xlabel("Permutation Importance")
        plt.ylabel("Features")
        plt.title(f"Permutation Importance for {model_name} (Test Data)")
        plt.gca().invert_yaxis()
        plt.show()

    # Training/Validation Results (Always Included)
    print("\nEvaluating on Training/validation data...")

    # Predict on Training/validation data
    y_val_pred = model.predict(X)

    # Residual Analysis: For Training/validation
    def plot_residuals(y_true, y_pred, baseline_residuals=None, title=None):
        residuals = y_true - y_pred

        # Baseline residuals (using mean prediction)
        baseline_residuals = baseline_residuals if baseline_residuals is not None else y_true - np.mean(y_true)

        plt.figure(figsize=(10, 6))
        plt.scatter(y_pred, residuals, alpha=0.7, color=main_color, label='Residuals')
        plt.scatter(y_pred, baseline_residuals, alpha=0.4, color='gray', label='Baseline Residuals', marker='x')
        plt.axhline(0, color='red', linestyle='--', label='Zero Residual')
        plt.xlabel("Predicted Values")
        plt.ylabel("Residuals")
        plt.title(title)
        plt.legend()
        plt.show()

    print("\nPlotting residuals for Training/validation data...")
    plot_residuals(y, y_val_pred, baseline_residuals=y - np.mean(y), title=f"Residuals Plot for {model_name} (Training/Validation)")

    # Calculate R² score for Training/validation
    r2_val = r2_score(y, y_val_pred)
    print(f"Training/Validation R² Score: {r2_val:.4f}")

    # Calculate MSE for Training/validation
    mse_val = mean_squared_error(y, y_val_pred)
    print(f"Training/Validation MSE: {mse_val:.4f}")

    # Calculate MSE for baseline
    mse_val_baseline = mean_squared_error(y, np.full_like(y, np.mean(y)))
    print(f"Baseline MSE (Mean Prediction): {mse_val_baseline:.4f}")

    # SHAP Analysis: On Training/validation data
    print("\nCalculating SHAP values for the Training/validation data...")
    explainer = shap.Explainer(model, X)
    shap_values = explainer(X)
    shap.summary_plot(shap_values, X, max_display=max_features)

    # Permutation Feature Importance: On Training/validation data
    print("\nCalculating Permutation Feature Importance for the Training/validation data...")
    result = permutation_importance(model, X, y, n_repeats=100, random_state=42, n_jobs=-1)
    importance_df = pd.DataFrame({
        'feature': X.columns, 
        'importance': result.importances_mean
    }).sort_values(by='importance', ascending=False)
    print(importance_df)
    plt.figure(figsize=(10, 6))
    plt.barh(importance_df['feature'], importance_df['importance'], color=main_color)
    plt.xlabel("Permutation Importance")
    plt.ylabel("Features")
    plt.title(f"Permutation Importance for {model_name} (Training/Validation Data)")
    plt.gca().invert_yaxis()
    plt.show()

    print("\nAnalysis completed.")

In [None]:
# Load data
prediction_data = pd.read_excel('prediction_data.xlsx', sheet_name='Sheet1')

# Determine test indices based on the first 2 characters of the "Team" column
test_indices = prediction_data[prediction_data['Team'].str.startswith('24')].index

# Use the preprocess_data function
processed_data = preprocess_data(
    dataset=prediction_data, 
    label_column='Trabajo Final',
    remove_columns=['Team'],
    pca_components = 7,
    test_indices=None
)

# Access processed data
X_train = processed_data['X_train']
#X_test = processed_data['X_test']
y_train = processed_data['y_train']
#y_test = processed_data['y_test']

# Evaluate all models
results, predictions = evaluate_models(X_train, y_train, models, param_grids=param_grids, random_configurations=10000)

In [None]:
analyze_results(results['LightGBM']['fitted_model'], X_train, y_train, X_test=X_test, y_test=y_test, model_name='LightGBM', max_features=20)