# Get Dataset from Google Drive  
Please upload your dataset on google drive first.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import zipfile
import tqdm

file_name = "Multimedia_dataset.zip"
zip_path = os.path.join('/content/drive/MyDrive/Multimedia/Multimedia_dataset/Multimedia_dataset.zip')

!cp "{zip_path}" .
!unzip -q "{file_name}"
!rm "{file_name}"

# Noise Transform  
If you want to change how much noise you are giving, change the stddev and mean values at 'gaussian_noise' function.

In [3]:
import torch
from torch.autograd import Variable
from torchvision import transforms

import random

class NoiseTransform(object):
  def __init__(self, size=180, mode="training"):
    super(NoiseTransform, self).__init__()
    self.size = size
    self.mode = mode
  
  def gaussian_noise(self, img):
    mean = 0
    stddev = 25
    noise = Variable(torch.zeros(img.size()))
    noise = noise.data.normal_(mean, stddev/255.)

    return noise

  def __call__(self, img):
    if (self.mode == "training") | (self.mode == "validation"):
      self.gt_transform = transforms.Compose([
        transforms.Resize((self.size, self.size), interpolation=2),
        transforms.ToTensor()])
      self.noise_transform = transforms.Compose([
        transforms.Resize((self.size, self.size), interpolation=2),
        transforms.ToTensor(),
        transforms.Lambda(self.gaussian_noise),
      ])
      return self.gt_transform(img), self.noise_transform(img)

    elif self.mode == "testing":
      self.gt_transform = transforms.Compose([
        transforms.ToTensor()])
      return self.gt_transform(img)
    else:
      return NotImplementedError


# Dataloader for Noise Dataset

In [4]:
import torch
import torch.utils.data  as data
import os
from PIL import Image
import numpy as np
import torch.nn as nn

import matplotlib.pyplot as plt

class NoiseDataset(data.Dataset):
  def __init__(self, root_path, size):
    super(NoiseDataset, self).__init__()

    self.root_path = root_path
    self.size = size
    self.transforms = None
    self.examples = None

  def set_mode(self, mode):
    self.mode = mode
    self.transforms = NoiseTransform(self.size, mode)
    if mode == "training":
      train_dir = os.path.join(self.root_path, "train")
      self.examples = [os.path.join(self.root_path, "train", dirs) for dirs in os.listdir(train_dir)]
    elif mode == "validation":
      val_dir = os.path.join(self.root_path, "validation")
      self.examples = [os.path.join(self.root_path, "validation", dirs) for dirs in os.listdir(val_dir)]
    elif mode == "testing":
      test_dir = os.path.join(self.root_path, "test")
      self.examples = [os.path.join(self.root_path, "test", dirs) for dirs in os.listdir(test_dir)]
    else:
      raise NotImplementedError
  
  def __len__(self):
    return len(self.examples)

  def __getitem__(self, idx):
    file_name = self.examples[idx]
    image = Image.open(file_name)


    if self.mode == "testing":
      input_img = self.transforms(image)
      sample = {"img": input_img,
                "file_name": "image_%06d.png" % int(os.path.basename(file_name).split('.')[0])}
    else:
      clean, noise = self.transforms(image)
      sample = {"img": clean, "noise": noise}

    return sample

# Example for Loading

In [5]:
import torch
import torch.utils.data  as data
import os
import matplotlib.pyplot as plt
from torchvision import transforms
import tqdm
from PIL import Image

def image_show(img):
  if isinstance(img, torch.Tensor):
    # PIL image로 바꿔준다.
    img = transforms.ToPILImage()(img)
  plt.imshow(img)
  plt.show()



# Change to your data root directory
root_path = "/content/"
# Depend on runtime setting
use_cuda = True

train_dataset = NoiseDataset(root_path, 128) #128은 size
train_dataset.set_mode("training")


# batch=4 단위로 data를 load
train_dataloader = data.DataLoader(train_dataset, batch_size=4, shuffle=True)
"""
# tqdm은 진행표시바
for i, data in enumerate(tqdm.tqdm(train_dataloader)):
  #CUDA는 NVIDIA에서 개발한 GPU 개발 툴로 많은 양의 연산을 동시에 처리하는 것이 목표
  if use_cuda:
    img = data["img"].to('cuda')
    noise = data["noise"].to('cuda')
  
  model_input = img + noise

  # clamp는 최대 최소 값을 정해주는 함수
  # (최소~최대 범위에 포함되지 않는게 있으면 그 값을 최소 최대값으로 변경)
  noise_image = torch.clamp(model_input, 0, 1)

  image_show(img[0])
  image_show(noise[0])



  input()
"""

