In [None]:
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Kernel, StationaryKernelMixin, NormalizedKernelMixin
import geopandas as gpd
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from scipy.optimize import fmin_l_bfgs_b
import warnings
warnings.filterwarnings("ignore")

class SpatialSimilarityKernel(StationaryKernelMixin, NormalizedKernelMixin, Kernel):
    """
    Spatial Similarity kernel class that extends scikit-learn's Gaussian Process kernels.
    Inherits from StationaryKernelMixin (for stationarity) and NormalizedKernelMixin.
    """

    def __init__(self, P_function, sigma_values, epsilon=0.0001):
        """
        Initializes the spatial similarity kernel.
        :param P_function: An array of weights that will be used to combine the exponential terms (feature importance).
        :param sigma_values: An array of variance estimates for each feature, used in the exponential decay.
        :param epsilon: A small value added to diagonal elements for numerical stability.
        """
        self.P_function = P_function
        self.sigma_values = sigma_values
        self.epsilon = epsilon

    def _calculate_similarity(self, u, v):
        """
        Computes element-wise exponential similarity between two matrices (u, v).
        Exponential decay is applied based on the squared differences scaled by sigma_values.

        :param u: Feature matrix X (shape [n_samples, n_features])
        :param v: Feature matrix Y (shape [m_samples, n_features])
        :return: An array E_i containing spatial similarity for each feature.
        """
        # Add extra dimensions to broadcast easily
        u = u[:, np.newaxis, :]
        v = v[np.newaxis, :, :]

        # Compute squared differences between each pair of points
        sq_diff = (u - v) ** 2

        # Calculate exponential decay for each feature using sigma_values
        E_i = np.exp(-((sq_diff) / (2 * (self.sigma_values))))
        return E_i

    def __call__(self, X, Y=None, eval_gradient=False):
        """
        Main function used by GaussianProcessRegressor to compute the kernel matrix.

        :param X: Feature matrix of shape [n_samples, n_features]
        :param Y: (Optional) Another feature matrix. If None, use X for both.
        :param eval_gradient: (Unused) Determines if gradient is computed, not used here.
        :return: The kernel similarity matrix of shape [n_samples, m_samples].
        """
        if Y is None:
            Y = X

        # Get exponential similarities feature-wise
        E_i = self._calculate_similarity(X, Y)

        # Weighted average across features using P_function as weights
        S_uv = np.average(E_i, axis=2, weights=self.P_function)
        return S_uv

    def diag(self, X):
        """
        Returns the diagonal of the kernel matrix for a given X.
        Includes a small epsilon for numerical stability.
        """
        return np.diag(np.ones(X.shape[0])) + self.epsilon

    def is_stationary(self):
        """
        Indicates that the kernel is stationary (depends on distances, not absolute positions).
        """
        return True


def loss_function(P_function, X_train, y_train, sigma_values, alpha=1e-0):
    """
    Defines a loss function for optimization based on Gaussian Process Regressor predictions.

    :param P_function: Current set of feature weights to be evaluated.
    :param X_train: Training features.
    :param y_train: Training target values.
    :param sigma_values: Array of variances used in the exponential similarity calculation.
    :param alpha: Regularization parameter for the Gaussian Process Regressor.
    :return: RMSE (root mean squared error) for the given set of feature weights.
    """
    # Instantiate the spatial similarity kernel with the current P_function
    kernel = SpatialSimilarityKernel(P_function=P_function, sigma_values=sigma_values)
    # Create a GPR model with the spatial similarity kernel
    gpr = GaussianProcessRegressor(kernel=kernel, alpha=alpha, optimizer=None)
    # Fit the model on the training set
    gpr.fit(X_train, y_train)
    # Predict on the same training data
    y_pred = gpr.predict(X_train, return_std=False)
    # Compute the RMSE for this set of predictions
    rmse = mean_squared_error(y_train, y_pred, squared=False)
    return rmse

def objective(P_function):
    """
    Wrapper function for the optimizer (fmin_l_bfgs_b).
    Returns the RMSE for the current P_function, which the optimizer tries to minimize.
    """
    return loss_function(P_function, X_train, y_train, sigma_values, alpha=1e-0)

