In [None]:
# Multi-Target Regression and SHAP Analysis

import os
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.multioutput import MultiOutputRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge, LinearRegression
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, r2_score
import shap
import matplotlib.pyplot as plt
import seaborn as sns

class MultiTargetRegression:
    def __init__(self, results_folder):
        """
        Initialize the MultiTargetRegression class.
        
        Parameters:
        - results_folder (str): Folder path to save results, plots, and models.
        """
        self.results_folder = results_folder
        # Ensure results folder exists
        os.makedirs(self.results_folder, exist_ok=True)

    def plot_true_vs_pred(self, y_true, y_pred, target):
        """
        Plot True vs Predicted values for a specific target variable.
        
        Parameters:
        - y_true (array): True values of the target variable.
        - y_pred (array): Predicted values of the target variable.
        - target (str): Name of the target variable.
        """
        plt.figure(figsize=(5, 5))  # Fixed size
        sns.scatterplot(x=y_pred, y=y_true, alpha=0.3, s=15)  # Scatter plot
        # Add a diagonal line representing perfect prediction
        plt.plot([y_pred.min(), y_pred.max()], [y_pred.min(), y_pred.max()],
                 color='red', linestyle='--', lw=1)
        plt.xticks(fontsize=10)
        plt.yticks(fontsize=10)
        plt.xlabel("Predicted Values", fontsize=12)
        plt.ylabel("True Values", fontsize=12)
        plt.tight_layout()
        plt.grid(True)
        # Save the plot as a PNG file
        plot_path = os.path.join(self.results_folder, f"true_vs_predicted_{target}.png")
        plt.savefig(plot_path, dpi=300)
        plt.close()

    def plot_feature_importance(self, feature_importance_df):
        """
        Plot SHAP-based feature importance for the top 15 features.
        
        Parameters:
        - feature_importance_df (DataFrame): DataFrame containing features and their SHAP importance values.
        """
        plt.figure(figsize=(5, 3))  # Fixed size
        sns.barplot(x="SHAP Importance", y="Feature", data=feature_importance_df.head(15))
        plt.xticks(fontsize=8)
        plt.yticks(fontsize=8)
        plt.xlabel("SHAP Importance", fontsize=12)
        plt.tight_layout()
        # Save the plot as a PNG file
        plot_path = os.path.join(self.results_folder, "shap_top15_feature_importance.png")
        plt.savefig(plot_path, dpi=300)
        plt.close()

    def plot_shap_summary(self, shap_values, X_test):
        """
        Plot SHAP summary plot for feature contributions.
        
        Parameters:
        - shap_values (array): SHAP values for the test dataset.
        - X_test (DataFrame): Test dataset (features only).
        """
        plt.figure(figsize=(6, 5))  # Fixed size
        shap.summary_plot(shap_values, X_test, show=False, plot_size=(6, 5))  # SHAP summary plot
        plt.xticks(fontsize=8)
        plt.yticks(fontsize=8)
        plt.xlabel("SHAP Value (Impact on Model Output)", fontsize=10)
        # Save the plot as a PNG file
        plot_path = os.path.join(self.results_folder, "shap_summary_plot.png")
        plt.savefig(plot_path, dpi=300, bbox_inches="tight")
        plt.close()

# Instantiate the MultiTargetRegression class
results_folder = "Regression analysis"
mt_regression = MultiTargetRegression(results_folder)

# Step 1: Load the data
# Load the input CSV file
file_path = 'AI_datasets4.csv'
data = pd.read_csv(file_path, encoding='ISO-8859-1')

# Step 2: Define features and targets
# Specify categorical and continuous feature columns
categorical_features = ['Province', 'Food']
continuous_features = [
    'Urbanization_Rate', 'Population_Density', 'Per_Capita_GDP', 'Average_Elevation',
    'Annual_Average_Temperature', 'Annual_Average_Rainfall', "Industry_GDP", 'Longitude', 'Latitude'
]
# Define the target variables (outputs)
target_variables = ['TEQ_PCDDFs', 'TEQ_dlPCBs', 'TEQ_total', 'mPCBs_total', 'PBDEs_total']

