In [23]:
import numpy as np
from random import random, seed
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
from matplotlib.ticker import LinearLocator, FormatStrFormatter
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


def FrankeFunction(x,y):
    """Evaluate the Franke Function: a two-variables function to create the dataset of vanilla problems"""
    term1 = 0.75*np.exp(-(0.25*(9*x-2)**2) - 0.25*((9*y-2)**2))
    term2 = 0.75*np.exp(-((9*x+1)**2)/49.0 - 0.1*(9*y+1))
    term3 = 0.5*np.exp(-(9*x-7)**2/4.0 - 0.25*((9*y-3)**2))
    term4 = -0.2*np.exp(-(9*x-4)**2 - (9*y-7)**2)
    return term1 + term2 + term3 + term4
 
def Plot_FrankeFunction(x,y,z, title="Dataset"):
    """3D plot, suitable for plotting the Franke Function"""
    
    fig = plt.figure(figsize=(8, 7))
    ax = fig.add_subplot(projection='3d')
    #ax = fig.gca(projection="3d")

    # Plot the surface.
    surf = ax.plot_surface(x, y, z, cmap=cm.coolwarm, linewidth=0, antialiased=False)

    # Customize the z axis.
    ax.set_zlim(-0.10, 1.40)
    ax.set_xlabel(r"$x$")
    ax.set_ylabel(r"$y$")
    ax.set_zlabel(r"$z$")
    ax.zaxis.set_major_locator(LinearLocator(10))
    ax.zaxis.set_major_formatter(FormatStrFormatter('%.02f'))

    # Add a color bar which maps values to colors.
    fig.colorbar(surf, shrink=0.5, aspect=5)
    plt.title(title)
    plt.show()
    
    
def create_xyz_dataset(n, mu, sigma):
    """ Create xyz dataset from the FrankeFunction with a added normal distributed noise.
    x,y variables are taken evenly distributed in the interval [0,1]
    
    Args:
    n (int): squared root of total number of datapoints
    mu (float): mean value of the normal distribution of the noise
    sigma (float): standard deviation of the normal distribution of the noise

    Returns x,y,z values, mashed on a grid.
    """
    x = np.linspace(0,1,n)
    y = np.linspace(0,1,n)

    x,y = np.meshgrid(x,y)
    z = FrankeFunction(x,y) + mu + sigma * np.random.randn(n,n)
    
    return x,y,z

def create_X(x, y, n):
    """Design matrix for two indipendent variables x,y"""
    if len(x.shape) > 1:
        x = np.ravel(x)
        y = np.ravel(y)

    N = len(x)
    l = int((n+1)*(n+2)/2)		# Number of elements in beta, number of feutures (degree of polynomial)
    X = np.ones((N,l))

    for i in range(1,n+1):
        q = int((i)*(i+1)/2)
        for k in range(i+1):
            X[:,q+k] = (x**(i-k))*(y**k)

    return X

In [111]:
# The MIT License (MIT)
#
# Copyright © 2021 Adele Zaini
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the “Software”), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of
# the Software. THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
# LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.

import numpy as np
from random import random, seed
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


# Error analysis
def R2(z_data, z_model):
    """Compute the R2 score of the two given values"""
    return 1 - np.sum((z_data - z_model) ** 2) / np.sum((z_data - np.mean(z_data)) ** 2)
def MSE(z_data,z_model):
    """Compute the Mean Square Error of the two given values"""
    n = np.size(z_model)
    return np.sum((z_data-z_model)**2)/n

def SVD(A):
    """ Application of SVD theorem.
    Useful for debugging. """
    U, S, VT = np.linalg.svd(A,full_matrices=True)
    D = np.zeros((len(U),len(VT)))
    print("shape D= ", np.shape(D))
    print("Shape S= ",np.shape(S))
    print("lenVT =",len(VT))
    print("lenU =",len(U))
    D = np.eye(len(U),len(VT))*S
    """
    for i in range(0,VT.shape[0]): #was len(VT)
        D[i,i]=S[i]
        print("i=",i)"""
    return U @ D @ VT
    
def SVDinv(A):
    """Evaluate the inverse of a matrix using the SVD theorem"""
    U, s, VT = np.linalg.svd(A)
    # reciprocals of singular values of s
    d = 1.0 / s
    # create m x n D matrix
    D = np.zeros(A.shape)
    # populate D with n x n diagonal matrix
    D[:A.shape[1], :A.shape[1]] = np.diag(d)
    UT = np.transpose(U)
    V = np.transpose(VT)
    return np.matmul(V,np.matmul(D.T,UT))
    
    
