In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import tensorflow as tf
import utils as utils

In [None]:
raw_data = utils.load_csv_data('<local-path>/data/Advertising.csv')

feature_keys = ['tv', 'radio', 'newspaper']
target_key = 'sales'

X, y = utils.prepare_data(raw_data, feature_keys, target_key)

print("X Shape: ", X.shape)
print("X length: ", len(X))
print("X first 5 features: ", X[:5])
print("X type: ", type(X))

print("y Shape: ", y.shape)
print("y length: ", len(y))
print("y first 5 features: ", y[:5])
print("y type: ", type(y))

In [None]:
# Plot the first 5 features vs target
print("Plot first 5 X vs y")
utils.plot_features_vs_target(X[:5], y[:5], feature_keys, target_key)

# Plot the entire features vs target
print("Plot entire X vs y")
utils.plot_features_vs_target(X, y, feature_keys, target_key)

In [None]:
# Step 1: Split the dataset into training, validation, and test sets
# First split: 75% training, 25% temporary set
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.25, random_state=55)
# Second split: Divide the temporary set into validation and test sets (50% each, which is 12.5% of the original data each)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.50, random_state=55)

# Step 2: Normalize using TensorFlow (adapt on train only)
# Create a normalization layer that will standardize the features
normalizer = tf.keras.layers.Normalization(axis=-1)
# Fit the normalizer only on training data to avoid data leakage
normalizer.adapt(X_train)  # Only fit on training data

# Step 3: Transform all datasets using the fitted normalizer
# Apply normalization to training data and convert to numpy array
X_train_norm = normalizer(X_train).numpy()
# Apply same normalization to validation data
X_val_norm = normalizer(X_val).numpy() 
# Apply same normalization to test data
X_test_norm = normalizer(X_test).numpy()

# Print shapes of all datasets to verify the splitting worked correctly
print("X_train_norm Shape: ", X_train_norm.shape)
print("y_train Shape: ", y_train.shape)
print("X_val_norm Shape: ", X_val_norm.shape)
print("y_val Shape: ", y_val.shape)
print("X_test_norm Shape: ", X_test_norm.shape)
print("y_test Shape: ", y_test.shape)

In [None]:
def predict(
    X: np.ndarray, 
    W: np.ndarray, 
    b: float
) -> np.ndarray:
    """
    Predict target values using linear regression.

    Args:
        X (np.ndarray): Feature matrix of shape (n_samples, n_features)
        W (np.ndarray): Weight vector of shape (n_features,)
        b (float): Bias term

    Returns:
        np.ndarray: Predicted values of shape (n_samples,)
    """

    # Calculate predictions using the linear regression formula: f(x) = X·W + b
    # - np.dot(X, W) computes the matrix multiplication between features and weights
    # - Adding b applies the bias term to each prediction
    f_x = np.dot(X, W) + b

    return f_x

In [None]:
def mean_squared_loss(
    X: np.ndarray, 
    y: np.ndarray, 
    W: np.ndarray, 
    b: float
) -> float:
    """
    Compute the mean squared error loss.

    Args:
        X (np.ndarray): Feature matrix of shape (m, n)
        y (np.ndarray): Target vector of shape (m,)
        W (np.ndarray): Weight vector of shape (n,)
        b (float): Bias term

    Returns:
        float: Mean squared error loss
    """
    
    # Get the number of training examples
    m = X.shape[0]
    
    # Calculate predictions using the predict function
    predictions = predict(X, W, b) 
    
    # Compute the squared differences between predictions and actual values
    squared_errors = (predictions - y) ** 2
    
    # Calculate the mean squared error loss with the 1/2m factor
    loss = np.sum(squared_errors) / (2 * m)
    
    return loss    

In [None]:
def compute_gradient(
    X: np.ndarray, 
    y: np.ndarray, 
    W: np.ndarray, 
    b: float
) -> tuple[np.ndarray, float]:
    """
    Compute the gradient of the cost function with respect to parameters W and b.
    
    Args:
        X: Input features, shape (m, n) where m is number of examples and n is number of features
        y: Target values, shape (m,)
        W: Weight parameters, shape (n,)
        b: Bias parameter
        
    Returns:
        tuple: Gradients with respect to W and b
            - d_dw: Gradient with respect to W, shape (n,)
            - d_db: Gradient with respect to b, scalar
    """
    
    # Get the number of examples (m) and features (n)
    m, n = X.shape

    # Calculate model predictions using current parameters
    predictions = predict(X, W, b) 
    # Compute the error (difference between predictions and actual values)
    error = predictions - y

    # Calculate gradient for weights by taking dot product of X transpose and error, then normalize by m
    # This is the partial derivative of the cost function with respect to W
    d_dw = np.dot(X.T, error) / m
    
    # Calculate gradient for bias by summing all errors and normalizing by m
    # This is the partial derivative of the cost function with respect to b
    d_db = np.sum(error) / m

    return d_dw, d_db

