# Gaussian Process Regression with Uncertainty Quantification

## Overview
This notebook implements a complete pipeline for Gaussian Process Regression with:
- **QuantileTransformer** for input features (X)
- **LogTransformer** for target variable (y)
- **Hyperparameter optimization** using CV set
- **Uncertainty quantification** with standard deviations
- **Comprehensive visualization** of predictions and uncertainty

## Key Features
1. Scikit-learn pipeline architecture
2. Multiple kernel options tested
3. Proper handling of std predictions
4. Visualization of first 50 predictions with uncertainty bounds
5. Model persistence for later use

In [1]:
# Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin
from sklearn.preprocessing import QuantileTransformer
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import (
    RBF, Matern, WhiteKernel, ConstantKernel as C, RationalQuadratic
)
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_absolute_error, r2_score, root_mean_squared_error
import joblib
from SALib.sample import saltelli
from SALib.analyze import sobol
import seaborn as sns

warnings.filterwarnings('ignore')
np.random.seed(42)

print("All libraries imported successfully!")


All libraries imported successfully!


## 1. Configurations

In [None]:
# Configuration
CONFIG = {
    'random_state': 42,
    'n_quantiles': 100,
    'output_distribution': 'normal',
    'n_restarts_optimizer': 10,
    'alpha': 1e-10,
    'normalize_y': False,
    'plot_samples': 50,
    'save_model': True,
    'model_path': 'gp_model_optimized.pkl',
}

print("Configuration set!")
for key, value in CONFIG.items():
    print(f"  {key}: {value}")


Configuration set!
  random_state: 42
  n_quantiles: 100
  output_distribution: normal
  n_restarts_optimizer: 10
  alpha: 1e-10
  normalize_y: False
  plot_samples: 50
  save_model: True
  model_path: gp_model_optimized.pkl


## 2. Custom Transformers and Wrapper

In [3]:
class LogTransformer(BaseEstimator, TransformerMixin):
    """Log transformation for target variable."""
    
    def __init__(self):
        self.fitted_ = False
    
    def fit(self, y, **fit_params):
        self.fitted_ = True
        return self
    
    def transform(self, y):
        if not self.fitted_:
            raise RuntimeError("Transformer must be fitted before transform")
        return np.log1p(y)
    
    def inverse_transform(self, y):
        return np.expm1(y)
    
    def get_feature_names_out(self, input_features=None):
        if input_features is None:
            return None
        return [f"{name}_log" for name in input_features]


class GPRegressorWithStd(BaseEstimator, RegressorMixin):
    """Wrapper for GaussianProcessRegressor that stores std predictions."""
    
    def __init__(self, kernel=None, alpha=1e-10, n_restarts_optimizer=10, 
                 normalize_y=False, random_state=None):
        self.kernel = kernel
        self.alpha = alpha
        self.n_restarts_optimizer = n_restarts_optimizer
        self.normalize_y = normalize_y
        self.random_state = random_state
        self.gp_ = None
        self.y_std_ = None
        
    def fit(self, X, y):
        self.gp_ = GaussianProcessRegressor(
            kernel=self.kernel,
            alpha=self.alpha,
            n_restarts_optimizer=self.n_restarts_optimizer,
            normalize_y=self.normalize_y,
            random_state=self.random_state
        )
        self.gp_.fit(X, y)
        return self
    
    def predict(self, X, return_std=False):
        if return_std:
            y_pred, y_std = self.gp_.predict(X, return_std=True)
            self.y_std_ = y_std
            return y_pred
        else:
            return self.gp_.predict(X, return_std=False)
    
    def get_params(self, deep=True):
        return {
            'kernel': self.kernel,
            'alpha': self.alpha,
            'n_restarts_optimizer': self.n_restarts_optimizer,
            'normalize_y': self.normalize_y,
            'random_state': self.random_state
        }
    
    def set_params(self, **params):
        for key, value in params.items():
            setattr(self, key, value)
        return self
    
    def score(self, X, y):
        y_pred = self.predict(X)
        return r2_score(y, y_pred)


print("Custom transformers defined!")


Custom transformers defined!


## 3. Kernel Definitions

