# Import

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
import os
import shutil

import random
import numpy as np
import time
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, ToTensor
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from torch.cuda.amp import autocast, GradScaler
from torch.optim.lr_scheduler import CosineAnnealingLR
from torchvision.transforms import CenterCrop, Resize
from PIL import Image

import warnings
warnings.filterwarnings(action='ignore')


# Path Setting

In [1]:
cd /content/drive/MyDrive/Colab Notebooks

/content/drive/MyDrive/Colab Notebooks


# Hyperparameter Setting

In [14]:
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':15,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':16,
    'SEED':42
}

# Fixed RandomSeed

In [5]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

# Data Pre-processing (한 번만 실행하면 됨)

In [None]:
import os
import shutil
import random

def Preprocess(base_dir, num_images=None):
    # 처리된 이미지가 저장될 디렉토리 설정
    processed_dir = os.path.join('/content/drive/MyDrive/Processed', base_dir.split('/')[-1])
    clean_dir = os.path.join(processed_dir, 'clean')
    noisy_dir = os.path.join(processed_dir, 'noisy')

    os.makedirs(clean_dir, exist_ok=True)
    os.makedirs(noisy_dir, exist_ok=True)

    # GT 디렉토리 (Label 데이터) 찾기
    source_dirs = []
    for root, dirs, files in os.walk(base_dir):
        for dir_name in dirs:
            if 'GT' in dir_name:
                source_dirs.append(os.path.join(root, dir_name))

    if not source_dirs:
        raise ValueError("No directory containing 'GT' found")

    # GT가 포함된 디렉토리에서 모든 clean 이미지를 복사하여 저장
    for source_dir in source_dirs:
        for filename in os.listdir(source_dir):
            if filename.endswith('.jpg'):
                shutil.copy(os.path.join(source_dir, filename), os.path.join(clean_dir, filename))

    # '밝은 조도'와 '어두운 조도' 폴더에서 동일한 갯수의 noisy 이미지를 가져오기
    lighting_conditions = ['밝은 조도', '어두운 조도']
    for condition in lighting_conditions:
        condition_dir = os.path.join(base_dir, condition)
        if not os.path.exists(condition_dir):
            print(f"{condition} 폴더가 없습니다.")
            continue

        for root, dirs, files in os.walk(condition_dir):
            for dir_name in dirs:
                if 'GT' in dir_name:
                    continue  # GT 디렉토리는 이미 처리되었으므로 건너뜀

                current_dir = os.path.join(root, dir_name)
                image_files = [f for f in os.listdir(current_dir) if f.endswith('.jpg')]
                print(f'{len(image_files)} files in {current_dir}')
                # 이미지 수가 지정되면 그 수만큼 랜덤으로 선택, 아니면 전체를 선택
                if num_images is not None and num_images < len(image_files):
                    image_files = random.sample(image_files, num_images)

                # 선택된 이미지를 noisy 폴더에 복사
                for filename in image_files:
                    shutil.copy(os.path.join(current_dir, filename), os.path.join(noisy_dir, f"{condition}_{filename}"))

    print('preprocessing done')

In [None]:
data_dir = '/content/drive/MyDrive/event (1)'
training_base_dir = os.path.join(data_dir, 'Training')
validation_base_dir = os.path.join(data_dir, 'Validation')

Preprocess(training_base_dir)
Preprocess(validation_base_dir)

KeyboardInterrupt: 

# CustomDataset

In [6]:
# 원하는 유형의 데이터셋을 로드할 수 있도록 변경함.
class CustomDataset(Dataset):
    def __init__(self, clean_image_paths, noisy_image_paths, image_types=None, transform=None):
        self.clean_image_paths = [os.path.join(clean_image_paths, x) for x in os.listdir(clean_image_paths)]
        self.noisy_image_paths = [os.path.join(noisy_image_paths, x) for x in os.listdir(noisy_image_paths)]
        self.transform = transform
        self.center_crop = CenterCrop(1080)
        self.resize = Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE']))
        self.image_types = image_types  # 추가된 파라미터로 원하는 이미지 유형 리스트

        # Create a list of (noisy, clean) pairs
        self.noisy_clean_pairs = self._create_noisy_clean_pairs()

    def _create_noisy_clean_pairs(self):
        clean_to_noisy = {}
        for clean_path in self.clean_image_paths:
            clean_id = '_'.join(os.path.basename(clean_path).split('_')[:-1])
            clean_to_noisy[clean_id] = clean_path

        noisy_clean_pairs = []
        for noisy_path in self.noisy_image_paths:
            noisy_id = '_'.join(os.path.basename(noisy_path).split('_')[:-1])
            image_type = os.path.basename(noisy_path).split('_')[-1].replace(".jpg", "")  # 이미지 유형 추출

            # 이미지 유형이 주어졌을 경우 필터링
            if self.image_types is None or image_type in self.image_types:
                if noisy_id in clean_to_noisy:
                    clean_path = clean_to_noisy[noisy_id]
                    noisy_clean_pairs.append((noisy_path, clean_path))

        return noisy_clean_pairs

    def __len__(self):
        return len(self.noisy_clean_pairs)

    def __getitem__(self, index):
        noisy_image_path, clean_image_path = self.noisy_clean_pairs[index]

        noisy_image = Image.open(noisy_image_path).convert("RGB")
        clean_image = Image.open(clean_image_path).convert("RGB")

        # Central Crop and Resize
        noisy_image = self.center_crop(noisy_image)
        clean_image = self.center_crop(clean_image)
        noisy_image = self.resize(noisy_image)
        clean_image = self.resize(clean_image)

        if self.transform:
            noisy_image = self.transform(noisy_image)
            clean_image = self.transform(clean_image)

        return noisy_image, clean_image


# Model Define

### Baseline

In [None]:
class MDTA(nn.Module):
    def __init__(self, channels, num_heads):
        super(MDTA, self).__init__()
        self.num_heads = num_heads
        self.temperature = nn.Parameter(torch.ones(1, num_heads, 1, 1))

        self.qkv = nn.Conv2d(channels, channels * 3, kernel_size=1, bias=False)
        self.qkv_conv = nn.Conv2d(channels * 3, channels * 3, kernel_size=3, padding=1, groups=channels * 3, bias=False)
        self.project_out = nn.Conv2d(channels, channels, kernel_size=1, bias=False)

    def forward(self, x):
        b, c, h, w = x.shape
        q, k, v = self.qkv_conv(self.qkv(x)).chunk(3, dim=1)

        q = q.reshape(b, self.num_heads, -1, h * w)
        k = k.reshape(b, self.num_heads, -1, h * w)
        v = v.reshape(b, self.num_heads, -1, h * w)
        q, k = F.normalize(q, dim=-1), F.normalize(k, dim=-1)

        attn = torch.softmax(torch.matmul(q, k.transpose(-2, -1).contiguous()) * self.temperature, dim=-1)
        out = self.project_out(torch.matmul(attn, v).reshape(b, -1, h, w))
        return out


class GDFN(nn.Module):
    def __init__(self, channels, expansion_factor):
        super(GDFN, self).__init__()

        hidden_channels = int(channels * expansion_factor)
        self.project_in = nn.Conv2d(channels, hidden_channels * 2, kernel_size=1, bias=False)
        self.conv = nn.Conv2d(hidden_channels * 2, hidden_channels * 2, kernel_size=3, padding=1,
                              groups=hidden_channels * 2, bias=False)
        self.project_out = nn.Conv2d(hidden_channels, channels, kernel_size=1, bias=False)

    def forward(self, x):
        x1, x2 = self.conv(self.project_in(x)).chunk(2, dim=1)
        x = self.project_out(F.gelu(x1) * x2)
        return x