# Step 3: Define function to filter zeros and outliers
def filter_outliers_and_zeros(df, target):
    """
    Filter out rows with zero values and extreme outliers for a specific target variable.
    
    Parameters:
    - df (DataFrame): Input dataset.
    - target (str): Name of the target variable.

    Returns:
    - df (DataFrame): Filtered dataset.
    """
    df = df[df[target] != 0]  # Remove rows where target is zero
    Q1, Q3 = df[target].quantile([0.25, 0.75])  # Calculate the first and third quartiles
    IQR = Q3 - Q1  # Interquartile range
    # Remove rows with values outside 2*IQR
    df = df[(df[target] >= Q1 - 2 * IQR) & (df[target] <= Q3 + 2 * IQR)]
    return df

# Step 4: Preprocess the data
# Convert categorical features into one-hot encoded features
data = pd.get_dummies(data, columns=categorical_features, drop_first=True)
# Filter outliers and zeros for each target variable
for target in target_variables:
    data = filter_outliers_and_zeros(data, target)
# Drop rows with missing values in target variables
data = data.dropna(subset=target_variables)
# Save the processed dataset
processed_data_path = os.path.join(results_folder, "processed_data.csv")
data.to_csv(processed_data_path, index=False)

# Step 5: Split the data into training and testing sets
X = data.drop(columns=target_variables)  # Features
y = data[target_variables]  # Targets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

# Step 6: Define and evaluate models
# Define a dictionary of base models for evaluation
base_models = {
    "RandomForest": RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1),
    "GradientBoosting": GradientBoostingRegressor(random_state=42),
    "Ridge": Ridge(alpha=1.0),
    "LinearRegression": LinearRegression(),
    "SVR": SVR(kernel='rbf')
}

results = []  # Store results for each model
best_model = None  # Placeholder for the best-performing model
best_score = -np.inf  # Track the highest mean R² score
model_performance = {}  # Store performance metrics for all models

# Train and evaluate each model
for name, model in base_models.items():
    print(f"Training and evaluating model: {name}")
    # Wrap the model in MultiOutputRegressor for multi-target regression
    multi_output_model = MultiOutputRegressor(model)
    # Perform cross-validation and calculate mean R² score
    scores = cross_val_score(multi_output_model, X_train, y_train, scoring='r2', cv=5)
    mean_r2 = np.mean(scores)
    # Train the model on the training data
    multi_output_model.fit(X_train, y_train)
    # Make predictions on the test set
    y_pred = multi_output_model.predict(X_test)
    # Calculate test metrics
    mse = mean_squared_error(y_test, y_pred, multioutput='uniform_average')
    r2 = r2_score(y_test, y_pred, multioutput='uniform_average')
    # Save performance metrics
    model_performance[name] = {"R2": r2, "MSE": mse}
    results.append({"Model": name, "Mean_R2": mean_r2, "Test_R2": r2, "Test_MSE": mse})
    # Update the best model if the current one performs better
    if mean_r2 > best_score:
        best_score = mean_r2
        best_model = multi_output_model

# Save the best model to a file
best_model_path = os.path.join(results_folder, "best_model.pkl")
joblib.dump(best_model, best_model_path)

# Save the model comparison results
results_df = pd.DataFrame(results)
results_df.to_csv(os.path.join(results_folder, "model_comparison_results.csv"), index=False)

# Step 7: Predictions and Visualization
# Generate True vs Predicted plots for the best model
y_pred_best = best_model.predict(X_test)
for i, target in enumerate(target_variables):
    mt_regression.plot_true_vs_pred(y_test.iloc[:, i], y_pred_best[:, i], target)

# Step 8: SHAP Feature Importance
# Use the first estimator in the best multi-output model
best_estimator = best_model.estimators_[0]
# Calculate SHAP values
explainer = shap.Explainer(best_estimator, X_test)
shap_values = explainer(X_test)

# Calculate and save SHAP feature importance
shap_importance = np.abs(shap_values.values).mean(axis=0)
feature_importance_df = pd.DataFrame({
    "Feature": X_test.columns,
    "SHAP Importance": shap_importance
}).sort_values(by="SHAP Importance", ascending=False)

feature_importance_df.to_csv(os.path.join(results_folder, "shap_feature_importance.csv"), index=False)
mt_regression.plot_feature_importance(feature_importance_df)
mt_regression.plot_shap_summary(shap_values, X_test)

