In [21]:
import numpy as np
import random
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from typing import Tuple
# from typing import Annotated

In [22]:
# from platform import python_version
# print(python_version())

3.7.3


<h2> Linear Regression </h2>

<h3> Normal Equation </h3>

In [11]:
def linear_regression_NE(X:np.ndarray, y:np.ndarray, X_test:np.ndarray, y_pred:np.ndarray) -> Tuple[np.ndarray, float]:
    """
    Implementation of linear regression using Normal Equation.
    by Aurélien Géron
    m - number of training instances, n - number of features

    - fast for large m
    - no out-of-core support
    - slow for large n
    - 0 hyperparameters
    - no scaling required

    """
    
    X_b = np.c_[np.ones((X.shape[0], 1)), X]
    theta_best = np.linalg.inv(X_b.T.dot(X_b)).dot(X_b.T).dot(y)
    
    X_test_b = np.c_[np.ones((X_test.shape[0], 1)), X_test]
    y_pred = X_test_b.dot(theta_best)
    
    mse = mean_squared_error(y_test, y_pred)
    
    return y_pred, mse

<h3> Singular Value Decomposition </h3>

In [15]:
def linear_regression_SVD(X:np.ndarray, y:np.ndarray, X_test:np.ndarray, y_pred:np.ndarray, threshold=0.0001) -> Tuple[np.ndarray, float]:
    """
    Simplified implementation of linear regression using Singular Value Decomposition.
    
    - fast for large m
    - no out-of-core support
    - slow for large n
    - 0 hyperparameters
    - no scaling required
    - from sklearn.linear_model import LinearRegression
    """
    
    X_b = np.c_[np.ones((X.shape[0], 1)), X]
    X_test_b = np.c_[np.ones((X_test.shape[0], 1)), X_test]
    
    # Calculate SVD
    U, E_vec, V_t = np.linalg.svd(X_b)

    # Calculate pseudoinverse

    for i in range(len(E_vec)):
        if E_vec[i] < threshold:
            E_vec[i] = 0
        else:
            E_vec[i] = 1 / E_vec[i]

    E_vec[E_vec < threshold] = 0
    E_ = np.vstack([np.diag(E_vec), np.zeros([X_b.shape[0] - len(np.diag(E_vec)), X_b.shape[1]])])
    X_b_ = V_t.T.dot(E_.T).dot(U.T)

    # Calculate theta
    theta = X_b_.dot(y)

    # Prediction

    y_pred = X_test_b.dot(theta)
    mse = mean_squared_error(y_test, y_pred)
    
    return y_pred, mse

<h3> Batch Gradient Descent </h3>

In [31]:
def linear_regression_BGD(X:np.ndarray, y:np.ndarray, X_test:np.ndarray, y_pred:np.ndarray, theta:list, tolerance:float=0.0001, message_frequency:int=20, 
n_iterations:int=500, eta:float=0.01, debugger:bool=True) -> Tuple[float, np.ndarray, np.ndarray, float]:
    """
    Implementation of linear regression using Batch Gradient Descent.
    mostly by Aurélien Géron
    - slow for large m
    - no out-of-core support
    - fast for large n
    - 2 hyperparameters
    - scaling required
    """
    
    X_b = np.c_[np.ones((X.shape[0], 1)), X]
    X_test_b = np.c_[np.ones((X_test.shape[0], 1)), X_test]
    
    m = X_b.shape[0]
    theta_path_BGD = [theta]

    for i in range(n_iterations):
        gradient = 2/m * X_b.T.dot(X_b.dot(theta) - y)
        theta = theta - eta * gradient
        theta_path_BGD.append(theta)
        
        if np.linalg.norm(theta_path_BGD[-1]-theta_path_BGD[-2]) < tolerance:
            break
        if i % message_frequency == 0:
            print('theta: ',theta)
        if debugger and not np.isfinite(theta).all():
            print('eta too large!')
            break
            
    theta_best = theta
    theta_path_BGD = np.array(theta_path_BGD)
    
    y_pred = X_test_b.dot(theta)
    mse = mean_squared_error(y_test, y_pred)
    
    return theta, theta_path_BGD, y_pred, mse

<h3> Stochastic Gradient Descent </h3>