# Load data
chicago = gpd.read_file("/data/chicago_scv.geojson")
chicago['pct_white'] = 1 - chicago['pct_nonwhi']
chicago['PD_log'] = np.log(chicago['population'])
y = np.log(chicago['TripCount'].values)

# Select features
X_vars = ['pct_white','pct_bachel', 'pct_no_veh','PD_log','job_entrop','TripMiles_']
X = chicago[X_vars]

# Standardize the feature matrix
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Split data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.9, random_state=999)

# Compute the variance of each feature in the training set
sigma_values = np.array(np.var(X_train, axis=0))

# Initialize P_function (weights) for all features
initial_P_function = np.ones(len(X_vars)) * 1

# Set bounds for each weight (between 0 and 5)
bounds = [(0, 5)] * len(X_vars)

# Use L-BFGS-B to optimize the objective function
optimized_P_function, min_loss, info = fmin_l_bfgs_b(
    objective,
    initial_P_function,
    bounds=bounds,
    approx_grad=True
)

print("Optimized P_function: ", optimized_P_function)

# Build a GaussianProcessRegressor using the optimized weights
kernel = SpatialSimilarityKernel(P_function=optimized_P_function, sigma_values=sigma_values)
gpr = GaussianProcessRegressor(kernel=kernel, alpha=1e-1, optimizer=None)

# Fit the final model on the training data
gpr.fit(X_train, y_train)

# Predict on the test set
y_pred = gpr.predict(X_test, return_std=False)

# Compute metrics (MSE, MAE, MAPE, R²) on the test set
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
mape = mean_absolute_percentage_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
r2 = round(r2, 4)

print("MSE: ", mse.round(4))
print("MAE: ", mae.round(4))
print("MAPE: ", mape.round(4))
print("R² Score: ", r2)

Optimized P_function:  [0.04688675 2.24758405 0.43979082 0.71099497 0.12188657 1.72973757]
MSE:  0.6014
MAE:  0.5837
MAPE:  0.0661
R² Score:  0.8223


In [None]:
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Kernel, StationaryKernelMixin, NormalizedKernelMixin
import geopandas as gpd
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from scipy.optimize import fmin_l_bfgs_b
import warnings
warnings.filterwarnings("ignore")

class SpatialSimilarityKernel(StationaryKernelMixin, NormalizedKernelMixin, Kernel):
    """
    Spatial Similarity kernel class that extends scikit-learn's Gaussian Process kernels.
    Inherits from StationaryKernelMixin (for stationarity) and NormalizedKernelMixin.
    """

    def __init__(self, P_function, sigma_values, epsilon=0.0001):
        """
        Initializes the spatial similarity kernel.
        :param P_function: An array of weights that will be used to combine the exponential terms (feature importance).
        :param sigma_values: An array of variance estimates for each feature, used in the exponential decay.
        :param epsilon: A small value added to diagonal elements for numerical stability.
        """
        self.P_function = P_function
        self.sigma_values = sigma_values
        self.epsilon = epsilon

    def _calculate_similarity(self, u, v):
        """
        Computes element-wise exponential similarity between two matrices (u, v).
        Exponential decay is applied based on the squared differences scaled by sigma_values.

        :param u: Feature matrix X (shape [n_samples, n_features])
        :param v: Feature matrix Y (shape [m_samples, n_features])
        :return: An array E_i containing spatial similarity for each feature.
        """
        # Add extra dimensions to broadcast easily
        u = u[:, np.newaxis, :]
        v = v[np.newaxis, :, :]

        # Compute squared differences between each pair of points
        sq_diff = (u - v) ** 2

        # Calculate exponential decay for each feature using sigma_values
        E_i = np.exp(-((sq_diff) / (2 * (self.sigma_values))))
        return E_i

    def __call__(self, X, Y=None, eval_gradient=False):
        """
        Main function used by GaussianProcessRegressor to compute the kernel matrix.

        :param X: Feature matrix of shape [n_samples, n_features]
        :param Y: (Optional) Another feature matrix. If None, use X for both.
        :param eval_gradient: (Unused) Determines if gradient is computed, not used here.
        :return: The kernel similarity matrix of shape [n_samples, m_samples].
        """
        if Y is None:
            Y = X

        # Get exponential similarities feature-wise
        E_i = self._calculate_similarity(X, Y)

        # Weighted average across features using P_function as weights
        S_uv = np.average(E_i, axis=2, weights=self.P_function)
        return S_uv

    def diag(self, X):
        """
        Returns the diagonal of the kernel matrix for a given X.
        Includes a small epsilon for numerical stability.
        """
        return np.diag(np.ones(X.shape[0])) + self.epsilon

    def is_stationary(self):
        """
        Indicates that the kernel is stationary (depends on distances, not absolute positions).
        """
        return True