class TransformerBlock(nn.Module):
    def __init__(self, channels, num_heads, expansion_factor):
        super(TransformerBlock, self).__init__()

        self.norm1 = nn.LayerNorm(channels)
        self.attn = MDTA(channels, num_heads)
        self.norm2 = nn.LayerNorm(channels)
        self.ffn = GDFN(channels, expansion_factor)

    def forward(self, x):
        b, c, h, w = x.shape
        x = x + self.attn(self.norm1(x.reshape(b, c, -1).transpose(-2, -1).contiguous()).transpose(-2, -1)
                          .contiguous().reshape(b, c, h, w))
        x = x + self.ffn(self.norm2(x.reshape(b, c, -1).transpose(-2, -1).contiguous()).transpose(-2, -1)
                         .contiguous().reshape(b, c, h, w))
        return x


class DownSample(nn.Module):
    def __init__(self, channels):
        super(DownSample, self).__init__()
        self.body = nn.Sequential(nn.Conv2d(channels, channels // 2, kernel_size=3, padding=1, bias=False),
                                  nn.PixelUnshuffle(2))

    def forward(self, x):
        return self.body(x)


class UpSample(nn.Module):
    def __init__(self, channels):
        super(UpSample, self).__init__()
        self.body = nn.Sequential(nn.Conv2d(channels, channels * 2, kernel_size=3, padding=1, bias=False),
                                  nn.PixelShuffle(2))

    def forward(self, x):
        return self.body(x)


class Restormer(nn.Module):
    def __init__(self, num_blocks=[4, 6, 6, 8], num_heads=[1, 2, 4, 8], channels=[24, 48, 96, 192], num_refinement=4, expansion_factor=2.66):

        super(Restormer, self).__init__()

        self.embed_conv = nn.Conv2d(3, channels[0], kernel_size=3, padding=1, bias=False)

        self.encoders = nn.ModuleList([nn.Sequential(*[TransformerBlock(
            num_ch, num_ah, expansion_factor) for _ in range(num_tb)]) for num_tb, num_ah, num_ch in
                                       zip(num_blocks, num_heads, channels)])

        # the number of down sample or up sample == the number of encoder - 1
        self.downs = nn.ModuleList([DownSample(num_ch) for num_ch in channels[:-1]])
        self.ups = nn.ModuleList([UpSample(num_ch) for num_ch in list(reversed(channels))[:-1]])

        # the number of reduce block == the number of decoder - 1
        self.reduces = nn.ModuleList([nn.Conv2d(channels[i], channels[i - 1], kernel_size=1, bias=False)
                                      for i in reversed(range(2, len(channels)))])

        # the number of decoder == the number of encoder - 1
        self.decoders = nn.ModuleList([nn.Sequential(*[TransformerBlock(channels[2], num_heads[2], expansion_factor)
                                                       for _ in range(num_blocks[2])])])
        self.decoders.append(nn.Sequential(*[TransformerBlock(channels[1], num_heads[1], expansion_factor)
                                             for _ in range(num_blocks[1])]))

        # the channel of last one is not change
        self.decoders.append(nn.Sequential(*[TransformerBlock(channels[1], num_heads[0], expansion_factor)
                                             for _ in range(num_blocks[0])]))

        self.refinement = nn.Sequential(*[TransformerBlock(channels[1], num_heads[0], expansion_factor)
                                          for _ in range(num_refinement)])
        self.output = nn.Conv2d(channels[1], 3, kernel_size=3, padding=1, bias=False)

    def forward(self, x):
        fo = self.embed_conv(x)
        out_enc1 = self.encoders[0](fo)
        out_enc2 = self.encoders[1](self.downs[0](out_enc1))
        out_enc3 = self.encoders[2](self.downs[1](out_enc2))
        out_enc4 = self.encoders[3](self.downs[2](out_enc3))

        out_dec3 = self.decoders[0](self.reduces[0](torch.cat([self.ups[0](out_enc4), out_enc3], dim=1)))
        out_dec2 = self.decoders[1](self.reduces[1](torch.cat([self.ups[1](out_dec3), out_enc2], dim=1)))
        fd = self.decoders[2](torch.cat([self.ups[2](out_dec2), out_enc1], dim=1))
        fr = self.refinement(fd)
        out = self.output(fr) + x
        return out

### Pretrained

In [7]:
## Restormer: Efficient Transformer for High-Resolution Image Restoration
## Syed Waqas Zamir, Aditya Arora, Salman Khan, Munawar Hayat, Fahad Shahbaz Khan, and Ming-Hsuan Yang
## https://arxiv.org/abs/2111.09881


import torch
import torch.nn as nn
import torch.nn.functional as F
from pdb import set_trace as stx
import numbers

from einops import rearrange



##########################################################################
## Layer Norm

def to_3d(x):
    return rearrange(x, 'b c h w -> b (h w) c')

def to_4d(x,h,w):
    return rearrange(x, 'b (h w) c -> b c h w',h=h,w=w)

class BiasFree_LayerNorm(nn.Module):
    def __init__(self, normalized_shape):
        super(BiasFree_LayerNorm, self).__init__()
        if isinstance(normalized_shape, numbers.Integral):
            normalized_shape = (normalized_shape,)
        normalized_shape = torch.Size(normalized_shape)

        assert len(normalized_shape) == 1

        self.weight = nn.Parameter(torch.ones(normalized_shape))
        self.normalized_shape = normalized_shape

    def forward(self, x):
        sigma = x.var(-1, keepdim=True, unbiased=False)
        return x / torch.sqrt(sigma+1e-5) * self.weight

class WithBias_LayerNorm(nn.Module):
    def __init__(self, normalized_shape):
        super(WithBias_LayerNorm, self).__init__()
        if isinstance(normalized_shape, numbers.Integral):
            normalized_shape = (normalized_shape,)
        normalized_shape = torch.Size(normalized_shape)

        assert len(normalized_shape) == 1

        self.weight = nn.Parameter(torch.ones(normalized_shape))
        self.bias = nn.Parameter(torch.zeros(normalized_shape))
        self.normalized_shape = normalized_shape

    def forward(self, x):
        mu = x.mean(-1, keepdim=True)
        sigma = x.var(-1, keepdim=True, unbiased=False)
        return (x - mu) / torch.sqrt(sigma+1e-5) * self.weight + self.bias


class LayerNorm(nn.Module):
    def __init__(self, dim, LayerNorm_type):
        super(LayerNorm, self).__init__()
        if LayerNorm_type =='BiasFree':
            self.body = BiasFree_LayerNorm(dim)
        else:
            self.body = WithBias_LayerNorm(dim)

    def forward(self, x):
        h, w = x.shape[-2:]
        return to_4d(self.body(to_3d(x)), h, w)



##########################################################################
## Gated-Dconv Feed-Forward Network (GDFN)
class FeedForward(nn.Module):
    def __init__(self, dim, ffn_expansion_factor, bias):
        super(FeedForward, self).__init__()

        hidden_features = int(dim*ffn_expansion_factor)

        self.project_in = nn.Conv2d(dim, hidden_features*2, kernel_size=1, bias=bias)

        self.dwconv = nn.Conv2d(hidden_features*2, hidden_features*2, kernel_size=3, stride=1, padding=1, groups=hidden_features*2, bias=bias)

        self.project_out = nn.Conv2d(hidden_features, dim, kernel_size=1, bias=bias)

    def forward(self, x):
        x = self.project_in(x)
        x1, x2 = self.dwconv(x).chunk(2, dim=1)
        x = F.gelu(x1) * x2
        x = self.project_out(x)
        return x



##########################################################################
## Multi-DConv Head Transposed Self-Attention (MDTA)
class Attention(nn.Module):
    def __init__(self, dim, num_heads, bias):
        super(Attention, self).__init__()
        self.num_heads = num_heads
        self.temperature = nn.Parameter(torch.ones(num_heads, 1, 1))

        self.qkv = nn.Conv2d(dim, dim*3, kernel_size=1, bias=bias)
        self.qkv_dwconv = nn.Conv2d(dim*3, dim*3, kernel_size=3, stride=1, padding=1, groups=dim*3, bias=bias)
        self.project_out = nn.Conv2d(dim, dim, kernel_size=1, bias=bias)



    def forward(self, x):
        b,c,h,w = x.shape

        qkv = self.qkv_dwconv(self.qkv(x))
        q,k,v = qkv.chunk(3, dim=1)

        q = rearrange(q, 'b (head c) h w -> b head c (h w)', head=self.num_heads)
        k = rearrange(k, 'b (head c) h w -> b head c (h w)', head=self.num_heads)
        v = rearrange(v, 'b (head c) h w -> b head c (h w)', head=self.num_heads)

        q = torch.nn.functional.normalize(q, dim=-1)
        k = torch.nn.functional.normalize(k, dim=-1)

        attn = (q @ k.transpose(-2, -1)) * self.temperature
        attn = attn.softmax(dim=-1)

        out = (attn @ v)

        out = rearrange(out, 'b head c (h w) -> b (head c) h w', head=self.num_heads, h=h, w=w)

        out = self.project_out(out)
        return out



##########################################################################
class TransformerBlock(nn.Module):
    def __init__(self, dim, num_heads, ffn_expansion_factor, bias, LayerNorm_type):
        super(TransformerBlock, self).__init__()

        self.norm1 = LayerNorm(dim, LayerNorm_type)
        self.attn = Attention(dim, num_heads, bias)
        self.norm2 = LayerNorm(dim, LayerNorm_type)
        self.ffn = FeedForward(dim, ffn_expansion_factor, bias)

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.ffn(self.norm2(x))

        return x



##########################################################################
## Overlapped image patch embedding with 3x3 Conv
class OverlapPatchEmbed(nn.Module):
    def __init__(self, in_c=3, embed_dim=48, bias=False):
        super(OverlapPatchEmbed, self).__init__()

        self.proj = nn.Conv2d(in_c, embed_dim, kernel_size=3, stride=1, padding=1, bias=bias)

    def forward(self, x):
        x = self.proj(x)

        return x



##########################################################################
## Resizing modules
class Downsample(nn.Module):
    def __init__(self, n_feat):
        super(Downsample, self).__init__()

        self.body = nn.Sequential(nn.Conv2d(n_feat, n_feat//2, kernel_size=3, stride=1, padding=1, bias=False),
                                  nn.PixelUnshuffle(2))

    def forward(self, x):
        return self.body(x)

class Upsample(nn.Module):
    def __init__(self, n_feat):
        super(Upsample, self).__init__()

        self.body = nn.Sequential(nn.Conv2d(n_feat, n_feat*2, kernel_size=3, stride=1, padding=1, bias=False),
                                  nn.PixelShuffle(2))

    def forward(self, x):
        return self.body(x)

##########################################################################
##---------- Restormer -----------------------
class PretrainedRestormer(nn.Module):
    def __init__(self,
        inp_channels=3,
        out_channels=3,
        dim = 48,
        num_blocks = [4,6,6,8],
        num_refinement_blocks = 4,
        heads = [1,2,4,8],
        ffn_expansion_factor = 2.66,
        bias = False,
        LayerNorm_type = 'WithBias',   ## Other option 'BiasFree'
        dual_pixel_task = False        ## True for dual-pixel defocus deblurring only. Also set inp_channels=6
    ):

        super(PretrainedRestormer, self).__init__()

        self.patch_embed = OverlapPatchEmbed(inp_channels, dim)

        self.encoder_level1 = nn.Sequential(*[TransformerBlock(dim=dim, num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])])

        self.down1_2 = Downsample(dim) ## From Level 1 to Level 2
        self.encoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])])

        self.down2_3 = Downsample(int(dim*2**1)) ## From Level 2 to Level 3
        self.encoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])])

        self.down3_4 = Downsample(int(dim*2**2)) ## From Level 3 to Level 4
        self.latent = nn.Sequential(*[TransformerBlock(dim=int(dim*2**3), num_heads=heads[3], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[3])])

        self.up4_3 = Upsample(int(dim*2**3)) ## From Level 4 to Level 3
        self.reduce_chan_level3 = nn.Conv2d(int(dim*2**3), int(dim*2**2), kernel_size=1, bias=bias)
        self.decoder_level3 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**2), num_heads=heads[2], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[2])])


        self.up3_2 = Upsample(int(dim*2**2)) ## From Level 3 to Level 2
        self.reduce_chan_level2 = nn.Conv2d(int(dim*2**2), int(dim*2**1), kernel_size=1, bias=bias)
        self.decoder_level2 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[1], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[1])])

        self.up2_1 = Upsample(int(dim*2**1))  ## From Level 2 to Level 1  (NO 1x1 conv to reduce channels)

        self.decoder_level1 = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_blocks[0])])

        self.refinement = nn.Sequential(*[TransformerBlock(dim=int(dim*2**1), num_heads=heads[0], ffn_expansion_factor=ffn_expansion_factor, bias=bias, LayerNorm_type=LayerNorm_type) for i in range(num_refinement_blocks)])

        #### For Dual-Pixel Defocus Deblurring Task ####
        self.dual_pixel_task = dual_pixel_task
        if self.dual_pixel_task:
            self.skip_conv = nn.Conv2d(dim, int(dim*2**1), kernel_size=1, bias=bias)
        ###########################

        self.output = nn.Conv2d(int(dim*2**1), out_channels, kernel_size=3, stride=1, padding=1, bias=bias)

    def forward(self, inp_img):

        inp_enc_level1 = self.patch_embed(inp_img)
        out_enc_level1 = self.encoder_level1(inp_enc_level1)

        inp_enc_level2 = self.down1_2(out_enc_level1)
        out_enc_level2 = self.encoder_level2(inp_enc_level2)

        inp_enc_level3 = self.down2_3(out_enc_level2)
        out_enc_level3 = self.encoder_level3(inp_enc_level3)

        inp_enc_level4 = self.down3_4(out_enc_level3)
        latent = self.latent(inp_enc_level4)

        inp_dec_level3 = self.up4_3(latent)
        inp_dec_level3 = torch.cat([inp_dec_level3, out_enc_level3], 1)
        inp_dec_level3 = self.reduce_chan_level3(inp_dec_level3)
        out_dec_level3 = self.decoder_level3(inp_dec_level3)

        inp_dec_level2 = self.up3_2(out_dec_level3)
        inp_dec_level2 = torch.cat([inp_dec_level2, out_enc_level2], 1)
        inp_dec_level2 = self.reduce_chan_level2(inp_dec_level2)
        out_dec_level2 = self.decoder_level2(inp_dec_level2)

        inp_dec_level1 = self.up2_1(out_dec_level2)
        inp_dec_level1 = torch.cat([inp_dec_level1, out_enc_level1], 1)
        out_dec_level1 = self.decoder_level1(inp_dec_level1)

        out_dec_level1 = self.refinement(out_dec_level1)

        #### For Dual-Pixel Defocus Deblurring Task ####
        if self.dual_pixel_task:
            out_dec_level1 = out_dec_level1 + self.skip_conv(inp_enc_level1)
            out_dec_level1 = self.output(out_dec_level1)
        ###########################
        else:
            out_dec_level1 = self.output(out_dec_level1) + inp_img


        return out_dec_level1