In [None]:
def gradient_descent(
    X: np.ndarray, 
    y: np.ndarray, 
    W: np.ndarray, 
    b: float, 
    learning_rate: float
)-> tuple[np.ndarray, float]:
    """
    Perform one step of gradient descent to update model parameters.
    
    Args:
        X: Input features, shape (m, n) where m is number of examples and n is number of features
        y: Target values, shape (m,) or (m, 1)
        W: Current weight parameters, shape (n,) or (n, 1)
        b: Current bias parameter
        learning_rate: Step size for the gradient descent update
        
    Returns:
        tuple: Updated weights W and bias b after one step of gradient descent
    """

    # Calculate gradients for weights and bias using current parameters
    dW, db = compute_gradient(X, y, W, b)

    # Update weights by subtracting the learning rate multiplied by the gradient
    W =  W - (learning_rate * dW)
    # Update bias by subtracting the learning rate multiplied by the gradient
    b = b - (learning_rate * db)

    return W, b   

In [None]:
def train(
    X_train: np.ndarray, 
    y_train: np.ndarray, 
    X_val: np.ndarray,
    y_val: np.ndarray,
    W: np.ndarray, 
    b: float, 
    learning_rate: float, 
    epochs: int
) -> tuple[np.ndarray, float, list[float], list[float]]:
    """
    Train a linear model using gradient descent optimization.
    
    Args:
        X_train: Training features, shape (n_samples, n_features)
        y_train: Training target values, shape (n_samples,)
        X_val: Validation features, shape (n_samples, n_features)
        y_val: Validation target values, shape (n_samples,)
        W: Initial weight matrix, shape (n_features,)
        b: Initial bias term
        learning_rate: Step size for gradient descent updates
        epochs: Number of training iterations
        
    Returns:
        tuple: Updated weights W, bias b, train loss history, and validation loss history after training
    """

    # Initialize empty lists to store loss values during training
    train_loss_history = []
    val_loss_history = []

    # Iterate through the specified number of training epochs
    for epoch in range(epochs):
        # Calculate and store the mean squared loss on training data
        train_loss = mean_squared_loss(X_train, y_train, W, b)
        train_loss_history.append(train_loss)

        # Calculate and store the mean squared loss on validation data
        val_loss = mean_squared_loss(X_val, y_val, W, b)
        val_loss_history.append(val_loss)      

        # Print progress every 100 epochs
        if epoch % 100 == 0:
            print(f"Epoch {epoch}: Train Loss = {train_loss:.4f}, Val Loss = {val_loss:.4f}")

        # Update model parameters (weights and bias) using gradient descent
        W, b = gradient_descent(X_train, y_train, W, b, learning_rate)

    # Return the trained model parameters and loss histories
    return W, b, train_loss_history, val_loss_history

In [None]:
# Initialize weights and bias
W = np.zeros(X_train.shape[1])  # shape (n_features,)
b = 0.0
learning_rate = 0.01  # Step size for gradient descent
epochs = 1000  # Number of training iterations

# Train the linear regression model
W_trained, b_trained, train_losses, val_losses = train(X_train_norm, y_train, X_val_norm, y_val, W, b, learning_rate, epochs)

# Print the final trained parameters
print(f"Training parameters Weight: {W_trained}, bias {b_trained}")

# Visualize the training and validation loss over epochs
# This helps to monitor model convergence and potential overfitting
utils.plot_loss_curve(train_losses, val_losses)

In [None]:
# Make predictions on the test set using trained weights and bias
y_predict = predict(X_test_norm, W_trained, b_trained)

# Calculate the mean squared loss on the test set
y_predict_loss = mean_squared_loss(X_test_norm, y_test, W_trained, b_trained)   

# Print the test loss
print(f"Test Loss: {y_predict_loss}")

# Create a plot comparing actual vs predicted sales
utils.plot_predictions(y_test, y_predict, 'Predicted vs Actual Sales', 'Actual Sales', 'Predicted Sales')

# Print the first 25 actual and predicted values for comparison
for i in range(25):
    print("Print actual va predicted values")
    print(f"Actual: {y_test[i]}, Predicted: {y_predict[i]:.1f}")