# 라이브러리

In [None]:
from __future__ import print_function

import os
import os.path
from os.path import join
from os import listdir

import numpy as np
import random
from collections import OrderedDict
import cv2
from PIL import Image
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, CenterCrop, ToTensor, Resize
import torchvision

import skimage.color as sc

import time
from math import log10, sqrt
import zipfile

import gdown

# 데이터셋 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

zip_train_clean = zipfile.ZipFile('/content/drive/MyDrive/KHUGGLE/khuggle_train_clean.zip')
zip_train_clean.extractall('/content/khuggle_train_clean')
 
zip_train_clean.close()

zip_train_degraded = zipfile.ZipFile('/content/drive/MyDrive/KHUGGLE/khuggle_train_degraded.zip')
zip_train_degraded.extractall('/content/khuggle_train_degraded')
 
zip_train_degraded.close()

zip_val = zipfile.ZipFile('/content/drive/MyDrive/KHUGGLE/khuggle_val.zip')
zip_val.extractall('/content/khuggle_val')
 
zip_val.close()

zip_test = zipfile.ZipFile('/content/drive/MyDrive/KHUGGLE/khuggle_test.zip')
zip_test.extractall('/content/khuggle_test')
 
zip_test.close()

Mounted at /content/drive


# 이미지 처리 및 로드 함수, 데이터 로더 클래스들을 정의

In [None]:
def is_image_file(filename):
    return any(filename.endswith(extension) for extension in [".png", ".jpg", ".jpeg"])

def load_img(filepath):
    img = Image.open(filepath)
    return img
    
def calculate_valid_crop_size(crop_size, upscale_factor):
    return crop_size - (crop_size % upscale_factor)