In [None]:
def get_kernel_options():
    """Define various kernel options for testing."""
    
    # Common bounds (reuse variables for cleaner code)
    ls_bounds = (1e-2, 1e2)
    c_bounds = (1e-3, 1e3)
    noise_bounds = (1e-10, 1e0)
    
    kernels = {
        # --- Standard Kernels ---
        # 'matern_1.5': C(1.0, c_bounds) * Matern(
        #     length_scale=1.0, length_scale_bounds=ls_bounds, nu=1.5
        # ) + WhiteKernel(noise_level=1e-5, noise_level_bounds=noise_bounds),
        
        # 'matern_2.5': C(1.0, c_bounds) * Matern(
        #     length_scale=1.0, length_scale_bounds=ls_bounds, nu=2.5
        # ) + WhiteKernel(noise_level=1e-5, noise_level_bounds=noise_bounds),
        
        # 'rbf': C(1.0, c_bounds) * RBF(
        #     length_scale=1.0, length_scale_bounds=ls_bounds
        # ) + WhiteKernel(noise_level=1e-5, noise_level_bounds=noise_bounds),
        
        # 'rational_quadratic': C(1.0, c_bounds) * RationalQuadratic(
        #     length_scale=1.0, alpha=1.0,
        #     length_scale_bounds=ls_bounds, alpha_bounds=ls_bounds
        # ) + WhiteKernel(noise_level=1e-5, noise_level_bounds=noise_bounds),
        
        # --- COMBINED KERNELS (NEW) ---
        
        # Option 1: Additive (Best for Structural Mechanics)
        # RBF captures global smooth trends, Matern captures local interactions
        'rbf_plus_matern': (
            C(1.0, c_bounds) * RBF(length_scale=1.0, length_scale_bounds=ls_bounds) + 
            C(1.0, c_bounds) * Matern(length_scale=1.0, length_scale_bounds=ls_bounds, nu=2.5)
        ) + WhiteKernel(noise_level=1e-5, noise_level_bounds=noise_bounds),
        
        # Option 2: Multiplicative
        # Creates a complex correlation structure
        'rbf_times_matern': (
            C(1.0, c_bounds) * 
            RBF(length_scale=1.0, length_scale_bounds=ls_bounds) * 
            Matern(length_scale=1.0, length_scale_bounds=ls_bounds, nu=2.5)
        ) + WhiteKernel(noise_level=1e-5, noise_level_bounds=noise_bounds),
    }
    return kernels


In [None]:
# def get_kernel_options():
#     """Define various kernel options for testing."""
#     kernels = {
#         'matern_1.5': C(1.0, (1e-3, 1e3)) * Matern(
#             length_scale=1.0, length_scale_bounds=(1e-2, 1e2), nu=1.5
#         ) + WhiteKernel(noise_level=1e-5, noise_level_bounds=(1e-10, 1e0)),
        
#         'matern_2.5': C(1.0, (1e-3, 1e3)) * Matern(
#             length_scale=1.0, length_scale_bounds=(1e-2, 1e2), nu=2.5
#         ) + WhiteKernel(noise_level=1e-5, noise_level_bounds=(1e-10, 1e0)),
        
#         'rbf': C(1.0, (1e-3, 1e3)) * RBF(
#             length_scale=1.0, length_scale_bounds=(1e-2, 1e2)
#         ) + WhiteKernel(noise_level=1e-5, noise_level_bounds=(1e-10, 1e0)),
        
#         'rational_quadratic': C(1.0, (1e-3, 1e3)) * RationalQuadratic(
#             length_scale=1.0, alpha=1.0,
#             length_scale_bounds=(1e-2, 1e2), alpha_bounds=(1e-2, 1e2)
#         ) + WhiteKernel(noise_level=1e-5, noise_level_bounds=(1e-10, 1e0)),
        
#         'rbf_simple': C(1.0, (1e-3, 1e3)) * RBF(
#             length_scale=1.0, length_scale_bounds=(1e-2, 1e2)
#         ),
#     }
#     return kernels

# kernels = get_kernel_options()
# print(f"Defined {len(kernels)} kernel options:")
# for name in kernels.keys():
#     print(f"  - {name}")


Defined 5 kernel options:
  - matern_1.5
  - matern_2.5
  - rbf
  - rational_quadratic
  - rbf_simple


## 4. Load Data

In [None]:
df = pd.read_csv(r'/Volumes/SG_Apple/Sensitivity Analysis/Final Model Fitting/sph.csv')


In [13]:
sample_df = df.groupby('Strat_cat').sample(frac=0.5,random_state=42)
sample_df['Strat_cat'].value_counts()