class LinearRegression:
    """A class that gathers OLS, Ridge, Lasso methods

    The 'fit' method needs to be implemented."""

    def __init__(self, X, z):
        self.X = X
        self.z = z
        
    def split(self,test_size=0.2):
        self.X_train, self.X_test, self.z_train, self.z_test = train_test_split(self.X, self.z, test_size=test_size)
        return self.X_train, self.X_test, self.z_train, self.z_test
            
    def rescale(self, with_std=False): #Improvement: pass the Scaler
        """ z needs to be raveled """
        scaler_X = StandardScaler(with_std=with_std)
        scaler_X.fit(self.X_train)
        self.X_train = scaler_X.transform(self.X_train)
        self.X_test = scaler_X.transform(self.X_test)

        scaler_z = StandardScaler(with_std=with_std)
        self.z_train = np.squeeze(scaler_z.fit_transform(self.z_train.reshape(-1, 1))) #scaler_z.fit_transform(z_train) #
        self.z_test = np.squeeze(scaler_z.transform(self.z_test.reshape(-1, 1))) #scaler_z.transform(z_test) #
        
        return self.X_train, self.X_test, self.z_train, self.z_test
    
    def fit(self):
        """Fit the model and return beta-values"""
        raise NotImplementedError("Method LinearRegression.fit is abstract and cannot be called")
        
    def fitSVD(self):
        """Fit the model and return beta-values, using SVD theorem to evalute the inverse of the matrix"""
        raise NotImplementedError("Method LinearRegression.fitSVD is abstract and cannot be called")
    
    def predict_train(self):
        return self.X_train @ self.beta
        
    def predict_test(self):
        return self.X_test @ self.beta
    
    def MSE_train(self, prec=4):
        return np.round(MSE(self.z_train,self.predict_train()),prec)
        
    def MSE_test(self, prec=4):
        return np.round(MSE(self.z_test,self.predict_test()),prec)
        
    def R2_train(self, prec=4):
        return np.round(R2(self.z_train,self.predict_train()),prec)
        
    def R2_test(self, prec=4):
        return np.round(R2(self.z_test,self.predict_test()),prec)
        
    def Confidence_Interval(self, sigma=1):
        #Calculates variance of beta, extracting just the diagonal elements of the matrix
        #var(B_j)=sigma^2*(X^T*X)^{-1}_{jj}
        beta_variance = np.diag(sigma**2 * np.linalg.pinv(self.X.T @ self.X))
        ci1 = self.beta - 1.96 * np.sqrt(beta_variance)/(self.X.shape[0])
        ci2 = self.beta + 1.96 * np.sqrt(beta_variance)/(self.X.shape[0])
        print('Confidence interval of β-estimator at 95 %:')
        ci_df = {r'$β_{-}$': ci1,
                 r'$β_{ols}$': self.beta,
                 r'$β_{+}$': ci2}
        ci_df = pd.DataFrame(ci_df)
        display(np.round(ci_df,3))
        return ci1, ci2
        
"""
    def predict(self, X):
        Fit the model and return the prediction
        
        Args:
        X (array): design matrix

        Returns X*beta
        
        raise NotImplementedError("Method LinearRegression.predict is abstract and cannot be called")
"""

class OLSRegression(LinearRegression):
    
    def __init__(self, X, z):
        super().__init__(X, z)
        
    def split(self, test_size=0.2):
        return super().split(test_size=test_size)
        
    def rescale(self, with_std=False):
        return super().rescale(with_std=with_std)
        
    def fit(self):
        self.beta = np.linalg.pinv(self.X_train.T @ self.X_train) @ self.X_train.T @ self.z_train
        return self.beta
        
    def fitSVD(self):
        self.beta = SVDinv(self.X_train.T @ self.X_train) @ self.X_train.T @ self.z_train
        return self.beta
          
    def predict_train(self):
        return super().predict_train()
        
    def predict_test(self):
        return super().predict_test()
        
    def MSE_train(self, prec=4):
        return super().MSE_train(prec=prec)
        
    def MSE_test(self, prec=4):
        return super().MSE_test(prec=prec)
        
    def R2_train(self, prec=4):
        return super().R2_train(prec=prec)
        
    def R2_test(self, prec=4):
        return super().R2_test(prec=prec)
        
    def Confidence_Interval(self, sigma=1):
        return super().Confidence_Interval(sigma=sigma)
        
