In [None]:
import pathlib

import matplotlib.pyplot as plt
import numpy as np
from sklearn import datasets, model_selection, pipeline, preprocessing, utils

# Linear Regression from Scratch with Numpy

## Small example with synthetic data

In [None]:
prng = np.random.RandomState(42)
m = 100
features = [
    np.ones((m, 1)),
    prng.normal(loc=1.0, scale=1.0, size=(m, 1))
]
X = np.hstack(features)
error = prng.normal(loc=0.0, scale=5e-1, size=(m, 1))
beta = np.array([[3.0], [1.5]])
y = X @ beta + error

In [None]:
_ = plt.plot(X[:, 1], y, 'o')
_ = plt.xlabel(r"$X_1$", fontsize=15)
_ = plt.ylabel("y", fontsize=15, rotation=0)

### Train-test split

In [None]:
train_features, test_features, train_target, test_target = model_selection.train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=prng
)

### Train the model

In [None]:
def linear_regression(X, y):
    return np.linalg.inv(X.T @ X) @ X.T @ y

In [None]:
beta_hat = linear_regression(train_features, train_target)

In [None]:
beta_hat

In [None]:
def loss_fn(y, y_hat):
    return 0.5 * np.mean((y - y_hat)**2)

In [None]:
train_predictions = train_features @ beta_hat
loss_fn(train_target, train_predictions)

### Evaluate on the test set

In [None]:
test_predictions = test_features @ beta_hat
loss_fn(test_target, test_predictions)

### Using Stochastic Gradient Descent 

In [None]:
# initialize weights
learned_parameters = prng.normal(loc=0, scale=1, size=(2, 1))


def model_fn(X):
    return X @ learned_parameters

def grad_fn(X, y, y_hat):
    m, _ = y.shape
    return -(1 / m) * (X.T @ (y - y_hat))


In [None]:
learning_rate = 0.001
batch_size = 1
epochs = 100
log_epochs = 10

for epoch in range(epochs):

    total_loss = 0.0
    for batch_ixs in utils.gen_batches(len(train_target), batch_size):
        features, target = train_features[batch_ixs], train_target[batch_ixs]

        # forward pass
        predictions = model_fn(features)
        loss = loss_fn(target, predictions)
        total_loss += loss
        
        # backward pass
        grad = grad_fn(features, target, predictions)
        learned_parameters -= grad * learning_rate
  
    if epoch % log_epochs == 0:
        print(f'Epoch {epoch}  Loss {total_loss / len(train_target):.4f}')

In [None]:
print(f'Final Parameters: {learned_parameters[:, 0]}')

In [None]:
total_loss = 0
for batch_ixs in utils.gen_batches(len(test_target), batch_size):
    features, target = test_features[batch_ixs], test_target[batch_ixs]
    predictions = model_fn(features)
    loss = loss_fn(target, predictions)
    total_loss += loss

print(f"Average test loss: {total_loss / len(test_target)}")

In [None]:
_ = plt.plot(X[:, 1], y, 'o')
_ = plt.xlabel(r"$X_1$", fontsize=15)
_ = plt.ylabel("y", fontsize=15, rotation=0)

new_features = [
    np.ones((m, 1)),
    np.linspace(-2, 4, m).reshape((-1, 1))    
]
X_new = np.hstack(new_features)
y_new = model_fn(X_new)

_ = plt.plot(X_new[:, 1], y_new)

## Predicting house prices

In [None]:
datasets.fetch_california_housing?

In [None]:
features, targets = datasets.fetch_california_housing(return_X_y=True, as_frame=True)

In [None]:
features.info()

In [None]:
features.describe()

In [None]:
targets.describe() # units are 100k USD

### Train-test split

In [None]:
random_state = np.random.RandomState(42)
train_features, test_features, train_targets, test_targets = model_selection.train_test_split(
    features,
    targets,
    test_size=0.1,
    random_state=random_state,
)

In [None]:
features_preprocessing_pipeline = pipeline.make_pipeline(
    preprocessing.StandardScaler(),
)

targets_preprocessing_pipeline = pipeline.make_pipeline(
    preprocessing.FunctionTransformer(lambda X: X.to_numpy()),
    preprocessing.FunctionTransformer(lambda X: X.reshape(-1, 1)),    
)

In [None]:
train_features_array = features_preprocessing_pipeline.fit_transform(train_features)
train_targets_array = targets_preprocessing_pipeline.fit_transform(train_targets)

In [None]:
test_features_array = features_preprocessing_pipeline.transform(test_features)
test_targets_array = targets_preprocessing_pipeline.transform(test_targets)

### Analytic Solution

In [None]:
learned_parameters = linear_regression(train_features_array, train_targets_array)

In [None]:
predictions_array = model_fn(train_features_array)
training_loss = loss_fn(predictions_array, train_targets_array)
print(f"Training loss: {np.sqrt(training_loss) * 100_000} USD")

In [None]:
predictions_array = model_fn(test_features_array)
test_loss = loss_fn(predictions_array, test_targets_array)
print(f"Test loss: {np.sqrt(test_loss) * 100_000} USD")

### Using Stochastic Gradient Descent

In [None]:
# initialize weights
_, n = train_features_array.shape
learned_parameters = prng.normal(loc=0, scale=1, size=(n, 1))

In [None]:
learning_rate = 0.001
batch_size = 32
epochs = 200
log_epochs = 10

for epoch in range(epochs):

    total_loss = 0.0
    for batch_ixs in utils.gen_batches(len(train_target), batch_size):
        features, target = train_features_array[batch_ixs], train_targets_array[batch_ixs]

        # forward pass
        predictions = model_fn(features)
        loss = loss_fn(target, predictions)
        total_loss += loss
        
        # backward pass
        grad = grad_fn(features, target, predictions)
        learned_parameters -= grad * learning_rate
  
    if epoch % log_epochs == 0:
        print(f'Epoch {epoch}  Loss {total_loss / len(train_targets_array):.4f}')

In [None]:
predictions_array = model_fn(train_features_array)

training_loss = loss_fn(predictions_array, train_targets_array)
print(f"Training loss: {np.sqrt(training_loss) * 100_000} USD")

In [None]:
predictions_array = model_fn(test_features_array)
test_loss = loss_fn(predictions_array, test_targets_array)
print(f"Test loss: {np.sqrt(test_loss) * 100_000} USD")