print("Best model:", best_model)


In [None]:
# Multi-Target Regression Analysis - Model Evaluation

import os
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split, cross_val_predict
from sklearn.metrics import mean_squared_error, median_absolute_error, mean_absolute_error, r2_score
from math import sqrt
from sklearn.multioutput import MultiOutputRegressor
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

class ModelEvaluator:
    def __init__(self, results_folder, target_variables):
        """
        Initialize the ModelEvaluator class.
        
        Parameters:
        - results_folder (str): Folder containing the trained model and processed dataset.
        - target_variables (list): List of target variables to evaluate.
        """
        self.results_folder = results_folder
        self.target_variables = target_variables
        self.evaluation_folder = os.path.join(results_folder, "evaluation_results")
        # Create the evaluation folder if it doesn't exist
        os.makedirs(self.evaluation_folder, exist_ok=True)

    def load_model_and_data(self):
        """
        Load the trained model and processed dataset.

        Returns:
        - model (object): Loaded trained model.
        - data (DataFrame): Processed dataset.
        """
        model_path = os.path.join(self.results_folder, "best_model.pkl")
        data_path = os.path.join(self.results_folder, "processed_data.csv")
        try:
            model = joblib.load(model_path)  # Load the model
            data = pd.read_csv(data_path)  # Load the data
            print("Model and data loaded successfully.")
            return model, data
        except Exception as e:
            print(f"Error loading model/data: {e}")
            return None, None

    def calculate_metrics(self, y_true, y_pred):
        """
        Calculate regression metrics for model evaluation.

        Parameters:
        - y_true (array): True target values.
        - y_pred (array): Predicted target values.

        Returns:
        - metrics (dict): Dictionary containing RMSE, MAE, MedAE, and R².
        """
        rmse = sqrt(mean_squared_error(y_true, y_pred))
        mae = mean_absolute_error(y_true, y_pred)
        medae = median_absolute_error(y_true, y_pred)
        r2 = r2_score(y_true, y_pred)
        metrics = {
            "RMSE": rmse,
            "MAE": mae,
            "MedAE": medae,
            "R2": r2
        }
        return metrics

    def calculate_q2(self, model, X, y):
        """
        Calculate Q² (cross-validated R²) for the model.

        Parameters:
        - model (object): Trained model.
        - X (DataFrame): Feature set.
        - y (DataFrame): Target set.

        Returns:
        - mean_q2 (float): Average Q² across all targets.
        - y_pred (array): Cross-validated predictions.
        """
        y_pred = cross_val_predict(model, X, y, cv=5)  # Perform 5-fold cross-validation
        q2_values = []
        for i in range(y.shape[1]):  # Iterate over all target variables
            y_true_col = y.iloc[:, i] if isinstance(y, pd.DataFrame) else y[:, i]
            y_pred_col = y_pred[:, i]
            ss_total = np.sum((y_true_col - np.mean(y_true_col)) ** 2)  # Total sum of squares
            ss_residual = np.sum((y_true_col - y_pred_col) ** 2)  # Residual sum of squares
            q2 = 1 - (ss_residual / ss_total)  # Calculate Q²
            q2_values.append(q2)
        return np.mean(q2_values), y_pred

    def plot_true_vs_pred(self, y_true, y_pred, target):
        """
        Plot True vs Predicted values for a specific target variable.

        Parameters:
        - y_true (array): True values of the target variable.
        - y_pred (array): Predicted values of the target variable.
        - target (str): Name of the target variable.
        """
        plt.figure(figsize=(6, 4))
        sns.scatterplot(x=y_true, y=y_pred, alpha=0.7)
        plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2)  # Perfect prediction line
        plt.xlabel("True Values")
        plt.ylabel("Predicted Values")
        plt.title(f"True vs Predicted for {target}")
        plt.grid()
        plt.tight_layout()
        # Save the plot
        plot_path = os.path.join(self.evaluation_folder, f"true_vs_predicted_{target}.png")
        plt.savefig(plot_path, dpi=300)
        plt.close()

    def plot_r2_q2_single(self, r2, q2, target):
        """
        Plot R² vs Q² for a single target variable.

        Parameters:
        - r2 (float): R² value for the target variable.
        - q2 (float): Q² value for the target variable.
        - target (str): Name of the target variable.
        """
        plt.figure(figsize=(6, 4))
        plt.scatter([r2], [q2], label=f"{target}", color="blue", alpha=0.7)
        plt.plot([r2], [r2], 'r--', lw=1, label="R² = Q² Line")  # Reference line
        plt.xlabel("R²")
        plt.ylabel("Q²")
        plt.title(f"R² vs Q² for {target}")
        plt.legend()
        plt.grid()
        # Save the plot
        plot_path = os.path.join(self.evaluation_folder, f"r2_q2_{target}.png")
        plt.savefig(plot_path, dpi=300)
        plt.close()
        print(f"R²-Q² plot saved for {target} at {plot_path}")

    def permutation_test(self, model, X, y, n_permutations=100):
        """
        Perform a permutation test to evaluate model significance.

        Parameters:
        - model (object): Trained model.
        - X (DataFrame): Feature set.
        - y (DataFrame): Target set.
        - n_permutations (int): Number of permutations.

        Returns:
        - original_r2 (float): R² of the original model.
        - permuted_r2_scores (list): R² scores of permuted models.
        - p_value (float): P-value indicating the significance of the model.
        """
        print(f"Starting permutation test with {n_permutations} permutations...")
        original_r2 = r2_score(y, model.fit(X, y).predict(X))  # Original R² score
        permuted_r2_scores = []

        for _ in tqdm(range(n_permutations), desc="Permutations"):
            y_permuted = shuffle(y, random_state=None)  # Shuffle target values
            permuted_r2 = r2_score(y_permuted, model.fit(X, y_permuted).predict(X))
            permuted_r2_scores.append(permuted_r2)

        # Calculate p-value as the proportion of permuted R² scores >= original R²
        p_value = np.sum(np.array(permuted_r2_scores) >= original_r2) / n_permutations
        print(f"Permutation test completed. P-value: {p_value}")
        return original_r2, permuted_r2_scores, p_value

    def evaluate_model(self, model, data):
        """
        Perform full evaluation of the model, including metrics, R²-Q² plots, and permutation tests.

        Parameters:
        - model (object): Trained model.
        - data (DataFrame): Processed dataset.
        """
        X = data.drop(columns=self.target_variables)
        y = data[self.target_variables]

        # Split data into training and testing sets
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        # Calculate Q² and cross-validated predictions
        q2, y_pred_cv = self.calculate_q2(model, X, y)
        # Predict on the test set
        y_pred = model.fit(X_train, y_train).predict(X_test)

        metrics_list = []
        for i, target in enumerate(self.target_variables):
            y_test_target = y_test.iloc[:, i]
            y_pred_target = y_pred[:, i]
            y_cv_target = y_pred_cv[:, i]

            # Calculate metrics
            metrics = self.calculate_metrics(y_test_target, y_pred_target)
            metrics["Target"] = target

            # Calculate R² and Q²
            r2 = metrics["R2"]
            q2 = r2_score(y.iloc[:, i], y_cv_target)
            metrics["Q2"] = q2

            metrics_list.append(metrics)

            print(f"Metrics for {target}: {metrics}")
            # Plot True vs Predicted values
            self.plot_true_vs_pred(y_test_target, y_pred_target, target)
            # Plot R² vs Q²
            self.plot_r2_q2_single(r2, q2, target)

        # Save metrics to a CSV file
        metrics_df = pd.DataFrame(metrics_list)
        metrics_path = os.path.join(self.evaluation_folder, "overall_metrics.csv")
        metrics_df.to_csv(metrics_path, index=False)
        print(f"Metrics saved to {metrics_path}")

        # Run permutation test
        original_r2, permuted_r2_scores, p_value = self.permutation_test(model, X, y)
        print(f"Original R²: {original_r2:.4f}, P-value: {p_value:.4f}")

        # Save permutation test results
        plt.figure(figsize=(6, 4))
        sns.histplot(permuted_r2_scores, kde=True, color="skyblue", label="Permuted R²")
        plt.axvline(original_r2, color='red', linestyle='--', label='Original R²')
        plt.xlabel("R² Scores")
        plt.ylabel("Frequency")
        plt.legend()
        plt.title("Permutation Test Results")
        perm_test_path = os.path.join(self.evaluation_folder, "permutation_test_results.png")
        plt.savefig(perm_test_path, dpi=300)
        plt.close()
        print(f"Permutation test results saved to {perm_test_path}. Evaluation completed.")

    def main(self):
        """
        Main method to load the model and data, and perform the evaluation.
        """
        model, data = self.load_model_and_data()
        if model is not None and data is not None:
            self.evaluate_model(model, data)