'\n# tqdm은 진행표시바\nfor i, data in enumerate(tqdm.tqdm(train_dataloader)):\n  #CUDA는 NVIDIA에서 개발한 GPU 개발 툴로 많은 양의 연산을 동시에 처리하는 것이 목표\n  if use_cuda:\n    img = data["img"].to(\'cuda\')\n    noise = data["noise"].to(\'cuda\')\n  \n  model_input = img + noise\n\n  # clamp는 최대 최소 값을 정해주는 함수\n  # (최소~최대 범위에 포함되지 않는게 있으면 그 값을 최소 최대값으로 변경)\n  noise_image = torch.clamp(model_input, 0, 1)\n\n  image_show(img[0])\n  image_show(noise[0])\n\n\n\n  input()\n'

# UNet Network

In [6]:
import os
import numpy as np
import math
import torch
import torch.nn as nn

## 네트워크 구축하기
class UNet(nn.Module):
    def __init__(self, nch, nker=64, learning_type="plain", norm="bnorm"):
        super(UNet, self).__init__()

        self.learning_type = learning_type

        # Contracting path
        self.enc1_1 = CBR2d(in_channels=nch, out_channels=1 * nker, norm=norm)
        self.enc1_2 = CBR2d(in_channels=1 * nker, out_channels=1 * nker, norm=norm)

        self.pool1 = nn.MaxPool2d(kernel_size=2)

        self.enc2_1 = CBR2d(in_channels=nker, out_channels=2 * nker, norm=norm)
        self.enc2_2 = CBR2d(in_channels=2 * nker, out_channels=2 * nker, norm=norm)

        self.pool2 = nn.MaxPool2d(kernel_size=2)

        self.enc3_1 = CBR2d(in_channels=2 * nker, out_channels=4 * nker, norm=norm)
        self.enc3_2 = CBR2d(in_channels=4 * nker, out_channels=4 * nker, norm=norm)

        self.pool3 = nn.MaxPool2d(kernel_size=2)

        self.enc4_1 = CBR2d(in_channels=4 * nker, out_channels=8 * nker, norm=norm)
        self.enc4_2 = CBR2d(in_channels=8 * nker, out_channels=8 * nker, norm=norm)

        self.pool4 = nn.MaxPool2d(kernel_size=2)

        self.enc5_1 = CBR2d(in_channels=8 * nker, out_channels=16 * nker, norm=norm)


        # Expansive path
        self.dec5_1 = CBR2d(in_channels=16 * nker, out_channels=8 * nker, norm=norm)

        self.unpool4 = nn.ConvTranspose2d(in_channels=8 * nker, out_channels=8 * nker,
                                          kernel_size=2, stride=2, padding=0, bias=True)

        self.dec4_2 = CBR2d(in_channels=2 * 8 * nker, out_channels=8 * nker, norm=norm)
        self.dec4_1 = CBR2d(in_channels=8 * nker, out_channels=4 * nker, norm=norm)

        self.unpool3 = nn.ConvTranspose2d(in_channels=4 * nker, out_channels=4 * nker,
                                          kernel_size=2, stride=2, padding=0, bias=True)

        self.dec3_2 = CBR2d(in_channels=2 * 4 * nker, out_channels=4 * nker, norm=norm)
        self.dec3_1 = CBR2d(in_channels=4 * nker, out_channels=2 * nker, norm=norm)

        self.unpool2 = nn.ConvTranspose2d(in_channels=2 * nker, out_channels=2 * nker,
                                          kernel_size=2, stride=2, padding=0, bias=True)

        self.dec2_2 = CBR2d(in_channels=2 * 2 * nker, out_channels=2 * nker, norm=norm)
        self.dec2_1 = CBR2d(in_channels=2 * nker, out_channels=1 * nker, norm=norm)

        self.unpool1 = nn.ConvTranspose2d(in_channels=1 * nker, out_channels=1 * nker,
                                          kernel_size=2, stride=2, padding=0, bias=True)

        self.dec1_2 = CBR2d(in_channels=2 * 1 * nker, out_channels=1 * nker, norm=norm)
        self.dec1_1 = CBR2d(in_channels=1 * nker, out_channels=1 * nker, norm=norm)

        self.fc = nn.Conv2d(in_channels=1 * nker, out_channels=nch, kernel_size=1, stride=1, padding=0, bias=True)

    # Unet Layer 연결 (Forwarding)
    # 위에서 정의한 것을 순서대로 실행한다고 생각하면 됨
    def forward(self, x):
        # forward encoder
        enc1_1 = self.enc1_1(x)
        enc1_2 = self.enc1_2(enc1_1)
        pool1 = self.pool1(enc1_2)

        enc2_1 = self.enc2_1(pool1)
        enc2_2 = self.enc2_2(enc2_1)
        pool2 = self.pool2(enc2_2)

        enc3_1 = self.enc3_1(pool2)
        enc3_2 = self.enc3_2(enc3_1)
        pool3 = self.pool3(enc3_2)

        enc4_1 = self.enc4_1(pool3)
        enc4_2 = self.enc4_2(enc4_1)
        pool4 = self.pool4(enc4_2)

        enc5_1 = self.enc5_1(pool4)

        # forward decoder
        dec5_1 = self.dec5_1(enc5_1)

        unpool4 = self.unpool4(dec5_1)
        # cat은 이전 step의 output channel과 skip connection을 연결해주는 부분
        # dim = [0: batch, 1: channel, 2: height, 3: width] <- dim은 해당 방향을 알려준다.
        cat4 = torch.cat((unpool4, enc4_2), dim=1)
        dec4_2 = self.dec4_2(cat4)
        dec4_1 = self.dec4_1(dec4_2)

        unpool3 = self.unpool3(dec4_1)
        cat3 = torch.cat((unpool3, enc3_2), dim=1)
        dec3_2 = self.dec3_2(cat3)
        dec3_1 = self.dec3_1(dec3_2)

        unpool2 = self.unpool2(dec3_1)
        cat2 = torch.cat((unpool2, enc2_2), dim=1)
        dec2_2 = self.dec2_2(cat2)
        dec2_1 = self.dec2_1(dec2_2)

        unpool1 = self.unpool1(dec2_1)
        cat1 = torch.cat((unpool1, enc1_2), dim=1)
        dec1_2 = self.dec1_2(cat1)
        dec1_1 = self.dec1_1(dec1_2)

        if self.learning_type == "plain":
            x = self.fc(dec1_1)
        elif self.learning_type == "residual":
            x = x + self.fc(dec1_1)

        return x
  
