# Import

In [1]:
import random
import os
import numpy as np
import torch
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split, Subset
from skimage.metrics import structural_similarity as ssim
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from skimage.metrics import structural_similarity as ssim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import cv2
import zipfile
from tqdm import tqdm
import time

In [2]:
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


# Hyperparameter Setting

In [3]:
CFG = {
    'EPOCHS':30,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':16,
    'SEED':42
}

# Fixed RandomSeed

In [4]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

# CustomDataset

In [5]:
class CustomDataset(Dataset):
    def __init__(self, damage_dir, origin_dir, transform=None):
        self.damage_dir = damage_dir
        self.origin_dir = origin_dir
        self.transform = transform
        self.damage_files = sorted(os.listdir(damage_dir))
        self.origin_files = sorted(os.listdir(origin_dir))

        # 각 디렉토리의 파일 수를 출력하여 문제 파악하기
        print(f"Number of images in damage_dir: {len(self.damage_files)}")
        print(f"Number of images in origin_dir: {len(self.origin_files)}")

        # 파일 수가 동일하지 않으면 AssertionError 발생
        assert len(self.damage_files) == len(self.origin_files), "The number of images in gray and color folders must match"

    def __len__(self):
        return len(self.damage_files)

    def __getitem__(self, idx):
        damage_img_name = self.damage_files[idx]
        origin_img_name = self.origin_files[idx]

        damage_img_path = os.path.join(self.damage_dir, damage_img_name)
        origin_img_path = os.path.join(self.origin_dir, origin_img_name)

        damage_img = Image.open(damage_img_path).convert("RGB")
        origin_img = Image.open(origin_img_path).convert("RGB")

        if self.transform:
            damage_img = self.transform(damage_img)
            origin_img = self.transform(origin_img)

        return {'A': damage_img, 'B': origin_img}




# Data Load

In [6]:
# 경로 설정
origin_dir = '/home/work/Donghyeon/dataset/train_gt'  # 원본 이미지 폴더 경로
damage_dir = '/home/work/Donghyeon/dataset/train_input'  # 손상된 이미지 폴더 경로
test_dir = '/home/work/Donghyeon/dataset/test_input'     # test 이미지 폴더 경로

# 데이터 전처리 설정
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

# 데이터셋 및 DataLoader 생성
dataset = CustomDataset(damage_dir=damage_dir, origin_dir=origin_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=1)

Number of images in damage_dir: 29603
Number of images in origin_dir: 29603


# Model Define

## AdamW -변함 없음

In [None]:

