#1.1 Linear Regression with Closed-Form Solution

In [1]:
# helper functions
import numpy as np

def add_bias(X):
    n_samples = X.shape[0]
    bias = np.ones((n_samples, 1))
    X_bias = np.hstack((bias, X))
    return X_bias

def loss(X, y, weights):
    predictions = X.dot(weights)
    errors = y - predictions
    mse = np.mean(errors ** 2)
    return mse, predictions, errors


def standardize(X_train, X_test):
    mean = X_train.mean(axis=0)
    std = X_train.std(axis=0)

    std_replaced = np.where(std == 0, 1, std)
    X_train_std = (X_train - mean) / std_replaced
    X_test_std = (X_test - mean) / std_replaced
    return X_train_std, X_test_std

In [2]:
import numpy as np


# closed form solution to svd
def svd_linear_regression(X, y):
    # bias already added
    X_bias = X

    # performing singular value decomposition
    U, sigma, VT = np.linalg.svd(X_bias, full_matrices = False)

    # pseudo inverse of sigma
    sigma_inv = np.zeros_like(sigma)

    for i in range(len(sigma)):
        if sigma[i] > 1e-10:
            sigma_inv[i] = 1 / sigma[i]

    Sigma_inv = np.diag(sigma_inv)

    # pseudo inverse of X
    VT_transposed = VT.T
    U_transposed = U.T
    # X = U * Σ * V^T
    X_inv = VT_transposed.dot(Sigma_inv).dot(U_transposed)

    # (X_inv * y) where X_inv = V * Σ^+ * U^T
    weights = X_inv.dot(y)
    return weights

# closed form solution to ridge regression (can use either svd or this for handling numerical instability)
def linear_regression_ridge(X, y):
    lamd = 1e-6

    X_bias = X # bias already added

    X_bias_transposed = X_bias.T
    X_bias_transposed_X = X_bias_transposed.dot(X_bias)

    n_features = X.shape[1]
    identity_matrix = np.eye(n_features)
    identity_matrix[0,0] = 0 # to not regularize bias term

    # (X^T * X + lamda * I)
    X_bias_transposed_X_reg = X_bias_transposed_X + lamd * identity_matrix

    # X^T * y
    X_bias_transposed_y = X_bias_transposed.dot(y)

    # weights (X^T * X + lamda * I)^-1 * X^T * y
    weights = np.linalg.inv(X_bias_transposed_X_reg).dot(X_bias_transposed_y)
    return weights



# 1.2 Gradient Descent

In [3]:
import numpy as np
# random seed for reproducibility
np.random.seed(42)



def batch_gradient_descent(X, y, num_iterations, verbose = False):
    n_samples, n_features = X.shape
    weights = np.random.randn(n_features) * 0.0001
    loss_history = []
    alpha = 1e-7
    tolerance = 1e-6

    for i in range(num_iterations):
        # loss
        mse, predictions, errors = loss(X, y, weights)
        loss_history.append(mse)

        if i > 0 and abs(loss_history[-2] - loss_history[-1]) < tolerance:
            print(f"Batch GD converged at iteration {i + 1}")
            break

        X_tranposed = X.T
        # ∇L(wt) = -2/n * X^T * (y - Xw)
        gradient = (-2 / n_samples) * (X_tranposed.dot(errors))

        # wt+1 = wt - α * ∇L(wt)
        weights = weights - alpha * gradient

        if verbose and (i + 1) % 1000 == 0:
            print(f"Iteration {i + 1}: Loss = {mse}")

    return weights, loss_history



def stochastic_gradient_descent(X, y, max_epochs=100, verbose = False):
    np.random.seed(42)
    n_samples, n_features = X.shape
    weights = np.random.randn(n_features) * 0.0001
    loss_history = []
    alpha = 1e-7
    tolerance = 1e-6
    previous_loss = float('inf')

    for epoch in range(max_epochs):
        random_index = np.random.permutation(n_samples)
        X_shuffled = X[random_index]
        y_shuffled = y[random_index]

        for j in range(n_samples):
            xi = X_shuffled[j]
            yi = y_shuffled[j]

            mse, predictions, errors = loss(xi, yi, weights)

            xi_transposed = xi.T
            # ∇L(wt) = -2* Xi * (y - Xw)
            gradient = (-2 * xi * errors)

            # wt+1 = wt - α * ∇L(wt)
            weights = weights - alpha * gradient

            # current loss every 100 updates
            if (epoch  * n_samples + j) % 100 == 0:
                current_loss, predictions, errors = loss(X, y, weights)
                loss_history.append(current_loss)

                # check convergence
                if abs(previous_loss - current_loss) < tolerance:
                    if verbose:
                        print(f"SGD converged at epoch {epoch + 1}, sample {j + 1}")
                    return weights, loss_history
                previous_loss = current_loss

        if verbose:
            current_loss, predictions, errors = loss(X, y, weights)
            print(f"SGD Epoch {epoch + 1}: Loss = {current_loss}")

    return weights, loss_history


