In [2]:
# Import necessary libraries
import numpy as np
import pandas as pd
from sklearn.linear_model import Ridge
from scipy.spatial.distance import cdist
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.rcParams['figure.dpi'] = 120

# Kernel functions encapsulated in a class
class Kernel:
    @staticmethod
    def Gaussian(x):
        return np.where(np.abs(x) > 4, 0, 1 / np.sqrt(2 * np.pi) * np.exp(-0.5 * x**2))

    @staticmethod
    def Tricubic(x):
        return np.where(np.abs(x) > 1, 0, (1 - np.abs(x)**3)**3)

    @staticmethod
    def Epanechnikov(x):
        return np.where(np.abs(x) > 1, 0, 3 / 4 * (1 - np.abs(x)**2))

    @staticmethod
    def Quartic(x):
        return np.where(np.abs(x) > 1, 0, 15 / 16 * (1 - np.abs(x)**2)**2)

# Utility function to calculate pairwise distances
def calculate_distances(X):
    return cdist(X, X, metric='euclidean')

# Function to calculate kernel-based weights
def kernel_function(distances, kernel, tau):
    return kernel(distances / (2 * tau))

# Ridge Regression Model with Kernel Weighting
class KernelWeightedRidgeModel:
    def __init__(self, kernel=Kernel.Gaussian, tau=0.05, alpha=0.001, max_iter=5000):
        self.kernel = kernel
        self.tau = tau
        self.alpha = alpha
        self.max_iter = max_iter
        self.model = Ridge(alpha=self.alpha, max_iter=self.max_iter)
    
    def fit(self, X, y, distances):
        # Calculate weights for each data point
        self.weights = kernel_function(distances, self.kernel, self.tau)
        
        # Train the model with weighted data
        W = np.diag(self.weights[:, 0])  # Use weights for the first data point (example)
        self.model.fit(W @ X, W @ y)
    
    def predict(self, X_new):
        return self.model.predict(X_new.reshape(1, -1))

    def evaluate(self, X_test, y_test):
        y_pred = self.model.predict(X_test)
        return mean_squared_error(y_test, y_pred)

data = pd.read_csv('https://github.com/dvasiliu/AAML/blob/main/Data%20Sets/mtcars.csv?raw=True')
X = data.drop(columns=['model', 'mpg']).values
y = data['mpg'].values

# Scale the features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Main workflow to train and evaluate the model
def main():
    # Calculate distances
    distances = calculate_distances(X_scaled)
    
    # Initialize model
    model = KernelWeightedRidgeModel(kernel=Kernel.Gaussian, tau=0.05, alpha=0.001, max_iter=5000)
    
    # Fit the model
    model.fit(X_scaled, y, distances)
    
    # Make a prediction for the first test point
    y_pred = model.predict(X_scaled[0])
    print(f"Predicted value: {y_pred[0]}, Actual value: {y[0]}")
    
    # Evaluate the model
    mse = model.evaluate(X_scaled, y)
    print(f"Mean Squared Error: {mse}")

# Run the main function
if __name__ == "__main__":
    main()

Predicted value: 20.97295036473242, Actual value: 21.0
Mean Squared Error: 554.421766688445


In [30]:
# Import necessary libraries
import numpy as np
import pandas as pd
from sklearn.linear_model import Ridge
from scipy.spatial.distance import cdist
from usearch.index import search, MetricKind, Matches, BatchMatches
from sklearn import linear_model
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from scipy.spatial import Delaunay
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import KFold, GridSearchCV
from sklearn.metrics import mean_squared_error
from scipy import linalg
from sklearn.base import BaseEstimator, RegressorMixin
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import Ridge
from sklearn.model_selection import train_test_split, KFold, GridSearchCV
from sklearn.preprocessing import MinMaxScaler
from scipy.spatial.distance import cdist
from scipy.interpolate import interp1d
from sklearn.utils.validation import check_is_fitted
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.exceptions import NotFittedError
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted

In [3]:
# We will be using mtcars dataset
data = pd.read_csv('https://github.com/dvasiliu/AAML/blob/main/Data%20Sets/mtcars.csv?raw=True')