train_set, hold_set = train_test_split(
    sample_df, test_size=0.5, shuffle=True, stratify=sample_df['Strat_cat'], random_state=42)
cv_set, test_set = train_test_split(
    hold_set, test_size=0.5, shuffle=True, stratify=hold_set['Strat_cat'], random_state=42)

for col in (train_set,cv_set,test_set):
    col.drop(['Strat_cat'],axis=1,inplace=True)


In [None]:
X_train = train_set.iloc[:, :10].values
y_train = train_set.iloc[:, 10].values
X_test = test_set.iloc[:, :10].values
y_test = test_set.iloc[:, 10].values
X_cv = cv_set.iloc[:, :10].values
y_cv = cv_set.iloc[:, 10].values

print(f"Dataset loaded:")
print(f"  Train: X={X_train.shape}, y={y_train.shape}")
print(f"  CV:    X={X_cv.shape}, y={y_cv.shape}")
print(f"  Test:  X={X_test.shape}, y={y_test.shape}")


Dataset loaded:
  Train: X=(2500, 10), y=(2500,)
  CV:    X=(1250, 10), y=(1250,)
  Test:  X=(1250, 10), y=(1250,)


## 5. Transform Target Variable

In [8]:
# # Create and fit log transformer for target
# qt_y = LogTransformer()

# y_train_scaled = qt_y.fit_transform(y_train.reshape(-1, 1)).flatten()
# y_cv_scaled = qt_y.transform(y_cv.reshape(-1, 1)).flatten()
# y_test_scaled = qt_y.transform(y_test.reshape(-1, 1)).flatten()

# print("Target variable transformed (log1p):")
# print(f"  Original range: [{y_train.min():.2f}, {y_train.max():.2f}]")
# print(f"  Scaled range:   [{y_train_scaled.min():.2f}, {y_train_scaled.max():.2f}]")


## 6. Hyperparameter Optimization

In [None]:
#test different kernels on the CV set to find the best configuration.


# def create_pipeline(kernel, config=CONFIG):
#     """Create pipeline with given kernel."""
#     return Pipeline([
#         ('quantile_transform', QuantileTransformer(
#             n_quantiles=config['n_quantiles'],
#             output_distribution=config['output_distribution'],
#             random_state=config['random_state']
#         )),
#         ('gp_regressor', GPRegressorWithStd(
#             kernel=kernel,
#             alpha=config['alpha'],
#             n_restarts_optimizer=config['n_restarts_optimizer'],
#             normalize_y=config['normalize_y'],
#             random_state=config['random_state']
#         ))
#     ])


# # Test each kernel
# print("\n" + "="*70)
# print("TESTING DIFFERENT KERNELS")
# print("="*70)

# results = {}
# kernels = get_kernel_options()

# for kernel_name, kernel in kernels.items():
#     print(f"\nTesting: {kernel_name}")
#     print("-" * 50)
    
#     try:
#         # Create and fit pipeline
#         pipeline = create_pipeline(kernel)
#         pipeline.fit(X_train, y_train_scaled)
        
#         # Predict on CV set
#         y_cv_pred = pipeline.predict(X_cv)
        
#         # Compute metrics
#         mse = np.mean((y_cv_scaled - y_cv_pred) ** 2)
#         rmse = np.sqrt(mse)
#         mae = np.mean(np.abs(y_cv_scaled - y_cv_pred))
#         r2 = r2_score(y_cv_scaled, y_cv_pred)
        
#         results[kernel_name] = {
#             'pipeline': pipeline,
#             'mse': mse,
#             'rmse': rmse,
#             'mae': mae,
#             'r2': r2
#         }
        
#         print(f"  MSE:  {mse:.6f}")
#         print(f"  RMSE: {rmse:.6f}")
#         print(f"  MAE:  {mae:.6f}")
#         print(f"  R²:   {r2:.6f}")
        
#     except Exception as e:
#         print(f"  ❌ Failed: {str(e)}")
#         results[kernel_name] = None

# # Find best kernel
# valid_results = {k: v for k, v in results.items() if v is not None}
# best_kernel_name = min(valid_results, key=lambda k: valid_results[k]['mse'])
# best_pipeline = valid_results[best_kernel_name]['pipeline']

