# Statistical Deep Learning HW2 (Appendix)

## 王泓達 R13546017

In [None]:
import torch
from torch import nn
from torch.nn import functional as F
import torch.optim as optim
import torch.utils.data as data_utils
from torch.utils.data import Dataset, DataLoader, random_split
import tensorflow as tf
import lightning as L
from lightning.pytorch import LightningModule, LightningDataModule, Trainer
from lightning.pytorch.callbacks import EarlyStopping
from torch.utils.tensorboard import SummaryWriter
from pytorch_lightning.loggers import TensorBoardLogger
from lightning.pytorch.callbacks import Callback
import os
import math
import kagglehub
import pandas as pd
import random
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression

## Question 1
Consider a 2-feature linear regression problem in which $x_1$ always lies in the range $[8, 9]$ and $x_2$ always lies in the range $[8000, 9000]$. $\\$ Comment on why gradient descent is likely to perform more efficiently after feature normalization by inspecting the 
objective function $\\$ over a toy training set with three examples.

### Data Generating

In [None]:
numbers = 3
np.random.seed(10)
x1 = np.random.normal(8, 9, numbers)
x2 = np.random.normal(8000, 9000, numbers)

features = np.vstack((np.ones(numbers), x1, x2)).T
target_o =   2 * x1 + 3 * x2 + np.random.normal(-9000,9000,numbers)
target =   target_o.reshape((3,1))

plt.figure(figsize=(10, 6))
plt.plot(x1, label="x1")
plt.plot(x2, label="x2")
plt.plot(target, label="target")
plt.xlabel("Sample Index")
plt.ylabel("Value")
plt.title("Toy training set with three examples.")
plt.legend()
plt.grid(True)
plt.show()

### Steepest Descent

In [None]:
scaler = StandardScaler()

for i in range(3):
    features_Normalization = (features - features.mean())/features.std()
    target_Normalization = (target - target.mean())/target.std()

def gradient_descent(X, y, w=np.zeros((3, 1)), learning_rate=0.001, iterations=10000):
    weights = [w]
    counter = 0
    for i in range(iterations):
        predictions = X.dot(w)
        gradient = (1/3) * (X.transpose().dot(predictions - y))
        tmp_w = w - learning_rate * gradient
        if np.linalg.norm(weights[-1] - tmp_w) < 1e-6 or np.linalg.norm(weights[-1] - tmp_w) > 1e6:
            break
        w = tmp_w
        counter += 1
        # print(w) 
    return w, counter

w_original, history = gradient_descent(features, target)
w_Normalization, history_Normalization = gradient_descent(features_Normalization, target_Normalization)
print(f"Iterations without Normalization: {history}")
print(f"Iterations with Normalization: {history_Normalization}")

res1 = (features[0].dot(w_original))[0]-target_o[0]
res2 = (features[1].dot(w_original))[0]-target_o[1]
res3 = (features[2].dot(w_original))[0]-target_o[2]
MSE_w = (res1**2 + res2**2 + res3**2)/3

res1 = (features_Normalization[0].dot(w_Normalization))[0]-target_Normalization.reshape((3))[0]
res2 = (features_Normalization[1].dot(w_Normalization))[0]-target_Normalization.reshape((3))[1]
res3 = (features_Normalization[2].dot(w_Normalization))[0]-target_Normalization.reshape((3))[2]
MSE_n = (res1**2 + res2**2 + res3**2)/3

print(f"MSE without Normalization: {MSE_w}")
print(f"MSE with Normalization: {MSE_n}")

### Comparison of Accuracy 

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(target, label='Targets')
plt.plot(features.dot(w_original), label='With outNormalization')
plt.xlabel("Sample Index")
plt.ylabel("Value")
plt.title("Toy training set with three examples.")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
plt.figure(figsize=(10, 6))
plt.plot(target_Normalization, label='Targets')
plt.plot(features_Normalization.dot(w_StandardScaler), label='With StandardScaler')
plt.xlabel("Sample Index")
plt.ylabel("Value")
plt.title("Toy training set with three examples.")
plt.legend()
plt.grid(True)
plt.show()


## 2. 

### Reading the file

In [None]:
# Download latest version
path = kagglehub.dataset_download("satvshr/top-4-used-car-sales-datasets-combined")
# read the data as a Pandas dataframe
df = pd.read_csv(path+"/output.csv")
df

