<h2>Assignment 2 - Using a Neural Network to fit the California Housing data</h2>

# Data Preparation

In [None]:
# California Housing dataset
import urllib.request
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer

# load data from csv file
urllib.request.urlretrieve("https://raw.githubusercontent.com/ageron/handson-ml2/master/datasets/housing/housing.csv", "housing.csv")
housing = pd.read_csv('housing.csv')

# Using the setting inplace=False, drop() creates a copy of the data and does not affect housing dataset
housing_data = housing.drop("median_house_value", axis=1, inplace=False)
housing_target = housing["median_house_value"].copy()
feature_names = list(housing_data.columns)

#  Transformation pipeline at https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html
num_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy="median")),
    ('std_scaler', StandardScaler()),
])

full_pipeline = ColumnTransformer([
    ('num', num_pipeline, feature_names[:-1]),
    ('cat', OneHotEncoder(), [feature_names[-1]]),
])

housing_preprocessed = full_pipeline.fit_transform(housing_data)

print(housing_preprocessed.shape)

(20640, 13)


In [None]:
X = housing_preprocessed
y = housing_target.to_numpy()

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# (for comparsion) Using scikit-learn's Linear Regression model to fit the data

In [None]:
from sklearn.linear_model import LinearRegression

# documentation at https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LinearRegression.html
lr = LinearRegression()

lr.fit(X_train, y_train)
print("model training error : %.3f" % lr.score(X_train, y_train))
print("model testing error: %.3f" % lr.score(X_test, y_test))

model training error : 0.647
model testing error: 0.640


In [None]:
# (Task 1 of Assignment 2): Using PyTorch nn.Sequential() to build a neural network to fit the data

In [None]:
import torch
from torch import nn
from torch import optim

# Number of input features
input_dim = X_train.shape[1]

# Convert the numpy arrays to PyTorch tensors
X_train_torch = torch.tensor(X_train, dtype=torch.float32)
y_train_torch = torch.tensor(y_train, dtype=torch.float32).squeeze()
X_test_torch = torch.tensor(X_test, dtype=torch.float32)
y_test_torch = torch.tensor(y_test, dtype=torch.float32).squeeze()

# Define the model with deeper architecture and more neurons
model = nn.Sequential(
    nn.Linear(input_dim, 256),  # First hidden layer with 256 neurons
    nn.ReLU(),
    nn.Linear(256, 128),        # Second hidden layer with 128 neurons
    nn.ReLU(),
    nn.Linear(128, 64),         # Third hidden layer with 64 neurons
    nn.ReLU(),
    nn.Linear(64, 32),          # Fourth hidden layer with 32 neurons
    nn.ReLU(),
    nn.Linear(32, 1)            # Output layer
)

# Xavier initialization for weights
for layer in model.children():
    if isinstance(layer, nn.Linear):
        torch.nn.init.xavier_uniform_(layer.weight)

# Define the loss function
loss_fn = nn.MSELoss()

# Define the optimizer (Adam with a lower learning rate)
optimizer = optim.Adam(model.parameters(), lr=0.0005)

# Training loop with increased epochs
num_epochs = 500
for epoch in range(num_epochs):
    # Forward pass
    y_pred = model(X_train_torch).squeeze()
    # Compute loss
    loss = loss_fn(y_pred, y_train_torch)

    # Zero gradients
    optimizer.zero_grad()
    # Backward pass
    loss.backward()
    # Clip gradients to avoid exploding gradients
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    # Update weights
    optimizer.step()

    # Print the loss during training
    if epoch % 50 == 0:
        print(f"Epoch {epoch}/{num_epochs}, Loss: {loss.item()}")

# Evaluation
model.eval()
with torch.no_grad():
    y_pred_train = model(X_train_torch).squeeze()
    train_error = loss_fn(y_pred_train, y_train_torch)

    y_pred_test = model(X_test_torch).squeeze()
    test_error = loss_fn(y_pred_test, y_test_torch)