# print("\n" + "="*70)
# print("BEST KERNEL")
# print("="*70)
# print(f"Kernel: {best_kernel_name}")
# print(f"CV MSE:  {valid_results[best_kernel_name]['mse']:.6f}")
# print(f"CV RMSE: {valid_results[best_kernel_name]['rmse']:.6f}")
# print(f"CV R²:   {valid_results[best_kernel_name]['r2']:.6f}")


In [15]:
# print("\nRetraining best model on combined train+CV set...")

# X_combined = np.vstack([X_train, X_cv])
# y_combined = np.concatenate([y_train_scaled, y_cv_scaled])

# best_pipeline.fit(X_combined, y_combined)

# print("✓ Final model trained!")
# print(f"  Training samples: {len(X_combined)}")
# print(f"  Features: {X_combined.shape[1]}")


In [16]:
# def compute_metrics(y_true, y_pred):
#     """Compute evaluation metrics."""
#     return {
#         "R2": f"{r2_score(y_true, y_pred):.3f}",
#         "MAE": f"{mean_absolute_error(y_true, y_pred):.3f}",
#         "RMSE": f"{root_mean_squared_error(y_true, y_pred):.3f}",
#     }


# print("\n" + "="*70)
# print("EVALUATION ON TEST SET")
# print("="*70)

# # Get predictions with standard deviation
# gp_model = best_pipeline.named_steps['gp_regressor']
# X_test_transformed = best_pipeline.named_steps['quantile_transform'].transform(X_test)

# y_pred_scaled = gp_model.predict(X_test_transformed, return_std=True)
# y_std_scaled = gp_model.y_std_

# # Metrics in scaled space
# metrics_scaled = compute_metrics(y_test_scaled, y_pred_scaled)

# print("\nMetrics in Scaled Space (log-transformed):")
# for metric, value in metrics_scaled.items():
#     print(f"  {metric:5s}: {value}")

# # Transform back to original space
# y_test_original = qt_y.inverse_transform(y_test_scaled)
# y_pred_original = qt_y.inverse_transform(y_pred_scaled)

# # Uncertainty bounds (±2σ)
# y_lower_original = qt_y.inverse_transform(y_pred_scaled - 2*y_std_scaled)
# y_upper_original = qt_y.inverse_transform(y_pred_scaled + 2*y_std_scaled)

# # Metrics in original space
# metrics_original = compute_metrics(y_test_original, y_pred_original)

# print("\nMetrics in Original Space:")
# for metric, value in metrics_original.items():
#     print(f"  {metric:5s}: {value}")

# print("\n" + "="*70)


In [None]:
# Plot first N samples with uncertainty bounds
n_samples = min(CONFIG['plot_samples'], len(y_test))
indices = np.arange(n_samples)

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))

# Plot 1: Predictions with uncertainty bounds
ax1.plot(indices, y_test_original[:n_samples], 'ko-', 
         label='True Values', markersize=6, linewidth=1.5, alpha=0.7)
ax1.plot(indices, y_pred_original[:n_samples], 'bs-', 
         label='Predictions', markersize=5, linewidth=1.5, alpha=0.7)

ax1.fill_between(indices, 
                 y_lower_original[:n_samples], 
                 y_upper_original[:n_samples],
                 alpha=0.3, color='blue', 
                 label='95% Confidence Interval (±2σ)')

ax1.set_xlabel('Sample Index', fontsize=12)
ax1.set_ylabel('Target Value (Omega)', fontsize=12)
ax1.set_title(f'Predictions with Uncertainty Bounds (First {n_samples} Samples)', 
              fontsize=14, fontweight='bold')
ax1.legend(loc='best', fontsize=10)
ax1.grid(True, alpha=0.3)

# Plot 2: Residuals with uncertainty
residuals = y_test_original[:n_samples] - y_pred_original[:n_samples]
std_width = y_upper_original[:n_samples] - y_pred_original[:n_samples]

ax2.scatter(indices, residuals, c='darkred', s=50, alpha=0.6, 
            label='Residuals')
ax2.fill_between(indices, -std_width, std_width,
                 alpha=0.2, color='gray', 
                 label='±2σ Uncertainty')
ax2.axhline(y=0, color='black', linestyle='--', linewidth=1.5)

ax2.set_xlabel('Sample Index', fontsize=12)
ax2.set_ylabel('Residual (True - Predicted)', fontsize=12)
ax2.set_title('Residuals with Uncertainty Bounds', fontsize=14, fontweight='bold')
ax2.legend(loc='best', fontsize=10)
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()
