<a href="https://colab.research.google.com/github/howru0321/AIGS538_Deep_Learning/blob/main/DL_project_CA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch.utils.data as data_utils
import os

from tqdm import tqdm
from torchvision import datasets
from torch.utils.data import DataLoader
from torch import optim
from datetime import datetime
from PIL import Image

current_time = datetime.now().strftime('%Y%m%d_%H%M%S')
base_dir = f'CA/{current_time}'

num_epochs=1000
num_save=100

In [None]:
class Preprocess():
  def __init__(self, train_data, test_data):
    self.train_data = train_data
    self.test_data = test_data

    self.features = train_data.columns
    self.in_features = [col for col in train_data.columns if train_data[col].dtype == float and col not in ('median_house_value')]
    self.out_features = ['median_house_value']

    mean, std = self.get_features_mean_std(train_data)
    self.mean = mean
    self.std = std

  def get_features_mean_std(self, data):
    return data[self.features].mean(), data[self.features].std()

  def get_train_input_output(self):
    normalized = (self.train_data[self.features]-self.mean)/self.std
    input = torch.Tensor(normalized[self.in_features].values)
    output = torch.Tensor(normalized[self.out_features].values)
    return [input, output]

  def get_test_input_output(self):
    normalized = (self.test_data[self.features]-self.mean)/self.std
    input = torch.Tensor(normalized[self.in_features].values)
    output = torch.Tensor(normalized[self.out_features].values)
    return [input, output]

# get data
train_data = pd.read_csv('./sample_data/california_housing_train.csv')
test_data = pd.read_csv('./sample_data/california_housing_test.csv')

preprocessor = Preprocess(train_data=train_data, test_data=test_data)
train_dataset = preprocessor.get_train_input_output()
test_dataset = preprocessor.get_train_input_output()

train_dataset = data_utils.TensorDataset(train_dataset[0], train_dataset[1])
test_dataset = data_utils.TensorDataset(test_dataset[0], test_dataset[1])

# get dataloader
batch_size = 256
train_loader = DataLoader(dataset=train_dataset,
                         batch_size=batch_size,
                         shuffle=True)
test_loader = DataLoader(dataset=test_dataset,
                         batch_size=batch_size,
                         shuffle=True)

In [None]:
class MLP(nn.Module):
  def __init__(self, regularizer_type: str, train_loader, test_loader, _lambda=1e-4):
    super().__init__()

    if regularizer_type == 'l1':
      self._lambda = _lambda
      self.norm = 1

    if regularizer_type == 'l2':
      self._lambda = _lambda
      self.norm = 2

    if regularizer_type == 'l3':
      self._lambda = _lambda
      self.norm = 3

    self.train_loader = train_loader
    self.test_loader = test_loader

    self.mlp = nn.Sequential(
      nn.Linear(8, 128),
      nn.ReLU(),
      nn.Linear(128, 128),
      nn.ReLU(),
      nn.Linear(128, 1)
    )

    self.loss = nn.MSELoss()
    self.optimizer = optim.Adam(self.mlp.parameters(), lr=0.01)

  def forward(self, x):
    output = self.mlp(x)
    return output

  def get_weight(self):
    return self.mlp[0].weight

  def get_loss(self, output, label):
    first_layer_weight = self.get_weight()
    loss = self.loss(output, label) + self._lambda * torch.norm(first_layer_weight, self.norm)
    return loss

  def train(self, epochs, save):
    self.test_error_list=[]
    self.train_error_list=[]

    self.mlp.train()
    for epoch in tqdm(range(epochs)):
      train_error = 0
      for x, gt in self.train_loader:
        self.optimizer.zero_grad()
        outputs = self(x)

        # train error
        train_error += torch.abs(gt-outputs).sum()

        loss = self.get_loss(outputs, gt)
        loss.backward()
        self.optimizer.step()

      if (epoch+1)%save == 0 or epoch == 0:
        self.show_weight_map(epoch, base_dir)

      error = float(train_error/len(self.train_loader))
      self.train_error_list.append(error)
      if (epoch+1)%50 == 0 or epoch == 0:
        print(f'\nTrain Error : {error}')
      self.test(epoch)

    # # save error
    # with open(f'./L{self.norm}_test_error.txt', 'w') as f1:
    #   f1.write('\n'.join(self.test_error_list))
    # with open(f'./L{self.norm}_train_error.txt', 'w') as f2:
    #   f2.write('\n'.join(self.train_error_list))

  def test(self, epoch):
      error=0
      self.mlp.eval()
      with torch.no_grad():
        for x, gt in self.test_loader:
          outputs = self(x)
          error += torch.abs(gt-outputs).sum()

      error = float(error/len(self.test_loader))
      self.test_error_list.append(error)
      if (epoch+1)%50 == 0 or epoch == 0:
        print(f'Test Error : {error}')
      self.mlp.train()

  def show_weight_map(self, epoch, base_dir):
    base_dir = os.path.join(base_dir, f'L{self.norm}')
    os.makedirs(base_dir, exist_ok=True)
    w = self.get_weight()
    w = torch.mean(w, dim=0)

    # print(f'\n MAX : {w.max()}, MIN : {w.min()}')
    norm_w = torch.abs(w/torch.abs(w).max())
    image = np.repeat(norm_w.view(1,8,1).detach().numpy(),3,-1)

    # upscale image
    scale = 50
    new_image = np.zeros((image.shape[0]*scale, image.shape[1]*scale, image.shape[2]))

    for i in range(image.shape[0]):
      for j in range(image.shape[1]):
        new_image[i*scale:(i+1)*scale, j*scale:(j+1)*scale] = image[i,j]

    plt.axis('off')
    save_path = os.path.join(base_dir, f'weight_{epoch}.png')
    plt.imsave(save_path, new_image, cmap='gray')
    plt.imshow(image, cmap='gray')

