# Import

In [1]:
import os
import shutil

import random
import numpy as np
import time
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, ToTensor
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from torch.cuda.amp import autocast, GradScaler
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau
from torchvision.transforms import CenterCrop, Resize
from PIL import Image

from swinmodel import *

import warnings
warnings.filterwarnings(action='ignore')


  warn(f"Failed to load image Python extension: {e}")


# Hyperparameter Setting

In [2]:
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':100,
    'LEARNING_RATE':2e-4,
    'BATCH_SIZE':16,
    'SEED':7
}

# Fixed RandomSeed

In [3]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

# Data Pre-processing

In [4]:
data_dir = './data'
training_base_dir = os.path.join(data_dir, 'Training')
validation_base_dir = os.path.join(data_dir, 'Validation')

# CustomDataset

In [5]:
class CustomDataset(Dataset):
    def __init__(self, clean_image_paths, noisy_image_paths, transform=None):
        self.clean_image_paths = [os.path.join(clean_image_paths, x) for x in os.listdir(clean_image_paths)]
        self.noisy_image_paths = [os.path.join(noisy_image_paths, x) for x in os.listdir(noisy_image_paths)]
        self.transform = transform
        self.center_crop = CenterCrop(1080)
        self.resize = Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE']))

        # Create a list of (noisy, clean) pairs
        self.noisy_clean_pairs = self._create_noisy_clean_pairs()

    def _create_noisy_clean_pairs(self):
        clean_to_noisy = {}
        for clean_path in self.clean_image_paths:
            clean_id = '_'.join(os.path.basename(clean_path).split('_')[:-1])
            clean_to_noisy[clean_id] = clean_path
        
        noisy_clean_pairs = []
        for noisy_path in self.noisy_image_paths:
            noisy_id = '_'.join(os.path.basename(noisy_path).split('_')[:-1])
            if noisy_id in clean_to_noisy:
                clean_path = clean_to_noisy[noisy_id]
                noisy_clean_pairs.append((noisy_path, clean_path))
            else:
                pass
        
        return noisy_clean_pairs

    def __len__(self):
        return len(self.noisy_clean_pairs)

    def __getitem__(self, index):
        noisy_image_path, clean_image_path = self.noisy_clean_pairs[index]

        noisy_image = Image.open(noisy_image_path).convert("RGB")
        clean_image = Image.open(clean_image_path).convert("RGB")
        
        # Central Crop and Resize
        noisy_image = self.center_crop(noisy_image)
        clean_image = self.center_crop(clean_image)
        noisy_image = self.resize(noisy_image)
        clean_image = self.resize(clean_image)
        
        if self.transform:
            noisy_image = self.transform(noisy_image)
            clean_image = self.transform(clean_image)
        
        return noisy_image, clean_image

# Model Define

# Train

In [6]:
def weights_init(m):
    if isinstance(m, nn.Conv2d):
        nn.init.kaiming_uniform_(m.weight.data, mode='fan_in', nonlinearity='relu')

def load_img(filepath):
    img = cv2.imread(filepath)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# 데이터셋 경로
noisy_image_paths = './data/Training/noisy'
clean_image_paths = './data/Training/clean'

# 데이터셋 로드 및 전처리
train_transform = Compose([
    ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
#train_transform = transforms.Compose([
#    transforms.RandomCrop(128),
#    transforms.RandomHorizontalFlip(),
#    transforms.RandomVerticalFlip(),
#    transforms.RandomRotation(90),
#    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
#    transforms.ToTensor(),
#    transforms.Lambda(lambda x: x + torch.randn_like(x) * 0.1)
#])
train_transform = transforms.Compose([
    #transforms.RandomCrop(128),
    #transforms.RandomHorizontalFlip(p=0.2),
    #transforms.RandomVerticalFlip(p=0.2),
    #transforms.RandomRotation(20),
    transforms.ToTensor(), 
])

In [7]:
# 커스텀 데이터셋 인스턴스 생성
train_dataset = CustomDataset(clean_image_paths, noisy_image_paths, transform=train_transform)

# 데이터 로더 설정
num_cores = os.cpu_count()
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], num_workers=int(num_cores/2), shuffle=True)

