In [1]:
import random
import pandas as pd
import numpy as np
import os
import cv2
import zipfile
import math
import glob

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from tqdm.auto import tqdm

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import torchvision.models as models
from torchvision import transforms

import warnings
warnings.filterwarnings(action='ignore')


In [2]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

cuda


## Hyperparameter Setting

In [4]:
CFG = {
    'IMG_SIZE':1024,
    'EPOCHS':30,
    'LEARNING_RATE':1e-4,
    'BATCH_SIZE':8,
    'SEED':41
}

## Fixed RandomSeed

In [5]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

## Data Path Load

In [6]:
train_lr_paths = sorted(glob.glob('./open/train/lr/*.jpg'))
train_hr_paths = sorted(glob.glob('./open/train/hr/*.jpg'))

test_lr_paths = sorted(glob.glob('./open/test/lr/*.jpg'))

## CustomDataset

In [7]:
class CustomDataset(Dataset):
    def __init__(self, lr_paths, hr_paths, transforms, train_mode):
        self.lr_paths = lr_paths
        self.hr_paths = hr_paths
        self.transforms = transforms
        self.train_mode = train_mode

    def __getitem__(self, index):
        lr_path = self.lr_paths[index]
        lr_img = cv2.imread(lr_path)
        lr_img = cv2.resize(lr_img, (CFG['IMG_SIZE'], CFG['IMG_SIZE']), interpolation=cv2.INTER_CUBIC)
        if self.train_mode:
            hr_path = self.hr_paths[index]
            hr_img = cv2.imread(hr_path)
            if transforms is not None:
                transformed = self.transforms(image=lr_img, label=hr_img)
                lr_img = transformed['image'] / 255.
                hr_img = transformed['label'] / 255.
            return lr_img, hr_img
        else:
            file_name = lr_path.split('/')[-1]
            if transforms is not None:
                transformed = self.transforms(image=lr_img)
                lr_img = transformed['image'] / 255.
            return lr_img, file_name
        
    def __len__(self):
        return len(self.lr_paths)

In [8]:
def get_train_transform():
    return A.Compose([
        ToTensorV2(p=1.0)],
        additional_targets={'image': 'image', 'label': 'image'}
    )

def get_test_transform():
    return A.Compose([
        ToTensorV2(p=1.0)],
        additional_targets={'image': 'image', 'label': 'image'}
    )

In [9]:
train_dataset = CustomDataset(train_lr_paths, train_hr_paths, get_train_transform(), True)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True)

test_dataset = CustomDataset(test_lr_paths, None, get_test_transform(), False)
test_loader = DataLoader(test_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False)

## Model Define

In [10]:
class SRCNN(nn.Module):
    def __init__(self, num_channels=3, feature_dim=64, map_dim=32):
        super(SRCNN, self).__init__()
        # Feature extraction layer.
        self.features = nn.Sequential(
            nn.Conv2d(num_channels, feature_dim, (9, 9), (1, 1), (4, 4)),
            nn.ReLU(True)
        )
        # Non-linear mapping layer.
        self.map = nn.Sequential(
            nn.Conv2d(feature_dim, map_dim, (5, 5), (1, 1), (2, 2)),
            nn.ReLU(True)
        )
        # Rebuild the layer.
        self.reconstruction = nn.Conv2d(map_dim, num_channels, (5, 5), (1, 1), (2, 2))
        # Initialize model weights.
        self._initialize_weights()

    def forward(self, x):
        out = self.features(x)
        out = self.map(out)
        out = self.reconstruction(out)
        return out

    # The filter weight of each layer is a Gaussian distribution with zero mean and
    # standard deviation initialized by random extraction 0.001 (deviation is 0)
    def _initialize_weights(self) -> None:
        for module in self.modules():
            if isinstance(module, nn.Conv2d):
                nn.init.normal_(module.weight.data, 0.0, math.sqrt(2 / (module.out_channels * module.weight.data[0][0].numel())))
                nn.init.zeros_(module.bias.data)
        nn.init.normal_(self.reconstruction.weight.data, 0.0, 0.001)
        nn.init.zeros_(self.reconstruction.bias.data)