## 네트워크 저장하기
def save(ckpt_dir, net, optim, epoch):
    if not os.path.exists(ckpt_dir):
        os.makedirs(ckpt_dir)

    torch.save({'net': net.state_dict(), 'optim': optim.state_dict()},
               "%s/model_epoch%d.pth" % (ckpt_dir, epoch))

## 네트워크 불러오기
def load(ckpt_dir, net, optim):
    if not os.path.exists(ckpt_dir):
        epoch = 0
        return net, optim, epoch

    ckpt_lst = os.listdir(ckpt_dir)
    ckpt_lst.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))

    dict_model = torch.load('%s/%s' % (ckpt_dir, ckpt_lst[-1]))

    net.load_state_dict(dict_model['net'])
    optim.load_state_dict(dict_model['optim'])
    epoch = int(ckpt_lst[-1].split('epoch')[1].split('.pth')[0])

    return net, optim, epoch

# convolution, batch normalizatoin, Relu
class CBR2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=True, norm="bnorm", relu=0.0):
        super().__init__()

        layers = []
        layers += [nn.Conv2d(in_channels=in_channels, out_channels=out_channels,
                             kernel_size=kernel_size, stride=stride, padding=padding,
                             bias=bias)]

        if not norm is None:
            if norm == "bnorm":
                layers += [nn.BatchNorm2d(num_features=out_channels)]
            elif norm == "inorm":
                layers += [nn.InstanceNorm2d(num_features=out_channels)]

        if not relu is None and relu >= 0.0:
            layers += [nn.ReLU() if relu == 0 else nn.LeakyReLU(relu)]

        self.cbr = nn.Sequential(*layers)

    def forward(self, x):
        return self.cbr(x)