def loss_function(P_function, X_train, y_train, sigma_values, alpha=1e-1):
    """
    Defines a loss function for optimization based on Gaussian Process Regressor predictions.

    :param P_function: Current set of feature weights to be evaluated.
    :param X_train: Training features.
    :param y_train: Training target values.
    :param sigma_values: Array of variances used in the exponential similarity calculation.
    :param alpha: Regularization parameter for the Gaussian Process Regressor.
    :return: RMSE (root mean squared error) for the given set of feature weights.
    """
    # Instantiate the spatial similarity kernel with the current P_function
    kernel = SpatialSimilarityKernel(P_function=P_function, sigma_values=sigma_values)

    # Create a GPR model with the spatial similarity kernel
    gpr = GaussianProcessRegressor(kernel=kernel, alpha=alpha, optimizer=None)

    # Fit the model on the training set
    gpr.fit(X_train, y_train)

    # Predict on the same training data
    y_pred = gpr.predict(X_train, return_std=False)

    # Compute the RMSE for this set of predictions
    rmse = mean_squared_error(y_train, y_pred, squared=False)
    return rmse

def objective(P_function):
    """
    Wrapper function for the optimizer (fmin_l_bfgs_b).
    Returns the RMSE for the current P_function, which the optimizer tries to minimize.
    """
    return loss_function(P_function, X_train, y_train, sigma_values, alpha=1e-1)


# Load data
Berxit = gpd.read_file("/data/leave_data.geojson")
y = Berxit['leave'].values
X_vars = ['to15','over65','lhosp','manu','badhealth','bornuk']
X = Berxit[X_vars]

# Standardize feature matrix
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Standardize the target as well
y_scaler = StandardScaler()
y_scaled = y_scaler.fit_transform(y.reshape(-1, 1))

# Split data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.9, random_state=42)

# Compute the variance of each feature in the training set
sigma_values = np.array(np.var(X_train, axis=0))

# Initialize P_function (weights) for all features
initial_P_function = np.ones(len(X_vars)) * 1

# Set bounds for each weight (between 0 and 5)
bounds = [(0, 5)] * len(X_vars)

# Use L-BFGS-B to optimize the objective function
optimized_P_function, min_loss, info = fmin_l_bfgs_b(
    objective,
    initial_P_function,
    bounds=bounds,
    approx_grad=True
)

print("Optimized P_function: ", optimized_P_function)

# Build a GaussianProcessRegressor using the optimized weights
kernel = SpatialSimilarityKernel(P_function=optimized_P_function, sigma_values=sigma_values)
gpr = GaussianProcessRegressor(kernel=kernel, alpha=1e-1, optimizer=None)

# Fit the final model on the training data
gpr.fit(X_train, y_train)

# Predict on the test set
y_pred = gpr.predict(X_test, return_std=False)

# Compute metrics (MSE, MAE, MAPE, R²) on the test set
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
mape = mean_absolute_percentage_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
r2 = round(r2, 4)

print("MSE: ", mse.round(4))
print("MAE: ", mae.round(4))
print("MAPE: ", mape.round(4))
print("R² Score: ", r2)


Optimized P_function:  [1.53809149 1.69200801 0.58865995 0.23002332 0.78079264 0.41889233]
MSE:  0.3735
MAE:  0.4637
MAPE:  1.7959
R² Score:  0.6193