if __name__ == "__main__":
    # Folder containing the model and data
    results_folder = "multi_output_best_model_results1-5"
    # Target variables for multi-target regression
    target_variables = ['TEQ_PCDDFs', 'TEQ_dlPCBs', 'TEQ_total', 'mPCBs_total', 'PBDEs_total']
    # Initialize and run the evaluator
    evaluator = ModelEvaluator(results_folder, target_variables)
    evaluator.main()


In [None]:
#Individual regression model and SHAP analysis
import os
import pandas as pd
import re
from pycaret.regression import setup, compare_models, tune_model, predict_model, save_model, pull
import seaborn as sns
import matplotlib.pyplot as plt
# Step 1: Load the data
file_path = 'AI_datasets4.csv'  # Specify the path to the input dataset
data = pd.read_csv(file_path, encoding='ISO-8859-1')  # Load the dataset with specified encoding

# Step 2: Define feature groups
# Define original categorical and continuous features
original_categorical_features = ['Province', 'Food']
continuous_features = [
    'Urbanization_Rate', 'Population_Density', 'Per_Capita_GDP', 'Average_Elevation', 
    'Annual_Average_Temperature', 'Annual_Average_Rainfall', "Industry_GDP", 'Longitude', 'Latitude'
]
# Define the target variables for regression
target_variables = ['TEQ_PCDDFs', 'TEQ_dlPCBs', 'TEQ_total', 'mPCBs_total', 'PBDEs_total']