## Train

In [11]:
def train(model, optimizer, train_loader, scheduler, device):
    model.to(device)
    criterion = nn.MSELoss().to(device)
    best_model = None
    best_loss = 9999
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for lr_img, hr_img in tqdm(iter(train_loader)):
            lr_img, hr_img = lr_img.float().to(device), hr_img.float().to(device)
            
            optimizer.zero_grad()
            
            pred_hr_img = model(lr_img)
            loss = criterion(pred_hr_img, hr_img)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
                    
        if scheduler is not None:
            scheduler.step()
        
        _train_loss = np.mean(train_loss)
        print(f'Epoch : [{epoch}] Train Loss : [{_train_loss:.5f}]')
         
        if best_loss > _train_loss:
            best_loss = _train_loss
            best_model = model
            
    return best_model

## Run!!

In [None]:
model = SRCNN()
model.eval()

optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

infer_model = train(model, optimizer, train_loader, scheduler, device)

  0%|          | 0/3701 [00:00<?, ?it/s]

## Inference

In [None]:
def inference(model, test_loader, device):
    infer_model.to(device)
    model.eval()
    pred_img_list = []
    name_list = []
    with torch.no_grad():
        for lr_img, file_name in tqdm(iter(test_loader)):
            lr_img = lr_img.float().to(device)
            
            pred_hr_img = model(lr_img)
            
            for pred, name in zip(pred_hr_img, file_name):
                pred = pred.cpu().clone().detach().numpy()
                pred = pred.transpose(1, 2, 0)
                pred = pred*255.
                
                pred_img_list.append(pred.astype('uint8'))
                name_list.append(name)
    return pred_img_list, name_list

In [None]:
pred_img_list, pred_name_list = inference(infer_model, test_loader, device)

## Submission

In [None]:
os.makedirs('./submission', exist_ok=True)
os.chdir("./submission/")
sub_imgs = []
for path, pred_img in tqdm(zip(pred_name_list, pred_img_list)):
    cv2.imwrite(path, pred_img)
    sub_imgs.append(path)
submission = zipfile.ZipFile("../submission.zip", 'w')
for path in sub_imgs:
    submission.write(path)
submission.close()
print('Done.')

In [None]:
## Hyperparameter Setting

CFG = {
    'IMG_SIZE':1024,
    'EPOCHS':30,
    'LEARNING_RATE':1e-4,
    'BATCH_SIZE':16,
    'SEED':41
}

## Fixed RandomSeed

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

## Data Path Load

train_lr_paths = sorted(glob.glob('./train/lr/*.jpg'))
train_hr_paths = sorted(glob.glob('./train/hr/*.jpg'))

test_lr_paths = sorted(glob.glob('./test/lr/*.jpg'))

## CustomDataset

class CustomDataset(Dataset):
    def __init__(self, lr_paths, hr_paths, transforms, train_mode):
        self.lr_paths = lr_paths
        self.hr_paths = hr_paths
        self.transforms = transforms
        self.train_mode = train_mode

    def __getitem__(self, index):
        lr_path = self.lr_paths[index]
        lr_img = cv2.imread(lr_path)
        lr_img = cv2.resize(lr_img, (CFG['IMG_SIZE'], CFG['IMG_SIZE']), interpolation=cv2.INTER_CUBIC)
        if self.train_mode:
            hr_path = self.hr_paths[index]
            hr_img = cv2.imread(hr_path)
            if transforms is not None:
                transformed = self.transforms(image=lr_img, label=hr_img)
                lr_img = transformed['image'] / 255.
                hr_img = transformed['label'] / 255.
            return lr_img, hr_img
        else:
            file_name = lr_path.split('/')[-1]
            if transforms is not None:
                transformed = self.transforms(image=lr_img)
                lr_img = transformed['image'] / 255.
            return lr_img, file_name
        
    def __len__(self):
        return len(self.lr_paths)

