In [2]:
import numpy as np
import pandas as pd
from scipy.spatial.distance import cdist
from sklearn.preprocessing import MinMaxScaler
from sklearn.neighbors import kneighbors_graph
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error
from scipy import sparse
from warnings import filterwarnings

filterwarnings('ignore')

# -----------------------
# Define RBF kernel
# -----------------------
def rbf(X1, X2, **kwargs):
    return np.exp(-cdist(X1, X2) ** 2 * kwargs['gamma'])

# -----------------------
# Define LapRLS Model
# -----------------------
class LapRLS:
    def __init__(self, opt):
        self.opt = opt

    def fit(self, X, Y):
        self.X_labeled = X
        self.Y_labeled = Y.flatten()

        if self.opt['neighbor_mode'] == 'distance':
            W = kneighbors_graph(self.X_labeled, self.opt['n_neighbor'], mode='distance', include_self=False)
            W = W.maximum(W.T)
            W = sparse.csr_matrix(
                (np.exp(-W.data ** 2 / (4 * self.opt['t'])), W.indices, W.indptr),
                shape=(self.X_labeled.shape[0], self.X_labeled.shape[0])
            )
        else:
            raise Exception("Only 'distance' mode supported in this version")

        L = sparse.diags(np.array(W.sum(0))[0]).tocsr() - W
        K = self.opt['kernel_function'](self.X_labeled, self.X_labeled, **self.opt['kernel_parameters'])
        l = X.shape[0]
        I_l = np.identity(l)

        self.alpha = np.linalg.inv(
            K + self.opt['gamma_A'] * l * I_l + (self.opt['gamma_I'] * l / (l ** 2)) * L.dot(K)
        ).dot(self.Y_labeled)

    def decision_function(self, X):
        new_K = self.opt['kernel_function'](self.X_labeled, X, **self.opt['kernel_parameters'])
        return np.squeeze(self.alpha.dot(new_K))

# -----------------------
# Hyperparameters
# -----------------------
opt = {
    'neighbor_mode': 'distance',
    'n_neighbor': 3,
    't': 1,
    'gamma_A': 0.0001,
    'gamma_I': 1,
    'kernel_function': rbf,
    'kernel_parameters': {
        'gamma': 1
    }
}

# -----------------------
# Load and preprocess data
# -----------------------
DATA_PATH = '100_cylindrical_anomalies.csv'
df = pd.read_csv(DATA_PATH)
X = np.abs(df.iloc[:, :22].values)
y = df.iloc[:, -1].values
X = MinMaxScaler().fit_transform(X)

# -----------------------
# Split: train 80%, val 10%, test 10%
# -----------------------
seed = 11
X_train, X_temp, y_train, y_temp = train_test_split(X, y, train_size=0.8, random_state=seed)
_, X_test, _, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=seed)

# -----------------------
# Train and evaluate only on test set
# -----------------------
model = LapRLS(opt)
model.fit(X_train, y_train)
y_test_pred = model.decision_function(X_test)

print("R² score on test data:", r2_score(y_test, y_test_pred))
print("MSE on test data:", mean_squared_error(y_test, y_test_pred))


R² score on test data: 0.8914756964942363
MSE on test data: 0.23656798792479253