def psnr (mse):
  if mse == 100:
    return 100
  else:
    return 20*math.log(255/math.sqrt(mse))


# ResNet

In [7]:
class ResNet(nn.Module):
    def __init__(self, in_channels, out_channels, nker=64,
                 learning_type="plain", norm="bnorm", nblk=16):
        super(ResNet, self).__init__()
        self.learning_type = learning_type

        self.enc = CBR2d(in_channels=in_channels, out_channels= nker,
                         kernel_size=3, stride=1, padding=1,
                         bias=True, norm=None, relu=0.0)

        # res block정의
        res = []
        for i in range(nblk):
            res += [ResBlock(nker, nker, kernel_size=3, stride=1,
                             padding=1, bias=True, norm=norm, relu=0.0)]
        self.res = nn.Sequential(*res)

        # encoder part
        self.dec = CBR2d(nker, nker, kernel_size=3, stride=1,
                             padding=1, bias=True, norm=norm, relu=0.0)

        # single convolution layer 생성
        self.fc = nn.Conv2d(in_channels=nker, out_channels=out_channels,
                            kernel_size=1, stride=1, padding=0, bias=True)

    # forward function
    def forward(self, x):
        x0 =x

        x = self.enc(x)
        x = self.res(x)
        x = self.dec(x)

        if self.learning_type == "plain":
            x = self.fc(x)
        elif self.learning_type == "residual":
            x = x0 + self.fc(x)


class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels,
                 kernel_size=3, stride=1, padding=1,
                 bias=True, norm="bnorm", relu=0.0):
        super().__init__()

        layers =[]

        # 1st CBR2d
        layers += [CBR2d(in_channels, out_channels,
                        kernel_size=kernel_size, stride=stride,
                        padding=padding, bias=bias, norm=norm, relu=relu)]

        # 2nd CBR2d
        layers += [CBR2d(in_channels, out_channels,
                        kernel_size=kernel_size, stride=stride,
                        padding=padding, bias=bias, norm=norm, relu=None)]

        self.resblk = nn.Sequential(*layers)

    def forward(self, x):
        return x + self.resblk(x)


## 네트워크 저장하기
def save(ckpt_dir, net, optim, epoch):
    if not os.path.exists(ckpt_dir):
        os.makedirs(ckpt_dir)

    torch.save({'net': net.state_dict(), 'optim': optim.state_dict()},
               "%s/model_epoch%d.pth" % (ckpt_dir, epoch))

## 네트워크 불러오기
def load(ckpt_dir, net, optim):
    if not os.path.exists(ckpt_dir):
        epoch = 0
        return net, optim, epoch

    ckpt_lst = os.listdir(ckpt_dir)
    ckpt_lst.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))

    dict_model = torch.load('%s/%s' % (ckpt_dir, ckpt_lst[-1]))

    net.load_state_dict(dict_model['net'])
    optim.load_state_dict(dict_model['optim'])
    epoch = int(ckpt_lst[-1].split('epoch')[1].split('.pth')[0])

    return net, optim, epoch

# convolution, batch normalizatoin, Relu
class CBR2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=True, norm="bnorm", relu=0.0):
        super().__init__()

        layers = []
        layers += [nn.Conv2d(in_channels=in_channels, out_channels=out_channels,
                             kernel_size=kernel_size, stride=stride, padding=padding,
                             bias=bias)]

        if not norm is None:
            if norm == "bnorm":
                layers += [nn.BatchNorm2d(num_features=out_channels)]
            elif norm == "inorm":
                layers += [nn.InstanceNorm2d(num_features=out_channels)]

        if not relu is None and relu >= 0.0:
            layers += [nn.ReLU() if relu == 0 else nn.LeakyReLU(relu)]

        self.cbr = nn.Sequential(*layers)

    def forward(self, x):
        return self.cbr(x)


# Trainer

In [8]:
import argparse

import os
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
#import pytorch_ssim
!pip install pytorch_msssim 
import pytorch_msssim

import matplotlib.pyplot as plt

from torchvision import transforms


## 트레이닝 파라메터 설정하기
train_continue = "off"