def mini_batch_gradient_descent(X, y, max_epochs=100, verbose=False):
    np.random.seed(42)
    n_samples, n_features = X.shape
    weights = np.random.randn(n_features) * 0.0001
    loss_history = []
    alpha = 1e-7
    batch_size = 64
    tolerance = 1e-6
    previous_loss = float('inf')

    for epoch in range(max_epochs):
        # shuffling data
        random_index = np.random.permutation(n_samples)
        X_shuffled = X[random_index]
        y_shuffled = y[random_index]

        for j in range(0, n_samples, batch_size):
            end_idx = j + batch_size
            batch_X = X_shuffled[j:end_idx]
            batch_y = y_shuffled[j:end_idx]

            mse, predictions, errors = loss(batch_X, batch_y, weights)
            loss_history.append(mse)

            batch_X_tranposed = batch_X.T
            # ∇L(wt) = -2/n * X^T * (y - Xw)
            gradient = (-2 / batch_X.shape[0]) * (batch_X_tranposed.dot(errors))
            # wt+1 = wt - α * ∇L(wt)
            weights = weights - alpha * gradient

            # check convergence
            if abs(previous_loss - mse) < tolerance:
                if verbose:
                    print(f"Mini-batch GD converged at epoch {epoch + 1}, batch {j // batch_size}")
                return weights, loss_history
            previous_loss = mse

        # current loss every 10 epochs
        if verbose and (epoch + 1) % 10 == 0:
            mse, predictions, errors = loss(batch_X, batch_y, weights)
            print(f"Mini-batch GD Epoch {epoch + 1}: Loss = {mse}")

    return weights, loss_history




# 1.3 Comparison with scikit-learn


In [4]:
import numpy as np
import time
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error


# Set random seed for reproducibility
np.random.seed(42)

# Load the California Housing Dataset
housing = fetch_california_housing()
X, y = housing.data, housing.target

# Feature names for reference
feature_names = housing.feature_names
print("Features:", feature_names)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

print("Training set shape:", X_train.shape)
print("Testing set shape:", X_test.shape)

# Standardized data
"""
X_train_std, X_test_std = standardize(X_train, X_test)

X_train_bias = add_bias(X_train_std)
X_test_bias = add_bias(X_test_std)
"""

# unstandardized data (currently in use)
X_train_bias = add_bias(X_train)
X_test_bias = add_bias(X_test)


def sklearn_linear_regression(X_train, y_train, X_test, y_test):
    model = LinearRegression()
    start_time = time.time()
    model.fit(X_train[:,1:], y_train)  # scikit-learn handles bias internally
    training_time = time.time() - start_time
    y_train_pred = model.predict(X_train[:,1:])
    y_test_pred = model.predict(X_test[:,1:])
    train_mse = mean_squared_error(y_train, y_train_pred)
    test_mse = mean_squared_error(y_test, y_test_pred)
    return model, train_mse, test_mse, training_time



# train scikit-learn's LinearRegression
model_sklearn, mse_train_sklearn, mse_test_sklearn, time_sklearn = sklearn_linear_regression(
    X_train_bias,
    y_train,
    X_test_bias,
    y_test
)

print("scikit-learn LinearRegression:")
print(f"Training MSE: {mse_train_sklearn:.4f}")
print(f"Testing MSE: {mse_test_sklearn:.4f}")
print(f"Training Time: {time_sklearn:.4f} seconds\n")



"""
SVD and Ridge Regression
"""


# Closed-Form Solution using SVD
start_time = time.time()
w_closed_svd = svd_linear_regression(X_train_bias, y_train)
time_closed_svd = time.time() - start_time
mse_train_closed_svd, predictions, errors = loss(X_train_bias, y_train, w_closed_svd)
mse_test_closed_svd, predictions, errors = loss(X_test_bias, y_test, w_closed_svd)

print("Closed-Form Solution (SVD):")
print(f"Training MSE: {mse_train_closed_svd:.4f}")
print(f"Testing MSE: {mse_test_closed_svd:.4f}")
print(f"Training Time: {time_closed_svd:.4f} seconds\n")

# Closed-Form Solution using Ridge Regression
start_time = time.time()
w_closed_ridge = linear_regression_ridge(X_train_bias, y_train)
time_closed_ridge = time.time() - start_time
mse_train_closed_ridge, predictions, errors = loss(X_train_bias, y_train, w_closed_ridge)
mse_test_closed_ridge, predictions, errors = loss(X_test_bias, y_test, w_closed_ridge)

print("Closed-Form Solution (Ridge Regression):")
print(f"Training MSE: {mse_train_closed_ridge:.4f}")
print(f"Testing MSE: {mse_test_closed_ridge:.4f}")
print(f"Training Time: {time_closed_ridge:.4f} seconds\n")