class RidgeRegression(LinearRegression):
    
    def __init__(self, X, z, lmb = 1e-12):
        super().__init__(X, z)
        self.lmd = lmd
        
    def split(self, test_size=0.2):
        return super().split(test_size=test_size)
        
    def rescale(self, with_std=False):
        return super().rescale(with_std=with_std)
    
    def fit(self):
        self.beta = np.linalg.pinv(self.X_train.T @ self.X_train + self.lmd * np.eye(len(self.X_train.T))) @ self.X_train.T @ self.z_train
        return self.beta
        
    def fitSVD(self):
        self.beta = SVDinv(self.X_train.T @ self.X_train + self.lmd * np.eye(len(self.X_train.T))) @ self.X_train.T @ self.z_train
        return self.beta
          
    def predict_train():
        return super().predict_train()
        
    def predict_test(self):
        return super().predict_test()

    def MSE_train(self, prec=4):
        return super().MSE_train(prec=prec)
        
    def MSE_test(self, prec=4):
        return super().MSE_test(prec=prec)
        
    def R2_train(self, prec=4):
        return super().R2_train(prec=prec)
        
    def R2_test(self, prec=4):
        return super().R2_test(prec=prec)
        
    def Confidence_Interval(self, sigma=1):
        return super().Confidence_Interval(sigma=sigma)

class LassoRegression(LinearRegression):

    def __init__(self, X, z, lmb = 1e-12):
        super().__init__(X, z)
        self.lmd = lmd
        
    def split(self, test_size=0.2):
        return super().split(test_size=test_size)
        
    def rescale(self, with_std=False):
        return super().rescale(with_std=with_std)
    
    def fit(self):
        RegLasso = linear_model.Lasso(self.lmd)
        self.beta = RegLasso.fit(self.X_train,self.z_train)
        return self.beta
        
    def fit(self):
        return fit()
          
    def predict_train(self):
        return super().predict_train()
        
    def predict_test(self):
        return super().predict_test()
        
    def MSE_train(self, prec=4):
        return super().MSE_train(prec=prec)
        
    def MSE_test(self, prec=4):
        return super().MSE_test(prec=prec)
        
    def R2_train(self, prec=4):
        return super().R2_train(prec=prec)
        
    def R2_test(self, prec=4):
        return super().R2_test(prec=prec)
        
    def Confidence_Interval(self, sigma=1):
        return super().Confidence_Interval(sigma=sigma)
    
"""
def ols_reg(X_train, X_test, z_train, z_test):

	# Calculating Beta Ordinary Least Square Equation with matrix pseudoinverse
    # Altervatively to Numpy pseudoinverse it is possible to use the SVD theorem to evalute the inverse of a matrix (even in case it is singular). Just replace 'np.linalg.pinv' with 'SVDinv'.
	ols_beta = np.linalg.pinv(X_train.T @ X_train) @ X_train.T @ z_train

	z_tilde = X_train @ ols_beta # z_prediction of the train data
	z_predict = X_test @ ols_beta # z_prediction of the test data
  
	return ols_beta, z_tilde, z_predict
 
 def ridge_reg(X_train, X_test, z_train, z_test, lmd = 10**(-12)):
 
    ridge_beta = np.linalg.pinv(X_train.T @ X_train + lmd*np.eye(len(X_train.T))) @ X_train.T @ z_train #psudoinverse
    z_model = X_train @ ridge_beta #calculates model
    z_predict = X_test @ ridge_beta

    #finds the lambda that gave the best MSE
    #best_lamda = lambdas[np.where(MSE_values == np.min(MSE_values))[0]]

    return ridge_beta, z_model, z_predict
    
def lasso_reg(X_train, X_test, z_train, z_test, lmd = 10**(-12)):

    RegLasso = linear_model.Lasso(lmd)
    _ = RegLasso.fit(X_train,z_train)
    z_model = RegLasso.predict(X_train)
    z_predict = RegLasso.predict(X_test)

    return z_model, z_predict
"""

# Return the rolling mean of a vector and two values at one sigma from the rolling average
def Rolling_Mean(vector, windows=3):
    vector_df = pd.DataFrame({'vector': vector})
    # computing the rolling average
    rolling_mean = vector_df.vector.rolling(windows).mean().to_numpy()
    # computing the values at two sigmas from the rolling average
    rolling_std = vector_df.vector.rolling(windows).std().to_numpy()
    value_up = rolling_mean + rolling_std
    value_down = rolling_mean - rolling_std
    
    return rolling_mean, value_down, value_up
    