### 2.a. 'feature': [Numbers, Rate]
Summarize the missing rates for each variable. That is, for each predictor $x$, what is the proportion of samples with missing values?

In [None]:
variable_is_na = {}

for i in df.columns:
    variable_is_na[i] = [0,0]
    for j in df[i]:
        if(pd.isna(j)):
            variable_is_na[i][0] += 1
    variable_is_na[i][1] += variable_is_na[i][0] / len(df[i])

variable_is_na

### 2.b. Drop col-na(%) > 0.2 and row-na
Remove variables with missing rate greater than $0.2$ and samples with missing variables. How many samples are left?

In [None]:
columns_to_drop = []

for i in variable_is_na.keys():
    if variable_is_na[i][1] > 0.2:
        columns_to_drop.append(i)

print(f"columns_to_drop: {columns_to_drop}")

df.drop(columns=columns_to_drop, inplace=True)
df = df.dropna(axis=0)
df

##### Result: We have 29352 samples left

### 2.d. Data Transformation
Transform the categorical variables into one-hot encoding. $\\$
Standardize the numerical variables. Use the training dataset to fit the standardization model. $\\$
Apply the same model to the validation and testing datasets.$\\$ 
Summarize the transformed datasets.


#### Drop the element that rarely apeears

In [None]:
brand = []
for i in df["brand"]:
    if i not in brand:
        brand.append(i)
model = []
for i in df["model"]:
    if i not in model:
        model.append(i)
transmission = []
for i in df["transmission"]:
    if i not in transmission:
        transmission.append(i)
fuel = []
for i in df["fuel"]:
    if i not in fuel:
        fuel.append(i)

#### Selected main features

In [None]:
brand_times = {}
for i in brand:
    brand_times[i] = 0
for i in df["brand"]:
    brand_times[i] += 1 / 29352 

model_times = {}
for i in model:
    model_times[i] = 0
for i in df["model"]:
    model_times[i] += 1 / 29352 

transmission_times = {}
for i in transmission:
    transmission_times[i] = 0
for i in df["transmission"]:
    transmission_times[i] += 1 / 29352 

fuel_times = {}
for i in fuel:
    fuel_times[i] = 0
for i in df["fuel"]:
    fuel_times[i] += 1 / 29352 

brand_final = []
for i in brand_times:
    if brand_times[i] > 0.01:
        brand_final.append(i)
print(brand_final)
model_final = []
for i in model_times:
    if model_times[i] > 0.01:
        model_final.append(i)
print(model_final)
transmission_final = []
for i in transmission_times:
    if transmission_times[i] > 0.01:
        transmission_final.append(i)
print(transmission_final)
fuel_final = []
for i in fuel_times:
    if fuel_times[i] > 0.01:
        fuel_final.append(i)
print(fuel_final)

#### New interested dataframe

In [None]:
df_e = df.copy()

for i in ["brand", "model", "fuel", "transmission"]:
    for j in df_e.index:
        if df_e[i][j] not in (brand_final + fuel_final + model_final + transmission_final):
            df_e[i][j] = np.nan
        
df_e = df_e.dropna(axis=0)
df_e

In [None]:
data_number = len(df_e)
features_age = np.zeros((19502,1))
features_km = np.zeros((19502,1))
features_seats = np.zeros((19502,1))
target_e = np.zeros((data_number, 1))
counter_e = 0

for i in df_e.index:
    features_age[counter_e,0] = float(df_e["age"][i])
    features_km[counter_e,0] = float(df_e["km"][i])
    features_seats[counter_e,0] = float(df_e["seats"][i])
    target_e[counter_e,0] = float(df_e["price"][i])
    counter_e += 1

df_e.drop(columns="age", inplace=True)
df_e.drop(columns="km", inplace=True)
df_e.drop(columns="seats", inplace=True)
df_e.drop(columns="price", inplace=True)

#### Standardize the numerical variables

In [None]:
scaler = StandardScaler()
features_age_standardized = scaler.fit_transform(features_age)
features_km_standardized = scaler.fit_transform(features_km)
features_seats_standardized = scaler.fit_transform(features_seats)
target_e_standardized = scaler.fit_transform(target_e)