def input_transform(crop_size, upscale_factor):
    return Compose([
        CenterCrop(crop_size),
        Resize(crop_size//upscale_factor, Image.BICUBIC),
        ToTensor()
    ])

def input_transform(crop_size, upscale_factor):
    return Compose([
        CenterCrop(crop_size),
        Resize(crop_size//upscale_factor, Image.BICUBIC),
        ToTensor()
    ])

def target_transform(crop_size):
     return Compose([
        CenterCrop(crop_size),
        ToTensor()
    ])
class DatasetFromFolder(data.Dataset):
    def __init__(self, hr_dir, lr_dir, input_transform=None, target_transform=None):
        super(DatasetFromFolder, self).__init__()
        self.hr_filenames = [join(hr_dir, x) for x in listdir(hr_dir) if is_image_file(x)]
        self.lr_filenames = [join(lr_dir, x) for x in listdir(lr_dir) if is_image_file(x)]

        self.input_transform = input_transform
        self.target_transform = target_transform

    def __getitem__(self, index):
        input = load_img(self.lr_filenames[index])
        target = load_img(self.hr_filenames[index])
        if self.input_transform:
            input = self.input_transform(input)
        if self.target_transform:
            target = self.target_transform(target)

        return input, target

    def __len__(self):
        return len(self.hr_filenames)

def get_training_set(hr_dir, lr_dir, upscale_factor):
   
    crop_size = calculate_valid_crop_size(64, upscale_factor)

    return DatasetFromFolder(hr_dir, lr_dir,
                             input_transform=input_transform(crop_size, upscale_factor),
                             target_transform=target_transform(crop_size))

def get_test_set(hr_dir, lr_dir, upscale_factor):
   
    crop_size = calculate_valid_crop_size(64, upscale_factor)

    return DatasetFromFolder(hr_dir, lr_dir,
                             input_transform=input_transform(crop_size, upscale_factor),
                             target_transform=target_transform(crop_size))

# 모델 정의(RRDB Net)

In [None]:
import functools
import torch
import torch.nn as nn
import torch.nn.functional as F


def make_layer(block, n_layers):
    layers = []
    for _ in range(n_layers):
        layers.append(block())
    return nn.Sequential(*layers)


class ResidualDenseBlock_5C(nn.Module):
    def __init__(self, nf=64, gc=32, bias=True):
        super(ResidualDenseBlock_5C, self).__init__()
        # gc: growth channel, i.e. intermediate channels
        self.conv1 = nn.Conv2d(nf, gc, 3, 1, 1, bias=bias)
        self.conv2 = nn.Conv2d(nf + gc, gc, 3, 1, 1, bias=bias)
        self.conv3 = nn.Conv2d(nf + 2 * gc, gc, 3, 1, 1, bias=bias)
        self.conv4 = nn.Conv2d(nf + 3 * gc, gc, 3, 1, 1, bias=bias)
        self.conv5 = nn.Conv2d(nf + 4 * gc, nf, 3, 1, 1, bias=bias)
        self.lrelu = nn.LeakyReLU(negative_slope=0.2, inplace=True)

        # initialization
        # mutil.initialize_weights([self.conv1, self.conv2, self.conv3, self.conv4, self.conv5], 0.1)

    def forward(self, x):
        x1 = self.lrelu(self.conv1(x))
        x2 = self.lrelu(self.conv2(torch.cat((x, x1), 1)))
        x3 = self.lrelu(self.conv3(torch.cat((x, x1, x2), 1)))
        x4 = self.lrelu(self.conv4(torch.cat((x, x1, x2, x3), 1)))
        x5 = self.conv5(torch.cat((x, x1, x2, x3, x4), 1))
        return x5 * 0.2 + x


class RRDB(nn.Module):
    '''Residual in Residual Dense Block'''

    def __init__(self, nf, gc=32):
        super(RRDB, self).__init__()
        self.RDB1 = ResidualDenseBlock_5C(nf, gc)
        self.RDB2 = ResidualDenseBlock_5C(nf, gc)
        self.RDB3 = ResidualDenseBlock_5C(nf, gc)

    def forward(self, x):
        out = self.RDB1(x)
        out = self.RDB2(out)
        out = self.RDB3(out)
        return out * 0.2 + x


class RRDBNet(nn.Module):
    def __init__(self, in_nc, out_nc, nf, nb, gc=32):
        super(RRDBNet, self).__init__()
        RRDB_block_f = functools.partial(RRDB, nf=nf, gc=gc)

        self.conv_first = nn.Conv2d(in_nc, nf, 3, 1, 1, bias=True)
        self.RRDB_trunk = make_layer(RRDB_block_f, nb)
        self.trunk_conv = nn.Conv2d(nf, nf, 3, 1, 1, bias=True)
        #### upsampling
        self.upconv1 = nn.Conv2d(nf, nf, 3, 1, 1, bias=True)
        self.upconv2 = nn.Conv2d(nf, nf, 3, 1, 1, bias=True)
        self.HRconv = nn.Conv2d(nf, nf, 3, 1, 1, bias=True)
        self.conv_last = nn.Conv2d(nf, out_nc, 3, 1, 1, bias=True)

        self.lrelu = nn.LeakyReLU(negative_slope=0.2, inplace=True)

    def forward(self, x):
        fea = self.conv_first(x)
        trunk = self.trunk_conv(self.RRDB_trunk(fea))
        fea = fea + trunk

        fea = self.lrelu(self.upconv1(F.interpolate(fea, scale_factor=2, mode='nearest')))
        fea = self.lrelu(self.upconv2(F.interpolate(fea, scale_factor=2, mode='nearest')))
        out = self.conv_last(self.lrelu(self.HRconv(fea)))

        return out

# Pretrain 가중치 다운로드

In [None]:
!gdown --id 1TPrz5QKd8DHHt1k8SRtm6tMiPjz_Qene
!gdown --id 1pJ_T-V1dpb1ewoEra1TGSWl5e6H7M4NN

Downloading...
From: https://drive.google.com/uc?id=1TPrz5QKd8DHHt1k8SRtm6tMiPjz_Qene
To: /content/RRDB_ESRGAN_x4.pth
100% 66.9M/66.9M [00:00<00:00, 158MB/s]
Downloading...
From: https://drive.google.com/uc?id=1pJ_T-V1dpb1ewoEra1TGSWl5e6H7M4NN
To: /content/RRDB_PSNR_x4.pth
100% 66.9M/66.9M [00:00<00:00, 184MB/s]


# 다른 Dataset에서 Pretrain된 가중치들을 interpolation

In [None]:
import sys
import torch
from collections import OrderedDict

alpha = 0.5

net_PSNR_path = '/content/RRDB_PSNR_x4.pth'
net_ESRGAN_path = '/content/RRDB_ESRGAN_x4.pth'
net_interp_path = '/content/drive/MyDrive/KHUGGLE/models/interp_{:02d}.pth'.format(int(alpha*10))

net_PSNR = torch.load(net_PSNR_path)
net_ESRGAN = torch.load(net_ESRGAN_path)
net_interp = OrderedDict()

print('Interpolating with alpha = ', alpha)

for k, v_PSNR in net_PSNR.items():
    v_ESRGAN = net_ESRGAN[k]
    net_interp[k] = (1 - alpha) * v_PSNR + alpha * v_ESRGAN

torch.save(net_interp, net_interp_path)

Interpolating with alpha =  0.5


# Loss Function 정의

In [None]:
# -*- coding: utf-8 -*-
"""
Created on Thu Dec  3 00:28:15 2020
@author: Yunpeng Li, Tianjin University
"""


import torch
import torch.nn as nn
import torch.nn.functional as F


class MS_SSIM_L1_LOSS(nn.Module):
    # Have to use cuda, otherwise the speed is too slow.
    def __init__(self, gaussian_sigmas=[0.5, 1.0, 2.0, 4.0, 8.0],
                 data_range = 1.0,
                 K=(0.01, 0.03),
                 alpha=0.025,
                 compensation=200.0,
                 cuda_dev=0,):
        super(MS_SSIM_L1_LOSS, self).__init__()
        self.DR = data_range
        self.C1 = (K[0] * data_range) ** 2
        self.C2 = (K[1] * data_range) ** 2
        self.pad = int(2 * gaussian_sigmas[-1])
        self.alpha = alpha
        self.compensation=compensation
        filter_size = int(4 * gaussian_sigmas[-1] + 1)
        g_masks = torch.zeros((3*len(gaussian_sigmas), 1, filter_size, filter_size))
        for idx, sigma in enumerate(gaussian_sigmas):
            # r0,g0,b0,r1,g1,b1,...,rM,gM,bM
            g_masks[3*idx+0, 0, :, :] = self._fspecial_gauss_2d(filter_size, sigma)
            g_masks[3*idx+1, 0, :, :] = self._fspecial_gauss_2d(filter_size, sigma)
            g_masks[3*idx+2, 0, :, :] = self._fspecial_gauss_2d(filter_size, sigma)
        self.g_masks = g_masks.cuda(cuda_dev)

    def _fspecial_gauss_1d(self, size, sigma):
        """Create 1-D gauss kernel
        Args:
            size (int): the size of gauss kernel
            sigma (float): sigma of normal distribution
        Returns:
            torch.Tensor: 1D kernel (size)
        """
        coords = torch.arange(size).to(dtype=torch.float)
        coords -= size // 2
        g = torch.exp(-(coords ** 2) / (2 * sigma ** 2))
        g /= g.sum()
        return g.reshape(-1)

    def _fspecial_gauss_2d(self, size, sigma):
        """Create 2-D gauss kernel
        Args:
            size (int): the size of gauss kernel
            sigma (float): sigma of normal distribution
        Returns:
            torch.Tensor: 2D kernel (size x size)
        """
        gaussian_vec = self._fspecial_gauss_1d(size, sigma)
        return torch.outer(gaussian_vec, gaussian_vec)

    def forward(self, x, y):
        b, c, h, w = x.shape
        mux = F.conv2d(x, self.g_masks, groups=3, padding=self.pad)
        muy = F.conv2d(y, self.g_masks, groups=3, padding=self.pad)

        mux2 = mux * mux
        muy2 = muy * muy
        muxy = mux * muy

        sigmax2 = F.conv2d(x * x, self.g_masks, groups=3, padding=self.pad) - mux2
        sigmay2 = F.conv2d(y * y, self.g_masks, groups=3, padding=self.pad) - muy2
        sigmaxy = F.conv2d(x * y, self.g_masks, groups=3, padding=self.pad) - muxy

        # l(j), cs(j) in MS-SSIM
        l  = (2 * muxy    + self.C1) / (mux2    + muy2    + self.C1)  # [B, 15, H, W]
        cs = (2 * sigmaxy + self.C2) / (sigmax2 + sigmay2 + self.C2)

        lM = l[:, -1, :, :] * l[:, -2, :, :] * l[:, -3, :, :]
        PIcs = cs.prod(dim=1)

        loss_ms_ssim = 1 - lM*PIcs  # [B, H, W]

        loss_l1 = F.l1_loss(x, y, reduction='none')  # [B, 3, H, W]
        # average l1 loss in 3 channels
        gaussian_l1 = F.conv2d(loss_l1, self.g_masks.narrow(dim=0, start=-3, length=3),
                               groups=3, padding=self.pad).mean(1)  # [B, H, W]

        loss_mix = self.alpha * loss_ms_ssim + (1 - self.alpha) * gaussian_l1 / self.DR
        loss_mix = self.compensation*loss_mix

        return loss_mix.mean()

# 하이퍼 파라미터 정의

1epoch 학습할 시에 성능이 가장 좋았음. 1epoch만 학습

In [None]:
upscale_factor = 4 
batchSize = 64       
testbatchSize = 1   
nEpochs = 1          
lr = 1e-4            
cuda = True          

# 모델 학습
validatoin score 와 Test score가 상관관계가 없어 validation score 측정 X

In [None]:
seed = 81
print("Seed: ", seed)
torch.manual_seed(seed)

if cuda and not torch.cuda.is_available():
    raise Exception("No GPU found, please run without cuda")

torch.manual_seed(seed)

device = torch.device("cuda" if cuda else "cpu") 

print('===> Loading datasets')

train_set = get_training_set(hr_dir = '/content/khuggle_train_clean/khuggle_train_clean_hr', lr_dir = '/content/khuggle_train_degraded/khuggle_train_degraded_hr', upscale_factor=upscale_factor)
training_data_loader = DataLoader(dataset=train_set, num_workers=2, batch_size=batchSize, shuffle=True)

print('===> Building model')
model_path = '/content/drive/MyDrive/KHUGGLE/models/interp_05.pth'
model = RRDBNet(3, 3, 64, 23, gc=32).to(device)
model.load_state_dict(torch.load(model_path), strict=True)

criterion = MS_SSIM_L1_LOSS(data_range=255)
optimizer = optim.Adam(model.parameters(),lr=lr)    

def print_network(net):
    num_params = 0
    for param in net.parameters():
        num_params += param.numel()
    
    print(net)
    print('Total number of parameters: %d' % num_params)

def train(epoch):
    epoch_loss = 0
    avg_psnr = 0
    for iteration, batch in enumerate(training_data_loader, 1):
        input, target = batch[0].to(device), batch[1].to(device)
        optimizer.zero_grad()
        prediction = model(input)
        loss = criterion(prediction, target)
        psnr = 10 *log10(1 /loss.item())
        avg_psnr += psnr
        epoch_loss += loss.item()
        loss.backward()
        optimizer.step()

        print("===> Epoch[{}]({}/{}): Loss: {:.4f}".format(epoch, iteration, len(training_data_loader), loss.item()))

    print("===> Epoch {} Complete: Avg. Loss: {:.4f}".format(epoch, epoch_loss / len(training_data_loader)))
    print("===> Avg. PSNR: {:.4f} dB".format(avg_psnr / len(training_data_loader)))

def checkpoint(epoch):
    model_out_path = "model_epoch_{}.pth".format(epoch)
    torch.save(model.state_dict(), model_out_path)
    print("Checkpoint saved to {}".format(model_out_path))

print("===> Training")
print_network(model)

for epoch in range(1, nEpochs + 1):
    train(epoch)
    checkpoint(epoch) 

Seed:  81
===> Loading datasets
===> Building model




===> Training
RRDBNet(
  (conv_first): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (RRDB_trunk): Sequential(
    (0): RRDB(
      (RDB1): ResidualDenseBlock_5C(
        (conv1): Conv2d(64, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (conv2): Conv2d(96, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (conv3): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (conv4): Conv2d(160, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (conv5): Conv2d(192, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (lrelu): LeakyReLU(negative_slope=0.2, inplace=True)
      )
      (RDB2): ResidualDenseBlock_5C(
        (conv1): Conv2d(64, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (conv2): Conv2d(96, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (conv3): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (conv4): Conv2d(160, 32, kernel_si

# 인퍼런스

In [None]:
import gc
# 모델을 다시 한번 정의하고, 저장된 모델을 불러옵니다.
model = RRDBNet(3, 3, 64, 23, gc=32)
model.load_state_dict(torch.load('/content/drive/MyDrive/model_epoch_1.pth'))

# 추론용 이미지(즉, 손상된 경희대 이미지)를 불러오고, 번호 순서대로 정렬합니다.
infer_dataset = '/content/khuggle_test'
infer_dataset_images = listdir(infer_dataset)
infer_dataset_images.sort()

np_list = []

for file in infer_dataset_images:

  print(file)
  
  img = cv2.imread(infer_dataset+'/'+file, cv2.IMREAD_UNCHANGED)  # cv2.IMREAD_GRAYSCALE
  img = img.astype(np.float32) / 255.
  input = torch.from_numpy(np.ascontiguousarray(img)).permute(2, 0, 1).float().unsqueeze(0)
  model = model.cuda()
  model = nn.DataParallel(model)
  input = input.cuda()
  
  # 인풋을 모델에 통과시켜 아웃풋을 얻습니다.
  out = model(input)
 
  # 이미지를 정규화하고 numpy array 형태로 변환합니다.
  out = out.data.squeeze().float().clamp_(0, 1).cpu().numpy()
  if out.ndim == 3:
      out = np.transpose(out, (1, 2, 0))
  out = np.uint8((out*255.0).round())
 

  # 결과 이미지가 저장될 경로를 정의하고 생성합니다.
  output_path = '/content/drive/Shareddrives/SURE_KHUGGLE/results/ESRGAN'
  if not os.path.exists(output_path):
    os.makedirs(output_path)

 
  out = np.squeeze(out)
  cv2.imwrite(output_path+'/'+file, out)

  out_np = np.asarray(Image.open(output_path+'/'+file))
  np_list.append(out_np)
  gc.collect()
  torch.cuda.empty_cache()


# numpy array로 구성된 리스트를 numpy array 형태로 변환합니다.
np_submission_array = np.array(np_list)

# 'submission.npy' 형태로 최종 제출 파일을 정의합니다.
np.save('/content/submission', np_submission_array)

print('Submission file is complete. Good luck!')

0001.png
0002.png
0003.png
0004.png
0005.png
0006.png
0007.png
0008.png
0009.png
0010.png
0011.png
0012.png
0013.png
0014.png
0015.png
0016.png
0017.png
0018.png
0019.png
0020.png
0021.png
0022.png
0023.png
0024.png
0025.png
0026.png
0027.png
0028.png
0029.png
0030.png
Submission file is complete. Good luck!


  np_submission_array = np.array(np_list)