In [4]:
# We will be having multiple features - cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb
X = data.drop(columns=['model', 'mpg']).values
y = data['mpg'].values

In [6]:
# if we compute distances we need to put the attributes on the same scale
scaler = StandardScaler()
xscaled = scaler.fit_transform(X)

In [8]:
many_in_many: BatchMatches = search(xscaled, xscaled,len(xscaled) , MetricKind.L2sq, exact=True)
one_in_many: Matches = search(xscaled, xscaled[0], len(xscaled), MetricKind.L2sq, exact=True)

In [9]:
dist = one_in_many.to_list()
distances = cdist(xscaled,xscaled,metric='Euclidean')

In [10]:
# Kernels

# Gaussian Kernel
def Gaussian(x):
  return np.where(np.abs(x)>4,0,1/(np.sqrt(2*np.pi))*np.exp(-1/2*x**2))
  
def Tricubic(x):
  return np.where(np.abs(x)>1,0,(1-np.abs(x)**3)**3)

def kernel_function(d_ij,kern, tau):
    return kern(d_ij/(2*tau))

kern = Gaussian
weights = kernel_function(distances,kern,tau=0.05)

In [11]:
model = Ridge(alpha=0.001,max_iter=5000)
model.fit(np.diag(weights[:,0])@xscaled, np.diag(weights[:,0])@y)

Ridge(alpha=0.001, max_iter=5000)

In [12]:
# Result - array([17.06248486])
model.predict(xscaled[0].reshape(1,-1))

array([20.97295036])

In [13]:
# Result - 18,0
y[0]

21.0

In [14]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [15]:
# Personalize code - Add Cauchy Kernel
def Cauchy(x):
    return 1 / (1 + x**2)

def Tricubic(x):
    return np.where(np.abs(x) > 1, 0, (1 - np.abs(x)**3)**3)

In [16]:
def kernel_function(distances, kernel, tau):
    return kernel(distances / (2 * tau))

def calculate_distances(X):
    return cdist(X, X, metric='Euclidean')

In [17]:
class Lowess(BaseEstimator, RegressorMixin):
    def __init__(self, kernel=Cauchy, tau=0.1, regularization=0.001):
        self.kernel = kernel
        self.tau = tau
        self.regularization = regularization
        self.fitted_ = False  

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y
        self.distances = calculate_distances(self.X_train)
        self.weights = kernel_function(self.distances, self.kernel, self.tau)
        self.fitted_ = True  

    def predict(self, X_new):
        if not self.fitted_:
            raise NotFittedError(f"This Lowess instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator.")

        predictions = []

        for i in range(len(X_new)):
            w = self.weights[:, i]
            model = Ridge(alpha=self.regularization)
            model.fit(np.diag(w) @ self.X_train, np.diag(w) @ self.y_train)
            prediction = model.predict(X_new[i].reshape(1, -1))
            predictions.append(prediction)

        return np.array(predictions)

    def get_params(self, deep=True):
        return {"kernel": self.kernel, "tau": self.tau, "regularization": self.regularization}

    def set_params(self, **params):
        for key, value in params.items():
            setattr(self, key, value)
        return self


In [18]:
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

model = Lowess(kernel=Cauchy, tau=0.15)
model.fit(X_train, y_train)

y_pred = model.predict(X_test)

print(y_pred[:5])
print(y_test[:5])

[[ -1.13821791]
 [  3.33646953]
 [-13.3106584 ]
 [-13.69685591]
 [ -7.97232186]]
[19.7 10.4 19.2 32.4 22.8]


In [19]:
from sklearn.metrics import mean_squared_error

mse = mean_squared_error(y_test, y_pred)
print(f"Mean Squared Error: {mse}")

Mean Squared Error: 708.2549571475354


In [20]:
param_grid = {'tau': np.linspace(0.01, 0.5, 10)}
grid_search = GridSearchCV(Lowess(), param_grid, scoring='neg_mean_squared_error', cv=5)

grid_search.fit(X_train, y_train)
best_tau = grid_search.best_params_['tau']