# Train(Ablation)

### Baseline

In [None]:
# 시작 시간 기록
start_time = time.time()

def weights_init(m):
    if isinstance(m, nn.Conv2d):
        nn.init.kaiming_uniform_(m.weight.data, mode='fan_in', nonlinearity='relu')

def load_img(filepath):
    img = cv2.imread(filepath)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# 데이터셋 경로
noisy_image_paths = '/content/drive/MyDrive/event (1)/Training/noisy'
clean_image_paths = '/content/drive/MyDrive/event (1)/Training/clean'

# 데이터셋 로드 및 전처리
train_transform = Compose([
    ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# 커스텀 데이터셋 인스턴스 생성
train_dataset = CustomDataset(clean_image_paths, noisy_image_paths, transform=train_transform)
print(f"Train Dataset Size: {len(train_dataset)}")

# 데이터 로더 설정
num_cores = os.cpu_count()
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], num_workers=int(num_cores/2), shuffle=True)

# GPU 사용 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Restormer 모델 인스턴스 생성 및 GPU로 이동
model = Restormer().to(device)

# 손실 함수와 최적화 알고리즘 설정
optimizer = optim.AdamW(model.parameters(), lr = CFG['LEARNING_RATE'], weight_decay=1e-4)
criterion = nn.L1Loss()
scaler = GradScaler()
scheduler = CosineAnnealingLR(optimizer, T_max=CFG['EPOCHS'])