"""
Batch Gradient Descent
Stochastic Gradient Descent
Mini-batch Gradient Descent
"""

# Batch Gradient Descent
num_iterations = 100000

start_time = time.time()
w_batch, loss_history_batch = batch_gradient_descent(
    X_train_bias,
    y_train,
    num_iterations,
    verbose=False
)
time_batch = time.time() - start_time
mse_train_batch, predictions, errors = loss(X_train_bias, y_train, w_batch)
mse_test_batch, predictions, errors = loss(X_test_bias, y_test, w_batch)

print("Batch Gradient Descent:")
print(f"Training MSE: {mse_train_batch:.4f}")
print(f"Testing MSE: {mse_test_batch:.4f}")
print(f"Training Time: {time_batch:.4f} seconds\n")

# Stochastic Gradient Descent

start_time = time.time()
w_sgd, loss_history_sgd = stochastic_gradient_descent(
    X_train_bias,
    y_train,
    max_epochs=100,
    verbose=False
)
time_sgd = time.time() - start_time
mse_train_sgd, predictions, errors = loss(X_train_bias, y_train, w_sgd)
mse_test_sgd, predictions, errors = loss(X_test_bias, y_test, w_sgd)

print("Stochastic Gradient Descent:")
print(f"Training MSE: {mse_train_sgd:.4f}")
print(f"Testing MSE: {mse_test_sgd:.4f}")
print(f"Training Time: {time_sgd:.4f} seconds\n")

# Mini-batch Gradient Descent

start_time = time.time()
w_mini, loss_history_mini = mini_batch_gradient_descent(
    X_train_bias,
    y_train,
    max_epochs=100,
    verbose=False
)
time_mini = time.time() - start_time
mse_train_mini, predictions, errors = loss(X_train_bias, y_train, w_mini)
mse_test_mini, predictions, errors = loss(X_test_bias, y_test, w_mini)

print("Mini-batch Gradient Descent:")
print(f"Training MSE: {mse_train_mini:.4f}")
print(f"Testing MSE: {mse_test_mini:.4f}")
print(f"Training Time: {time_mini:.4f} seconds\n")




Features: ['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population', 'AveOccup', 'Latitude', 'Longitude']
Training set shape: (16512, 8)
Testing set shape: (4128, 8)
scikit-learn LinearRegression:
Training MSE: 0.5179
Testing MSE: 0.5559
Training Time: 0.0205 seconds

Closed-Form Solution (SVD):
Training MSE: 0.5179
Testing MSE: 0.5559
Training Time: 0.0241 seconds

Closed-Form Solution (Ridge Regression):
Training MSE: 0.5179
Testing MSE: 0.5559
Training Time: 0.0013 seconds

Batch GD converged at iteration 67185
Batch Gradient Descent:
Training MSE: 1.2512
Testing MSE: 1.2259
Training Time: 25.2519 seconds

Stochastic Gradient Descent:
Training MSE: 0.8022
Testing MSE: 0.8048
Training Time: 54.8140 seconds

Mini-batch Gradient Descent:
Training MSE: 1.2951
Testing MSE: 1.2692
Training Time: 1.0335 seconds



Relative Difference in Learned Parameters

In [5]:
# Relative Difference in Learned Parameters
def relative_difference(w1, w2):
    return np.linalg.norm(w1 - w2) / np.linalg.norm(w2)

# Extract scikit-learn's weights (including intercept)
w_sklearn = np.concatenate(([model_sklearn.intercept_], model_sklearn.coef_))

# Compute relative differences
rel_diff_closed_svd = relative_difference(w_closed_svd, w_sklearn)
rel_diff_closed_ridge = relative_difference(w_closed_ridge, w_sklearn)
rel_diff_batch = relative_difference(w_batch, w_sklearn)
rel_diff_sgd = relative_difference(w_sgd, w_sklearn)
rel_diff_mini = relative_difference(w_mini, w_sklearn)

print("Relative Difference in Learned Parameters:")
print(f"Closed-Form Solution (SVD): {rel_diff_closed_svd:.6f}")
print(f"Closed-Form Solution (Ridge Regression): {rel_diff_closed_ridge:.6f}")
print(f"Batch Gradient Descent: {rel_diff_batch:.6f}")
print(f"Stochastic Gradient Descent (SGD): {rel_diff_sgd:.6f}")
print(f"Mini-batch Gradient Descent: {rel_diff_mini:.6f}\n")


Relative Difference in Learned Parameters:
Closed-Form Solution (SVD): 0.000000
Closed-Form Solution (Ridge Regression): 0.000000
Batch Gradient Descent: 0.999994
Stochastic Gradient Descent (SGD): 0.999912
Mini-batch Gradient Descent: 0.999998