lr = 1e-3 #learning rate
batch_size = 100
num_epoch = 100

ckpt_dir = "/content/drive/MyDrive/Multimedia/Unet(residual, MSE, batch100)/checkpoint/"
log_dir = "/content/drive/MyDrive/Multimedia/Unet(residual, MSE, batch100)/log"
result_dir = "/content/drive/MyDrive/Multimedia/Unet(residual, MSE, batch100)/result"

task = 'denoising'
opts = ['random', 4]


nch = 3
nker = 64

learning_type = 'residual'

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')



print("learning rate: %.4e" % lr)
print("batch size: %d" % batch_size)
print("number of epoch: %d" % num_epoch)

print("task: %s" % task)
print("opts: %s" % opts)

print("learning type: %s" % learning_type)

print("ckpt dir: %s" % ckpt_dir)
print("log dir: %s" % log_dir)
print("result dir: %s" % result_dir)

print("device: %s" % device)

## 디렉토리 생성하기
result_dir_train = os.path.join(result_dir, 'train')
result_dir_val = os.path.join(result_dir, 'val')

if not os.path.exists(result_dir):
    os.makedirs(os.path.join(result_dir_train, 'png'))
    # os.makedirs(os.path.join(result_dir_train, 'numpy'))

    os.makedirs(os.path.join(result_dir_val, 'png'))
    # os.makedirs(os.path.join(result_dir_val, 'numpy'))

    os.makedirs(os.path.join(result_dir_test, 'png'))
    os.makedirs(os.path.join(result_dir_test, 'numpy'))


# Change to your data root directory
root_path = "/content/"
# Depend on runtime setting
use_cuda = True



# training dataset 불러오기
dataset_train = NoiseDataset(root_path, 128) #128은 size
dataset_train.set_mode("training")
loader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)



# validation dataset불러오기
dataset_val = NoiseDataset(root_path, 128) #128은 size
dataset_val.set_mode("validation")
loader_val = DataLoader(dataset_val, batch_size=batch_size, shuffle=True)



# 그밖에 부수적인 variables 설정하기
num_data_train = len(dataset_train)
num_data_val = len(dataset_val)

num_batch_train = np.ceil(num_data_train / batch_size)
num_batch_val = np.ceil(num_data_val / batch_size)



## 네트워크 생성하기
net = UNet(nch=nch, nker=nker, learning_type=learning_type).to(device)
#net = ResNet(in_channels= nch, out_channels = nch, nker=nker, learning_type=learning_type, nblk=16).to(device)

## 손실함수 정의하기
fn_loss = nn.MSELoss().to(device)
#fn_loss = nn.L1Loss().to(device)
#fn_loss = pytorch_msssim.SSIM().to(device)
#fn_loss = MS_SSIM_L1_LOSS()
mse_loss = nn.MSELoss().to(device)

## Optimizer 설정하기
# adam optimizer사용
optim = torch.optim.Adam(net.parameters(), lr=lr)
"""
#scheduler
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer=optim,
                                        lr_lambda=lambda epoch: 0.95 ** epoch,
                                        last_epoch=-1,
                                        verbose=False)
"""


## 그밖에 부수적인 functions 설정하기
# output을 저장하기 위해 필요한 몇가지 함수들


# tensor에서 numpy로 변환하는 함수
fn_tonumpy = lambda x: x.to('cpu').detach().numpy().transpose(0, 2, 3, 1)
# normalize된 data를 반대로 denomalization시키는 함수
fn_denorm = lambda x, mean, std: (x * std) + mean
# 네트워크 output의 이미지를 binary class로 분류해주는 함수
fn_class = lambda x: 1.0 * (x > 0.5)

cmap = None


## Tensorboard 를 사용하기 위한 SummaryWriter 설정
writer_train = SummaryWriter(log_dir=os.path.join(log_dir, 'train'))
writer_val: SummaryWriter = SummaryWriter(log_dir=os.path.join(log_dir, 'val'))


## 네트워크 학습시키기
# training이 시작되는 epoch의 시작점 0으로 세팅
st_epoch = 0


# TRAIN 
if train_continue == "on":
    net, optim, st_epoch = load(ckpt_dir=ckpt_dir, net=net, optim=optim)
    #사전에 저장이 된 network가 있다면 연속적으로 학습하기 위해 불러와서 사용