features_num = np.hstack((features_age_standardized,features_km_standardized,target_e_standardized))
features_num

#### Transform the categorical variables into one-hot encoding

In [None]:
features_cate = pd.get_dummies(df_e)
features_cat = features_cate.to_numpy()
features_cat = np.array(features_cat, dtype=np.float32)

features_cate

In [None]:
features_e = np.hstack((features_num, features_cat))
data_columns = len(features_e[0])
features_e

### 2.c. Split the entire dataset into training, validation, and testing datasets with a 70-20-10 ratio

In [None]:
feature_train_e = features_e[0:math.ceil(data_number*0.7)]
feature_val_e = features_e[math.ceil(data_number*0.7):math.ceil(data_number*0.9)]
feature_test_e = features_e[math.ceil(data_number*0.9):data_number]

target_train_e = target_e_standardized[0:math.ceil(data_number*0.7)]
target_val_e = target_e_standardized[math.ceil(data_number*0.7):math.ceil(data_number*0.9)]
target_test_e = target_e_standardized[math.ceil(data_number*0.9):data_number]

print(f"Numbers of feature_train : {len(feature_train_e)}")
print(f"Numbers of feature_validation : {len(feature_val_e)}")
print(f"Numbers of feature_test : {len(feature_test_e)}")

### 2.e. Build the Model with Less than 100 Trainable Parameters.
Design a neural network model to predict the price of used cars. The model should have less than 100 trainable parameters. $\\$ 
Summarize the model architecture.


In [None]:
class datasetSetUp(Dataset):
    def __init__(self, features, targets):
        self.features = features
        self.targets = targets

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        x = self.features[idx]
        y = self.targets[idx]
        return x, y

class dataModule(LightningDataModule):
    def __init__(self, feature_train_e, target_train_e, feature_val_e, target_val_e, batch_size=10):
        super().__init__()
        self.feature_train_e = torch.tensor(feature_train_e, dtype=torch.float32)
        self.target_train_e = torch.tensor(target_train_e, dtype=torch.float32)
        self.feature_val_e = torch.tensor(feature_val_e, dtype=torch.float32)
        self.target_val_e = torch.tensor(target_val_e, dtype=torch.float32)
        self.feature_test_e = torch.tensor(feature_test_e, dtype=torch.float32)
        self.target_test_e = torch.tensor(target_test_e, dtype=torch.float32)
        self.batch_size = batch_size

    def setup(self, stage=None):
        self.train_dataset_e = datasetSetUp(self.feature_train_e, self.target_train_e)
        self.val_dataset_e = datasetSetUp(self.feature_val_e, self.target_val_e)
        self.test_dataset_e = datasetSetUp(self.feature_test_e, self.target_test_e)

    def train_dataloader(self):
        return DataLoader(self.train_dataset_e, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset_e, batch_size=self.batch_size, shuffle=False)
    
    def test_dataloader(self):
        return DataLoader(self.test_dataset_e, batch_size=self.batch_size, shuffle=False)

class neuralNetwork(LightningModule):
    def __init__(self):
        super().__init__()
        self.layer_1 = torch.nn.Linear(46, 2)
        self.layer_2 = torch.nn.Linear(2, 1)
        self.train_losses = []
        self.val_losses = []  

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = self.layer_1(x)
        x = torch.relu(x)
        x = self.layer_2(x)
        return x

    def MSE(self, logits, labels):
        return F.mse_loss(logits, labels)

    def training_step(self, train_batch, batch_idx):
        x, y = train_batch
        logits = self.forward(x)
        loss = self.MSE(logits, y)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, val_batch, batch_idx):
        x, y = val_batch
        logits = self.forward(x)
        loss = self.MSE(logits, y)
        self.log('val_loss', loss)

    def on_train_epoch_end(self):
        avg_train_loss = self.trainer.callback_metrics['train_loss'].item()
        self.train_losses.append(avg_train_loss)

    def on_validation_epoch_end(self):
        avg_val_loss = self.trainer.callback_metrics['val_loss'].item()
        self.val_losses.append(avg_val_loss)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)

data_module = dataModule(feature_train_e, target_train_e, feature_val_e, target_val_e, batch_size=10)
model = neuralNetwork()

