In [1]:
import numpy as np
import matplotlib.pyplot as plt

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression as LR

In [2]:
def plot_regression_line(X, y, lr): 
    # plotting the actual points as scatter plot 
    plt.scatter(
        X[:, 0], y, color = "m", 
        marker = "o", s = 30
    ) 
  
    # predicted response vector 
    y_pred = lr.predict(X)
    y_pred = y_pred.flatten()
    # plotting the regression line 
    plt.plot(X[:, 0], y_pred, color = "g") 
  
    # putting labels 
    plt.xlabel('X') 
    plt.ylabel('y') 
  
    # function to show plot 
    plt.show() 

In [3]:
def l2_penalty(penalty, W):
    return penalty * (np.sum(W*W) / 2)

In [4]:
def mse(y_true, y_pred):
    return np.mean(np.square(y_true - y_pred))

In [5]:
def r2_score(y_true, y_pred):
    SS_tot = np.sum(np.square(y_true - np.mean(y_true)))
    SS_res = np.sum(np.square(y_true - y_pred))
    return 1 - SS_res / SS_tot

In [6]:
class LinearRegression:
    def __init__(self, regularization_penalty=0):
        self.W = None
        self.b = None
        self.penalty = 0

    def fit(self, X, y, method='analytic', **kwargs):
        try:
            y.shape[1]
        except IndexError:
            y = y.reshape(-1, 1)
        if method[0] == 'a':
            self._analytic_method(X, y)
        elif method[0] == 'g':
            np.random.seed(42)
            self.W = np.random.normal(loc=0, scale=0.1, size=(X.shape[1]))
            self.b = 0
            self._gradient_descent_runner(X, y, **kwargs)

    def predict(self, X):
        return (np.dot(X, self.W) + self.b).reshape(-1, 1)
    
    def evaluate(self, X, y):
        try:
            y.shape[1]
        except IndexError:
            y = y.reshape(-1, 1)
        return self._compute_error(X, y)

    def _analytic_method(self, X, y):
        y_ = np.mean(y, axis=0)
        X_ = np.mean(X, axis=0)

        SS_xy = np.sum((X - X_) * (y - y_), axis=0)
        SS_xx = np.sum(np.square(X - X_), axis=0)
        
        self.W = SS_xy / SS_xx
        self.b = (y_ - np.dot(X_, self.W))

    def _compute_error(self, X, y):
        N = len(y)
        y_hat = (np.dot(X, self.W) + self.b).reshape(-1, 1)
        totalError = (1/N) * np.sum(np.square(y - y_hat))
        return totalError
    
    def _step_gradient(self, X, y, learning_rate, batch_size):
        N = len(y)
        if batch_size is None:
            batch_size = N
        batch_size = min(batch_size, N)

        for b in range(0, N, batch_size):
            X_batch = X[b:b + batch_size]
            y_batch = y[b:b + batch_size]
            y_hat = (np.dot(X_batch, self.W) + self.b).reshape(-1, 1)

            dW = -2/N * np.sum(X_batch * (y_batch - y_hat), axis=0)
            dW -= l2_penalty(self.penalty, self.W)
            db = -2/N * np.sum(y_batch - y_hat)
            db -= l2_penalty(self.penalty, self.b)

            self.W -= (dW * learning_rate) / max(1, batch_size % N)
            self.b -= (db * learning_rate) / max(1, batch_size % N)

    def _gradient_descent_runner(
        self, X, y, learning_rate=0.1, batch_size=None, epochs=10, tool=None, verbose=False
    ):
        old_error = self._compute_error(X, y)
        for e in range(epochs):
            self._step_gradient(X, y, learning_rate, batch_size)
            error = self._compute_error(X, y)
            if tool and abs(old_error - error) < tool:
                break
            old_error = error
            if verbose:
                print('epoch {}: loss = {}'.format(e + 1, error))

In [7]:
class PolynomialLinearRegression(LinearRegression):
    def __init__(self, p, regularization_penalty=0):
        super(PolynomialLinearRegression, self).__init__(regularization_penalty)
        self.p = p
    
    def fit(self, X, y, method='analytic', **kwargs):
        X = np.concatenate([np.power(X, i) for i in range(1, self.p + 1)], axis=1)
        super().fit(X, y, method, **kwargs)

    def predict(self, X):
        X = np.concatenate([np.power(X, i) for i in range(1, self.p + 1)], axis=1)
        return super().predict(X)

In [8]:
df = pd.read_csv('../data/trab1_advertising.csv', index_col=0)

features = df.drop(columns='sales')
features = (features - features.mean()) / features.std()
labels = df['sales']

X_train, X_test, y_train, y_test = train_test_split(
    features.values, 
    labels.values, 
    random_state=42
)

In [9]:
my_lr = LinearRegression(regularization_penalty=0)
my_lr.fit(X_train, y_train, method='a', learning_rate=0.1, epochs=10)

sk_lr = LR()
sk_lr.fit(X_train, y_train)

y_pred = my_lr.predict(X_train)
print('Train my_lr -> r2: {}, mse: {}'.format(
    r2_score(y_train, y_pred),
    mse(y_train, y_pred),
))
y_pred = sk_lr.predict(X_train)
print('Train sk_lr -> r2: {}, mse: {}'.format(
    r2_score(y_train, y_pred),
    mse(y_train, y_pred),
))
print()
y_pred = my_lr.predict(X_test)
print('Test my_lr -> r2: {}, mse: {}'.format(
    r2_score(y_test, y_pred),
    mse(y_test, y_pred),
))
y_pred = sk_lr.predict(X_test)
print('Test sk_lr -> r2: {}, mse: {}'.format(
    r2_score(y_test, y_pred),
    mse(y_test, y_pred),
))

Train my_lr -> r2: -319.4278096721214, mse: 57.20771049389113
Train sk_lr -> r2: 0.8966445527601498, mse: 2.767891078046973

Test my_lr -> r2: -110.67062895770702, mse: 60.40627026525228
Test sk_lr -> r2: 0.8935163320163658, mse: 2.880023730094191