In [None]:
# define model
model_l1 = MLP(regularizer_type='l1', train_loader=train_loader, test_loader=test_loader)
# train
model_l1.train(epochs=num_epochs, save=num_save)

l1_tr_err = model_l1.train_error_list
l1_te_err = model_l1.test_error_list

In [None]:
# define model
model_l2 = MLP(regularizer_type='l2', train_loader=train_loader, test_loader=test_loader)
# train
model_l2.train(epochs=num_epochs, save=num_save)

l2_tr_err = model_l2.train_error_list
l2_te_err = model_l2.test_error_list

In [None]:
# define model
model_l3 = MLP(regularizer_type='l3', train_loader=train_loader, test_loader=test_loader)
# train
model_l3.train(epochs=num_epochs, save=num_save)

l3_tr_err = model_l3.train_error_list
l3_te_err = model_l3.test_error_list

In [None]:
# plot accuracy progress
plt.title("Train Error")
plt.plot(range(1,num_epochs+1),l1_tr_err,label="l1")
plt.plot(range(1,num_epochs+1),l2_tr_err,label="l2")
plt.plot(range(1,num_epochs+1),l3_tr_err,label="l3")
plt.ylabel("Error")
plt.xlabel("Training Epochs")
plt.grid(True)
plt.legend()
plt.show()

In [None]:
# plot accuracy progress
plt.title("Test Error")
plt.plot(range(1,num_epochs+1),l1_te_err,label="l1")
plt.plot(range(1,num_epochs+1),l2_te_err,label="l2")
plt.plot(range(1,num_epochs+1),l3_te_err,label="l3")
plt.ylabel("Error")
plt.xlabel("Test Epochs")
plt.legend()
plt.show()

In [None]:
def plot_images_from_subdirs(base_dir):
    for subdir, _, files in sorted(os.walk(base_dir)):
        image_files = sorted([f for f in files if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))])
        image_files.sort(key=lambda x: int(x.split('_')[1].split('.')[0]))
        if image_files:
            images = []
            for image_file in image_files:
                img_path = os.path.join(subdir, image_file)
                img = Image.open(img_path)
                images.append(img)

            num_images = len(images)
            fig, axes = plt.subplots(1, num_images, figsize=(num_images * 3, 3))
            plt.suptitle(os.path.basename(subdir), fontsize=16)
            if num_images == 1:
                axes = [axes]

            for ax, img, file_name in zip(axes, images, image_files):
                ax.imshow(img)
                ax.set_title(file_name)
                ax.axis('off')

            plt.show()

plot_images_from_subdirs(base_dir)