<a href="https://colab.research.google.com/github/JK-the-Ko/Thermo-Fluid-Dynamics-Experiment/blob/main/2023-2/%EC%97%B4%EC%9C%A0%EC%B2%B4%EA%B3%B5%ED%95%99%EC%8B%A4%ED%97%981_Week13_PyTorch_Denoising_Autoencoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Denoising Autoencoder Using PyTorch Framework

## Check NVIDIA GPU Setting

In [None]:
!nvidia-smi

## Load MNIST Dataset

In [None]:
from torchvision.datasets import MNIST
from torchvision import transforms

In [None]:
trainDataset = MNIST(root="content",
                     train=True,
                     transform=transforms.Compose([transforms.Resize((32,32)), transforms.ToTensor()]),
                     download=True)
testDataset = MNIST(root="content",
                    train=False,
                    transform=transforms.Compose([transforms.Resize((32,32)), transforms.ToTensor()]),
                    download=True)

In [None]:
from torch import nn
import torch.nn.functional as F

In [None]:
class myModel(nn.Module) :
  def __init__(self, opt) :
    super(myModel, self).__init__()

    inputDim, channels = opt["inputDim"], opt["channels"]

    self.encoder = nn.Sequential(nn.Conv2d(inputDim, channels, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.MaxPool2d(kernel_size=2, stride=2),
                                 nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.MaxPool2d(kernel_size=2, stride=2),
                                 nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.MaxPool2d(kernel_size=2, stride=2),
                                 nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU())
    self.decoder = nn.Sequential(nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.Upsample(scale_factor=2),
                                 nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.Upsample(scale_factor=2),
                                 nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1),
                                 nn.ReLU(),
                                 nn.Upsample(scale_factor=2),
                                 nn.Conv2d(channels, inputDim, kernel_size=3, stride=1, padding=1))

  def forward(self, input) :
    noisy = torch.clamp(input+torch.randn_like(input)*(50/255), 0, 1)

    output = self.decoder(self.encoder(noisy))

    return noisy, output

## Train DL Model

In [None]:
import torch
from torch.utils.data import DataLoader
from torch import optim

from torchsummary import summary

from tqdm import tqdm

### Fix Seed

In [None]:
import random
import numpy as np

In [None]:
def fixSeed(seed) :
  random.seed(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)
  torch.cuda.manual_seed(seed)
  torch.cuda.manual_seed_all(seed)
  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = False

## Create Average Meter Instance

In [None]:
class AverageMeter(object):
  def __init__(self):
    self.reset()

  def reset(self):
    self.val = 0
    self.avg = 0
    self.sum = 0
    self.count = 0

  def update(self, val, n=1):
    self.val = val
    self.sum += val*n
    self.count += n
    self.avg = self.sum / self.count

## Training Code as a Function (Abstraction)

In [None]:
def train(opt, trainDataset, testDataset, myModel, criterion) :
  fixSeed(opt["seed"])

  trainDataLoader = DataLoader(trainDataset, batch_size=opt["batchSize"], shuffle=True, drop_last=True)
  testDataLoader = DataLoader(testDataset, batch_size=opt["batchSize"], shuffle=False, drop_last=False)

  fixSeed(opt["seed"])
  model = myModel(opt)
  if opt["isCUDA"] :
    model = model.cuda()

  summary(model, (opt["inputDim"], opt["inputSize"], opt["inputSize"]))

  optimizer = optim.Adam(model.parameters(), lr=opt["lr"])

  trainLoss, testLoss = AverageMeter(), AverageMeter()
  trainLossList, testLossList = [], []
  bestLoss = torch.inf

  for epoch in range(1, opt["epochs"]+1) :
    trainBar = tqdm(trainDataLoader)
    trainLoss.reset()

    for data in trainBar :
      input, target = data
      if opt["isCUDA"] :
        input = input.cuda()
      optimizer.zero_grad()
      pred = model(input)
      loss = criterion(pred[-1], input)
      loss.backward()
      optimizer.step()

      trainLoss.update(loss.item(), opt["batchSize"])
      trainBar.set_description(desc=f"[{epoch}/{opt['epochs']}] [Train] < Loss:{trainLoss.avg:.6f} >")

    trainLossList.append(trainLoss.avg)

    testBar = tqdm(testDataLoader)
    testLoss.reset()

    for data in testBar :
      input, target = data
      if opt["isCUDA"] :
        input = input.cuda()

      model.eval()
      with torch.no_grad() :
        pred = model(input)
        loss = criterion(pred[-1], input)

        testLoss.update(loss.item(), opt["batchSize"])

        testBar.set_description(desc=f"[{epoch}/{opt['epochs']}] [Test] < Loss:{testLoss.avg:.6f} >")

    testLossList.append(testLoss.avg)

    if testLoss.avg < bestLoss :
      bestLoss = testLoss.avg
      torch.save(model.state_dict(), "bestModel.pth")

    torch.save(model.state_dict(), "latestModel.pth")

  return (trainLossList, testLossList)

## Create Training Option (Hyperparameter) Dictionary

In [None]:
opt = {"inputSize":32,
       "seed":42,
       "inputDim":1,
       "channels":64,
       "batchSize":16,
       "lr":1e-4,
       "epochs":5,
       "isCUDA":torch.cuda.is_available()}

## Train Model

In [None]:
lossList = train(opt, trainDataset, testDataset, myModel, nn.L1Loss())

## Plot Training vs. Test Loss Graph

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.figure(figsize=(20,10))

plt.plot(np.arange(0, opt["epochs"], 1), lossList[0], label="Training Loss")
plt.plot(np.arange(0, opt["epochs"], 1), lossList[1], label="Test Loss")

plt.xlabel("Epoch")
plt.ylabel("L1 Loss")
plt.legend(loc="best")

plt.show()

## Extract Latent Vector

### Load Trained Model

In [None]:
weights = torch.load("/content/bestModel.pth")

model = myModel(opt)
model.load_state_dict(weights)
if opt["isCUDA"] :
  model = model.cuda()

### Get Model Structure

In [None]:
print(model)

### Load Test Dataset

In [None]:
testDataLoader = DataLoader(testDataset, batch_size=opt["batchSize"], shuffle=False, drop_last=False)

### Add Result

In [None]:
import cv2

In [None]:
%mkdir "output-samples"

In [None]:
numSample = 0

for input, target in testDataLoader :
  if opt["isCUDA"] :
    input = input.cuda()
    noisy, output = model(input)

  for i, label in enumerate(target) :
    noisySample = noisy[i].squeeze(0).detach().cpu().numpy()
    outputSample = output[i].squeeze(0).detach().cpu().numpy()
    targetSample = input[i].squeeze(0).detach().cpu().numpy()

    noisySample = np.clip(noisySample*225, 0, 255)
    outputSample = np.clip(outputSample*225, 0, 255)
    targetSample = np.clip(targetSample*225, 0, 255)

    concat = np.hstack((noisySample, outputSample, targetSample))

    cv2.imwrite(f"output-samples/sample-{numSample}.png", concat)
    numSample += 1

### Visualize Result

In [None]:
import matplotlib.pyplot as plt

In [None]:
concat = []

for i in range(10) :
  subConcat = []
  for j in range(10) :
    subConcat.append(cv2.imread(f"/content/output-samples/sample-{i*10+j}.png"))
  concat.append(np.vstack(subConcat))

concat = np.hstack(concat)

In [None]:
plt.figure(figsize=(20,20))
plt.imshow(concat)
plt.show()