trainer = Trainer(callbacks=[EarlyStopping(monitor="val_loss", mode="min")])
trainer.fit(model, data_module)
torch.save(model.state_dict(), 'model_weights.pth')

### Plot the model in 2.e. (epochs $< 100$) 
Train the model for at most 100 epochs. $\\$
Plot the training and validation loss curves (𝑥-axis: epoch, 𝑦-axis: training/validation loss). $\\$ 
Is there any sign of overfitting/underfitting

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(model.train_losses, label='Train Loss')
plt.plot(model.val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.show()

#### Comments: There is not any sign of overfitting or underfitting.

### 2.g. Change early stopping strategy of model in 2.e. 
Use the early stopping strategy: stop training if the validation loss increases by $1%$ in less than $10$ epochs. 

In [None]:
class datasetSetUp(Dataset):
    def __init__(self, features, targets):
        self.features = features
        self.targets = targets

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        x = self.features[idx]
        y = self.targets[idx]
        return x, y

class dataModule(LightningDataModule):
    def __init__(self, feature_train_e, target_train_e, feature_val_e, target_val_e, batch_size=10):
        super().__init__()
        self.feature_train_e = torch.tensor(feature_train_e, dtype=torch.float32)
        self.target_train_e = torch.tensor(target_train_e, dtype=torch.float32)
        self.feature_val_e = torch.tensor(feature_val_e, dtype=torch.float32)
        self.target_val_e = torch.tensor(target_val_e, dtype=torch.float32)
        self.feature_test_e = torch.tensor(feature_test_e, dtype=torch.float32)
        self.target_test_e = torch.tensor(target_test_e, dtype=torch.float32)
        self.batch_size = batch_size

    def setup(self, stage=None):
        self.train_dataset_e = datasetSetUp(self.feature_train_e, self.target_train_e)
        self.val_dataset_e = datasetSetUp(self.feature_val_e, self.target_val_e)
        self.test_dataset_e = datasetSetUp(self.feature_test_e, self.target_test_e)

    def train_dataloader(self):
        return DataLoader(self.train_dataset_e, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset_e, batch_size=self.batch_size, shuffle=False)
    
    def test_dataloader(self):
        return DataLoader(self.test_dataset_e, batch_size=self.batch_size, shuffle=False)

class neuralNetwork(LightningModule):
    def __init__(self):
        super().__init__()
        self.layer_1 = torch.nn.Linear(46, 2)
        self.layer_2 = torch.nn.Linear(2, 1)
        self.train_losses = []
        self.val_losses = []  

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = self.layer_1(x)
        x = torch.relu(x)
        x = self.layer_2(x)
        return x

    def MSE(self, logits, labels):
        return F.mse_loss(logits, labels)

    def training_step(self, train_batch, batch_idx):
        x, y = train_batch
        logits = self.forward(x)
        loss = self.MSE(logits, y)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, val_batch, batch_idx):
        x, y = val_batch
        logits = self.forward(x)
        loss = self.MSE(logits, y)
        self.log('val_loss', loss)

    def on_train_epoch_end(self):
        avg_train_loss = self.trainer.callback_metrics['train_loss'].item()
        self.train_losses.append(avg_train_loss)

    def on_validation_epoch_end(self):
        avg_val_loss = self.trainer.callback_metrics['val_loss'].item()
        self.val_losses.append(avg_val_loss)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)

class earlyStoppingStrtegy(Callback):
    def __init__(self, max_val_epoch=10, rate=0.01):
        super().__init__()
        self.max_val_epoch = max_val_epoch
        self.rate = rate
        self.loss_up = -999
        self.loss_up_up = -999
        self.loss_now = 999
        self.loss_past = 999
        self.counter = 0

    def on_validation_epoch_end(self, trainer, pl_module):
        current_loss = trainer.callback_metrics["val_loss"].item()
        self.loss_past = self.loss_now 
        self.loss_now = current_loss
        if self.loss_now > self.loss_past and self.counter == 0:
            self.loss_up = self.loss_now
            self.counter+1
        elif self.loss_now > self.loss_past and self.counter != 0:
            self.loss_up_up = self.loss_now
            self.counter+1
            if self.loss_up_up > self.loss_up*self.rate and self.counter <= self.max_val_epoch:
                trainer.should_stop = True
        else:
            self.loss_up = -999
            self.loss_up_up = -999
            self.loss_now = 999
            self.loss_past = 999