In [17]:
def linear_regression_SGD(X:np.ndarray, y:np.ndarray, X_test:np.ndarray, y_pred:np.ndarray, theta:list,
                          n_epochs:int=10, t0:float=10, t1:float=100, message_frequency:int=20,
                          tolerance:float=0.0001, debugger:bool=True) -> Tuple[float, np.ndarray, np.ndarray, float]:
    """
    Implementation of linear regression using Stochastic Gradient Descent.
    mostly by Aurélien Géron
    - fast for large m
    - out-of-core support
    - fast for large n
    - 2 or more hyperparameters
    - scaling required
    - from sklearn.linear_model import SGDRegressor
    """
    
    def learning_rate(t):
        return t0 / (t + t1)

    X_b = np.c_[np.ones((X.shape[0], 1)), X]
    X_test_b = np.c_[np.ones((X_test.shape[0], 1)), X_test]
    
    theta_path_SGD = [theta]

    for i in range(n_epochs):
        for j in range(m):
            random_sample = np.random.randint(0, m)
            x_i = X_b[random_sample]
            y_i = y[random_sample]
            gradient = 2 * x_i.T.dot(x_i.dot(theta) - y_i)
            eta = learning_rate(i * m +j)
            theta = theta - eta * gradient
            theta_path_SGD.append(theta)
            
            if np.linalg.norm(theta_path_SGD[-1]-theta_path_SGD[-2]) < tolerance:
                break
            if j % message_frequency == 0:
                print('theta: ',theta)
            if debugger and not np.isfinite(theta).all():
                print('eta too large!')
                break
        else:
            continue
        break
    theta_best = theta
    theta_path_SGD = np.array(theta_path_SGD)
    
    y_pred = X_test_b.dot(theta_best)
    mse = mean_squared_error(y_test, y_pred)
    
    return theta, theta_path_SGD, y_pred, mse

In [19]:
def linear_regression_MbGD(X:np.ndarray, y:np.ndarray, X_test:np.ndarray, y_pred:np.ndarray, theta:list,
                          batch_size:int, n_epochs:int=10, t0:float=10, t1:float=100, message_frequency:int=20,
                          tolerance:float=0.0001, debugger:bool=True) -> Tuple[float, np.ndarray, np.ndarray, float]:
    """
    Implementation of linear regression using Mini-batch Gradient Descent.
    
    - fast for large m
    - out-of-core support
    - fast for large n
    - 2 or more hyperparameters
    - scaling required
    """
    
    def learning_rate(t):
        return t0 / (t + t1)
    
    X_b = np.c_[np.ones((X.shape[0], 1)), X]
    X_test_b = np.c_[np.ones((X_test.shape[0], 1)), X_test]
    
    theta_path_MbGD = [theta]

    for i in range(n_epochs):
        for j in range(m):
            random_samples = random.sample(range(0, m), batch_size)
            x_i = X_b[random_samples]
            y_i = y[random_samples]
            gradients = 2 / batch_size * x_i.T.dot(x_i.dot(theta) - y_i)
            eta = learning_rate(i * m +j)
            theta = theta - eta * gradients
            theta_path_MbGD.append(theta)
            
            if np.linalg.norm(theta_path_MbGD[-1]-theta_path_MbGD[-2]) < tolerance:
                break
            if j % message_frequency == 0:
                print('theta: ',theta)
            if debugger and not np.isfinite(theta).all():
                print('eta too large!')
                break
        else:
            continue
        break
        
    theta_best = theta
    theta_path_MbGD = np.array(theta_path_MbGD)
    
    y_pred = X_test_b.dot(theta_best)
    mse = mean_squared_error(y_test, y_pred)
    
    return theta, theta_path_MbGD, y_pred, mse

In [28]:
def linear_regression_compare_paths(paths:list, path_labels:list, figsize:tuple=(7,4), legend_loc:str='upper left', legend_fontsize:int=16,
                                   label_fontize:int=20, line_styles:list=['r-s', 'g-+', 'b-o']):
    """
    Plots theta paths for different linear regression step by step implementations.
    mostly by Aurélien Géron
    """
    
    plt.figure(figsize=figsize)
    for i in range(len(paths)):
        line_style = line_styles[i%len(line_styles)]
        path_label = 'path ' + str(i)
        plt.plot(paths[i][:, 0], paths[i][:, 1], line_style, linewidth=1+i, label=path_labels[i])
        

    #plt.plot(theta_path_BGD[:, 0], theta_path_BGD[:, 1], "r-s", linewidth=1, label="Batch")

    plt.legend(loc=legend_loc, fontsize=legend_fontsize)
    plt.xlabel(r"$\theta_0$", fontsize=label_fontize)
    plt.ylabel(r"$\theta_1$", fontsize=label_fontize, rotation=0)
    plt.axis()
    plt.show()