# **Denoising Autoencoder Using PyTorch Framework**

## Check NVIDIA GPU Setting

In [1]:
!nvidia-smi

/bin/bash: line 1: nvidia-smi: command not found


## Load MNIST Dataset

In [None]:
from torchvision.datasets import MNIST
from torchvision import transforms

In [None]:
train_dataset = MNIST(root="content",
                      train=True,
                      transform=transforms.Compose([transforms.Resize((32,32)), transforms.ToTensor()]),
                      download=True)
test_dataset = MNIST(root="content",
                     train=False,
                     transform=transforms.Compose([transforms.Resize((32,32)), transforms.ToTensor()]),
                     download=True)

In [None]:
import torch
from torch import nn
import torch.nn.functional as F

In [None]:
class MyModel(nn.Module) :
  def __init__(self, opt) :
    super(MyModel, self).__init__()

    input_dim, hidden_dim = opt["input_dim"], opt["hidden_dim"]

    self.encoder = nn.Sequential(nn.Conv2d(input_dim, hidden_dim, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.MaxPool2d(kernel_size=2, stride=2),
                                 nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.MaxPool2d(kernel_size=2, stride=2),
                                 nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.MaxPool2d(kernel_size=2, stride=2),
                                 nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU())
    self.decoder = nn.Sequential(nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.Upsample(scale_factor=2),
                                 nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.Upsample(scale_factor=2),
                                 nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.Upsample(scale_factor=2),
                                 nn.Conv2d(hidden_dim, input_dim, kernel_size=3, stride=1, padding=1))

    self.bottleneck0 = nn.Linear(((opt["input_shape"]//8)**2)*hidden_dim, 2)
    self.bottleneck1 = nn.Linear(2, ((opt["input_shape"]//8)**2)*hidden_dim)

  def forward(self, input) :
    noisy = torch.clamp(input+torch.randn_like(input)*(50/255), 0, 1)

    output = self.encoder(noisy)
    n, c, h, w = output.size()

    lnt_vec = self.bottleneck0(output.view(-1, c*h*w))

    output = self.decoder(self.bottleneck1(lnt_vec).view(n, c, h, w))

    return lnt_vec, noisy, output

## Train DL Model

In [None]:
from torch.utils.data import DataLoader
from torch import optim

from torchsummary import summary

from tqdm import tqdm

### Fix Seed

In [None]:
import random
import numpy as np

In [None]:
def fix_seed(seed) :
  random.seed(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)
  torch.cuda.manual_seed(seed)
  torch.cuda.manual_seed_all(seed)
  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = False

## Create Average Meter Instance

In [None]:
class AverageMeter(object):
  def __init__(self):
    self.reset()

  def reset(self):
    self.val = 0
    self.avg = 0
    self.sum = 0
    self.count = 0

  def update(self, val, n=1):
    self.val = val
    self.sum += val*n
    self.count += n
    self.avg = self.sum / self.count

## Training Code as a Function (Abstraction)

In [None]:
def train(opt, train_dataset, test_dataset, MyModel, criterion) :
  fix_seed(opt["seed"])

  train_dataloader = DataLoader(train_dataset, batch_size=opt["batch_size"], shuffle=True, drop_last=True)
  test_dataloader = DataLoader(test_dataset, batch_size=opt["batch_size"], shuffle=False, drop_last=False)

  fix_seed(opt["seed"])
  model = MyModel(opt)
  if opt["cuda"] :
    model = model.cuda()

  summary(model, (opt["input_dim"], opt["input_shape"], opt["input_shape"]))

  optimizer = optim.Adam(model.parameters(), lr=opt["lr"])

  train_loss, test_loss = AverageMeter(), AverageMeter()
  train_loss_list, test_loss_list = [], []
  best_loss = torch.inf

  for epoch in range(1, opt["epochs"]+1) :
    train_bar = tqdm(train_dataloader)
    train_loss.reset()

    for data in train_bar :
      input, _ = data
      if opt["cuda"] :
        input = input.cuda()

      optimizer.zero_grad()

      pred = model(input)

      loss = criterion(pred[-1], input)
      loss.backward()

      optimizer.step()

      train_loss.update(loss.item(), opt["batch_size"])
      train_bar.set_description(desc=f"[{epoch}/{opt['epochs']}] [Train] < Loss:{train_loss.avg:.6f} >")

    train_loss_list.append(train_loss.avg)

    test_bar = tqdm(test_dataloader)
    test_loss.reset()

    for data in test_bar :
      input, _ = data
      if opt["cuda"] :
        input = input.cuda()

      model.eval()
      with torch.no_grad() :
        pred = model(input)
        loss = criterion(pred[-1], input)

        test_loss.update(loss.item(), opt["batch_size"])

        test_bar.set_description(desc=f"[{epoch}/{opt['epochs']}] [Test] < Loss:{test_loss.avg:.6f} >")

    test_loss_list.append(test_loss.avg)

    if test_loss.avg < best_loss :
      best_loss = test_loss.avg
      torch.save(model.state_dict(), "best_model.pth")

    torch.save(model.state_dict(), "latest_model.pth")

  return (train_loss_list, test_loss_list)

## Create Training Option (Hyperparameter) Dictionary

In [None]:
opt = {"input_shape":32,
       "seed":42,
       "input_dim":1,
       "hidden_dim":64,
       "batch_size":16,
       "lr":1e-4,
       "epochs":10,
       "cuda":torch.cuda.is_available()}

## Train Model

In [None]:
loss_list = train(opt, train_dataset, test_dataset, MyModel, nn.L1Loss())

## Plot Training vs. Test Loss Graph

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.figure(figsize=(20,10))

plt.plot(np.arange(0, opt["epochs"], 1), loss_list[0], label="Training Loss")
plt.plot(np.arange(0, opt["epochs"], 1), loss_list[1], label="Test Loss")

plt.xlabel("Epoch")
plt.ylabel("L1 Loss")
plt.legend(loc="best")

plt.show()

## Extract Latent Vector

### Load Trained Model

In [None]:
weights = torch.load("/content/best_model.pth")

model = MyModel(opt)
model.load_state_dict(weights)
if opt["cuda"] :
  model = model.cuda()

### Get Model Structure

In [None]:
print(model)

### Load Test Dataset

In [None]:
test_dataloader = DataLoader(test_dataset, batch_size=opt["batch_size"], shuffle=False, drop_last=False)

### Create Dictionary Instance for Saving Result

In [None]:
class_dict = {0:[], 1:[], 2:[], 3:[], 4:[], 5:[], 6:[], 7:[], 8:[], 9:[]}

### Add Result

In [None]:
import cv2

In [None]:
%mkdir "output_samples"

In [None]:
num_sample = 0

for input, target in test_dataloader :
  if opt["cuda"] :
    input = input.cuda()
    lnt_vec, noisy, output = model(input)

  for i, label in enumerate(target) :
    class_dict[int(label)].append(lnt_vec[i].view(-1).detach().cpu().numpy())

    sample_noisy = noisy[i].squeeze(0).detach().cpu().numpy()
    sample_output = output[i].squeeze(0).detach().cpu().numpy()
    sample_target = input[i].squeeze(0).detach().cpu().numpy()

    sample_noisy = np.clip(sample_noisy*225, 0, 255)
    sample_output = np.clip(sample_output*225, 0, 255)
    sample_target = np.clip(sample_target*225, 0, 255)

    concat = np.hstack((sample_noisy, sample_output, sample_target))

    cv2.imwrite(f"output_samples/sample_{num_sample}.png", concat)
    num_sample += 1

In [None]:
class_dict

### Visualize Result

In [None]:
plt.figure(figsize=(10,10))

for i in range(10) :
  x_list, y_list = [], []
  for sub_lnt_vec in class_dict[i] :
    x_list.append(sub_lnt_vec[0])
    y_list.append(sub_lnt_vec[1])
  plt.scatter(x_list, y_list, label=f"class-{i}", s=10)

plt.legend(loc="best")
plt.show()

In [None]:
concat = []

for i in range(10) :
  sub_concat = []
  for j in range(10) :
    sub_concat.append(cv2.imread(f"/content/output_samples/sample_{i*10+j}.png"))
  concat.append(np.vstack(sub_concat))

concat = np.hstack(concat)

In [None]:
plt.figure(figsize=(20,20))
plt.imshow(concat)
plt.show()