print(f'\nTraining error: {train_error.item()}')
print(f'Testing error: {test_error.item()}')


Epoch 0/500, Loss: 56189616128.0
Epoch 50/500, Loss: 56182284288.0
Epoch 100/500, Loss: 56134045696.0
Epoch 150/500, Loss: 55975866368.0
Epoch 200/500, Loss: 55602954240.0
Epoch 250/500, Loss: 54877401088.0
Epoch 300/500, Loss: 53624619008.0
Epoch 350/500, Loss: 51660603392.0
Epoch 400/500, Loss: 48791314432.0
Epoch 450/500, Loss: 44786122752.0

Training error: 39540543488.0
Testing error: 39334789120.0


# (Task 2 of Assignment 2): Subclassing nn.Module to build a neural network (the same network structure as Task 1) to fit the data

In [None]:
# Define the custom neural network by subclassing nn.Module
class CustomModel(nn.Module):
    def __init__(self, input_dim):
        super(CustomModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 256)  # First hidden layer with 256 neurons
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(256, 128)        # Second hidden layer with 128 neurons
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(128, 64)         # Third hidden layer with 64 neurons
        self.relu3 = nn.ReLU()
        self.fc4 = nn.Linear(64, 32)          # Fourth hidden layer with 32 neurons
        self.relu4 = nn.ReLU()
        self.output_layer = nn.Linear(32, 1)  # Output layer

        # Xavier initialization for weights
        for layer in [self.fc1, self.fc2, self.fc3, self.fc4, self.output_layer]:
            torch.nn.init.xavier_uniform_(layer.weight)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.fc3(x)
        x = self.relu3(x)
        x = self.fc4(x)
        x = self.relu4(x)
        x = self.output_layer(x)
        return x

# Number of input features
input_dim = X_train.shape[1]

# Convert the numpy arrays to PyTorch tensors
X_train_torch = torch.tensor(X_train, dtype=torch.float32)
y_train_torch = torch.tensor(y_train, dtype=torch.float32).squeeze()
X_test_torch = torch.tensor(X_test, dtype=torch.float32)
y_test_torch = torch.tensor(y_test, dtype=torch.float32).squeeze()

# Instantiate the custom model
model = CustomModel(input_dim)

# Define the loss function
loss_fn = nn.MSELoss()

# Define the optimizer (Adam with a lower learning rate)
optimizer = optim.Adam(model.parameters(), lr=0.0005)

# Training loop with increased epochs
num_epochs = 500
for epoch in range(num_epochs):
    # Forward pass
    y_pred = model(X_train_torch).squeeze()
    # Compute loss
    loss = loss_fn(y_pred, y_train_torch)

    # Zero gradients
    optimizer.zero_grad()
    # Backward pass
    loss.backward()
    # Clip gradients to avoid exploding gradients
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    # Update weights
    optimizer.step()

    # Print the loss during training
    if epoch % 50 == 0:
        print(f"Epoch {epoch}/{num_epochs}, Loss: {loss.item()}")

# Evaluation
model.eval()
with torch.no_grad():
    y_pred_train = model(X_train_torch).squeeze()
    train_error = loss_fn(y_pred_train, y_train_torch)

    y_pred_test = model(X_test_torch).squeeze()
    test_error = loss_fn(y_pred_test, y_test_torch)

print(f'\nTraining error: {train_error.item()}')
print(f'Testing error: {test_error.item()}')


Epoch 0/500, Loss: 56189612032.0
Epoch 50/500, Loss: 56181542912.0
Epoch 100/500, Loss: 56123289600.0
Epoch 150/500, Loss: 55922630656.0
Epoch 200/500, Loss: 55436750848.0
Epoch 250/500, Loss: 54467080192.0
Epoch 300/500, Loss: 52804628480.0
Epoch 350/500, Loss: 50219483136.0
Epoch 400/500, Loss: 46475104256.0
Epoch 450/500, Loss: 41399181312.0

Training error: 34970099712.0
Testing error: 34811768832.0