data_module2 = dataModule(feature_train_e, target_train_e, feature_val_e, target_val_e, batch_size=10)
model2 = neuralNetwork()
early_stopping = earlyStoppingStrtegy(max_val_epoch=10, rate=0.01)
trainer2 = Trainer(callbacks=[early_stopping], max_epochs=100)

trainer2.fit(model2, data_module2)
torch.save(model2.state_dict(), 'model2_weights.pth')

#### Plot the loss

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(model2.train_losses, label='Train Loss')
plt.plot(model2.val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.show()

### 2.h.
Evaluate the models from (f) and (g) on the testing dataset. Report the mean squared error. $\\$
Run a linear regression model on the same dataset and report the mean squared error.$\\$ 
Comment on the results.


#### Compare MSE between (f) and (g)

In [None]:
model.load_state_dict(torch.load('model_weights.pth'))
model2.load_state_dict(torch.load('model2_weights.pth'))
model.eval()
model2.eval()

test_loader = data_module.test_dataloader()
features_for_testing = torch.tensor(feature_test_e, dtype = torch.float32)
target_for_testing = torch.tensor(target_test_e, dtype = torch.float32)

with torch.no_grad():  
    prediction = model(features_for_testing)  
    prediction2 = model2(features_for_testing)   

model_residual = (prediction-target_for_testing).numpy()
model_residual2 = (prediction2-target_for_testing).numpy()
residuals_model1 = []
residuals_model2 = []
MSE1_total = 0
MSE2_total = 0

for i in model_residual:
    residuals_model1.append(i[0])

for i in model_residual2:
    residuals_model2.append(i[0])

for i in residuals_model1:
    MSE1_total += i**2

for i in residuals_model2:
    MSE2_total += i**2

MSE1 = MSE1_total/len(model_residual)
MSE2 = MSE2_total/len(model_residual2)

print("MSE of model in (f): ", MSE1)
print("MSE of model in (g)", MSE2)

#### Linear regression model

In [None]:
predict_number = len(feature_test_e)
var_number = len(feature_test_e[0])

reg = LinearRegression().fit(feature_train_e, target_train_e)
predict_lr = np.zeros((1,predict_number))

for i in range(predict_number):
    predict_tmp = 0
    for j in range(var_number):
        predict_tmp += reg.coef_[0][j]*feature_test_e[i][j]
    predict_tmp += reg.intercept_
    predict_lr[0,i] = predict_tmp

#### MSE of linear regression model

In [None]:
MSE_3 = 0.0
for i in range(predict_number):
    MSE_3 = (predict_lr[0,i]-target_test_e[i])**2

MSE_3 = MSE_3 / predict_number

print("MSE of linear regression model: ", MSE_3[0])

#### MSE comparison of all methods

In [None]:
print("MSE of model in (f): ", MSE1)
print("MSE of model in (g)", MSE2)
print("MSE of linear regression model: ", MSE_3[0])

### 2.i. Draw the residual plots for the three models

$x$-axis: observed price; $y$: residual = predicted - observed $\\$
neural network from (f), neural network from (g), and linear regression

In [None]:
observation = target_test_e.reshape((1,1950))
residuals_model3 = predict_lr[0] - observation
fig, axes = plt.subplots(1, 3, figsize=(15, 5), sharey=True)

axes[0].scatter(observation, residuals_model1, color='firebrick', alpha=0.5)
axes[0].set_title('Residual Plot: Model 1')
axes[0].set_xlabel('Predicted Values')
axes[0].set_ylabel('Residuals')

axes[1].scatter(observation, residuals_model2, color='dodgerblue', alpha=0.5)
axes[1].set_title('Residual Plot: Model 2')
axes[1].set_xlabel('Predicted Values')

axes[2].scatter(observation, residuals_model3, color='mediumaquamarine', alpha=0.5)
axes[2].set_title('Residual Plot: Model 3')
axes[2].set_xlabel('Predicted Values')

plt.tight_layout()
plt.show()


## Tensorboard

In [None]:
# Start tensorboard.
%load_ext tensorboard
%tensorboard --logdir=lightning_logs/