In [1]:
import numpy as np
import plotly.graph_objects as go
import plotly.express as px

In [2]:
class LinearRegression:
    def __init__(
        self, 
        reg=1e-3, 
        lr=1e-2, 
        epochs=1000,
        threshold=1e-8,
        method='normal_eqn',
        seed=None
        ):
        self.lr = lr
        self.reg = reg
        self.epochs = epochs
        self.seed = seed
        self.threshold = threshold
        self.method = method
    
    def fit(self, X_train, y_train):
        self.X_train = np.concatenate((np.ones(shape=(X_train.shape[0], 1)), X_train), axis=1)
        self.y_train = y_train
        if self.method == 'normal_eqn':
            weights = self.normal_eqn()
        if self.method == 'gradient_descent':
            weights = self.gradient_descent()
        
        return weights
    
    def predict(self, X):
        X = np.concatenate((np.ones(shape=(X.shape[0], 1)), X), axis=1)
        y_pred = np.matmul(X, self.learnt_weights)
        return y_pred
    
    def init_weights(self,):
        m = self.X_train.shape[1]
        np.random.seed(self.seed)
        random_weights = np.random.rand(m, 1)
        
        return random_weights
    
    def gradient_descent(self,):
        random_weights = self.init_weights()        
        weights = random_weights.copy()
        y_pred = np.matmul(self.X_train, weights)
        
        cost_history = [self.mean_squared_error(self.y_train, y_pred) + self.l2_penalty(weights)]
        weights_history = [weights]
        
        for i in range(self.epochs):
            gradient = -2 * np.mean(np.matmul(self.X_train.T, (self.y_train - y_pred))) + self.reg*weights
            update_value = -self.lr*gradient
            weights += update_value
            
            y_pred = np.matmul(self.X_train, weights)
            cost_history.append(self.mean_squared_error(self.y_train, y_pred))
            weights_history.append(weights) 
            
            # if abs(cost_history[-1] - cost_history[-2]) <= self.threshold:
            #     print(f'Stopping algorithm because cost has stopped decreasing beyond threshold at epoch {i}')
            #     break
            
        self.weights_history, self.cost_history = weights_history, cost_history
        self.learnt_weights = weights
        
        return weights 
    
    def normal_eqn(self,):
        m = self.X_train.shape[1]        
        XTy = np.matmul(self.X_train.T, self.y_train)
        penalty_term = self.reg * np.eye(m)
        
        inverse_term = np.linalg.inv(penalty_term + np.matmul(self.X_train.T, self.X_train))
        weights = np.matmul(inverse_term, XTy)
        self.learnt_weights = weights
        
        return weights
    
    def mean_squared_error(self, y, y_hat):
        return np.mean((y - y_hat)**2)
            
    def l2_penalty(self, weights):
        return 0.5 * self.reg * np.matmul(weights.T, weights)

In [3]:
def cross_validation(
    model,
    folds=10,
    root_name_X = 'trainInput',
    root_name_y = 'trainTarget',
    root_path = '/home/jivjotsingh/Desktop/PracticeML/ML@Waterloo/regression-dataset/'
    ):
    X_subsets = [np.loadtxt(root_path + root_name_X + f'{i+1}.csv', delimiter=',') for i in range(folds)]
    y_subsets = [np.loadtxt(root_path + root_name_y + f'{i+1}.csv', delimiter=',') for i in range(folds)]
    train_error = 0
    validation_error = 0
    
    for i in range(folds):
        val_X_subset, val_y_subset = X_subsets[0], y_subsets[0]
        X_subsets.remove(val_X_subset)
        y_subsets.remove(val_y_subset)
        
        X_stacked, y_stacked = np.vstack(tuple(X_subsets)), np.hstack(tuple(y_subsets))
        
        model.fit(X_stacked, y_stacked)
        train_pred, val_pred = model.predict(X_stacked), model.predict(val_X_subset)
        train_error += model.mean_squared_error(y_stacked, train_pred)
        validation_error += model.mean_squared_error(val_y_subset, val_pred)
        
        X_subsets.append(val_X_subset)
        y_subsets.append(val_y_subset)
        
    return train_error/folds, validation_error/folds

In [4]:
train_record, validation_record = [], []
reg_range = np.arange(0, 4.1, 0.1)
for reg in reg_range:
    LinReg = LinearRegression(reg=reg, method='normal_eqn')
    t, v = cross_validation(LinReg, 10)
    train_record.append(t), validation_record.append(v)

trace_train = go.Scatter(x=reg_range, y=train_record,
                         mode='lines', line=dict(color='blue'), name='Training')
trace_validation = go.Scatter(x=reg_range, y=validation_record,
                              mode='lines', line=dict(color='red'), name='Validation')
layout = go.Layout(xaxis=dict(title='Regularization Constant'), yaxis=dict(title='Error'),
                   legend=dict(x=1.05, y=1),
                   title='Training and Validation Curves'
                   )
fig = go.Figure(data=[trace_train, trace_validation], layout=layout)
fig.update_layout(width=600, height=600)
fig.show()
reg_star = reg_range[np.argmin(validation_record)]
print(f'Optimal Regularization constant value: {reg_star}')

Optimal Regularization constant value: 1.3


In [5]:
root_path = '/home/jivjotsingh/Desktop/PracticeML/ML@Waterloo/regression-dataset/'
X_ = [np.loadtxt(root_path+f'trainInput{i+1}.csv', delimiter=',') for i in range(10)]
y_ = [np.loadtxt(root_path+f'trainTarget{i+1}.csv', delimiter=',') for i in range(10)]
X_test = np.loadtxt(root_path+'testInput.csv', delimiter=',')
y_test = np.loadtxt(root_path+'testTarget.csv', delimiter=',')
X_stacked, y_stacked = np.vstack(tuple(X_)), np.hstack(tuple(y_))

In [6]:
LinReg = LinearRegression(reg_star, method='normal_eqn')
LinReg.fit(X_stacked, y_stacked)
train_pred, test_pred = LinReg.predict(X_stacked), LinReg.predict(X_test)
print(f'Training Loss: {round(LinReg.mean_squared_error(y_stacked, train_pred),3)}')
print(f'Test Loss: {round(LinReg.mean_squared_error(y_test, test_pred),3)}')

Training Loss: 1.306
Test Loss: 1.436


In [7]:
def visualize_3Dsolution(model, X, y, mesh_size=0.02):
    learnt_weights = model.learnt_weights
    x1_min, x1_max = X[:, 0].min(), X[:, 0].max()
    x2_min, x2_max = X[:, 1].min(), X[:, 1].max()
    x1range = np.arange(x1_min, x1_max, mesh_size)
    x2range = np.arange(x2_min, x2_max, mesh_size)
    x1x1, x2x2 = np.meshgrid(x1range, x2range)
    Y_predicted = np.matmul(np.c_[np.ones((x1x1.ravel().shape[0], 1)), x1x1.ravel(), x2x2.ravel()],
                            learnt_weights
                            )
    fig = px.scatter_3d(x=X[:,0], y=X[:,1], z=y)
    fig.update_traces(marker=dict(size=3))
    fig.add_traces(go.Surface(x=x1range, y=x2range, z=Y_predicted.reshape(x1x1.shape),
                              colorscale='speed', name='Predicted Surface', showscale=False, showlegend=False
                              ))
    fig.update_layout(showlegend=True, template='plotly_dark',
                      width=600, height=600,
                      scene_camera = dict(
                          eye=dict(x=2, y=0.8, z=0.5)
                          ))
    return fig

fig = visualize_3Dsolution(LinReg, X_stacked, y_stacked)
fig.show()