# training을 한다고 network에 알림림
for epoch in range(st_epoch + 1, num_epoch + 1):
    net.train()
    loss_list = []
    loss_mse_list = []


    # network이 input을 받아 output을 출력하는 forward pass
    for batch, data in enumerate(loader_train, 1):
        # forward pass
        label = data['img'].to(device)
        noise = data['noise'].to(device)

        model_input = label + noise
        input = torch.clamp(model_input, 0, 1)

        #normalization
        input = (input - 0.5) / 0.5
        label = (label - 0.5) / 0.5

        output = net(input)


        # backpropagation을 한느 부분
        # backward pass
        optim.zero_grad()

        #print(output.shape)
        #print(label.shape)

        loss = fn_loss(output, label)
        loss_list +=[loss.item()]
        loss.backward()

        optim.step()
        #scheduler.step() 

        
        # mse loss 계산
        loss_mse = mse_loss(output, label)
        loss_mse_list += [loss_mse.item()]

        print("TRAIN: EPOCH %04d / %04d | BATCH %04d / %04d | L1+MS-SSIM LOSS %.4f | MSE LOSS %.4f | ACCURACY %.4f" %
                  (epoch, num_epoch, batch, num_batch_train, np.mean(loss_list), np.mean(loss_mse_list), 1-np.mean(loss_mse_list)))

        if batch % 10 == 0:
          # Tensorboard 저장하기
          # Tensorboard에 input, output, label을 저장
          label = fn_tonumpy(fn_denorm(label, mean=0.5, std=0.5))
          input = fn_tonumpy(fn_denorm(input, mean=0.5, std=0.5))
          output = fn_tonumpy(fn_denorm(output, mean=0.5, std=0.5))

          input = np.clip(input, a_min=0, a_max=1)
          output = np.clip(output, a_min=0, a_max=1)

          id = num_batch_train * (epoch - 1) + batch

          plt.imsave(os.path.join(result_dir_train, 'png', '%04d_label.png' % id), label[0].squeeze(), cmap=cmap)
          plt.imsave(os.path.join(result_dir_train, 'png', '%04d_input.png' % id), input[0].squeeze(), cmap=cmap)
          plt.imsave(os.path.join(result_dir_train, 'png', '%04d_output.png' % id), output[0].squeeze(), cmap=cmap)


    # loss 를 tensorboard에 저장
    writer_train.add_scalar('loss', np.mean(loss_mse_list), epoch)
    
#=============================================================================네트워크를 training하는 부분 끝
    # network validation하는 부분
    # validatoin부분은 backpropagation이 없기에 이를 사전에 막기 위해 torch.no_grad()를 activate 시킨다.
    # network에게 현재 validatoin한다는 것을 알리기 위해 net.eval() 활성화
    with torch.no_grad():
        net.eval()
        loss_list = []
        loss_mse_list = []

        # training과 마찬가지로 forward pass진행
        for batch, data in enumerate(loader_val, 1):
            # forward pass
            label = data['img'].to(device)
            noise = data['noise'].to(device)

            model_input = label + noise
            input = torch.clamp(model_input, 0, 1)

            #normalization
            input = (input - 0.5) / 0.5
            label = (label - 0.5) / 0.5


            output = net(input)


            # 손실함수 계산하기
            loss = fn_loss(output, label)

            # 손실함수 계산
            loss_list += [loss.item()]

            # mse loss 계산
            loss_mse = mse_loss(output, label)
            loss_mse_list += [loss_mse.item()]
            #loss_psnr = psnr(np.mean(loss_mse))
            print("VALID: EPOCH %04d / %04d | BATCH %04d / %04d | MSE LOSS %.4f | ACCURACY %.4f" %
                  (epoch, num_epoch, batch, num_batch_train, np.mean(loss_mse_list), 1-np.mean(loss_mse_list)))
            
            if batch % 3 == 0:
              label = fn_tonumpy(fn_denorm(label, mean=0.5, std=0.5))
              input = fn_tonumpy(fn_denorm(input, mean=0.5, std=0.5))
              output = fn_tonumpy(fn_denorm(output, mean=0.5, std=0.5))

              input = np.clip(input, a_min=0, a_max=1)
              output = np.clip(output, a_min=0, a_max=1)

              id = num_batch_val * (epoch - 1) + batch

              #결과를 png파일로 저장
              plt.imsave(os.path.join(result_dir_val, 'png', '%04d_label.png' % id), label[0].squeeze(), cmap=cmap)
              plt.imsave(os.path.join(result_dir_val, 'png', '%04d_input.png' % id), input[0].squeeze(), cmap=cmap)
              plt.imsave(os.path.join(result_dir_val, 'png', '%04d_output.png' % id), output[0].squeeze(), cmap=cmap)


    writer_val.add_scalar('loss', np.mean(loss_mse_list), epoch)

    # 20번마다 한번씩 network저장
    if epoch % 20 == 0:
      # epoch이 진행 될 때마다 network저장
        save(ckpt_dir=ckpt_dir, net=net, optim=optim, epoch=epoch)