print(f"Best tau: {best_tau}")

best_model = Lowess(kernel=Cauchy, tau=best_tau)
best_model.fit(X_train, y_train)

best_y_pred = best_model.predict(X_test)
best_mse = mean_squared_error(y_test, best_y_pred)
print(f"Optimized Mean Squared Error: {best_mse}")

Best tau: 0.5
Optimized Mean Squared Error: 571.6161212691303


In [31]:
class ModelPipeline:
    def __init__(self, model_class, kernel, tau=0.15, param_grid=None):
        self.model_class = model_class
        self.kernel = kernel
        self.tau = tau
        self.param_grid = param_grid
        self.model = self.model_class(kernel=self.kernel, tau=self.tau)
        
        self.grid_search = None
        self.best_tau = None
        self.best_model = None

        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
    
    def split_data(self, X, y, test_size=0.2, random_state=42):
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)

    def train_model(self):
        self.model.fit(self.X_train, self.y_train)

    def predict(self):
        return self.model.predict(self.X_test)

    def evaluate(self, y_pred):
        mse = mean_squared_error(self.y_test, y_pred)
        print(f"Mean Squared Error: {mse}")
        return mse

    def tune_parameters(self):
        if self.param_grid:
            self.grid_search = GridSearchCV(self.model_class(kernel=self.kernel), self.param_grid, scoring='neg_mean_squared_error', cv=5)
            self.grid_search.fit(self.X_train, self.y_train)
            self.best_tau = self.grid_search.best_params_['tau']
            print(f"Best tau: {self.best_tau}")
            self.best_model = self.model_class(kernel=self.kernel, tau=self.best_tau)
            self.best_model.fit(self.X_train, self.y_train)
        else:
            print("No param_grid provided")
    
    def predict_best_model(self):
        if self.best_model:
            return self.best_model.predict(self.X_test)
        else:
            raise ValueError("Best model not available")
    
    def evaluate_best_model(self, y_pred):
        best_mse = mean_squared_error(self.y_test, y_pred)
        print(f"Optimized Mean Squared Error: {best_mse}")
        return best_mse
    
    def run_pipeline(self, X, y, tune=False):
        self.split_data(X, y)
        self.train_model()
        y_pred = self.predict()
        self.evaluate(y_pred)
        
        if tune and self.param_grid:
            self.tune_parameters()
            best_y_pred = self.predict_best_model()
            self.evaluate_best_model(best_y_pred)
        elif tune:
            print("No tuning parameters provided")

In [32]:
param_grid = {'tau': np.linspace(0.01, 0.5, 10)}
pipeline = ModelPipeline(Lowess, Cauchy, tau=0.15, param_grid=param_grid)
pipeline.run_pipeline(X_scaled, y, tune=True)

Mean Squared Error: 708.2549571475354
Best tau: 0.5
Optimized Mean Squared Error: 571.6161212691303


In [34]:
param_grid = {'tau': np.linspace(0.01, 0.5, 10)}
pipeline = ModelPipeline(Lowess, Cauchy, tau=0.5, param_grid=param_grid)
pipeline.run_pipeline(X_scaled, y, tune=True)

Mean Squared Error: 571.6161212691303
Best tau: 0.5
Optimized Mean Squared Error: 571.6161212691303


In [33]:
param_grid = {'tau': np.linspace(0.01, 0.5, 10)}
pipeline = ModelPipeline(Lowess, Gaussian, tau=0.15, param_grid=param_grid)
pipeline.run_pipeline(X_scaled, y, tune=True)

Mean Squared Error: 814.9926518560685
Best tau: 0.5
Optimized Mean Squared Error: 783.7627322556642


In [37]:
param_grid = {'tau': np.linspace(0.01, 0.5, 10)}
pipeline = ModelPipeline(Lowess, Tricubic, tau=0.5, param_grid=param_grid)
pipeline.run_pipeline(X_scaled, y, tune=True)

Mean Squared Error: 916.2222436264611
Best tau: 0.33666666666666667
Optimized Mean Squared Error: 787.9824387794472