# GPU 사용 여부 확인
#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cuda')
print(device)
# Restormer 모델 인스턴스 생성 및 GPU로 이동
#model = Restormer().to(device)
model = SwinIR(img_size=64, 
               patch_size=1, 
               in_chans=3,
               embed_dim=96, 
               depths=[6, 6, 6, 6], 
               num_heads=[6, 6, 6, 6], 
               window_size=7, 
               mlp_ratio=2, 
               upsampler='pixelshuffledirect').to(device)

model = nn.DataParallel(model, device_ids=[0,1,2,3]).to(device)

cuda


In [8]:
import matplotlib.pyplot as plt
from torch.cuda import memory_allocated, max_memory_allocated
from tqdm import tqdm

In [9]:
import pytorch_ssim

# 커스텀 손실 함수 정의
def combined_loss(output, target):
    l1_loss = nn.L1Loss()(output, target)
    ssim_loss = 1 - pytorch_ssim.ssim(output, target)
    return l1_loss + ssim_loss * 0.1  # 가중치는 필요에 따라 조절

# 손실 함수 업데이트
#criterion = combined_loss

In [10]:
# 손실 함수와 최적화 알고리즘 설정
#optimizer = optim.Adam(model.parameters(), lr = CFG['LEARNING_RATE'])
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.MSELoss()
scaler = GradScaler()
scheduler = CosineAnnealingLR(optimizer, T_max=CFG['EPOCHS'])
#scheduler = ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
# 모델의 파라미터 수 계산
total_parameters = count_parameters(model)
print("Total Parameters:", total_parameters)

Total Parameters: 2247804


In [11]:
from tqdm import tqdm

In [12]:
import math
import torch
import time
from torch import nn, optim
from torch.cuda.amp import GradScaler, autocast
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader
from tqdm import tqdm
import torch.nn.functional as F

# Define the configuration (example CFG)

# Function to count model parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Calculate and print model parameters
total_parameters = count_parameters(model)
print("Total Parameters:", total_parameters)

# PSNR calculation function
def calculate_psnr(img1, img2):
    mse = torch.mean((img1 - img2) ** 2)
    if mse == 0:
        return float('inf')
    psnr = 20 * math.log10(1.0 / math.sqrt(mse))
    return psnr

# Model training
model.train()
best_loss = 1000
best_psnr = 0