# Step 3: Preprocess data
# Specify the folder to store results and ensure its existence
results_folder = "Regression analysis"
os.makedirs(results_folder, exist_ok=True)

def is_tunable(model):
    """
    Check if the given model supports hyperparameter tuning in PyCaret.

    Parameters:
    - model: Model object from PyCaret.

    Returns:
    - bool: True if the model is tunable, False otherwise.
    """
    try:
        tune_model(model, n_iter=10, verbose=False)
        return True
    except Exception:
        return False

def filter_outliers_and_zeros(df, target):
    """
    Remove zero values and outliers from the target variable.

    Parameters:
    - df (DataFrame): Input dataset.
    - target (str): Target variable name.

    Returns:
    - df (DataFrame): Cleaned dataset.
    """
    df = df[df[target] != 0]  # Remove rows where target value is zero
    Q1, Q3 = df[target].quantile([0.25, 0.75])  # Calculate quartiles
    IQR = Q3 - Q1  # Interquartile range
    # Remove outliers based on 2*IQR
    df = df[(df[target] >= Q1 - 2 * IQR) & (df[target] <= Q3 + 2 * IQR)]
    return df

def encode_and_map_features(data, categorical_features):
    """
    Perform one-hot encoding for categorical features.

    Parameters:
    - data (DataFrame): Input dataset.
    - categorical_features (list): List of categorical feature names.

    Returns:
    - data (DataFrame): Dataset with encoded features.
    - encoded_columns (list): List of encoded feature column names.
    """
    data = pd.get_dummies(data, columns=categorical_features, drop_first=True)  # One-hot encoding
    encoded_columns = [col for col in data.columns if col.startswith(tuple(categorical_features))]
    return data, encoded_columns

# Step 4: Model training and evaluation for each target variable
# Store the best models and evaluation results for each target variable
best_models = {}
evaluation_results = []