writer_train.close()
writer_val.close()





Collecting pytorch_msssim
  Downloading https://files.pythonhosted.org/packages/9d/d3/3cb0f397232cf79e1762323c3a8862e39ad53eca0bb5f6be9ccc8e7c070e/pytorch_msssim-0.2.1-py3-none-any.whl
Installing collected packages: pytorch-msssim
Successfully installed pytorch-msssim-0.2.1
learning rate: 1.0000e-03
batch size: 100
number of epoch: 100
task: denoising
opts: ['random', 4]
learning type: residual
ckpt dir: /content/drive/MyDrive/Multimedia/Unet(residual, MSE, batch100)/checkpoint/
log dir: /content/drive/MyDrive/Multimedia/Unet(residual, MSE, batch100)/log
result dir: /content/drive/MyDrive/Multimedia/Unet(residual, MSE, batch100)/result
device: cuda


  "Argument interpolation should be of type InterpolationMode instead of int. "


[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
TRAIN: EPOCH 0001 / 0100 | BATCH 0001 / 0045 | L1+MS-SSIM LOSS 0.2031 | MSE LOSS 0.2031 | ACCURACY 0.7969
TRAIN: EPOCH 0001 / 0100 | BATCH 0002 / 0045 | L1+MS-SSIM LOSS 0.2482 | MSE LOSS 0.2482 | ACCURACY 0.7518
TRAIN: EPOCH 0001 / 0100 | BATCH 0003 / 0045 | L1+MS-SSIM LOSS 0.2003 | MSE LOSS 0.2003 | ACCURACY 0.7997
TRAIN: EPOCH 0001 / 0100 | BATCH 0004 / 0045 | L1+MS-SSIM LOSS 0.1724 | MSE LOSS 0.1724 | ACCURACY 0.8276
TRAIN: EPOCH 0001 / 0100 | BATCH 0005 / 0045 | L1+MS-SSIM LOSS 0.1510 | MSE LOSS 0.1510 | ACCURACY 0.8490
TRAIN: EPOCH 0001 / 0100 | BATCH 0006 / 0045 | L1+MS-SSIM LOSS 0.1343 | MSE LOSS 0.1343 | ACCURACY 0.8657
TRAIN: EPOCH 0001 / 0100 | BATCH 0007 / 0045 | L1+MS-SSIM LOSS 0.1210 | MSE LOSS 0.1210 | ACCURACY 0.8790
TRAIN: EPOCH 0001 / 0100 | BATCH 0008 / 0045 | L1+MS-SSIM LOSS 0.1104 | MSE LOSS 0.1104 | ACCURACY 0.8896
TRAIN: EPOCH 0001 / 0100 | BATCH 0009 / 0045 | L1+MS-SSIM LOSS 0.1015 | MSE LOSS 0.1015 | ACCURACY 0.8

# Test

In [9]:
import argparse

import os
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter


import matplotlib.pyplot as plt

from torchvision import transforms

ckpt_dir = "/content/drive/MyDrive/Multimedia/Unet(residual, MSE, batch100)/checkpoint/"
log_dir = "/content/drive/MyDrive/Multimedia/Unet(residual, MSE, batch100)/log"
result_dir = "/content/drive/MyDrive/Multimedia/Unet(residual, MSE, batch100)/result"

## 디렉토리 생성하기
result_dir_test = os.path.join(result_dir, 'test')
if not os.path.exists(result_dir):
    os.makedirs(os.path.join(result_dir_test, 'png'))
    os.makedirs(os.path.join(result_dir_test, 'numpy'))


dataset_test = NoiseDataset(root_path, 128) #128은 size
dataset_test.set_mode("testing")
loader_test = DataLoader(dataset_test, batch_size=1, shuffle=False)


# 그밖에 부수적인 variables 설정하기
num_data_test = len(dataset_test)
num_batch_test = np.ceil(num_data_test / batch_size)



## 네트워크 생성하기
net = UNet(nch=nch, nker=nker, learning_type=learning_type).to(device)


## Optimizer 설정하기
# adam optimizer사용
optim = torch.optim.Adam(net.parameters(), lr=lr)


## 그밖에 부수적인 functions 설정하기
# output을 저장하기 위해 필요한 몇가지 함수들


# tensor에서 numpy로 변환하는 함수
fn_tonumpy = lambda x: x.to('cpu').detach().numpy().transpose(0, 2, 3, 1)
# normalize된 data를 반대로 denomalization시키는 함수
fn_denorm = lambda x, mean, std: (x * std) + mean
# 네트워크 output의 이미지를 binary class로 분류해주는 함수
fn_class = lambda x: 1.0 * (x > 0.5)

cmap = None

net, optim, st_epoch = load(ckpt_dir=ckpt_dir, net=net, optim=optim)


# backpropagation이 없기에 이를 사전에 막기 위해 torch.no_grad()를 activate 시킨다.
# network에게 현재 validatoin한다는 것을 알리기 위해 net.eval() 활성화
with torch.no_grad():
    net.eval()
    loss_mse = []

    for batch, data in enumerate(loader_test, 1):
        # forward pass
        input = data['img'].to(device)
        file_name = data["file_name"]

        #normalization
        input = (input - 0.5) / 0.5

        output = net(input)


        print("TEST: BATCH %04d / %04d" %
              (batch, num_batch_test))
        

        # Tensorboard 저장하기
        input = fn_tonumpy(fn_denorm(input, mean=0.5, std=0.5))
        output = fn_tonumpy(fn_denorm(output, mean=0.5, std=0.5))

        for j in range(input.shape[0]):
            id = batch_size * (batch - 1) + j

            output_ = output[j]

            # 결과를 png파일로 저장장
            output_ = np.clip(output_, a_min=0, a_max=1)

            plt.imsave(os.path.join(result_dir_test, file_name[j]), output_, cmap=cmap)

TEST: BATCH 0001 / 0010
TEST: BATCH 0002 / 0010
TEST: BATCH 0003 / 0010
TEST: BATCH 0004 / 0010
TEST: BATCH 0005 / 0010
TEST: BATCH 0006 / 0010
TEST: BATCH 0007 / 0010
TEST: BATCH 0008 / 0010
TEST: BATCH 0009 / 0010
TEST: BATCH 0010 / 0010
TEST: BATCH 0011 / 0010
TEST: BATCH 0012 / 0010
TEST: BATCH 0013 / 0010
TEST: BATCH 0014 / 0010
TEST: BATCH 0015 / 0010
TEST: BATCH 0016 / 0010
TEST: BATCH 0017 / 0010
TEST: BATCH 0018 / 0010
TEST: BATCH 0019 / 0010
TEST: BATCH 0020 / 0010
TEST: BATCH 0021 / 0010
TEST: BATCH 0022 / 0010
TEST: BATCH 0023 / 0010
TEST: BATCH 0024 / 0010
TEST: BATCH 0025 / 0010
TEST: BATCH 0026 / 0010
TEST: BATCH 0027 / 0010
TEST: BATCH 0028 / 0010
TEST: BATCH 0029 / 0010
TEST: BATCH 0030 / 0010
TEST: BATCH 0031 / 0010
TEST: BATCH 0032 / 0010
TEST: BATCH 0033 / 0010
TEST: BATCH 0034 / 0010
TEST: BATCH 0035 / 0010
TEST: BATCH 0036 / 0010
TEST: BATCH 0037 / 0010
TEST: BATCH 0038 / 0010
TEST: BATCH 0039 / 0010
TEST: BATCH 0040 / 0010
TEST: BATCH 0041 / 0010
TEST: BATCH 0042