for epoch in range(CFG['EPOCHS']):
    model.train()
    epoch_start_time = time.time()
    mse_running_loss = 0.0
    psnr_running = 0.0
    
    with tqdm(train_loader, desc=f"Epoch {epoch+1}/{CFG['EPOCHS']}", leave=False) as pbar:
        for batch_idx, (noisy_images, clean_images) in enumerate(pbar):
            noisy_images = noisy_images.to(device)
            clean_images = clean_images.to(device)
            
            optimizer.zero_grad()
            
            with autocast():
                outputs = model(noisy_images)
                # Ensure output size matches the clean image size
                outputs = F.interpolate(outputs, size=clean_images.shape[2:], mode='bilinear', align_corners=False)
                
                # Calculate MSE loss
                mse_loss = criterion(outputs, clean_images)
            
            # Scale the loss for mixed precision
            scaler.scale(mse_loss).backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            scaler.step(optimizer)
            scaler.update()
            
            # Accumulate the running loss
            mse_running_loss += mse_loss.item() * noisy_images.size(0)
            
            # Calculate PSNR
            psnr = calculate_psnr(outputs, clean_images)
            psnr_running += psnr * noisy_images.size(0)

            # Update progress bar with current batch stats
            pbar.set_postfix({
                'Batch Loss': mse_loss.item(),
                'PSNR': psnr,
                'LR': optimizer.param_groups[0]['lr']  # Getting LR directly from optimizer
            })

    current_lr = optimizer.param_groups[0]['lr']
    epoch_end_time = time.time()
    epoch_time = epoch_end_time - epoch_start_time
    minutes = int(epoch_time // 60)
    seconds = int(epoch_time % 60)
    hours = int(minutes // 60)
    minutes = int(minutes % 60)

    # Calculate epoch-level average loss and PSNR
    mse_epoch_loss = mse_running_loss / len(train_loader.dataset)
    psnr_epoch = psnr_running / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{CFG['EPOCHS']}, MSE Loss: {mse_epoch_loss:.4f}, PSNR: {psnr_epoch:.2f} dB, Lr: {current_lr:.8f}")
    print(f"1epoch 훈련 소요 시간: {hours}시간 {minutes}분 {seconds}초")

    # Step the scheduler with epoch-level average loss
    scheduler.step(mse_epoch_loss)
    
    # Save model if PSNR is the best so far
    if psnr_epoch > best_psnr:
        best_psnr = psnr_epoch
        torch.save(model.state_dict(), 'b16_noarg.pth')
        print(f"{epoch+1}epoch 모델 저장 완료 (PSNR: {best_psnr:.2f} dB)")

Total Parameters: 2247804


                                                                                                        

Epoch 1/100, MSE Loss: 0.0555, PSNR: 12.63 dB, Lr: 0.00010000
1epoch 훈련 소요 시간: 0시간 16분 1초
1epoch 모델 저장 완료 (PSNR: 12.63 dB)


                                                                                                       

Epoch 2/100, MSE Loss: 0.0542, PSNR: 12.72 dB, Lr: 0.00010000
1epoch 훈련 소요 시간: 0시간 15분 48초
2epoch 모델 저장 완료 (PSNR: 12.72 dB)


                                                                                                       

Epoch 3/100, MSE Loss: 0.0535, PSNR: 12.79 dB, Lr: 0.00010000
1epoch 훈련 소요 시간: 0시간 15분 48초
3epoch 모델 저장 완료 (PSNR: 12.79 dB)


                                                                                                       

Epoch 4/100, MSE Loss: 0.0534, PSNR: 12.79 dB, Lr: 0.00010000
1epoch 훈련 소요 시간: 0시간 16분 8초
4epoch 모델 저장 완료 (PSNR: 12.79 dB)


                                                                                                        

Epoch 5/100, MSE Loss: 0.0531, PSNR: 12.82 dB, Lr: 0.00010000
1epoch 훈련 소요 시간: 0시간 15분 51초
5epoch 모델 저장 완료 (PSNR: 12.82 dB)


                                                                                                       

Epoch 6/100, MSE Loss: 0.0528, PSNR: 12.84 dB, Lr: 0.00010000
1epoch 훈련 소요 시간: 0시간 15분 47초
6epoch 모델 저장 완료 (PSNR: 12.84 dB)


                                                                                                       

Epoch 7/100, MSE Loss: 0.0528, PSNR: 12.85 dB, Lr: 0.00010000
1epoch 훈련 소요 시간: 0시간 15분 56초
7epoch 모델 저장 완료 (PSNR: 12.85 dB)


                                                                                                       

Epoch 8/100, MSE Loss: 0.0524, PSNR: 12.88 dB, Lr: 0.00010000
1epoch 훈련 소요 시간: 0시간 15분 53초
8epoch 모델 저장 완료 (PSNR: 12.88 dB)


Epoch 9/100:  85%|████████▍ | 740/871 [14:53<02:44,  1.25s/it, Batch Loss=0.0456, PSNR=13.4, LR=0.0001]

In [13]:
import math
import torch


# 손실 함수와 최적화 알고리즘 설정
#optimizer = optim.Adam(model.parameters(), lr = CFG['LEARNING_RATE'])
#criterion = nn.L1Loss()
#scaler = GradScaler()
#scheduler = CosineAnnealingLR(optimizer, T_max=CFG['EPOCHS'])

# 모델의 파라미터 수 계산
total_parameters = count_parameters(model)
print("Total Parameters:", total_parameters)


# PSNR 계산 함수 정의
def calculate_psnr(img1, img2):
    mse = torch.mean((img1 - img2) ** 2)
    if mse == 0:
        return float('inf')
    psnr = 20 * math.log10(1.0 / math.sqrt(mse))
    return psnr

# 모델 학습
model.train()
best_loss = 1000
best_psnr = 0

for epoch in range(CFG['EPOCHS']):
    model.train()
    epoch_start_time = time.time()
    mse_running_loss = 0.0
    psnr_running = 0.0
    
    with tqdm(train_loader, desc=f"Epoch {epoch+1}/{CFG['EPOCHS']}", leave=False) as pbar:
        for batch_idx, (noisy_images, clean_images) in enumerate(pbar):
            noisy_images = noisy_images.to(device)
            clean_images = clean_images.to(device)
            
            optimizer.zero_grad()
            
            with autocast():
                outputs = model(noisy_images)
                outputs = F.interpolate(outputs, size=clean_images.shape[2:], mode='bilinear', align_corners=False)
                
                mse_loss = criterion(outputs, clean_images)
            
            scaler.scale(mse_loss).backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            scaler.step(optimizer)
            scaler.update()
            scheduler.step()
            
            mse_running_loss += mse_loss.item() * noisy_images.size(0)
            
            # PSNR 계산
            psnr = calculate_psnr(outputs, clean_images)
            psnr_running += psnr * noisy_images.size(0)

            # 진행 상황 업데이트
            pbar.set_postfix({
                'Batch Loss': mse_loss.item(),
                'PSNR': psnr,
                'LR': scheduler.get_last_lr()[0]
            })

    current_lr = scheduler.get_last_lr()[0]
    epoch_end_time = time.time()
    epoch_time = epoch_end_time - epoch_start_time
    minutes = int(epoch_time // 60)
    seconds = int(epoch_time % 60)
    hours = int(minutes // 60)
    minutes = int(minutes % 60)

    # Epoch 단위 평균 손실 및 PSNR 계산
    mse_epoch_loss = mse_running_loss / len(train_dataset)
    psnr_epoch = psnr_running / len(train_dataset)
    print(f"Epoch {epoch+1}/{CFG['EPOCHS']}, MSE Loss: {mse_epoch_loss:.4f}, PSNR: {psnr_epoch:.2f} dB, Lr: {current_lr:.8f}")
    print(f"1epoch 훈련 소요 시간: {hours}시간 {minutes}분 {seconds}초")

    # PSNR이 최고일 때 모델 저장
    if psnr_epoch > best_psnr:
        best_psnr = psnr_epoch
        torch.save(model.state_dict(), 'best_Swin_psnr.pth')
        print(f"{epoch+1}epoch 모델 저장 완료 (PSNR: {best_psnr:.2f} dB)")

Total Parameters: 2247804


                                                    

RuntimeError: module must have its parameters and buffers on device cuda:0 (device_ids[0]) but found one of them on device: cuda:3

# Inference

In [None]:
class CustomDatasetTest(data.Dataset):
    def __init__(self, noisy_image_paths, transform=None):
        self.noisy_image_paths = [os.path.join(noisy_image_paths, x) for x in os.listdir(noisy_image_paths)]
        self.transform = transform

    def __len__(self):
        return len(self.noisy_image_paths)

    def __getitem__(self, index):
        noisy_image_path = self.noisy_image_paths[index]
        noisy_image = load_img(self.noisy_image_paths[index])
        
        # Convert numpy array to PIL image
        if isinstance(noisy_image, np.ndarray):
            noisy_image = Image.fromarray(noisy_image)

        if self.transform:
            noisy_image = self.transform(noisy_image)

        return noisy_image, noisy_image_path


test_transform = Compose([
    ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

def load_img(filepath):
    img = cv2.imread(filepath)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

In [None]:
device = torch.device('cuda:3')
print(device)
# Restormer 모델 인스턴스 생성 및 GPU로 이동
#model = Restormer()#.to(device)
model = SwinIR(img_size=64, 
               patch_size=1, 
               in_chans=3,
               embed_dim=96, 
               depths=[6, 6, 6, 6], 
               num_heads=[6, 6, 6, 6], 
               window_size=7, 
               mlp_ratio=2, 
               upsampler='pixelshuffledirect')

model = nn.DataParallel(model, device_ids=[0,3]).to(device)
# State dict 불러오기
state_dict = torch.load('best_Restormer1.pth')

# "module." 제거
from collections import OrderedDict

new_state_dict = OrderedDict()
for k, v in state_dict.items():
    name = k[7:] if k.startswith("module.") else k  # "module." 제거
    new_state_dict[name] = v

In [None]:
model = SwinIR(img_size=64, 
               patch_size=1, 
               in_chans=3,
               embed_dim=96, 
               depths=[6, 6, 6, 6], 
               num_heads=[6, 6, 6, 6], 
               window_size=7, 
               mlp_ratio=2, 
               upsampler='pixelshuffledirect')

# GPU 설정
device = torch.device('cuda:3' if torch.cuda.is_available() else 'cpu')
model.to(device)

# State dict 불러오기 후 "module." 제거
state_dict = torch.load('best_Restormer1.pth')
new_state_dict = OrderedDict((k[7:] if k.startswith("module.") else k, v) for k, v in state_dict.items())
model.load_state_dict(new_state_dict)

test_data_path = './data/Validation/noisy'
output_path = './data/Validation/output'

# 데이터셋 로드 및 전처리
test_dataset = CustomDatasetTest(test_data_path, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

if not os.path.exists(output_path):
    os.makedirs(output_path)

# 이미지 denoising 및 저장
for noisy_image, noisy_image_path in test_loader:
    noisy_image = noisy_image.to(device)
    with torch.no_grad():
        denoised_image = model(noisy_image)
    
    # denoised_image를 CPU로 이동하여 이미지 저장
    denoised_image = denoised_image.cpu().squeeze(0)
    denoised_image = (denoised_image * 0.5 + 0.5).clamp(0, 1)
    denoised_image = transforms.ToPILImage()(denoised_image)

    # 저장 경로 지정 및 저장
    output_filename = noisy_image_path[0] if isinstance(noisy_image_path, list) else noisy_image_path
    print(output_filename)
    denoised_filename = output_path + '/' + output_filename[0].split('/')[-1][:-4] + '.jpg'
    denoised_image.save(denoised_filename)
    
    print(f'Saved denoised image: {denoised_filename}')

In [None]:

model = SwinIR(img_size=64, 
               patch_size=1, 
               in_chans=3,
               embed_dim=96, 
               depths=[6, 6, 6, 6], 
               num_heads=[6, 6, 6, 6], 
               window_size=7, 
               mlp_ratio=2, 
               upsampler='pixelshuffledirect')

# State dict 불러오기
state_dict = torch.load('best_Restormer1.pth')

# "module." 제거
from collections import OrderedDict

new_state_dict = OrderedDict()
for k, v in state_dict.items():
    name = k[7:] if k.startswith("module.") else k  # "module." 제거
    new_state_dict[name] = v

# 모델에 새로운 state dict 로드
model.load_state_dict(new_state_dict)

# GPU 사용 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cuda:3')
model.to(device)


test_data_path = './data/Validation/noisy'
output_path = './data/Validation/output'

# 데이터셋 로드 및 전처리
test_dataset = CustomDatasetTest(test_data_path, transform=test_transform)

# 데이터 로더 설정
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)


if not os.path.exists(output_path):
    os.makedirs(output_path)

# 이미지 denoising 및 저장
for noisy_image, noisy_image_path in test_loader:
    noisy_image = noisy_image.to(device)
    denoised_image = model(noisy_image)
    
    # denoised_image를 CPU로 이동하여 이미지 저장
    denoised_image = denoised_image.cpu().squeeze(0)
    denoised_image = (denoised_image * 0.5 + 0.5).clamp(0, 1)
    denoised_image = transforms.ToPILImage()(denoised_image)

    # Save denoised image
    output_filename = noisy_image_path[0]
    denoised_filename = output_path + '/' + output_filename.split('/')[-1][:-4] + '.jpg'
    denoised_image.save(denoised_filename) 
    
    print(f'Saved denoised image: {denoised_filename}')

# Submission

In [None]:
def zip_folder(folder_path, output_zip):
    shutil.make_archive(output_zip, 'zip', folder_path)
    print(f"Created {output_zip}.zip successfully.")

zip_folder(output_path, './submission')