def get_train_transform():
    return A.Compose([
        ToTensorV2(p=1.0)],
        additional_targets={'image': 'image', 'label': 'image'}
    )

def get_test_transform():
    return A.Compose([
        ToTensorV2(p=1.0)],
        additional_targets={'image': 'image', 'label': 'image'}
    )

train_dataset = CustomDataset(train_lr_paths, train_hr_paths, get_train_transform(), True)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True)

test_dataset = CustomDataset(test_lr_paths, None, get_test_transform(), False)
test_loader = DataLoader(test_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False)

## Model Define

class SRCNN(nn.Module):
    def __init__(self, num_channels=3, feature_dim=64, map_dim=32):
        super(SRCNN, self).__init__()
        # Feature extraction layer.
        self.features = nn.Sequential(
            nn.Conv2d(num_channels, feature_dim, (9, 9), (1, 1), (4, 4)),
            nn.ReLU(True)
        )
        # Non-linear mapping layer.
        self.map = nn.Sequential(
            nn.Conv2d(feature_dim, map_dim, (5, 5), (1, 1), (2, 2)),
            nn.ReLU(True)
        )
        # Rebuild the layer.
        self.reconstruction = nn.Conv2d(map_dim, num_channels, (5, 5), (1, 1), (2, 2))
        # Initialize model weights.
        self._initialize_weights()

    def forward(self, x):
        out = self.features(x)
        out = self.map(out)
        out = self.reconstruction(out)
        return out

    # The filter weight of each layer is a Gaussian distribution with zero mean and
    # standard deviation initialized by random extraction 0.001 (deviation is 0)
    def _initialize_weights(self) -> None:
        for module in self.modules():
            if isinstance(module, nn.Conv2d):
                nn.init.normal_(module.weight.data, 0.0, math.sqrt(2 / (module.out_channels * module.weight.data[0][0].numel())))
                nn.init.zeros_(module.bias.data)
        nn.init.normal_(self.reconstruction.weight.data, 0.0, 0.001)
        nn.init.zeros_(self.reconstruction.bias.data)

## Train

def train(model, optimizer, train_loader, scheduler, device):
    model.to(device)
    criterion = nn.MSELoss().to(device)
    best_model = None
    best_loss = 9999
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for lr_img, hr_img in tqdm(iter(train_loader)):
            lr_img, hr_img = lr_img.float().to(device), hr_img.float().to(device)
            
            optimizer.zero_grad()
            
            pred_hr_img = model(lr_img)
            loss = criterion(pred_hr_img, hr_img)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
                    
        if scheduler is not None:
            scheduler.step()
        
        _train_loss = np.mean(train_loss)
        print(f'Epoch : [{epoch}] Train Loss : [{_train_loss:.5f}]')
         
        if best_loss > _train_loss:
            best_loss = _train_loss
            best_model = model
            
    return best_model

## Run!!

model = SRCNN()
model.eval()

optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

infer_model = train(model, optimizer, train_loader, scheduler, device)

## Inference

def inference(model, test_loader, device):
    infer_model.to(device)
    model.eval()
    pred_img_list = []
    name_list = []
    with torch.no_grad():
        for lr_img, file_name in tqdm(iter(test_loader)):
            lr_img = lr_img.float().to(device)
            
            pred_hr_img = model(lr_img)
            
            for pred, name in zip(pred_hr_img, file_name):
                pred = pred.cpu().clone().detach().numpy()
                pred = pred.transpose(1, 2, 0)
                pred = pred*255.
                
                pred_img_list.append(pred.astype('uint8'))
                name_list.append(name)
    return pred_img_list, name_list

pred_img_list, pred_name_list = inference(infer_model, test_loader, device)

## Submission

os.makedirs('./submission', exist_ok=True)
os.chdir("./submission/")
sub_imgs = []
for path, pred_img in tqdm(zip(pred_name_list, pred_img_list)):
    cv2.imwrite(path, pred_img)
    sub_imgs.append(path)
submission = zipfile.ZipFile("../submission.zip", 'w')
for path in sub_imgs:
    submission.write(path)
submission.close()
print('Done.')