class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        self.enc1 = self.conv_block(3, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)
        self.enc5 = self.conv_block(512, 1024)

        self.up1 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec1 = self.conv_block(1024 + 512, 512)

        self.up2 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec2 = self.conv_block(512 + 256, 256)

        self.up3 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec3 = self.conv_block(256 + 128, 128)

        self.up4 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec4 = self.conv_block(128 + 64, 64)

        self.final = nn.Conv2d(64, 3, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        e1 = self.enc1(x)
        e2 = self.enc2(nn.MaxPool2d(2)(e1))
        e3 = self.enc3(nn.MaxPool2d(2)(e2))
        e4 = self.enc4(nn.MaxPool2d(2)(e3))
        e5 = self.enc5(nn.MaxPool2d(2)(e4))

        d1 = self.dec1(torch.cat([self.up1(e5), e4], dim=1))
        d2 = self.dec2(torch.cat([self.up2(d1), e3], dim=1))
        d3 = self.dec3(torch.cat([self.up3(d2), e2], dim=1))
        d4 = self.dec4(torch.cat([self.up4(d3), e1], dim=1))

        return torch.sigmoid(self.final(d4))

class PatchGANDiscriminator(nn.Module):
    def __init__(self, in_channels=3):
        super(PatchGANDiscriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(256, 512, kernel_size=4, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)

class ImageDataset(Dataset):
    def __init__(self, input_dir, gt_dir, transform=None):
        self.input_dir = input_dir
        self.gt_dir = gt_dir
        self.input_images = sorted(os.listdir(input_dir))
        self.gt_images = sorted(os.listdir(gt_dir))
        self.transform = transform

    def __len__(self):
        return len(self.input_images)

    def __getitem__(self, idx):
        input_path = os.path.join(self.input_dir, self.input_images[idx])
        gt_path = os.path.join(self.gt_dir, self.gt_images[idx])
        input_image = cv2.imread(input_path)
        gt_image = cv2.imread(gt_path)
        if self.transform:
            input_image = self.transform(input_image)
            gt_image = self.transform(gt_image)
        return (
            torch.tensor(input_image).permute(2, 0, 1).float() / 255.0,
            torch.tensor(gt_image).permute(2, 0, 1).float() / 255.0
        )

generator = UNet().to(device)
discriminator = PatchGANDiscriminator().to(device)

generator = nn.DataParallel(generator, device_ids=[0])
discriminator = nn.DataParallel(discriminator, device_ids=[0])

adversarial_loss = nn.BCEWithLogitsLoss()
pixel_loss = nn.combined_loss()

optimizer_G = optim.AdamW(generator.parameters(), lr=0.00001, betas=(0.5, 0.999))
optimizer_D = optim.AdamW(discriminator.parameters(), lr=0.0001, betas=(0.5, 0.999))

train_dataset = ImageDataset("/home/work/Donghyeon/dataset/train_input", "/home/work/Donghyeon/dataset/train_gt")
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=1, pin_memory=True)

epochs = 100
result_dir = "/home/work/Donghyeon/Com_Loss"
os.makedirs(result_dir, exist_ok=True)
checkpoint_path = "checkpoint.pth"

for epoch in range(epochs):
    generator.train()
    discriminator.train()
    running_loss_G = 0.0
    running_loss_D = 0.0

    with tqdm(total=len(train_loader), desc=f"Epoch {epoch+1}/{epochs}", unit="batch") as pbar:
        for input_images, gt_images in train_loader:
            input_images, gt_images = input_images.to(device), gt_images.to(device)

            real_labels = torch.ones_like(discriminator(gt_images)).to(device)
            fake_labels = torch.zeros_like(discriminator(input_images)).to(device)

            optimizer_G.zero_grad()
            fake_images = generator(input_images)
            pred_fake = discriminator(fake_images)

            g_loss_adv = adversarial_loss(pred_fake, real_labels)
            g_loss_pixel = pixel_loss(fake_images, gt_images)
            g_loss = g_loss_adv + 100 * g_loss_pixel
            g_loss.backward()
            optimizer_G.step()

            optimizer_D.zero_grad()
            pred_real = discriminator(gt_images)
            loss_real = adversarial_loss(pred_real, real_labels)

            pred_fake = discriminator(fake_images.detach())
            loss_fake = adversarial_loss(pred_fake, fake_labels)

            d_loss = (loss_real + loss_fake) / 2
            d_loss.backward()
            optimizer_D.step()

            running_loss_G += g_loss.item()
            running_loss_D += d_loss.item()

            pbar.set_postfix(generator_loss=g_loss.item(), discriminator_loss=d_loss.item())
            pbar.update(1)

    print(f"Epoch [{epoch+1}/{epochs}] - Generator Loss: {running_loss_G / len(train_loader):.4f}, Discriminator Loss: {running_loss_D / len(train_loader):.4f}")

    test_input_dir = "/home/work/Donghyeon/dataset/test_input"
    output_dir = f"output_images_epoch_{epoch+1}"
    os.makedirs(output_dir, exist_ok=True)
    with torch.no_grad():
        for img_name in sorted(os.listdir(test_input_dir)):
            img_path = os.path.join(test_input_dir, img_name)
            img = cv2.imread(img_path)
            input_tensor = torch.tensor(img).permute(2, 0, 1).unsqueeze(0).float().to(device) / 255.0
            output = generator(input_tensor).squeeze().permute(1, 2, 0).cpu().numpy() * 255.0
            output = output.astype(np.uint8)
            cv2.imwrite(os.path.join(output_dir, img_name), output)

    zip_filename = os.path.join(result_dir, f"epoch_{epoch+1}.zip")
    with zipfile.ZipFile(zip_filename, 'w') as zipf:
        for img_name in os.listdir(output_dir):
            zipf.write(os.path.join(output_dir, img_name), arcname=img_name)
    print(f"Epoch {epoch+1} results saved to {zip_filename}")

    torch.save({
        'epoch': epoch,
        'generator_state_dict': generator.state_dict(),
        'discriminator_state_dict': discriminator.state_dict(),
        'optimizer_G_state_dict': optimizer_G.state_dict(),
        'optimizer_D_state_dict': optimizer_D.state_dict()
    }, checkpoint_path)

generator.train()  
discriminator.train()  

## Combine Loss(MSE + L1)

In [None]:
import os
import cv2
import zipfile
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        self.enc1 = self.conv_block(3, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)
        self.enc5 = self.conv_block(512, 1024)

        self.up1 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec1 = self.conv_block(1024 + 512, 512)

        self.up2 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec2 = self.conv_block(512 + 256, 256)

        self.up3 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec3 = self.conv_block(256 + 128, 128)

        self.up4 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec4 = self.conv_block(128 + 64, 64)

        self.final = nn.Conv2d(64, 3, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        e1 = self.enc1(x)
        e2 = self.enc2(nn.MaxPool2d(2)(e1))
        e3 = self.enc3(nn.MaxPool2d(2)(e2))
        e4 = self.enc4(nn.MaxPool2d(2)(e3))
        e5 = self.enc5(nn.MaxPool2d(2)(e4))

        d1 = self.dec1(torch.cat([self.up1(e5), e4], dim=1))
        d2 = self.dec2(torch.cat([self.up2(d1), e3], dim=1))
        d3 = self.dec3(torch.cat([self.up3(d2), e2], dim=1))
        d4 = self.dec4(torch.cat([self.up4(d3), e1], dim=1))

        return torch.sigmoid(self.final(d4))

class PatchGANDiscriminator(nn.Module):
    def __init__(self, in_channels=3):
        super(PatchGANDiscriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(256, 512, kernel_size=4, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1)
        )

    def forward(self, x):
        return self.model(x)

class ImageDataset(Dataset):
    def __init__(self, input_dir, gt_dir, transform=None):
        self.input_dir = input_dir
        self.gt_dir = gt_dir
        self.input_images = sorted(os.listdir(input_dir))
        self.gt_images = sorted(os.listdir(gt_dir))
        self.transform = transform

    def __len__(self):
        return len(self.input_images)

    def __getitem__(self, idx):
        input_path = os.path.join(self.input_dir, self.input_images[idx])
        gt_path = os.path.join(self.gt_dir, self.gt_images[idx])
        input_image = cv2.imread(input_path)
        gt_image = cv2.imread(gt_path)
        if self.transform:
            input_image = self.transform(input_image)
            gt_image = self.transform(gt_image)
        return (
            torch.tensor(input_image).permute(2, 0, 1).float() / 255.0,
            torch.tensor(gt_image).permute(2, 0, 1).float() / 255.0
        )

lambda_l1 = 0.5
lambda_mse = 0.5
lambda_adv = 1.0
lambda_pixel = 1.0


adversarial_loss = nn.BCEWithLogitsLoss()
pixel_loss = lambda fake, target: lambda_l1 * nn.L1Loss()(fake, target) + lambda_mse * nn.MSELoss()(fake, target)

generator = UNet().to(device)
discriminator = PatchGANDiscriminator().to(device)

optimizer_G = optim.AdamW(generator.parameters(), lr=0.0001, betas=(0.5, 0.999))
optimizer_D = optim.AdamW(discriminator.parameters(), lr=0.0004, betas=(0.5, 0.999))

train_dataset = ImageDataset("/home/work/Donghyeon/dataset/train_input", "/home/work/Donghyeon/dataset/train_gt")
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4, pin_memory=True)

scaler = torch.cuda.amp.GradScaler()


epochs = 100
result_dir = "/home/work/Donghyeon/Com_Loss"
os.makedirs(result_dir, exist_ok=True)
checkpoint_path = "checkpoint.pth"

for epoch in range(epochs):
    generator.train()
    discriminator.train()
    running_loss_G = 0.0
    running_loss_D = 0.0

    with tqdm(total=len(train_loader), desc=f"Epoch {epoch+1}/{epochs}", unit="batch") as pbar:
        for input_images, gt_images in train_loader:
            input_images, gt_images = input_images.to(device), gt_images.to(device)

            real_labels = torch.ones_like(discriminator(gt_images)).to(device)
            fake_labels = torch.zeros_like(discriminator(input_images)).to(device)

            # Train Generator
            optimizer_G.zero_grad()
            fake_images = generator(input_images)
            pred_fake = discriminator(fake_images)

            g_loss_adv = lambda_adv * adversarial_loss(pred_fake, real_labels)
            g_loss_pixel = lambda_pixel * pixel_loss(fake_images, gt_images)
            g_loss = g_loss_adv + g_loss_pixel
            g_loss.backward()
            optimizer_G.step()

            # Train Discriminator
            optimizer_D.zero_grad()
            pred_real = discriminator(gt_images)
            loss_real = adversarial_loss(pred_real, real_labels)

            pred_fake = discriminator(fake_images.detach())
            loss_fake = adversarial_loss(pred_fake, fake_labels)

            d_loss = (loss_real + loss_fake) / 2
            d_loss.backward()
            optimizer_D.step()

            running_loss_G += g_loss.item()
            running_loss_D += d_loss.item()

            pbar.set_postfix(generator_loss=g_loss.item(), discriminator_loss=d_loss.item())
            pbar.update(1)

    print(f"Epoch [{epoch+1}/{epochs}] - Generator Loss: {running_loss_G / len(train_loader):.4f}, Discriminator Loss: {running_loss_D / len(train_loader):.4f}")

    test_input_dir = "/home/work/Donghyeon/dataset/test_input"
    output_dir = f"perloss_images_epoch_{epoch+1}"
    os.makedirs(output_dir, exist_ok=True)
    with torch.no_grad():
        for img_name in sorted(os.listdir(test_input_dir)):
            img_path = os.path.join(test_input_dir, img_name)
            img = cv2.imread(img_path)
            input_tensor = torch.tensor(img).permute(2, 0, 1).unsqueeze(0).float().to(device) / 255.0
            output = generator(input_tensor).squeeze().permute(1, 2, 0).cpu().numpy() * 255.0
            output = output.astype(np.uint8)
            cv2.imwrite(os.path.join(output_dir, img_name), output)

    zip_filename = os.path.join(result_dir, f"epoch_{epoch+1}.zip")
    with zipfile.ZipFile(zip_filename, 'w') as zipf:
        for img_name in os.listdir(output_dir):
            zipf.write(os.path.join(output_dir, img_name), arcname=img_name)
    print(f"Epoch {epoch+1} results saved to {zip_filename}")

    torch.save({
        'epoch': epoch,
        'generator_state_dict': generator.state_dict(),
        'discriminator_state_dict': discriminator.state_dict(),
        'optimizer_G_state_dict': optimizer_G.state_dict(),
        'optimizer_D_state_dict': optimizer_D.state_dict()
    }, checkpoint_path)

generator.train()  
discriminator.train()


## Adamw - perceptual loss 변화

In [None]:
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        self.enc1 = self.conv_block(3, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)
        self.enc5 = self.conv_block(512, 1024)

        self.up1 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec1 = self.conv_block(1024 + 512, 512)

        self.up2 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec2 = self.conv_block(512 + 256, 256)

        self.up3 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec3 = self.conv_block(256 + 128, 128)

        self.up4 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec4 = self.conv_block(128 + 64, 64)

        self.final = nn.Conv2d(64, 3, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        e1 = self.enc1(x)
        e2 = self.enc2(nn.MaxPool2d(2)(e1))
        e3 = self.enc3(nn.MaxPool2d(2)(e2))
        e4 = self.enc4(nn.MaxPool2d(2)(e3))
        e5 = self.enc5(nn.MaxPool2d(2)(e4))

        d1 = self.dec1(torch.cat([self.up1(e5), e4], dim=1))
        d2 = self.dec2(torch.cat([self.up2(d1), e3], dim=1))
        d3 = self.dec3(torch.cat([self.up3(d2), e2], dim=1))
        d4 = self.dec4(torch.cat([self.up4(d3), e1], dim=1))

        return torch.sigmoid(self.final(d4))

class PatchGANDiscriminator(nn.Module):
    def __init__(self, in_channels=3):
        super(PatchGANDiscriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(256, 512, kernel_size=4, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)

class ImageDataset(Dataset):
    def __init__(self, input_dir, gt_dir, transform=None):
        self.input_dir = input_dir
        self.gt_dir = gt_dir
        self.input_images = sorted(os.listdir(input_dir))
        self.gt_images = sorted(os.listdir(gt_dir))
        self.transform = transform

    def __len__(self):
        return len(self.input_images)

    def __getitem__(self, idx):
        input_path = os.path.join(self.input_dir, self.input_images[idx])
        gt_path = os.path.join(self.gt_dir, self.gt_images[idx])
        input_image = cv2.imread(input_path)
        gt_image = cv2.imread(gt_path)
        if self.transform:
            input_image = self.transform(input_image)
            gt_image = self.transform(gt_image)
        return (
            torch.tensor(input_image).permute(2, 0, 1).float() / 255.0,
            torch.tensor(gt_image).permute(2, 0, 1).float() / 255.0
        )

generator = UNet().to(device)
discriminator = PatchGANDiscriminator().to(device)

generator = nn.DataParallel(generator, device_ids=[0])
discriminator = nn.DataParallel(discriminator, device_ids=[0])

adversarial_loss = nn.BCELoss()  
pixel_loss = nn.MSELoss()

optimizer_G = optim.AdamW(generator.parameters(), lr=0.0001, betas=(0.5, 0.999))
optimizer_D = optim.AdamW(discriminator.parameters(), lr=0.0001, betas=(0.5, 0.999))

train_dataset = ImageDataset("/home/work/Donghyeon/dataset/train_input", "/home/work/Donghyeon/dataset/train_gt")
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=1, pin_memory=True)

# New loss weights
lambda_adv = 1.0
lambda_pixel = 100.0
lambda_perceptual = 0.1

# Placeholder for perceptual loss, in this case using L1 for simplicity
perceptual_loss = nn.L1Loss()

epochs = 50
result_dir = "/home/work/Donghyeon/hackerton"
os.makedirs(result_dir, exist_ok=True)
checkpoint_path = "checkpoint.pth"

for epoch in range(epochs):
    generator.train()
    discriminator.train()
    running_loss_G = 0.0
    running_loss_D = 0.0

    with tqdm(total=len(train_loader), desc=f"Epoch {epoch+1}/{epochs}", unit="batch") as pbar:
        for input_images, gt_images in train_loader:
            input_images, gt_images = input_images.to(device), gt_images.to(device)

            real_labels = torch.ones_like(discriminator(gt_images)).to(device)
            fake_labels = torch.zeros_like(discriminator(input_images)).to(device)

            # Train Generator
            optimizer_G.zero_grad()
            fake_images = generator(input_images)
            pred_fake = discriminator(fake_images)

            g_loss_adv = lambda_adv * adversarial_loss(pred_fake, real_labels)
            g_loss_pixel = lambda_pixel * pixel_loss(fake_images, gt_images)
            g_loss_perceptual = lambda_perceptual * perceptual_loss(fake_images, gt_images)
            g_loss = g_loss_adv + g_loss_pixel + g_loss_perceptual
            g_loss.backward()
            optimizer_G.step()

            # Train Discriminator
            optimizer_D.zero_grad()
            pred_real = discriminator(gt_images)
            loss_real = adversarial_loss(pred_real, real_labels)

            pred_fake = discriminator(fake_images.detach())
            loss_fake = adversarial_loss(pred_fake, fake_labels)

            d_loss = (loss_real + loss_fake) / 2
            d_loss.backward()
            optimizer_D.step()

            running_loss_G += g_loss.item()
            running_loss_D += d_loss.item()

            pbar.set_postfix(generator_loss=g_loss.item(), discriminator_loss=d_loss.item())
            pbar.update(1)

    print(f"Epoch [{epoch+1}/{epochs}] - Generator Loss: {running_loss_G / len(train_loader):.4f}, Discriminator Loss: {running_loss_D / len(train_loader):.4f}")

    test_input_dir = "/home/work/Donghyeon/dataset/test_input"
    output_dir = f"perloss_images_epoch_{epoch+1}"
    os.makedirs(output_dir, exist_ok=True)
    with torch.no_grad():
        for img_name in sorted(os.listdir(test_input_dir)):
            img_path = os.path.join(test_input_dir, img_name)
            img = cv2.imread(img_path)
            input_tensor = torch.tensor(img).permute(2, 0, 1).unsqueeze(0).float().to(device) / 255.0
            output = generator(input_tensor).squeeze().permute(1, 2, 0).cpu().numpy() * 255.0
            output = output.astype(np.uint8)
            cv2.imwrite(os.path.join(output_dir, img_name), output)

    zip_filename = os.path.join(result_dir, f"epoch_{epoch+1}.zip")
    with zipfile.ZipFile(zip_filename, 'w') as zipf:
        for img_name in os.listdir(output_dir):
            zipf.write(os.path.join(output_dir, img_name), arcname=img_name)
    print(f"Epoch {epoch+1} results saved to {zip_filename}")

    torch.save({
        'epoch': epoch,
        'generator_state_dict': generator.state_dict(),
        'discriminator_state_dict': discriminator.state_dict(),
        'optimizer_G_state_dict': optimizer_G.state_dict(),
        'optimizer_D_state_dict': optimizer_D.state_dict()
    }, checkpoint_path)

generator.train()  
discriminator.train()

## Model - TV Loss

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os
import cv2
import numpy as np
from tqdm import tqdm
import zipfile
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        self.enc1 = self.conv_block(3, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)
        self.enc5 = self.conv_block(512, 1024)

        self.up1 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec1 = self.conv_block(1024 + 512, 512)

        self.up2 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec2 = self.conv_block(512 + 256, 256)

        self.up3 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec3 = self.conv_block(256 + 128, 128)

        self.up4 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.dec4 = self.conv_block(128 + 64, 64)

        self.final = nn.Conv2d(64, 3, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        e1 = self.enc1(x)
        e2 = self.enc2(nn.MaxPool2d(2)(e1))
        e3 = self.enc3(nn.MaxPool2d(2)(e2))
        e4 = self.enc4(nn.MaxPool2d(2)(e3))
        e5 = self.enc5(nn.MaxPool2d(2)(e4))

        d1 = self.dec1(torch.cat([self.up1(e5), e4], dim=1))
        d2 = self.dec2(torch.cat([self.up2(d1), e3], dim=1))
        d3 = self.dec3(torch.cat([self.up3(d2), e2], dim=1))
        d4 = self.dec4(torch.cat([self.up4(d3), e1], dim=1))

        return torch.sigmoid(self.final(d4))

class PatchGANDiscriminator(nn.Module):
    def __init__(self, in_channels=3):
        super(PatchGANDiscriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(256, 512, kernel_size=4, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)

class ImageDataset(Dataset):
    def __init__(self, input_dir, gt_dir, transform=None):
        self.input_dir = input_dir
        self.gt_dir = gt_dir
        self.input_images = sorted(os.listdir(input_dir))
        self.gt_images = sorted(os.listdir(gt_dir))
        self.transform = transform

    def __len__(self):
        return len(self.input_images)

    def __getitem__(self, idx):
        input_path = os.path.join(self.input_dir, self.input_images[idx])
        gt_path = os.path.join(self.gt_dir, self.gt_images[idx])
        input_image = cv2.imread(input_path)
        gt_image = cv2.imread(gt_path)
        if self.transform:
            input_image = self.transform(input_image)
            gt_image = self.transform(gt_image)
        return (
            torch.tensor(input_image).permute(2, 0, 1).float() / 255.0,
            torch.tensor(gt_image).permute(2, 0, 1).float() / 255.0
        )

# TV loss function
def tv_loss(img, tv_weight):
    w_variance = torch.sum(torch.pow(img[:, :, :, :-1] - img[:, :, :, 1:], 2))
    h_variance = torch.sum(torch.pow(img[:, :, :-1, :] - img[:, :, 1:, :], 2))
    loss = tv_weight * (h_variance + w_variance)
    return loss

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize models, loss functions, optimizers, and dataloader (already defined in your code)
generator = UNet().to(device)
discriminator = PatchGANDiscriminator().to(device)

# DataParallel for multi-GPU
generator = nn.DataParallel(generator, device_ids=[0])
discriminator = nn.DataParallel(discriminator, device_ids=[0])

# Loss functions
adversarial_loss = nn.BCELoss()
pixel_loss = nn.MSELoss()

# Optimizers
optimizer_G = optim.AdamW(generator.parameters(), lr=0.0001, betas=(0.5, 0.999))
optimizer_D = optim.AdamW(discriminator.parameters(), lr=0.0001, betas=(0.5, 0.999))

# Dataset and Dataloader
train_dataset = ImageDataset("/home/work/Donghyeon/dataset/train_input", "/home/work/Donghyeon/dataset/train_gt")
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=1, pin_memory=True)

# Training parameters
epochs = 100
result_dir = "/home/work/Donghyeon/TVLoss"
os.makedirs(result_dir, exist_ok=True)
checkpoint_path = "checkpoint.pth"
tv_weight = 1e-6  # TV loss weight

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from tqdm import tqdm
import os
import cv2
import numpy as np
import zipfile

# Define learning rate schedulers
scheduler_G = optim.lr_scheduler.StepLR(optimizer_G, step_size=10, gamma=0.5)
scheduler_D = optim.lr_scheduler.StepLR(optimizer_D, step_size=10, gamma=0.5)

# Training loop
for epoch in range(epochs):
    generator.train()
    discriminator.train()
    running_loss_G = 0.0
    running_loss_D = 0.0

    # Learning rate scheduler (optional)
    scheduler_G.step()
    scheduler_D.step()

    with tqdm(total=len(train_loader), desc=f"Epoch {epoch+1}/{epochs}", unit="batch") as pbar:
        for i, (input_images, gt_images) in enumerate(train_loader):
            input_images, gt_images = input_images.to(device), gt_images.to(device)

            # Real and fake labels
            real_labels = torch.ones_like(discriminator(gt_images)).to(device)
            fake_labels = torch.zeros_like(discriminator(input_images)).to(device)

            # Train discriminator
            optimizer_D.zero_grad()
            with torch.no_grad():
                fake_images = generator(input_images)

            pred_real = discriminator(gt_images)
            loss_real = adversarial_loss(pred_real, real_labels)

            pred_fake = discriminator(fake_images.detach())
            loss_fake = adversarial_loss(pred_fake, fake_labels)

            d_loss = (loss_real + loss_fake) / 2
            d_loss.backward()
            nn.utils.clip_grad_norm_(discriminator.parameters(), max_norm=1.0)  # Gradient clipping
            optimizer_D.step()

            # Train generator
            if i % 2 == 0:  # Update generator every 2 discriminator updates
                optimizer_G.zero_grad()
                fake_images = generator(input_images)
                pred_fake = discriminator(fake_images)

                g_loss_adv = adversarial_loss(pred_fake, real_labels)
                g_loss_pixel = pixel_loss(fake_images, gt_images)
                g_loss_tv = tv_loss(fake_images, tv_weight)
                g_loss = g_loss_adv + 50 * g_loss_pixel + 0.1 * g_loss_tv

                g_loss.backward()
                nn.utils.clip_grad_norm_(generator.parameters(), max_norm=1.0)  # Gradient clipping
                optimizer_G.step()

            running_loss_G += g_loss.item()
            running_loss_D += d_loss.item()

            pbar.set_postfix(generator_loss=g_loss.item(), discriminator_loss=d_loss.item())
            pbar.update(1)

    print(f"Epoch [{epoch+1}/{epochs}] - Generator Loss: {running_loss_G / len(train_loader):.4f}, Discriminator Loss: {running_loss_D / len(train_loader):.4f}")

    # Save and test results
    test_input_dir = "/home/work/Donghyeon/dataset/test_input"
    output_dir = f"output_images_epoch_{epoch+1}"
    os.makedirs(output_dir, exist_ok=True)
    with torch.no_grad():
        for img_name in sorted(os.listdir(test_input_dir)):
            img_path = os.path.join(test_input_dir, img_name)
            img = cv2.imread(img_path)
            input_tensor = torch.tensor(img).permute(2, 0, 1).unsqueeze(0).float().to(device) / 255.0
            output = generator(input_tensor).squeeze().permute(1, 2, 0).cpu().numpy() * 255.0
            output = output.astype(np.uint8)
            cv2.imwrite(os.path.join(output_dir, img_name), output)

    zip_filename = os.path.join(result_dir, f"epoch_{epoch+1}.zip")
    with zipfile.ZipFile(zip_filename, 'w') as zipf:
        for img_name in os.listdir(output_dir):
            zipf.write(os.path.join(output_dir, img_name), arcname=img_name)
    print(f"Epoch {epoch+1} results saved to {zip_filename}")

    torch.save({
        'epoch': epoch,
        'generator_state_dict': generator.state_dict(),
        'discriminator_state_dict': discriminator.state_dict(),
        'optimizer_G_state_dict': optimizer_G.state_dict(),
        'optimizer_D_state_dict': optimizer_D.state_dict()
    }, checkpoint_path)

generator.train()
discriminator.train()

Epoch 1/100: 100%|██████████| 3701/3701 [15:04<00:00,  4.09batch/s, discriminator_loss=0.663, generator_loss=1.88] 


Epoch [1/100] - Generator Loss: 3.0199, Discriminator Loss: 0.5230
Epoch 1 results saved to /home/work/Donghyeon/TVLoss/epoch_1.zip


Epoch 2/100: 100%|██████████| 3701/3701 [30:03<00:00,  2.05batch/s, discriminator_loss=0.518, generator_loss=1.67] 


Epoch [2/100] - Generator Loss: 1.8420, Discriminator Loss: 0.6256
Epoch 2 results saved to /home/work/Donghyeon/TVLoss/epoch_2.zip


Epoch 3/100: 100%|██████████| 3701/3701 [36:24<00:00,  1.69batch/s, discriminator_loss=0.631, generator_loss=1.64] 


Epoch [3/100] - Generator Loss: 1.7634, Discriminator Loss: 0.6295
Epoch 3 results saved to /home/work/Donghyeon/TVLoss/epoch_3.zip


Epoch 4/100: 100%|██████████| 3701/3701 [41:23<00:00,  1.49batch/s, discriminator_loss=0.644, generator_loss=1.59] 


Epoch [4/100] - Generator Loss: 1.6974, Discriminator Loss: 0.6347
Epoch 4 results saved to /home/work/Donghyeon/TVLoss/epoch_4.zip


Epoch 5/100: 100%|██████████| 3701/3701 [38:52<00:00,  1.59batch/s, discriminator_loss=0.743, generator_loss=1.93] 


Epoch [5/100] - Generator Loss: 1.6494, Discriminator Loss: 0.6270
Epoch 5 results saved to /home/work/Donghyeon/TVLoss/epoch_5.zip


Epoch 6/100: 100%|██████████| 3701/3701 [23:54<00:00,  2.58batch/s, discriminator_loss=1.03, generator_loss=1.09]  


Epoch [6/100] - Generator Loss: 1.5688, Discriminator Loss: 0.6361
Epoch 6 results saved to /home/work/Donghyeon/TVLoss/epoch_6.zip


Epoch 7/100: 100%|██████████| 3701/3701 [18:41<00:00,  3.30batch/s, discriminator_loss=0.882, generator_loss=1.42] 


Epoch [7/100] - Generator Loss: 1.5458, Discriminator Loss: 0.6307
Epoch 7 results saved to /home/work/Donghyeon/TVLoss/epoch_7.zip


Epoch 8/100: 100%|██████████| 3701/3701 [19:54<00:00,  3.10batch/s, discriminator_loss=0.682, generator_loss=1.19] 


Epoch [8/100] - Generator Loss: 1.5422, Discriminator Loss: 0.6252
Epoch 8 results saved to /home/work/Donghyeon/TVLoss/epoch_8.zip


Epoch 9/100: 100%|██████████| 3701/3701 [18:47<00:00,  3.28batch/s, discriminator_loss=0.57, generator_loss=1.89]  


Epoch [9/100] - Generator Loss: 1.5176, Discriminator Loss: 0.6295
Epoch 9 results saved to /home/work/Donghyeon/TVLoss/epoch_9.zip


Epoch 10/100: 100%|██████████| 3701/3701 [19:56<00:00,  3.09batch/s, discriminator_loss=0.688, generator_loss=1.87] 


Epoch [10/100] - Generator Loss: 1.3810, Discriminator Loss: 0.6052
Epoch 10 results saved to /home/work/Donghyeon/TVLoss/epoch_10.zip


Epoch 11/100: 100%|██████████| 3701/3701 [18:51<00:00,  3.27batch/s, discriminator_loss=0.655, generator_loss=1.57] 


Epoch [11/100] - Generator Loss: 1.3934, Discriminator Loss: 0.6075
Epoch 11 results saved to /home/work/Donghyeon/TVLoss/epoch_11.zip


Epoch 12/100: 100%|██████████| 3701/3701 [19:46<00:00,  3.12batch/s, discriminator_loss=0.748, generator_loss=1.34] 


Epoch [12/100] - Generator Loss: 1.4236, Discriminator Loss: 0.6044
Epoch 12 results saved to /home/work/Donghyeon/TVLoss/epoch_12.zip


Epoch 13/100: 100%|██████████| 3701/3701 [19:08<00:00,  3.22batch/s, discriminator_loss=0.599, generator_loss=1.51] 


Epoch [13/100] - Generator Loss: 1.4161, Discriminator Loss: 0.6057
Epoch 13 results saved to /home/work/Donghyeon/TVLoss/epoch_13.zip


Epoch 14/100: 100%|██████████| 3701/3701 [19:30<00:00,  3.16batch/s, discriminator_loss=0.627, generator_loss=1.26] 


Epoch [14/100] - Generator Loss: 1.4203, Discriminator Loss: 0.6039
Epoch 14 results saved to /home/work/Donghyeon/TVLoss/epoch_14.zip


Epoch 15/100: 100%|██████████| 3701/3701 [19:21<00:00,  3.19batch/s, discriminator_loss=0.534, generator_loss=1.38] 


Epoch [15/100] - Generator Loss: 1.4319, Discriminator Loss: 0.6013
Epoch 15 results saved to /home/work/Donghyeon/TVLoss/epoch_15.zip


Epoch 16/100: 100%|██████████| 3701/3701 [19:18<00:00,  3.20batch/s, discriminator_loss=0.711, generator_loss=1.34] 


Epoch [16/100] - Generator Loss: 1.4174, Discriminator Loss: 0.6044
Epoch 16 results saved to /home/work/Donghyeon/TVLoss/epoch_16.zip


Epoch 17/100: 100%|██████████| 3701/3701 [19:33<00:00,  3.15batch/s, discriminator_loss=0.939, generator_loss=1.21] 


Epoch [17/100] - Generator Loss: 1.4235, Discriminator Loss: 0.6025
Epoch 17 results saved to /home/work/Donghyeon/TVLoss/epoch_17.zip


Epoch 18/100: 100%|██████████| 3701/3701 [19:03<00:00,  3.24batch/s, discriminator_loss=0.558, generator_loss=1.31] 


Epoch [18/100] - Generator Loss: 1.4130, Discriminator Loss: 0.5991
Epoch 18 results saved to /home/work/Donghyeon/TVLoss/epoch_18.zip


Epoch 19/100: 100%|██████████| 3701/3701 [19:47<00:00,  3.12batch/s, discriminator_loss=0.451, generator_loss=1.87] 


Epoch [19/100] - Generator Loss: 1.4465, Discriminator Loss: 0.5896
Epoch 19 results saved to /home/work/Donghyeon/TVLoss/epoch_19.zip


Epoch 20/100: 100%|██████████| 3701/3701 [18:53<00:00,  3.26batch/s, discriminator_loss=0.365, generator_loss=1.72] 


Epoch [20/100] - Generator Loss: 1.3642, Discriminator Loss: 0.5648
Epoch 20 results saved to /home/work/Donghyeon/TVLoss/epoch_20.zip


Epoch 21/100: 100%|██████████| 3701/3701 [19:54<00:00,  3.10batch/s, discriminator_loss=0.62, generator_loss=1.6]   


Epoch [21/100] - Generator Loss: 1.3811, Discriminator Loss: 0.5619
Epoch 21 results saved to /home/work/Donghyeon/TVLoss/epoch_21.zip


Epoch 22/100: 100%|██████████| 3701/3701 [18:42<00:00,  3.30batch/s, discriminator_loss=0.565, generator_loss=1.27] 


Epoch [22/100] - Generator Loss: 1.3972, Discriminator Loss: 0.5575
Epoch 22 results saved to /home/work/Donghyeon/TVLoss/epoch_22.zip


Epoch 23/100: 100%|██████████| 3701/3701 [19:57<00:00,  3.09batch/s, discriminator_loss=0.584, generator_loss=1.8]  


Epoch [23/100] - Generator Loss: 1.4182, Discriminator Loss: 0.5531
Epoch 23 results saved to /home/work/Donghyeon/TVLoss/epoch_23.zip


Epoch 24/100: 100%|██████████| 3701/3701 [18:39<00:00,  3.31batch/s, discriminator_loss=0.479, generator_loss=1.29] 


Epoch [24/100] - Generator Loss: 1.4421, Discriminator Loss: 0.5454
Epoch 24 results saved to /home/work/Donghyeon/TVLoss/epoch_24.zip


Epoch 25/100: 100%|██████████| 3701/3701 [19:55<00:00,  3.10batch/s, discriminator_loss=0.513, generator_loss=1.52] 


Epoch [25/100] - Generator Loss: 1.4651, Discriminator Loss: 0.5392
Epoch 25 results saved to /home/work/Donghyeon/TVLoss/epoch_25.zip


Epoch 26/100: 100%|██████████| 3701/3701 [20:25<00:00,  3.02batch/s, discriminator_loss=0.558, generator_loss=1.3]  


Epoch [26/100] - Generator Loss: 1.4821, Discriminator Loss: 0.5328
Epoch 26 results saved to /home/work/Donghyeon/TVLoss/epoch_26.zip


Epoch 27/100:  27%|██▋       | 982/3701 [06:08<15:03,  3.01batch/s, discriminator_loss=0.449, generator_loss=1.42]

In [20]:
import torch

if torch.cuda.is_available():
    num_gpus = torch.cuda.device_count()
    print(f"Available GPUs: {num_gpus}")
    for i in range(num_gpus):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
else:
    print("No GPU available")


Available GPUs: 1
GPU 0: CUDA GPU


In [9]:
import shutil
import os

def compress_folder(folder_path, output_path):
    """
    폴더를 압축하여 지정된 경로에 저장합니다.

    :param folder_path: 압축할 폴더의 경로
    :param output_path: 압축 파일의 저장 경로 (확장자 제외)
    :return: 압축 파일의 경로
    """
    if not os.path.exists(folder_path):
        raise FileNotFoundError(f"폴더가 존재하지 않습니다: {folder_path}")
    
    # shutil.make_archive는 압축 파일의 경로와 포맷을 지정합니다.
    archive_path = shutil.make_archive(output_path, 'zip', folder_path)
    return archive_path


# 예제 사용
if __name__ == "__main__":
    folder_to_compress = "/home/work/Donghyeon/result/Unet/output_images_epoch_29"  # 압축할 폴더 경로
    output_file = "/home/work/Donghyeon/result"  # 결과 파일 경로 (확장자 제외)

    try:
        compressed_file_path = compress_folder(folder_to_compress, output_file)
        print(f"폴더가 성공적으로 압축되었습니다: {compressed_file_path}")
    except Exception as e:
        print(f"오류 발생: {e}")


폴더가 성공적으로 압축되었습니다: /home/work/Donghyeon/result.zip


# Train

In [30]:
# 모델 저장을 위한 디렉토리 생성
model_save_dir = "/home/work/Donghyeon/div_datasetsaved_models"
os.makedirs(model_save_dir, exist_ok=True)

best_loss = float("inf")
lambda_pixel = 100  # 픽셀 손실에 대한 가중치
        
# Generator와 Discriminator 초기화
generator = UNetGenerator()
generator = generator.to(device)
discriminator = PatchGANDiscriminator()
discriminator = discriminator.to(device)

generator.apply(weights_init_normal).to(device)
discriminator.apply(weights_init_normal).to(device)

# 손실 함수 및 옵티마이저 설정
criterion_GAN = nn.MSELoss()
criterion_pixelwise = nn.L1Loss()

optimizer_G = optim.Adam(generator.parameters(), lr = CFG["LEARNING_RATE"])
optimizer_D = optim.Adam(discriminator.parameters(), lr = CFG["LEARNING_RATE"]) 

# 학습
for epoch in range(1, CFG['EPOCHS'] + 1):
    for i, batch in enumerate(dataloader):
        real_A = batch['A'].to(device)
        real_B = batch['B'].to(device)

        # Generator 훈련
        optimizer_G.zero_grad()
        fake_B = generator(real_A)
        pred_fake = discriminator(fake_B, real_A)
        loss_GAN = criterion_GAN(pred_fake, torch.ones_like(pred_fake).to(device))
        loss_pixel = criterion_pixelwise(fake_B, real_B)
        loss_G = loss_GAN + lambda_pixel * loss_pixel
        loss_G.backward()
        optimizer_G.step()

        # Discriminator 훈련
        optimizer_D.zero_grad()
        pred_real = discriminator(real_B, real_A)
        loss_real = criterion_GAN(pred_real, torch.ones_like(pred_real).to(device))
        pred_fake = discriminator(fake_B.detach(), real_A)
        loss_fake = criterion_GAN(pred_fake, torch.zeros_like(pred_fake).to(device))
        loss_D = 0.5 * (loss_real + loss_fake)
        loss_D.backward()
        optimizer_D.step()

        # 진행 상황 출력
        print(f"[Epoch {epoch}/{CFG['EPOCHS']}] [Batch {i}/{len(dataloader)}] [D loss: {loss_D.item()}] [G loss: {loss_G.item()}]")

        # 현재 에포크에서의 손실이 best_loss보다 작으면 모델 저장
        if loss_G.item() < best_loss:
            best_loss = loss_G.item()
            torch.save(generator.state_dict(), os.path.join(model_save_dir, "best_generator.pth"))
            torch.save(discriminator.state_dict(), os.path.join(model_save_dir, "best_discriminator.pth"))
            print(f"Best model saved at epoch {epoch}, batch {i} with G loss: {loss_G.item()} and D loss: {loss_D.item()}")

[Epoch 1/10] [Batch 0/1851] [D loss: 3.027674436569214] [G loss: 53.319435119628906]
Best model saved at epoch 1, batch 0 with G loss: 53.319435119628906 and D loss: 3.027674436569214
[Epoch 1/10] [Batch 1/1851] [D loss: 39.19083023071289] [G loss: 88.07061767578125]
[Epoch 1/10] [Batch 2/1851] [D loss: 1.6809589862823486] [G loss: 48.0106315612793]
Best model saved at epoch 1, batch 2 with G loss: 48.0106315612793 and D loss: 1.6809589862823486
[Epoch 1/10] [Batch 3/1851] [D loss: 6.1320109367370605] [G loss: 49.246726989746094]
[Epoch 1/10] [Batch 4/1851] [D loss: 1.5387701988220215] [G loss: 44.518348693847656]
Best model saved at epoch 1, batch 4 with G loss: 44.518348693847656 and D loss: 1.5387701988220215
[Epoch 1/10] [Batch 5/1851] [D loss: 2.8624863624572754] [G loss: 47.39918899536133]
[Epoch 1/10] [Batch 6/1851] [D loss: 2.0438361167907715] [G loss: 43.60688018798828]
Best model saved at epoch 1, batch 6 with G loss: 43.60688018798828 and D loss: 2.0438361167907715
[Epoch 1/

# Inference

In [None]:
# test_transform 설정
test_dir = '/home/work/.project1/data/test_input'     # test 이미지 폴더 경로

# 데이터 전처리 설정
transform_test = transform_test.Compose([
    transform_test.Resize((256, 256)),
    transform_test.ToTensor(),
    transform_test.Normalize([0.5], [0.5])
])

# 데이터셋 및 DataLoader 생성
dataset = CustomDataset(damage_dir=damage_dir, origin_dir=origin_dir, transform=transform_test)
dataloader = DataLoader(dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=1) 

In [31]:
# 저장할 디렉토리 설정
submission_dir = "./submission"
os.makedirs(submission_dir, exist_ok=True)

# 이미지 로드 및 전처리
def load_image(image_path):
    image = Image.open(image_path).convert("RGB")
    image = transform(image)
    image = image.unsqueeze(0)  # 배치 차원을 추가합니다.
    return image

# 모델 경로 설정
generator_path = os.path.join(model_save_dir, "best_generator.pth")

# 모델 로드 및 설정
model = UNetGenerator().to(device)
model.load_state_dict(torch.load(generator_path))
model.eval()

# 파일 리스트 불러오기
test_images = sorted(os.listdir(test_dir))

# 모든 테스트 이미지에 대해 추론 수행
for image_name in test_images:
    test_image_path = os.path.join(test_dir, image_name)

    # 손상된 테스트 이미지 로드 및 전처리
    test_image = load_image(test_image_path).to(device)

    with torch.no_grad():
        # 모델로 예측
        pred_image = model(test_image)
        pred_image = pred_image.cpu().squeeze(0)  # 배치 차원 제거
        pred_image = pred_image * 0.5 + 0.5  # 역정규화
        pred_image = pred_image.numpy().transpose(1, 2, 0)  # HWC로 변경
        pred_image = (pred_image * 255).astype('uint8')  # 0-255 범위로 변환
        
        # 예측된 이미지를 실제 이미지와 같은 512x512로 리사이즈
        pred_image_resized = cv2.resize(pred_image, (512, 512), interpolation=cv2.INTER_LINEAR)

    # 결과 이미지 저장
    output_path = os.path.join(submission_dir, image_name)
    cv2.imwrite(output_path, cv2.cvtColor(pred_image_resized, cv2.COLOR_RGB2BGR))    
    
print(f"Saved all images")

  model.load_state_dict(torch.load(generator_path))


Saved all images


# Submission

In [32]:
# 저장된 결과 이미지를 ZIP 파일로 압축
zip_filename = "Only_augmentation.zip"
with zipfile.ZipFile(zip_filename, 'w') as submission_zip:
    for image_name in test_images:
        image_path = os.path.join(submission_dir, image_name)
        submission_zip.write(image_path, arcname=image_name)

print(f"All images saved in {zip_filename}")

All images saved in Only_augmentation.zip