for target in target_variables:
    print(f"\nProcessing target variable: {target}")
    sanitized_target = re.sub(r'[\\/*?:"<>|]', '_', target)  # Sanitize target name for file paths

    # Prepare data for the current target variable
    data_target = data.dropna(subset=[target])  # Remove rows where the target is missing
    data_target = filter_outliers_and_zeros(data_target, target)  # Remove zeros and outliers

    # Drop other target variables to avoid data leakage
    other_targets = [col for col in target_variables if col != target]
    data_target = data_target.drop(columns=other_targets, errors='ignore')

    # Encode categorical features
    data_target, encoded_columns = encode_and_map_features(data_target, original_categorical_features)
    categorical_features = encoded_columns

    # Save the processed data for the current target
    encoded_data_path = os.path.join(results_folder, f"encoded_data_{sanitized_target}.csv")
    data_target.to_csv(encoded_data_path, index=False)
    print(f"Encoded data saved to {encoded_data_path}")

    # Setup PyCaret regression environment
    regression_setup = setup(
        data=data_target,
        target=target,
        categorical_features=categorical_features,
        numeric_features=continuous_features,
        session_id=42,  # Ensure reproducibility
        normalize=True,
        verbose=False
    )

    # Compare and select the best models
    best_model = compare_models(n_select=5)  # Select the top 5 models
    # Tune the best model if tunable
    tuned_model = tune_model(best_model[0]) if is_tunable(best_model[0]) else best_model[0]

    # Save the tuned/best model
    model_path = os.path.join(results_folder, f'best_model_{sanitized_target}.pkl')
    save_model(tuned_model, model_path)
    print(f"Model for {target} saved to {model_path}")

    # Save evaluation metrics
    evaluation_table = pull()  # Get PyCaret's evaluation metrics
    evaluation_table.to_csv(os.path.join(results_folder, f'evaluation_metrics_{sanitized_target}.csv'), index=False)
    print(f"Evaluation metrics for {target} saved.")

    # Save model predictions
    predictions = predict_model(tuned_model, data=data_target)
    predictions_file = os.path.join(results_folder, f'predictions_{sanitized_target}.csv')
    pred_col = 'prediction' if 'prediction' in predictions.columns else 'prediction_label'
    predictions[[target, pred_col]].to_csv(predictions_file, index=False)
    print(f"Predictions for {target} saved to {predictions_file}")

    # Generate True vs Predicted scatter plot
    plt.figure(figsize=(12, 6))
    sns.scatterplot(x=predictions[pred_col], y=predictions[target], alpha=0.6)
    plt.plot([predictions[pred_col].min(), predictions[pred_col].max()],
             [predictions[pred_col].min(), predictions[pred_col].max()],
             color='red', linestyle='--', lw=2)
    plt.title(f"True vs Predicted for {target}")
    plt.xlabel('Predicted Values')
    plt.ylabel('True Values')
    plt.grid(True)
    plot_path = os.path.join(results_folder, f'true_vs_predicted_{sanitized_target}.png')
    plt.tight_layout()
    plt.savefig(plot_path)
    plt.close()
    print(f"True vs Predicted plot saved for {target}.")

    # Feature Importance Analysis with SHAP
    try:
        import shap
        # Create SHAP explainer and calculate SHAP values
        explainer = shap.Explainer(tuned_model, data_target.drop(columns=[target], errors='ignore'))
        shap_values = explainer(data_target.drop(columns=[target], errors='ignore'))

        # Save feature importance to a CSV file
        feature_importance = shap_values.abs.mean(0).values  # Average absolute SHAP values
        feature_names = data_target.drop(columns=[target], errors='ignore').columns
        feature_importance_df = pd.DataFrame({
            'Feature': feature_names,
            'Importance': feature_importance
        }).sort_values(by='Importance', ascending=False)
        feature_importance_path = os.path.join(results_folder, f"feature_importance_{sanitized_target}.csv")
        feature_importance_df.to_csv(feature_importance_path, index=False)
        print(f"All feature importance saved to {feature_importance_path}")

        # Top 15 features for SHAP summary plot
        top_features_idx = feature_importance.argsort()[-15:][::-1]
        top_features = feature_names[top_features_idx]
        shap_values_top = shap_values[:, top_features_idx]

        # Generate and save SHAP summary plot
        shap_summary_plot_path = os.path.join(results_folder, f"shap_summary_top15_{sanitized_target}.png")
        shap.summary_plot(shap_values_top, data_target[top_features], show=False)
        plt.savefig(shap_summary_plot_path, bbox_inches='tight')
        plt.close()
        print(f"SHAP summary plot for top 15 features saved to {shap_summary_plot_path}")

    except Exception as e:
        print(f"Feature importance not available for {target}: {e}")

    # Store evaluation metrics and models
    evaluation_table['Target'] = target
    evaluation_results.append(evaluation_table)
    best_models[target] = tuned_model

