In [1]:
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import numpy as np
from time import time

diabetes = load_diabetes()
X = diabetes.data
y = diabetes.target
m = X.shape[0]  #number of samples
n = X.shape[1]  #number of features

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# actually you can do like this too
# X = np.insert(X, 0, 1, axis=1)
intercept = np.ones((X_train.shape[0], 1))
X_train = np.concatenate((intercept, X_train), axis=1)
intercept = np.ones((X_test.shape[0], 1))
X_test = np.concatenate((intercept, X_test), axis=1)

In [2]:
class LinearRegression:
    # if batch, set alpha to smaller values
    def __init__(self, alpha=0.0001, max_iter=1000, 
            loss_old=np.infty, tol=1e-5, method="batch"):
        self.alpha = alpha
        self.max_iter = max_iter
        self.loss_old = loss_old
        self.tol = tol
        self.method = method
        self.mini_batch_size = 100
        
    def fit(self, X, y):
        self.theta = np.zeros(X.shape[1])
        X_train = np.zeros(X.shape)
        y_train = np.zeros(y.shape)
        list_of_used_ix = [] #<===without replacement
        
        start = time()
        
        for i in range(self.max_iter):
                        
            if self.method == "batch":
                X_train = X
                y_train = y
            
            elif self.method == "mini":
                index = np.random.randint(X.shape[0])
                while index in list_of_used_ix:
                    index = np.random.randint(X.shape[0])
                    X_train = X[index:index + self.mini_batch_size, :]
                    y_train = y[index:index + self.mini_batch_size]
                    list_of_used_ix.append(index)
                    if len(list_of_used_ix) == X.shape[0]:
                        list_of_used_ix = []
            
            elif self.method == "sto":
                index = np.random.randint(X.shape[0])
                while index in list_of_used_ix:
                    index = np.random.randint(X.shape[0])
                    X_train = X[index, :].reshape(1, -1)
                    y_train = y[index]
                    list_of_used_ix.append(index)
                    if len(list_of_used_ix) == X.shape[0]:
                        list_of_used_ix = []
            
            yhat = self.h_theta(X_train)
            error = yhat - y_train
            
            # early stopping
            loss_new = self.mse(yhat, y_train)
            if self.delta_loss(loss_new, self.loss_old, self.tol):  #np.allclose
                break
            self.loss_old = loss_new

            grad = self.gradient(X_train, error)
            self.theta = self.theta - self.alpha * grad
            
        time_taken = time() - start
        print("Time taken: ", time_taken)
        print("Stop at iteration: ", i)

    # can name it predict for easy understanding
    def h_theta(self, X):
        return X @ self.theta

    def mse(self, yhat, y):
        return ((yhat - y)**2 / yhat.shape[0]).sum()

    def delta_loss(self, loss_new, loss_old, tol):
        return np.abs(loss_new - loss_old) < tol

    def gradient(self, X, error):
        return X.T @ error

model = LinearRegression(method="batch") #<==try put method="batch" or "sto"
model.fit(X_train, y_train)
yhat = model.h_theta(X_test)
mse = model.mse(yhat, y_test)

# print the mse
print("MSE: ", mse)

Time taken:  0.019963979721069336
Stop at iteration:  999
MSE:  2718.280655118355
