In [1]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import matplotlib.pyplot as plt
from IPython.display import clear_output
import time

In [2]:
!cp -r /kaggle/input/ex-123/* /kaggle/working/

'cp' is not recognized as an internal or external command,
operable program or batch file.


In [3]:
def function(*, X: np.ndarray, theta: np.ndarray, add_ones: True) -> np.ndarray:
    """
    Predicts output values using the input features and model parameters.

    Args:
        X (np.ndarray): Input features.
        theta (np.ndarray): Model parameters (theta).

    Returns:
        np.ndarray: Predicted output values.
    """
    if add_ones:
        ones = np.ones(len(X)).reshape(-1, 1)  # Add a column of ones for the intercept term
        X_with_ones = np.hstack((ones, X))  # Combine input with ones
    else: 
        X_with_ones = X
    y_pred = np.sum(X_with_ones * theta.reshape(-1,), axis=1)  # Compute predictions
        
    return y_pred

def plot_graph(*, title_1: str, title_2: str, title_3: 0, style_1: str, style_2: str,  style_3: 0,
               x1: np.ndarray, y1: np.ndarray, x2: np.ndarray, y2: np.ndarray,  x3: 0, y3: 0):
    """
    Plots two sets of data points using Plotly.

    Args:
        title_1 (str): Name of the first plot line.
        title_2 (str): Name of the second plot line.
        style_1 (str): Plot style for the first line (e.g., 'markers').
        style_2 (str): Plot style for the second line (e.g., 'lines').
        x1, y1, x2, y2 (np.ndarray): Data points for both lines.
    """
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=x1, y=y1, mode=style_1, name=title_1))
    fig.add_trace(go.Scatter(x=x2, y=y2, mode=style_2, name=title_2))
    fig.add_trace(go.Scatter(x=x3, y=y3, mode=style_3, name=title_3))
    fig.show()

def normalize_vector(vector: np.ndarray) -> np.ndarray:
    """
    Normalizes a vector to have zero mean and unit variance.

    Args:
        vector (np.ndarray): Input vector.

    Returns:
        np.ndarray: Normalized vector.
    """
    mean = np.mean(vector)  # Calculate the mean
    std = np.std(vector)  # Calculate the standard deviation
    normalized_vector = (vector - mean) / std  # Normalize the vector
    return normalized_vector

def create_polynomial_features(*, X: np.ndarray, order: int) -> np.ndarray:
    """
    Converts an input vector into a matrix of polynomial features.

    Args:
        X (np.ndarray): Input feature vector.
        degree (int): Degree of the polynomial.

    Returns:
        np.ndarray: Matrix of polynomial features.
    """
    X = X.reshape(-1,)
    poly_matrix = np.ones((len(X), order))  # Initialize the matrix with ones
    for i in range(order):
        poly_matrix[:, i] = X ** (i + 1)  # Raise each column to the respective power
    return poly_matrix

def compute_true_theta(*, X: np.ndarray, Y: np.ndarray) -> np.ndarray:
    """
    Computes the true value of theta using the normal equation.

    Args:
        X (np.ndarray): Input feature matrix.
        Y (np.ndarray): Output vector.

    Returns:
        np.ndarray: Computed theta values.
    """
    ones = np.ones(len(X)).reshape(-1, 1)
    X_with_ones = np.hstack((ones, X))  # Add a column of ones for the intercept term
    inv_XT_X = np.linalg.inv(np.dot(X_with_ones.T, X_with_ones))  # Compute (X.T * X)^-1
    XT_Y = np.dot(X_with_ones.T, Y.reshape(-1, 1))  # Compute (X.T * Y)
    theta_true = np.dot(inv_XT_X, XT_Y)  # Compute theta
    return theta_true

def denormalize_theta(*, theta_norm: np.ndarray, X: np.ndarray, Y: np.ndarray) -> np.ndarray:
    """
    Converts normalized theta values back to the original scale.

    Args:
        theta_norm (np.ndarray): Normalized theta values.
        X (np.ndarray): Input features.
        Y (np.ndarray): Output values.

    Returns:
        np.ndarray: Denormalized theta values.
    """
    theta = np.zeros_like(theta_norm)

    mean_X = np.mean(X, axis=0)
    std_X = np.std(X, axis=0)

    mean_Y = np.mean(Y, axis=0)
    std_Y = np.std(Y, axis=0)

    theta[1:] = std_Y * theta_norm[1:] / std_X.reshape(-1, 1)
    theta[0] = mean_Y + std_Y * theta_norm[0] - np.dot(std_Y * mean_X / std_X, theta_norm[1:])
    return theta

In [4]:
class Linear_Regression_Multivariables:
    def __init__(self, *, number_of_feature: int) -> None:
        """
        Initializes the Linear Regression model.
        Args:
            num_features (int): Number of features in the input data.
        """
        self.number_of_features = number_of_feature

    def normalize_vector(self, vector: np.ndarray) -> np.ndarray:
        """
        Normalizes the input vector.
        """
        mean = np.mean(vector)
        std = np.std(vector)
        return (vector - mean) / std
    
    def normalize_input_output(self, *, X: np.ndarray, Y: np.ndarray) -> tuple:
        """
        Normalizes the input features and output values.

        Args:
            X (np.ndarray): Input features.
            Y (np.ndarray): Output values.

        Returns:
            tuple: Normalized input features and output values.
        """
        norm_X = np.apply_along_axis(self.normalize_vector, arr=X, axis=0).reshape(-1, self.number_of_features)
        norm_Y = np.apply_along_axis(self.normalize_vector, arr=Y, axis=0).reshape(-1, 1)
        return norm_X, norm_Y

    def add_ones_columns(self, *, normalized_input: np.ndarray) -> np.ndarray:
        """
        Adds a column of ones to the input features for the intercept term.

        Args:
            X (np.ndarray): Normalized input features.

        Returns:
            np.ndarray: Input features with an added column of ones.
        """
        ones = np.ones(len(normalized_input)).reshape(-1, 1)
        x_add = np.hstack((ones, normalized_input))
        return x_add

    def predict(self, *, theta: np.ndarray, normalized_input: np.ndarray) -> np.ndarray:
        y_pred = np.matmul(normalized_input, theta)
        return y_pred
    
    def compute_loss(self, *, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
        m = len(y_true)
        E = y_pred - y_true
        J = np.sum((E)**2) / (2*m)
        return J
    
    def update_params(self, *, theta: np.ndarray, lr: float, y_pred: np.ndarray, 
                      y_true: np.ndarray, normalized_input: np.ndarray) -> np.ndarray:
        m = len(y_true)
        E = y_pred - y_true
        dJ_dtheta = np.dot(normalized_input.T, E) / (m)
        theta_updated = theta - lr*dJ_dtheta
        return theta_updated
    
    def denormalize_theta(self, *, theta_normalized: np.ndarray, input: np.ndarray, output: np.ndarray) -> np.ndarray:
        theta = np.zeros_like(theta_normalized)

        mean_x = np.mean(input,  axis= 0)
        std_x = np.std(input,  axis= 0)

        mean_y = np.mean(output, axis= 0)
        std_y = np.std(output, axis= 0)

        theta[1:] = std_y*theta_normalized[1:]/(std_x.reshape(-1, 1))
        theta[0] = mean_y + std_y*theta_normalized[0] - np.dot(std_y*mean_x/std_x, theta_normalized[1:])

        return theta
    
    def plot_graph(self, data):
        clear_output(wait= True)
        fig = go.FigureWidget()
        for i in range(len(data)):
            fig.add_trace(go.Scatter(x=data[i]['x'], y=data[i]['y'],
                                     mode= data[i]['mode'], name= data[i]['title']))
        fig.show()
        time.sleep(1/500)
    
    def train(self, *, epoch: int, theta: np.ndarray, input: np.ndarray, 
              output: np.ndarray, lr: float, plot_graph: False) -> np.ndarray:
        """
        Trains the Linear Regression model using gradient descent.

        Args:
            epochs (int): Number of training iterations.
            theta (np.ndarray): Initial model parameters.
            X (np.ndarray): Input features.
            Y (np.ndarray): Output values.
            lr (float): Learning rate for parameter updates.

        Returns:
            tuple: Array of loss values and the trained model parameters.
        """
        normalized_input, normalized_ouput = self.normalize_input_output(X= input, Y= output)
        normalized_input_with_ones = self.add_ones_columns(normalized_input= normalized_input)

        J_array = np.array([])
        for i in range(epoch):
            y_pred = self.predict(theta= theta, normalized_input= normalized_input_with_ones)
            J = self.compute_loss(y_true= normalized_ouput, y_pred= y_pred)
            theta = self.update_params(theta= theta, lr= lr, y_pred= y_pred, 
                                       y_true= normalized_ouput, normalized_input= normalized_input_with_ones)
            J_array = np.append(arr= J_array, values= J)
            
            if plot_graph == True:
                x_with_ones = self.add_ones_columns(normalized_input= input)
                theta_praph = self.denormalize_theta(theta_normalized= theta, input= input, output= output)
                data_1 = dict(x= input[:, 0].reshape(-1,), y= output.reshape(-1,), mode= 'markers', title= 'Data')
                data_2 = dict(x= input[:, 0].reshape(-1,), y= function(X= x_with_ones, theta= theta_praph, add_ones= False) , mode= 'lines', title= 'Predicted')
                data = [data_1, data_2]
                self.plot_graph(data= data)
        
        theta = self.denormalize_theta(theta_normalized= theta, input= input, output= output)
        return J_array, theta

### ex3

In [5]:
# Read csv file ex2.csv
pd_ex3 = pd.read_csv('ex3.csv')

# Get collumns of file 
X_cols = pd_ex3.columns[:-1]
Y_col = pd_ex3.columns[-1]

In [6]:
# Get vector input and output
x_value = pd_ex3[X_cols].values
y_value = pd_ex3[Y_col].values

In [7]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=x_value.reshape(-1,), y=y_value,
                        mode= 'markers', name= f'Data'))
fig.update_xaxes(title= 'x')
fig.update_yaxes(title= 'y', tickangle= 0)
fig.show()

In [8]:
order= 2
model_poly = Linear_Regression_Multivariables(number_of_feature= order)

In [9]:
X_poly = create_polynomial_features(X= x_value, order= order)
Y_poly = y_value.reshape(-1, 1)

In [10]:
np.random.seed(1)

theta_init = np.random.randn(order + 1, 1)
theta_init.shape

(3, 1)

In [11]:
learning_rate_ = [0.01, 0.001, 0.003, 0.3, 0.04, 0.1]
J = np.zeros((1000, ))
for i in learning_rate_:
    
    J_arr, theta_arr = model_poly.train(epoch= 1000, theta= theta_init, 
                                           input= X_poly, output= Y_poly, lr= i, plot_graph= False )
    
    J = np.vstack([J, J_arr])

fig = go.Figure()
for i in range(len(J[1:, :])):
    fig.add_trace(go.Scatter(x=np.arange(1000), y=J[(1+i), :],
                            mode= 'lines', name= f'lr: {learning_rate_[i]}'))
fig.update_xaxes(title= 'epochs')
fig.update_yaxes(title= 'J', tickangle= 0)
fig.show()

In [12]:
J_arr, theta_arr = model_poly.train(epoch= 1000, theta= theta_init, 
                                           input= X_poly, output= Y_poly, lr= 0.3, plot_graph= True )