# 모델의 파라미터 수 계산
total_parameters = count_parameters(model)
print("Total Parameters:", total_parameters)

# 모델 학습
model.train()
best_loss = 1000

for epoch in range(CFG['EPOCHS']):
    model.train()
    epoch_start_time = time.time()
    mse_running_loss = 0.0

    for noisy_images, clean_images in train_loader:
        noisy_images = noisy_images.to(device)
        clean_images = clean_images.to(device)

        optimizer.zero_grad()

        with autocast():
            outputs = model(noisy_images)
            mse_loss = criterion(outputs, clean_images)

        scaler.scale(mse_loss).backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()
        scheduler.step()

        mse_running_loss += mse_loss.item() * noisy_images.size(0)

    current_lr = scheduler.get_last_lr()[0]
    epoch_end_time = time.time()
    epoch_time = epoch_end_time - epoch_start_time
    minutes = int(epoch_time // 60)
    seconds = int(epoch_time % 60)
    hours = int(minutes // 60)
    minutes = int(minutes % 60)

    mse_epoch_loss = mse_running_loss / len(train_dataset)
    print(f"Epoch {epoch+1}/{CFG['EPOCHS']}, MSE Loss: {mse_epoch_loss:.4f}, Lr: {current_lr:.8f}")
    print(f"1epoch 훈련 소요 시간: {hours}시간 {minutes}분 {seconds}초")

    if mse_epoch_loss < best_loss:
        best_loss = mse_epoch_loss
        torch.save(model.state_dict(), 'best_Restormer.pth')
        print(f"{epoch+1}epoch 모델 저장 완료")

# 종료 시간 기록
end_time = time.time()

# 소요 시간 계산
training_time = end_time - start_time
minutes = int(training_time // 60)
seconds = int(training_time % 60)
hours = int(minutes // 60)
minutes = int(minutes % 60)

# 결과 출력
print(f"훈련 소요 시간: {hours}시간 {minutes}분 {seconds}초")

Train Dataset Size: 13921
Total Parameters: 6676132
Epoch 1/1, MSE Loss: 0.1670, Lr: 0.00000000
1epoch 훈련 소요 시간: 0시간 14분 49초
1epoch 모델 저장 완료
훈련 소요 시간: 0시간 14분 53초


### Finetuning - PSNR Ver 1

In [8]:
import time
import os
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.cuda.amp import autocast, GradScaler
from torchvision.transforms import Compose, ToTensor
from torchvision import transforms
from torch.optim.lr_scheduler import CosineAnnealingLR

# 시작 시간 기록
start_time = time.time()

# 데이터셋 경로
noisy_image_paths = '/content/drive/MyDrive/event/Training/noisy'
clean_image_paths = '/content/drive/MyDrive/event/Training/clean'

# 데이터 전처리 설정
train_transform = Compose([
    transforms.Resize((128, 128)),  # 이미지 크기 줄이기
    ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# 데이터셋 및 데이터 로더 설정
train_dataset = CustomDataset(clean_image_paths, noisy_image_paths, transform=train_transform)
print(f"Train Dataset Size: {len(train_dataset)}")

# 데이터 로더 설정
num_cores = os.cpu_count()
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], num_workers=int(num_cores/2), shuffle=True)

# GPU 사용 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 모델 인스턴스 생성
pretrained_model = PretrainedRestormer().to(device)

# 사전학습된 가중치 로드
def load_pretrained_checkpoint(filepath, model):
    checkpoint = torch.load(filepath, map_location=device)
    state_dict = checkpoint.get('params') or checkpoint.get('state_dict') or checkpoint

    # 입력 채널이 불일치하는 경우 첫 번째 레이어 가중치 수정
    if model.patch_embed.proj.weight.shape[1] != state_dict['patch_embed.proj.weight'].shape[1]:
        print("Adjusting input channels to match model configuration")
        original_weight = state_dict['patch_embed.proj.weight']
        new_weight = original_weight[:, :3, :, :]  # 첫 3채널만 가져오기 (RGB)
        state_dict['patch_embed.proj.weight'] = new_weight

    # 가중치 로드
    model.load_state_dict(state_dict, strict=False)
    print("Checkpoint loaded successfully")

pretrained_checkpoint_path = "./pretrained_models/single_image_defocus_deblurring.pth"
load_pretrained_checkpoint(pretrained_checkpoint_path, pretrained_model)

# 손실 함수, 옵티마이저, 학습률 스케줄러 설정
def psnr_loss(pred, target, max_val=1.0):
    mse = torch.mean((pred - target) ** 2)
    return -10 * torch.log10(mse + 1e-10) + max_val

criterion = lambda pred, target: nn.L1Loss()(pred, target) - psnr_loss(pred, target)
# criterion = nn.L1Loss()

optimizer = optim.AdamW(pretrained_model.parameters(), lr=CFG['LEARNING_RATE'], weight_decay=1e-4)
scaler = GradScaler()  # AMP용 스케일러
scheduler = CosineAnnealingLR(optimizer, T_max=CFG['EPOCHS'])

# 파라미터 수 출력
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

total_parameters = count_parameters(pretrained_model)
print("Total Parameters:", total_parameters)

# 모델 파인튜닝
pretrained_model.train()
best_loss = float('inf')

# GPU 메모리 릴리스
if torch.cuda.is_available():
    torch.cuda.empty_cache()
    print("GPU memory has been released.")

for epoch in range(CFG['EPOCHS']):
    pretrained_model.train()
    epoch_start_time = time.time()
    running_loss = 0.0

    for noisy_images, clean_images in train_loader:
        noisy_images = noisy_images.to(device)
        clean_images = clean_images.to(device)

        optimizer.zero_grad()

        # 자동 혼합 정밀도(AMP) 사용
        with autocast():
            outputs = pretrained_model(noisy_images)
            loss = criterion(outputs, clean_images)

        # 역전파 및 최적화
        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(pretrained_model.parameters(), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * noisy_images.size(0)

    scheduler.step()  # 학습률 스케줄러 업데이트
    epoch_loss = running_loss / len(train_dataset)

    # 에포크 소요 시간 계산
    epoch_end_time = time.time()
    epoch_time = epoch_end_time - epoch_start_time
    minutes = int(epoch_time // 60)
    seconds = int(epoch_time % 60)
    hours = int(minutes // 60)
    minutes = int(minutes % 60)

    # 에포크 손실 및 학습률 출력
    current_lr = scheduler.get_last_lr()[0]
    print(f"Epoch {epoch+1}/{CFG['EPOCHS']}, Loss: {epoch_loss:.4f}, LR: {current_lr:.8f}")
    print(f"Epoch Time: {hours}h {minutes}m {seconds}s")

    # 최상의 모델 저장
    if epoch_loss < best_loss:
        best_loss = epoch_loss
        torch.save(pretrained_model.state_dict(), 'fine_tuned_PSNRloss.pth')
        print(f"Best model saved at epoch {epoch+1}")

# 전체 학습 소요 시간 계산
end_time = time.time()
training_time = end_time - start_time
minutes = int(training_time // 60)
seconds = int(training_time % 60)
hours = int(minutes // 60)
minutes = int(minutes % 60)

# 전체 훈련 시간 출력
print(f"Total Training Time: {hours}h {minutes}m {seconds}s")


Train Dataset Size: 13921
Checkpoint loaded successfully
Total Parameters: 26126644
GPU memory has been released.
Epoch 1/15, Loss: -14.1498, LR: 0.00029672
Epoch Time: 0h 7m 19s
Best model saved at epoch 1
Epoch 2/15, Loss: -15.6837, LR: 0.00028703
Epoch Time: 0h 5m 43s
Best model saved at epoch 2
Epoch 3/15, Loss: -16.6854, LR: 0.00027135
Epoch Time: 0h 5m 42s
Best model saved at epoch 3
Epoch 4/15, Loss: -17.4790, LR: 0.00025037
Epoch Time: 0h 5m 42s
Best model saved at epoch 4
Epoch 5/15, Loss: -18.1463, LR: 0.00022500
Epoch Time: 0h 5m 42s
Best model saved at epoch 5
Epoch 6/15, Loss: -18.8069, LR: 0.00019635
Epoch Time: 0h 5m 43s
Best model saved at epoch 6
Epoch 7/15, Loss: -19.4515, LR: 0.00016568
Epoch Time: 0h 5m 43s
Best model saved at epoch 7
Epoch 8/15, Loss: -19.8453, LR: 0.00013432
Epoch Time: 0h 5m 43s
Best model saved at epoch 8
Epoch 9/15, Loss: -20.4983, LR: 0.00010365
Epoch Time: 0h 5m 43s
Best model saved at epoch 9
Epoch 10/15, Loss: -21.3030, LR: 0.00007500
Epoch

### Finetuning - PSNR Ver 2

In [15]:
import time
import os
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.cuda.amp import autocast, GradScaler
from torchvision.transforms import Compose, ToTensor
from torchvision import transforms
from torch.optim.lr_scheduler import CosineAnnealingLR

# 시작 시간 기록
start_time = time.time()

# 데이터셋 경로
noisy_image_paths = '/content/drive/MyDrive/event/Training/noisy'
clean_image_paths = '/content/drive/MyDrive/event/Training/clean'

# 데이터 전처리 설정
train_transform = Compose([
    transforms.Resize((128, 128)),  # 이미지 크기 줄이기
    ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# 데이터셋 및 데이터 로더 설정
train_dataset = CustomDataset(clean_image_paths, noisy_image_paths, transform=train_transform)
print(f"Train Dataset Size: {len(train_dataset)}")

# 데이터 로더 설정
num_cores = os.cpu_count()
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], num_workers=int(num_cores/2), shuffle=True)

# GPU 사용 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 모델 인스턴스 생성
pretrained_model = PretrainedRestormer().to(device)

# 사전학습된 가중치 로드
def load_pretrained_checkpoint(filepath, model):
    checkpoint = torch.load(filepath, map_location=device)
    state_dict = checkpoint.get('params') or checkpoint.get('state_dict') or checkpoint

    # 입력 채널이 불일치하는 경우 첫 번째 레이어 가중치 수정
    if model.patch_embed.proj.weight.shape[1] != state_dict['patch_embed.proj.weight'].shape[1]:
        print("Adjusting input channels to match model configuration")
        original_weight = state_dict['patch_embed.proj.weight']
        new_weight = original_weight[:, :3, :, :]  # 첫 3채널만 가져오기 (RGB)
        state_dict['patch_embed.proj.weight'] = new_weight

    # 가중치 로드
    model.load_state_dict(state_dict, strict=False)
    print("Checkpoint loaded successfully")

pretrained_checkpoint_path = "./pretrained_models/single_image_defocus_deblurring.pth"
load_pretrained_checkpoint(pretrained_checkpoint_path, pretrained_model)

# 손실 함수, 옵티마이저, 학습률 스케줄러 설정
def psnr_loss(pred, target, max_val=1.0):
    # PSNR 계산
    mse = torch.mean((pred - target) ** 2)
    psnr = 10 * torch.log10((max_val ** 2) / (mse + 1e-10))

    # PSNR을 음수로 변환하여 손실 함수로 사용
    return -psnr

criterion = psnr_loss
# criterion = nn.L1Loss()

optimizer = optim.AdamW(pretrained_model.parameters(), lr=CFG['LEARNING_RATE'], weight_decay=1e-4)
scaler = GradScaler()  # AMP용 스케일러
scheduler = CosineAnnealingLR(optimizer, T_max=CFG['EPOCHS'])

# 파라미터 수 출력
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

total_parameters = count_parameters(pretrained_model)
print("Total Parameters:", total_parameters)

# 모델 파인튜닝
pretrained_model.train()
best_loss = float('inf')

# GPU 메모리 릴리스
if torch.cuda.is_available():
    torch.cuda.empty_cache()
    print("GPU memory has been released.")

for epoch in range(CFG['EPOCHS']):
    pretrained_model.train()
    epoch_start_time = time.time()
    running_loss = 0.0

    for noisy_images, clean_images in train_loader:
        noisy_images = noisy_images.to(device)
        clean_images = clean_images.to(device)

        optimizer.zero_grad()

        # 자동 혼합 정밀도(AMP) 사용
        with autocast():
            outputs = pretrained_model(noisy_images)
            loss = criterion(outputs, clean_images)

        # 역전파 및 최적화
        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(pretrained_model.parameters(), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * noisy_images.size(0)

    scheduler.step()  # 학습률 스케줄러 업데이트
    epoch_loss = running_loss / len(train_dataset)

    # 에포크 소요 시간 계산
    epoch_end_time = time.time()
    epoch_time = epoch_end_time - epoch_start_time
    minutes = int(epoch_time // 60)
    seconds = int(epoch_time % 60)
    hours = int(minutes // 60)
    minutes = int(minutes % 60)

    # 에포크 손실 및 학습률 출력
    current_lr = scheduler.get_last_lr()[0]
    print(f"Epoch {epoch+1}/{CFG['EPOCHS']}, Loss: {epoch_loss:.4f}, LR: {current_lr:.8f}")
    print(f"Epoch Time: {hours}h {minutes}m {seconds}s")

    # 최상의 모델 저장
    if epoch_loss < best_loss:
        best_loss = epoch_loss
        torch.save(pretrained_model.state_dict(), 'fine_tuned_PSNRloss2_20epoch.pth')
        print(f"Best model saved at epoch {epoch+1}")

# 전체 학습 소요 시간 계산
end_time = time.time()
training_time = end_time - start_time
minutes = int(training_time // 60)
seconds = int(training_time % 60)
hours = int(minutes // 60)
minutes = int(minutes % 60)

# 전체 훈련 시간 출력
print(f"Total Training Time: {hours}h {minutes}m {seconds}s")


Train Dataset Size: 13921
Checkpoint loaded successfully
Total Parameters: 26126644
GPU memory has been released.
Epoch 1/20, Loss: -13.0708, LR: 0.00029815
Epoch Time: 0h 7m 7s
Best model saved at epoch 1
Epoch 2/20, Loss: -14.8423, LR: 0.00029266
Epoch Time: 0h 5m 43s
Best model saved at epoch 2
Epoch 3/20, Loss: -15.7329, LR: 0.00028365
Epoch Time: 0h 5m 44s
Best model saved at epoch 3
Epoch 4/20, Loss: -16.6085, LR: 0.00027135
Epoch Time: 0h 5m 44s
Best model saved at epoch 4
Epoch 5/20, Loss: -17.0950, LR: 0.00025607
Epoch Time: 0h 5m 44s
Best model saved at epoch 5
Epoch 6/20, Loss: -17.8065, LR: 0.00023817
Epoch Time: 0h 5m 44s
Best model saved at epoch 6
Epoch 7/20, Loss: -18.5695, LR: 0.00021810
Epoch Time: 0h 5m 43s
Best model saved at epoch 7
Epoch 8/20, Loss: -19.0824, LR: 0.00019635
Epoch Time: 0h 5m 43s
Best model saved at epoch 8
Epoch 9/20, Loss: -19.6512, LR: 0.00017347
Epoch Time: 0h 5m 43s
Best model saved at epoch 9
Epoch 10/20, Loss: -19.7329, LR: 0.00015000
Epoch 

### Finetuning - Layer Freeze(Train only Attention)

In [None]:
import time
import os
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.cuda.amp import autocast, GradScaler
from torchvision.transforms import Compose, ToTensor
from torchvision import transforms
from torch.optim.lr_scheduler import CosineAnnealingLR

# 시작 시간 기록
start_time = time.time()

# 데이터셋 경로
noisy_image_paths = '/content/drive/MyDrive/event/Training/noisy'
clean_image_paths = '/content/drive/MyDrive/event/Training/clean'

# 데이터 전처리 설정
train_transform = Compose([
    transforms.Resize((128, 128)),  # 이미지 크기 줄이기
    ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# 데이터셋 및 데이터 로더 설정
train_dataset = CustomDataset(clean_image_paths, noisy_image_paths, transform=train_transform)
print(f"Train Dataset Size: {len(train_dataset)}")

# 데이터 로더 설정
num_cores = os.cpu_count()
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], num_workers=int(num_cores/2), shuffle=True)

# GPU 사용 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 모델 인스턴스 생성
pretrained_model = PretrainedRestormer().to(device)

# 사전학습된 가중치 로드
def load_pretrained_checkpoint(filepath, model):
    checkpoint = torch.load(filepath, map_location=device)
    state_dict = checkpoint.get('params') or checkpoint.get('state_dict') or checkpoint

    # 입력 채널이 불일치하는 경우 첫 번째 레이어 가중치 수정
    if model.patch_embed.proj.weight.shape[1] != state_dict['patch_embed.proj.weight'].shape[1]:
        print("Adjusting input channels to match model configuration")
        original_weight = state_dict['patch_embed.proj.weight']
        new_weight = original_weight[:, :3, :, :]  # 첫 3채널만 가져오기 (RGB)
        state_dict['patch_embed.proj.weight'] = new_weight

    # 가중치 로드
    model.load_state_dict(state_dict, strict=False)
    print("Checkpoint loaded successfully")

pretrained_checkpoint_path = "./pretrained_models/single_image_defocus_deblurring.pth"
load_pretrained_checkpoint(pretrained_checkpoint_path, pretrained_model)

# 모든 파라미터 고정 (학습되지 않도록 설정)
for param in pretrained_model.parameters():
    param.requires_grad = False

# 디코더의 마지막 레이어만 학습 가능하도록 설정
for param in pretrained_model.decoder_level1.parameters():
    param.requires_grad = True

# GDFN 및 MDTA 모듈만 학습 가능하도록 설정
for module in pretrained_model.modules():
    if isinstance(module, FeedForward) or isinstance(module, Attention):
        for param in module.parameters():
            param.requires_grad = True

# 손실 함수 설정
def psnr_loss(pred, target, max_val=1.0):
    mse = torch.mean((pred - target) ** 2)
    return -10 * torch.log10(mse + 1e-10) + max_val

criterion = lambda pred, target: nn.L1Loss()(pred, target) - psnr_loss(pred, target)

# 필요한 부분만 업데이트하도록 옵티마이저 설정
optimizer = optim.AdamW(
    filter(lambda p: p.requires_grad, pretrained_model.parameters()),
    lr=CFG['LEARNING_RATE'],
    weight_decay=1e-4
)
scaler = GradScaler()  # AMP용 스케일러
scheduler = CosineAnnealingLR(optimizer, T_max=CFG['EPOCHS'])

# 모델 파인튜닝
pretrained_model.train()
best_loss = float('inf')

# GPU 메모리 릴리스
if torch.cuda.is_available():
    torch.cuda.empty_cache()
    print("GPU memory has been released.")

for epoch in range(CFG['EPOCHS']):
    pretrained_model.train()
    epoch_start_time = time.time()
    running_loss = 0.0

    for noisy_images, clean_images in train_loader:
        noisy_images = noisy_images.to(device)
        clean_images = clean_images.to(device)

        optimizer.zero_grad()

        # 자동 혼합 정밀도(AMP) 사용
        with autocast():
            outputs = pretrained_model(noisy_images)
            loss = criterion(outputs, clean_images)

        # 역전파 및 최적화
        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(filter(lambda p: p.requires_grad, pretrained_model.parameters()), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * noisy_images.size(0)

    scheduler.step()  # 학습률 스케줄러 업데이트
    epoch_loss = running_loss / len(train_dataset)

    # 에포크 소요 시간 계산
    epoch_end_time = time.time()
    epoch_time = epoch_end_time - epoch_start_time
    minutes = int(epoch_time // 60)
    seconds = int(epoch_time % 60)
    hours = int(minutes // 60)
    minutes = int(minutes % 60)

    # 에포크 손실 및 학습률 출력
    current_lr = scheduler.get_last_lr()[0]
    print(f"Epoch {epoch+1}/{CFG['EPOCHS']}, Loss: {epoch_loss:.4f}, LR: {current_lr:.8f}")
    print(f"Epoch Time: {hours}h {minutes}m {seconds}s")

    # 최상의 모델 저장
    if epoch_loss < best_loss:
        best_loss = epoch_loss
        torch.save(pretrained_model.state_dict(), 'finetuned_Restormer_Layerfreeze.pth')
        print(f"Best model saved at epoch {epoch+1}")

# 전체 학습 소요 시간 계산
end_time = time.time()
training_time = end_time - start_time
minutes = int(training_time // 60)
seconds = int(training_time % 60)
hours = int(minutes // 60)
minutes = int(minutes % 60)

# 전체 훈련 시간 출력
print(f"Total Training Time: {hours}h {minutes}m {seconds}s")


Train Dataset Size: 13921
Checkpoint loaded successfully
GPU memory has been released.
Epoch 1/5, Loss: -14.1384, LR: 0.00027135
Epoch Time: 0h 6m 52s
Best model saved at epoch 1
Epoch 2/5, Loss: -15.9794, LR: 0.00019635
Epoch Time: 0h 5m 30s
Best model saved at epoch 2
Epoch 3/5, Loss: -17.0297, LR: 0.00010365
Epoch Time: 0h 5m 30s
Best model saved at epoch 3
Epoch 4/5, Loss: -17.7327, LR: 0.00002865
Epoch Time: 0h 5m 30s
Best model saved at epoch 4
Epoch 5/5, Loss: -18.4395, LR: 0.00000000
Epoch Time: 0h 5m 30s
Best model saved at epoch 5
Total Training Time: 0h 28m 57s


### Finetuning - Decoder Tuning


In [22]:
import time
import os
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.cuda.amp import autocast, GradScaler
from torchvision.transforms import Compose, ToTensor
from torchvision import transforms
from torch.optim.lr_scheduler import CosineAnnealingLR

# 시작 시간 기록
start_time = time.time()

# 데이터셋 경로
noisy_image_paths = '/content/drive/MyDrive/event/Training/noisy'
clean_image_paths = '/content/drive/MyDrive/event/Training/clean'

# 데이터 전처리 설정
train_transform = Compose([
    transforms.Resize((128, 128)),  # 이미지 크기 줄이기
    ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# 데이터셋 및 데이터 로더 설정
train_dataset = CustomDataset(clean_image_paths, noisy_image_paths, transform=train_transform)
print(f"Train Dataset Size: {len(train_dataset)}")

# 데이터 로더 설정
num_cores = os.cpu_count()
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], num_workers=int(num_cores/2), shuffle=True)

# GPU 사용 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 모델 인스턴스 생성
pretrained_model = PretrainedRestormer().to(device)

# 사전학습된 가중치 로드
def load_pretrained_checkpoint(filepath, model):
    checkpoint = torch.load(filepath, map_location=device)
    state_dict = checkpoint.get('params') or checkpoint.get('state_dict') or checkpoint

    # 입력 채널이 불일치하는 경우 첫 번째 레이어 가중치 수정
    if model.patch_embed.proj.weight.shape[1] != state_dict['patch_embed.proj.weight'].shape[1]:
        print("Adjusting input channels to match model configuration")
        original_weight = state_dict['patch_embed.proj.weight']
        new_weight = original_weight[:, :3, :, :]  # 첫 3채널만 가져오기 (RGB)
        state_dict['patch_embed.proj.weight'] = new_weight

    # 가중치 로드
    model.load_state_dict(state_dict, strict=False)
    print("Checkpoint loaded successfully")

pretrained_checkpoint_path = "./pretrained_models/single_image_defocus_deblurring.pth"
load_pretrained_checkpoint(pretrained_checkpoint_path, pretrained_model)

# 특정 레이어만 파인튜닝 설정
for param in pretrained_model.parameters():
    param.requires_grad = False

for param in pretrained_model.decoder_level1.parameters():
    param.requires_grad = True
for param in pretrained_model.refinement.parameters():
    param.requires_grad = True

# 손실 함수, 옵티마이저, 학습률 스케줄러 설정
def psnr_loss(pred, target, max_val=1.0):
    mse = torch.mean((pred - target) ** 2)
    return -10 * torch.log10(mse + 1e-10) + max_val

criterion = lambda pred, target: nn.L1Loss()(pred, target) - psnr_loss(pred, target)
# criterion = nn.L1Loss()

# 필요한 부분만 업데이트하도록 옵티마이저 설정
optimizer = optim.AdamW(
    filter(lambda p: p.requires_grad, pretrained_model.parameters()),
    lr=CFG['LEARNING_RATE'],
    weight_decay=1e-4
)
scaler = GradScaler()  # AMP용 스케일러
scheduler = CosineAnnealingLR(optimizer, T_max=CFG['EPOCHS'])

# 파라미터 수 출력
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

total_parameters = count_parameters(pretrained_model)
print("Total Parameters:", total_parameters)

# 모델 파인튜닝
pretrained_model.train()
best_loss = float('inf')

for epoch in range(CFG['EPOCHS']):
    pretrained_model.train()
    epoch_start_time = time.time()
    running_loss = 0.0

    for noisy_images, clean_images in train_loader:
        noisy_images = noisy_images.to(device)
        clean_images = clean_images.to(device)

        optimizer.zero_grad()

        # 자동 혼합 정밀도(AMP) 사용
        with autocast():
            outputs = pretrained_model(noisy_images)
            loss = criterion(outputs, clean_images)

        # 역전파 및 최적화
        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(filter(lambda p: p.requires_grad, pretrained_model.parameters()), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * noisy_images.size(0)

    scheduler.step()  # 학습률 스케줄러 업데이트
    epoch_loss = running_loss / len(train_dataset)

    # 에포크 소요 시간 계산
    epoch_end_time = time.time()
    epoch_time = epoch_end_time - epoch_start_time
    minutes = int(epoch_time // 60)
    seconds = int(epoch_time % 60)
    hours = int(minutes // 60)
    minutes = int(minutes % 60)

    # 에포크 손실 및 학습률 출력
    current_lr = scheduler.get_last_lr()[0]
    print(f"Epoch {epoch+1}/{CFG['EPOCHS']}, Loss: {epoch_loss:.4f}, LR: {current_lr:.8f}")
    print(f"Epoch Time: {hours}h {minutes}m {seconds}s")

    # 최상의 모델 저장
    if epoch_loss < best_loss:
        best_loss = epoch_loss
        torch.save(pretrained_model.state_dict(), 'finetuned_Restormer_decoder_refinement.pth')
        print(f"Best model saved at epoch {epoch+1}")

# 전체 학습 소요 시간 출력
end_time = time.time()
training_time = end_time - start_time
minutes = int(training_time // 60)
seconds = int(training_time % 60)
hours = int(minutes // 60)
minutes = int(minutes % 60)

print(f"Total Training Time: {hours}h {minutes}m {seconds}s")


Train Dataset Size: 13921
Checkpoint loaded successfully
Total Parameters: 942968
Epoch 1/5, Loss: -13.2496, LR: 0.00027135
Epoch Time: 0h 8m 56s
Best model saved at epoch 1
Epoch 2/5, Loss: -14.0997, LR: 0.00019635
Epoch Time: 0h 3m 37s
Best model saved at epoch 2
Epoch 3/5, Loss: -14.4894, LR: 0.00010365
Epoch Time: 0h 3m 37s
Best model saved at epoch 3
Epoch 4/5, Loss: -14.8983, LR: 0.00002865
Epoch Time: 0h 3m 37s
Best model saved at epoch 4
Epoch 5/5, Loss: -15.1820, LR: 0.00000000
Epoch Time: 0h 3m 36s
Best model saved at epoch 5
Total Training Time: 0h 24m 11s


# Validation

In [16]:
import torch
import torch.nn.functional as F
import numpy as np
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.transforms import ToTensor
from PIL import Image
import os

class CustomDatasetVal(data.Dataset):
    def __init__(self, noisy_image_paths, clean_image_paths, transform=None):
        self.noisy_image_paths = [os.path.join(noisy_image_paths, x) for x in os.listdir(noisy_image_paths)]
        self.clean_image_paths = [os.path.join(clean_image_paths, x) for x in os.listdir(clean_image_paths)]
        self.transform = transform

    def __len__(self):
        return len(self.noisy_image_paths)

    def __getitem__(self, index):
        noisy_image_path = self.noisy_image_paths[index]
        clean_image_path = self.clean_image_paths[index]

        noisy_image = load_img(noisy_image_path)
        clean_image = load_img(clean_image_path)

        # Convert numpy array to PIL image
        if isinstance(noisy_image, np.ndarray):
            noisy_image = Image.fromarray(noisy_image)
        if isinstance(clean_image, np.ndarray):
            clean_image = Image.fromarray(clean_image)

        if self.transform:
            noisy_image = self.transform(noisy_image)
            clean_image = self.transform(clean_image)

        return noisy_image, clean_image

def calculate_psnr(img1, img2, epsilon=1e-10):
    mse = F.mse_loss(img1, img2)
    psnr = 20 * torch.log10(1.0 / (torch.sqrt(mse) + epsilon))
    return psnr

# Checkpoint 로드 함수 수정
def load_pretrained_checkpoint(filepath, model):
    checkpoint = torch.load(filepath, map_location=device)
    state_dict = checkpoint.get('params') or checkpoint.get('state_dict') or checkpoint

    # 입력 채널이 불일치하는 경우 첫 번째 레이어 가중치 수정
    if model.patch_embed.proj.weight.shape[1] != state_dict['patch_embed.proj.weight'].shape[1]:
        print("Adjusting input channels to match model configuration")
        original_weight = state_dict['patch_embed.proj.weight']
        new_weight = original_weight[:, :3, :, :]  # 첫 3채널만 가져오기 (RGB)
        state_dict['patch_embed.proj.weight'] = new_weight

    # 가중치 로드
    model.load_state_dict(state_dict, strict=False)
    print("Checkpoint loaded successfully")

def load_checkpoint(filepath):
    checkpoint = torch.load(filepath, map_location=device)
    model.load_state_dict(checkpoint)
    print("Checkpoint loaded successfully")

# Validation 성능 평가 함수
def validate(model, val_loader, criterion, epsilon=1e-10):
    model.eval()
    psnr_values = []
    val_loss = 0.0

    with torch.no_grad():
        for noisy_images, clean_images in val_loader:
            noisy_images = noisy_images.to(device)
            clean_images = clean_images.to(device)

            outputs = model(noisy_images)
            loss = criterion(outputs, clean_images)
            val_loss += loss.item()

            # PSNR 계산
            outputs = outputs.clamp(0, 1)  # 클램핑
            clean_images = clean_images.clamp(0, 1)  # 클램핑
            psnr = calculate_psnr(outputs, clean_images, epsilon)
            psnr_values.append(psnr.item())

    avg_val_loss = val_loss / len(val_loader)
    avg_psnr = sum(psnr_values) / len(psnr_values)
    return avg_val_loss, avg_psnr


### One Model

In [17]:
# 전처리 변환 정의
val_transform = Compose([
    ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# 데이터셋 경로
noisy_val_image_paths = '/content/drive/MyDrive/event/Validation/noisy'
clean_val_image_paths = '/content/drive/MyDrive/event/Validation/clean'

# Validation 데이터셋 로드 및 전처리
val_dataset = CustomDataset(clean_val_image_paths, noisy_val_image_paths, transform=val_transform)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)
print(f"Validation Dataset Size: {len(val_dataset)}")

# 모델의 체크포인트 경로
finetuned_model_path = 'fine_tuned_PSNRloss_15epoch.pth'

# GPU 사용 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


# Restormer 모델 생성 - 논문 설정 적용
finetuned_model = PretrainedRestormer().to(device)

# 모델을 평가 모드로 전환
finetuned_model.eval()

# Checkpoint 로드
load_pretrained_checkpoint(finetuned_model_path, finetuned_model)

# Validation 실행
avg_val_loss, avg_psnr = validate(finetuned_model, val_loader, criterion)
print(f"Validation Loss: {avg_val_loss:.4f}, Validation PSNR: {avg_psnr:.2f} dB")

Validation Dataset Size: 183
Checkpoint loaded successfully
Validation Loss: -17.0970, Validation PSNR: 38.59 dB


# Inference

### One Model

In [21]:
class CustomDatasetTest(data.Dataset):
    def __init__(self, noisy_image_paths, transform=None):
        self.noisy_image_paths = [os.path.join(noisy_image_paths, x) for x in os.listdir(noisy_image_paths)]
        self.transform = transform

    def __len__(self):
        return len(self.noisy_image_paths)

    def __getitem__(self, index):
        noisy_image_path = self.noisy_image_paths[index]
        noisy_image = load_img(self.noisy_image_paths[index])

        # Convert numpy array to PIL image
        if isinstance(noisy_image, np.ndarray):
            noisy_image = Image.fromarray(noisy_image)

        if self.transform:
            noisy_image = self.transform(noisy_image)

        return noisy_image, noisy_image_path

# 모델의 체크포인트 경로
finetuned_model_path = 'fine_tuned_PSNRloss_15epoch.pth'

# Restormer 모델 생성 - 논문 설정 적용
finetuned_model = PretrainedRestormer().to(device)

# 모델을 평가 모드로 전환
finetuned_model.eval()

# Checkpoint 로드
load_pretrained_checkpoint(finetuned_model_path, finetuned_model)

test_transform = Compose([
    ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

def load_img(filepath):
    img = cv2.imread(filepath)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

# model = Restormer()
# model.load_state_dict(torch.load(pretrained_model_path))


# GPU 사용 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
finetuned_model.to(device)


# 데이터셋 경로
test_data_path = '/content/drive/MyDrive/open 2/test/Input'
output_path = '/content/drive/MyDrive/open 2/test/submission'

# 데이터셋 로드 및 전처리
test_dataset = CustomDatasetTest(test_data_path, transform=test_transform)

# 데이터 로더 설정
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

if not os.path.exists(output_path):
    os.makedirs(output_path)

# 이미지 denoising 및 저장
for noisy_image, noisy_image_path in test_loader:
    noisy_image = noisy_image.to(device)
    denoised_image = finetuned_model(noisy_image)

    # denoised_image를 CPU로 이동하여 이미지 저장
    denoised_image = denoised_image.cpu().squeeze(0)
    denoised_image = (denoised_image * 0.5 + 0.5).clamp(0, 1)
    denoised_image = transforms.ToPILImage()(denoised_image)

    # Save denoised image
    output_filename = noisy_image_path[0]
    denoised_filename = output_path + '/' + output_filename.split('/')[-1][:-4] + '.jpg'
    denoised_image.save(denoised_filename)

    print(f'Saved denoised image: {denoised_filename}')

Checkpoint loaded successfully
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_014.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_015.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_029.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_007.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_013.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_039.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_004.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_011.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_001.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_012.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_028.jpg
Saved denoised image: /content/drive/MyDrive/open 2/test/submission/TEST_038.j

In [28]:
# GPU 메모리 릴리스
if torch.cuda.is_available():
    torch.cuda.empty_cache()
    print("GPU memory has been released.")

GPU memory has been released.


# Submission

In [20]:
def zip_folder(folder_path, output_zip):
    shutil.make_archive(output_zip, 'zip', folder_path)
    print(f"Created {output_zip}.zip successfully.")

zip_folder(output_path, './submission')

Created ./submission.zip successfully.
