In [1]:
import matplotlib.pyplot as plt
from sklearn import datasets
import numpy as np
from IPython import display
import pylab as pl


In [None]:
    
def generate_linear_regression_data(n=100, input_dim=1, output_dim=1, noise=20):
    return datasets.make_regression(n_samples=n, n_features=input_dim, n_targets=output_dim, noise=noise)
    
    
def linear_regression(X, y):
    X = np.c_[np.ones(X.shape[0]), X]
    beta = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(y)
    return beta


class LinearRegression():
    def __init__(self, X, y, batch_size=None):
        self.X = X
        self.y = y
        self.num_samples = self.X.shape[0]
        self.input_dim = 1 if len(self.X.shape) == 1 else self.X.shape[1]
        self.output_dim = 1 if len(self.y.shape) == 1 else self.y.shape[1]
        self.d = self.input_dim + 1
        
        # n x d => n data points
        self.data = np.c_[np.ones(self.num_samples), self.X]
        
        self.betha = self.init_params()
        self.actual_betha = self.analytical_solution(self.X, self.y)
        self.loss_function = lambda y, y_hat: np.mean(np.square(y.reshape(self.output_dim, 1) - y_hat.reshape(self.output_dim, 1)))
        self.loss_function_derivative = lambda y_i, y_i_hat, x_i: -2 * (y_i_hat - y_i).reshape(self.output_dim, 1) @ x_i.reshape(1, self.d)
        self.is_batch = batch_size is not None
        self.batch_size = batch_size
        
    def init_params(self):
        return (np.random.rand(self.output_dim, self.d) - 0.5) * 100
    
    def analytical_solution(self, X, y):
        X = np.c_[np.ones(X.shape[0]), X]
        beta = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(y)
        return beta.T  
    
    def step(self, lr=1e-3):
        dE_db = np.zeros((self.output_dim, self.d))
        n = self.num_samples
        loss_sum = 0

        if self.is_batch:
            indices = np.random.permutation(self.num_samples)[:self.batch_size]
            batch_data = self.data[indices]
            batch_labels = self.y[indices]
            n = self.batch_size
        else:
            batch_data = self.data
            batch_labels = self.y
                        
        for j in range(n):
            x_j = batch_data[j]
            y_j = batch_labels[j]
                
            # Forward pass
            outputs = self.betha @ x_j
                
            # # Error / Loss
            loss = self.loss_function(outputs, y_j)
            loss_sum += loss
                
            # Accumulate derivative
            # Do some vector calculus shit
            dE_db += self.loss_function_derivative(outputs, y_j, x_j)
            
        self.betha -= lr * dE_db / self.num_samples
        
        # Returns parameters and average loss over batch
        return self.betha, loss_sum / n
    
    def calculate_total_loss(self, betha=None):
        total_loss = 0
        
        betha = self.betha if betha is None else betha
        
        for i in range(self.data.shape[0]):
            total_loss += self.loss_function(betha @ self.data[i], self.y[i])    
        
        return total_loss / self.num_samples
    
    def reset(self):
        self.betha = self.init_params()
        
    def step_n(self, n=1000, lr=1e-2, visualize=False):
        if visualize:
            visualizer = RegressionVisualizer(self)
            return visualizer.visualize_run(n=n, lr=lr)
        
        # Else, without visualization
        for step_id in range(n):
            parameters, loss = self.step(lr=lr)
            
        return parameters, loss

        
class RegressionVisualizer():
    def __init__(self, model):
        self.model = model
        
    def visualize_run(self, n=1000, lr=1e-2):   
        fig, axs = self.init_axis(self.model.d if self.model.input_dim > 1 else self.model.d+1)
        
        # Run n steps   
        for step_id in range(n):
            parameters, loss = self.model.step(lr)
            
            self.plot_parameters(axs, step_id, parameters)
            
            # Plot the regression line
            if self.model.input_dim == 1:
                self.plot_regereession_line(axs)

            display.clear_output(wait=True)
            display.display(pl.gcf()) 
            
            if self.model.input_dim == 1:
                self.remove_regression_lines(axs)
        
        return parameters, loss
    
    def init_axis(self, num_plots, row_height=5, col_width=5):
        fig, axs = plt.subplots(self.model.output_dim, num_plots, figsize=(num_plots*col_width, self.model.output_dim*row_height))
            
        # Initialize figure titles
        for i in range(self.model.output_dim):
            for j in range(self.model.d):
                axs[i][j].set_title(f'betha_{i}_{j}')
        
        return fig, axs
    
    def plot_parameters(self, axs, step_id, parameters):
        for j in range(self.model.output_dim):
            for k in range(self.model.d):
                axs[j][k].plot(step_id, self.model.actual_betha[j][k], 'bo')
                axs[j][k].plot(step_id, parameters[j][k], 'ro')
    
    def plot_regereession_line(self, axs):            
        for i in range(self.model.output_dim):
            axs[i][-1].scatter(self.model.X, model.y[:, i], c='b')
            axs[i][-1].plot(self.model.X, model.betha[i, :] @ model.data.T, 'r', label='Regression Line')
            
    def remove_regression_lines(self, axs):
        for i in range(self.model.output_dim):
            if len(axs[i][-1].lines) > 0:  # Check if there are lines plotted
                axs[i][-1].lines[-1].remove()  # Remove the last line     
        
        
        
data, labels = generate_linear_regression_data(n=1000, input_dim=1, output_dim=3, noise=30)    
model = LinearRegression(-1*data, labels)
params, loss = model.step_n(n=50, lr=1e-1, visualize=True)
print(params, loss)


    
    
    