In [99]:
def scale_Xz(X_train, X_test, z_train, z_test, with_std=False):
    scaler_X = StandardScaler(with_std=with_std) #with_std=False
    scaler_X.fit(X_train)
    X_train = scaler_X.transform(X_train)
    X_test = scaler_X.transform(X_test)

    scaler_z = StandardScaler(with_std=with_std) #with_std=False
    z_train = np.squeeze(scaler_z.fit_transform(z_train.reshape(-1, 1))) #scaler_z.fit_transform(z_train) #
    z_test = np.squeeze(scaler_z.transform(z_test.reshape(-1, 1))) #scaler_z.transform(z_test) #  
    return X_train, X_test, z_train, z_test

# Splitting and rescaling data (rescaling is optional)
# Default values: 20% of test data and the scaler is StandardScaler without std.dev.
def Split_and_Scale(X,z,test_size=0.2, scale=True, with_std=False):

    #Splitting training and test data
    X_train, X_test, z_train, z_test = train_test_split(X, z, test_size=test_size)

    # Rescaling X and z (optional)
    if scale:
        X_train, X_test, z_train, z_test = scale_Xz(X_train, X_test, z_train, z_test, with_std=with_std)
      
    return X_train, X_test, z_train, z_test

# OLS equation
def OLS_solver(X_train, X_test, z_train, z_test):

    # Calculating Beta Ordinary Least Square Equation with matrix pseudoinverse
    # Altervatively to Numpy pseudoinverse it is possible to use the SVD theorem to evalute the inverse of a matrix (even in case it is singular). Just replace 'np.linalg.pinv' with 'SVDinv'.
    ols_beta = np.linalg.pinv(X_train.T @ X_train) @ X_train.T @ z_train

    z_tilde = X_train @ ols_beta # z_prediction of the train data
    z_predict = X_test @ ols_beta # z_prediction of the test data
  
    return ols_beta, z_tilde, z_predict
    
def ridge_reg(X_train, X_test, z_train, z_test, lmd = 10**(-12)):
 
    ridge_beta = np.linalg.pinv(X_train.T @ X_train + lmd*np.eye(len(X_train.T))) @ X_train.T @ z_train #psudoinverse
    z_model = X_train @ ridge_beta #calculates model
    z_predict = X_test @ ridge_beta

    #finds the lambda that gave the best MSE
    #best_lamda = lambdas[np.where(MSE_values == np.min(MSE_values))[0]]

    return ridge_beta, z_model, z_predict
    
def lasso_reg(X_train, X_test, z_train, z_test, lmd = 10**(-12)):

    RegLasso = linear_model.Lasso(lmd)
    _ = RegLasso.fit(X_train,z_train)
    z_model = RegLasso.predict(X_train)
    z_predict = RegLasso.predict(X_test)

    return z_model, z_predict

In [103]:
np.random.seed(1234)

# Degree of the polynomial
degree=5
# Datapoints (squared root of datapoints -> meshgrid)
n = 25
# Paramaters of noise distribution
mu = 0; sigma = 0.1
# Parameter of splitting data
test_size = 0.2

x,y,z=create_xyz_dataset(n,mu,sigma)
#Plot_FrankeFunction(x,y,z)
z=z.ravel()
X=create_X(x,y,degree)

X_train, X_test, z_train, z_test = Split_and_Scale(X,np.ravel(z)) #StardardScaler, test_size=0.2, scale=true
print(X_train[:1])
print(X_test[:1])
print(z_train[:2])
print(z_test[:2])

ols_beta, z_tilde,z_predict = OLS_solver(X_train, X_test, z_train, z_test)

print(ols_beta[:10])
print(z_tilde[:10])
print(z_predict[:10])

prec=4
print("––––––––––––––––––––––––––––––––––––––––––––")
print("Train MSE:", np.round(MSE(z_train,z_tilde),prec))
print("Test MSE:", np.round(MSE(z_test,z_predict),prec))
print("––––––––––––––––––––––––––––––––––––––––––––")
print("Train R2:", np.round(R2(z_train,z_tilde),prec))
print("Test R2:", np.round(R2(z_test,z_predict),prec))
print("––––––––––––––––––––––––––––––––––––––––––––")

[[ 0.          0.08033333  0.21241667 -0.004625    0.16577083  0.16637153
  -0.06782841  0.07257494  0.12575231  0.09994083 -0.10346354  0.01118949
   0.05754982  0.08036039  0.04393807 -0.12024953 -0.02415488  0.01295761
   0.03514108  0.04366974  0.00213328]]