# Combine all evaluation metrics into a single CSV
combined_evaluation_path = os.path.join(results_folder, "combined_evaluation_metrics.csv")
pd.concat(evaluation_results, ignore_index=True).to_csv(combined_evaluation_path, index=False)
print(f"\nCombined evaluation metrics saved to {combined_evaluation_path}")


In [None]:
# Evaluation of individual regression model 
import os
import joblib
import pandas as pd
from sklearn.metrics import mean_squared_error, median_absolute_error, mean_absolute_error
from sklearn.model_selection import train_test_split
from math import sqrt
from scipy.spatial.distance import mahalanobis
from sklearn.preprocessing import StandardScaler
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

class ModelEvaluator:
    def __init__(self, results_folder, target_variables):
        """
        Initialize the ModelEvaluator class.
        
        Parameters:
        - results_folder (str): The folder where the models and data files are stored.
        - target_variables (list): List of target variables to evaluate.
        """
        self.results_folder = results_folder
        self.target_variables = target_variables
        self.evaluation_folder = os.path.join(results_folder, "evaluation_results2")
        
        # Ensure that the evaluation results folder exists
        os.makedirs(self.evaluation_folder, exist_ok=True)

    def load_model_and_data(self, target):
        """
        Load a pre-trained model and corresponding encoded data for a specific target variable.
        
        Parameters:
        - target (str): The name of the target variable.

        Returns:
        - model (object): The loaded machine learning model.
        - data (DataFrame): The encoded dataset corresponding to the target variable.
        """
        model_path = os.path.join(self.results_folder, f"best_model_{target}.pkl.pkl")
        data_path = os.path.join(self.results_folder, f"encoded_data_{target}.csv")
        try:
            model = joblib.load(model_path)
            data = pd.read_csv(data_path)
            print(f"Model and data loaded for {target}.")
            return model, data
        except Exception as e:
            print(f"Error loading model/data for {target}: {e}")
            return None, None

    def calculate_metrics(self, y_true, y_pred):
        """
        Calculate regression evaluation metrics.
        
        Parameters:
        - y_true (array): The true target values.
        - y_pred (array): The predicted target values.

        Returns:
        - metrics (dict): A dictionary containing RMSE, MAE, and MedAE values.
        """
        rmse = sqrt(mean_squared_error(y_true, y_pred))  # Root Mean Squared Error
        mae = mean_absolute_error(y_true, y_pred)       # Mean Absolute Error
        medae = median_absolute_error(y_true, y_pred)   # Median Absolute Error
        metrics = {
            "RMSE": rmse,
            "MAE": mae,
            "MedAE": medae
        }
        return metrics

    def permutation_test(self, model, X_test, y_test, n_permutations=50):
        """
        Perform a permutation test to evaluate the robustness of the model.
        
        Parameters:
        - model (object): The machine learning model.
        - X_test (DataFrame): The test features.
        - y_test (array): The test target values.
        - n_permutations (int): The number of permutations to perform.

        Returns:
        - original_score (float): The original R² score of the model.
        - permuted_scores (list): A list of R² scores from permuted data.
        """
        original_score = model.score(X_test, y_test)  # Original R² score
        permuted_scores = []
        for _ in tqdm(range(n_permutations), desc="Permutation Test"):
            # Shuffle the target values
            y_test_permuted = np.random.permutation(y_test)
            try:
                # Calculate the model score on permuted data
                permuted_score = model.score(X_test, y_test_permuted)
            except Exception as e:
                print(f"Permutation test error: {e}")
                permuted_score = np.nan
            permuted_scores.append(permuted_score)
        return original_score, permuted_scores

    def applicability_domain(self, X_train, X_test, epsilon=1e-5):
        """
        Assess the applicability domain using Mahalanobis distance.

        Parameters:
        - X_train (DataFrame): The training features.
        - X_test (DataFrame): The test features.
        - epsilon (float): A small constant added for numerical stability.

        Returns:
        - mahalanobis_distances (array): Mahalanobis distances for the test samples.
        """
        scaler = StandardScaler()  # Standardize features
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        # Calculate the covariance matrix and its pseudo-inverse
        cov_matrix = np.cov(X_train_scaled, rowvar=False) + epsilon * np.eye(X_train_scaled.shape[1])
        inv_cov_matrix = np.linalg.pinv(cov_matrix)

        # Calculate Mahalanobis distances for each test sample
        mahalanobis_distances = [
            mahalanobis(x, np.mean(X_train_scaled, axis=0), inv_cov_matrix) for x in X_test_scaled
        ]
        return np.array(mahalanobis_distances)

    def plot_true_vs_pred(self, y_true, y_pred, target):
        """
        Create a scatter plot of true vs. predicted values.

        Parameters:
        - y_true (array): The true target values.
        - y_pred (array): The predicted target values.
        - target (str): The name of the target variable.
        """
        plt.figure(figsize=(5, 3))
        sns.scatterplot(x=y_pred, y=y_true, alpha=0.3, s=15)
        plt.plot([y_pred.min(), y_pred.max()], [y_pred.min(), y_pred.max()],
                 color='red', linestyle='--', lw=1)  # Line of perfect prediction
        plt.xticks(fontsize=10)
        plt.yticks(fontsize=10)
        plt.xlabel("Predicted Values", fontsize=14)
        plt.ylabel("True Values", fontsize=14)
        plt.tight_layout()
        plt.grid(True)
        plot_path = os.path.join(self.evaluation_folder, f"true_vs_predicted_{target}.png")
        plt.savefig(plot_path, dpi=300)
        plt.close()

    def plot_permutation_test(self, original_score, permuted_scores, target):
        """
        Plot the results of the permutation test.

        Parameters:
        - original_score (float): The original R² score.
        - permuted_scores (list): A list of R² scores from permuted data.
        - target (str): The name of the target variable.
        """
        plt.figure(figsize=(5, 3))
        sns.histplot(permuted_scores, kde=True, bins=30, color='blue', alpha=0.5)
        plt.axvline(original_score, color='red', linestyle='--', lw=1, label='Original Score')
        plt.xticks(fontsize=10)
        plt.yticks(fontsize=10)
        plt.xlabel("Permuted Scores", fontsize=14)
        plt.ylabel("Frequency", fontsize=14)
        plt.legend(fontsize=8)
        plt.tight_layout()
        plot_path = os.path.join(self.evaluation_folder, f"permutation_test_{target}.png")
        plt.savefig(plot_path, dpi=300)
        plt.close()

    def evaluate_model(self, model, data, target):
        """
        Perform the full evaluation for a single model and dataset.

        Parameters:
        - model (object): The pre-trained machine learning model.
        - data (DataFrame): The dataset corresponding to the target variable.
        - target (str): The name of the target variable.
        """
        X = data.drop(columns=[target], errors='ignore')  # Features
        y = data[target]  # Target

        # Split the data into training and testing sets
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        y_pred = model.predict(X_test)

        # Calculate and save evaluation metrics
        metrics = self.calculate_metrics(y_test, y_pred)
        metrics_df = pd.DataFrame([metrics])
        metrics_path = os.path.join(self.evaluation_folder, f"metrics_{target}.csv")
        metrics_df.to_csv(metrics_path, index=False)

        # Generate evaluation plots
        self.plot_true_vs_pred(y_test, y_pred, target)
        original_score, permuted_scores = self.permutation_test(model, X_test, y_test)
        self.plot_permutation_test(original_score, permuted_scores, target)

    def main(self):
        """
        Iterate through all target variables and evaluate their models.
        """
        for target in self.target_variables:
            model, data = self.load_model_and_data(target)
            if model is not None and data is not None:
                self.evaluate_model(model, data, target)

if __name__ == "__main__":
    # Define the results folder and target variables
    results_folder = "Regression analysis"
    target_variables = ['TEQ_PCDDFs', 'TEQ_dlPCBs', 'TEQ_total', 'mPCBs_total', 'PBDEs_total']

    # Initialize and run the ModelEvaluator
    evaluator = ModelEvaluator(results_folder, target_variables)
    evaluator.main()