[[ 0.         -0.08633333  0.17075    -0.17129167  0.03035417  0.10907986
  -0.19398582 -0.05271441  0.0182581   0.04084071 -0.18911169 -0.08118609
  -0.03601934 -0.0034974  -0.01027019 -0.17523442 -0.08607818 -0.05448448
  -0.03435209 -0.02087356 -0.04449469]]
[-0.30980687  0.03407068]
[-0.32305563  0.00526445]
[  0.           6.0240481    2.77833014 -27.41271571 -11.59605851
  -4.54493467  38.33903483  34.4407874   17.5722142  -16.00905112]
[-0.29419307 -0.0832303  -0.25361917  0.30578607 -0.08940484  0.35642934
 -0.19681158  0.5963452  -0.15675562  0.23819927]
[-0.2410186   0.03289853 -0.32184282 -0.29944618  0.41458793  0.23171393
  0.10950287 -0.07146335  0.67845615 -0.33666404]
––––––––––––––––––––––––––––––––––––––––––––


In [110]:
np.random.seed(1234)

model=OLSRegression(X,z)
X_train, X_test, z_train, z_test = model.split()
X_train, X_test, z_train, z_test = model.rescale()
#X_train, X_test, z_train, z_test = model.X_train, model.X_test, model.z_train, model.z_test #StardardScaler, test_size=0.2, scale=true
"""print(X_train[:1])
print(X_test[:1])
print(z_train[:2])
print(z_test[:2])"""

beta = model.fit()
print(beta[:10])
z_tilde = model.predict_train()
z_predict = model.predict_test()
print(z_tilde[:10])
print(z_predict[:10])

prec=4
print("––––––––––––––––––––––––––––––––––––––––––––")
print("Train MSE:", np.round(MSE(z_train,z_tilde),prec))
print("Test MSE:", np.round(MSE(z_test,z_predict),prec))
print("––––––––––––––––––––––––––––––––––––––––––––")
print("Train R2:", np.round(R2(z_train,z_tilde),prec))
print("Test R2:", np.round(R2(z_test,z_predict),prec))
print("––––––––––––––––––––––––––––––––––––––––––––")

print("––––––––––––––––––––––––––––––––––––––––––––")
print("Train MSE:", model.MSE_train())
print("Test MSE:", model.MSE_test())
print("––––––––––––––––––––––––––––––––––––––––––––")
print("Train R2:", model.R2_train())
print("Test R2:", model.R2_test())
print("––––––––––––––––––––––––––––––––––––––––––––")
# Confidence interval
beta1, beta2 = model.Confidence_Interval(0.1)
print("––––––––––––––––––––––––––––––––––––––––––––")

[  0.           5.9775372    2.62744824 -26.82754565 -11.60503448
  -3.33370708  36.72760272  34.54406785  18.6218372  -20.17673741]
[-0.08469855 -0.11421559  0.13643301 -0.16757094  0.65560671 -0.00895539
  0.14397071  0.18081704  0.36333525 -0.12630596]
[ 0.54809917  0.12061255 -0.32207662  0.36061367 -0.12275633 -0.07422399
  0.09656214 -0.26153205 -0.10654021  0.2230573 ]
––––––––––––––––––––––––––––––––––––––––––––
Train MSE: 0.0108
Test MSE: 0.0132
––––––––––––––––––––––––––––––––––––––––––––
Train R2: 0.8857
Test R2: 0.8453
––––––––––––––––––––––––––––––––––––––––––––
––––––––––––––––––––––––––––––––––––––––––––
Train MSE: 0.0108
Test MSE: 0.0132
––––––––––––––––––––––––––––––––––––––––––––
Train R2: 0.8857
Test R2: 0.8453
––––––––––––––––––––––––––––––––––––––––––––
Confidence interval of β-estimator at 95 %:


Unnamed: 0,$β_{-}$,$β_{ols}$,$β_{+}$
0,-0.0,0.0,0.0
1,5.976,5.978,5.979
2,2.626,2.627,2.629
3,-26.837,-26.828,-26.818
4,-11.612,-11.605,-11.598
5,-3.343,-3.334,-3.324
6,36.706,36.728,36.749
7,34.528,34.544,34.56
8,18.606,18.622,18.638
9,-20.198,-20.177,-20.155


––––––––––––––––––––––––––––––––––––––––––––
