In [41]:
import os
import glob
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.utils import save_image

In [42]:
import gc

gc.collect()

2886

In [43]:
print("학습 데이터셋 A와 B의 개수:", len(next(os.walk('./stable_diffusion_1108_V/train/'))[2]))
print("테스트 데이터셋 A와 B의 개수:", len(next(os.walk('./stable_diffusion_1108_V/test/'))[2]))
print("평가 데이터셋 A와 B의 개수:", len(next(os.walk('./stable_diffusion_1108_V/val/'))[2]))

학습 데이터셋 A와 B의 개수: 20
테스트 데이터셋 A와 B의 개수: 0
평가 데이터셋 A와 B의 개수: 5


In [44]:
class ImageDataset(Dataset):
    def __init__(self, root, transforms_=None, mode="train"):
        self.transform = transforms_

        self.files = sorted(glob.glob(os.path.join(root, mode) + "/*.jpg"))
        # 데이터의 개수가 적기 때문에 테스트 데이터를 학습 시기에 사용
        if mode == "train":
            self.files.extend(sorted(glob.glob(os.path.join(root, "test") + "/*.jpg")))

    def __getitem__(self, index):
        img = Image.open(self.files[index % len(self.files)])
        w, h = img.size
        img_A = img.crop((0, 0, w / 2, h)) # 이미지의 왼쪽 절반
        img_B = img.crop((w / 2, 0, w, h)) # 이미지의 오른쪽 절반

        # 데이터 증진(data augmentation)을 위한 좌우 반전(horizontal flip)
        if np.random.random() < 0.5:
            img_A = Image.fromarray(np.array(img_A)[:, ::-1, :], "RGB")
            img_B = Image.fromarray(np.array(img_B)[:, ::-1, :], "RGB")

        img_A = self.transform(img_A)
        img_B = self.transform(img_B)
        
        return {"A": img_A, "B": img_B}

    def __len__(self):
        return len(self.files)

In [45]:
transforms_ = transforms.Compose([
    transforms.Resize((1024, 1024), Image.BICUBIC),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_dataset = ImageDataset("stable_diffusion_1108_V", transforms_=transforms_, mode='train')
val_dataset = ImageDataset("stable_diffusion_1108_V", transforms_=transforms_, mode='val')

train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=0)
val_dataloader = DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=0)

In [46]:
print(len(train_dataloader))
print(len(val_dataloader))

20
5


In [47]:
# U-Net 아키텍처의 다운 샘플링(Down Sampling) 모듈
class UNetDown(nn.Module):
    def __init__(self, in_channels, out_channels, normalize=True, dropout=0.0):
        super(UNetDown, self).__init__()
        # 너비와 높이가 2배씩 감소
        layers = [nn.Conv2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1, bias=False)]
        if normalize:
            layers.append(nn.InstanceNorm2d(out_channels))
        layers.append(nn.LeakyReLU(0.2))
        if dropout:
            layers.append(nn.Dropout(dropout))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)


# U-Net 아키텍처의 업 샘플링(Up Sampling) 모듈: Skip Connection 사용
class UNetUp(nn.Module):
    def __init__(self, in_channels, out_channels, dropout=0.0):
        super(UNetUp, self).__init__()
        # 너비와 높이가 2배씩 증가
        layers = [nn.ConvTranspose2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1, bias=False)]
        layers.append(nn.InstanceNorm2d(out_channels))
        layers.append(nn.ReLU(inplace=True))
        if dropout:
            layers.append(nn.Dropout(dropout))
        self.model = nn.Sequential(*layers)

    def forward(self, x, skip_input):
        x = self.model(x)
        x = torch.cat((x, skip_input), 1) # 채널 레벨에서 합치기(concatenation)

        return x


# U-Net 생성자(Generator) 아키텍처
class GeneratorUNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=3):
        super(GeneratorUNet, self).__init__()

        self.down1 = UNetDown(in_channels, 64, normalize=False) # 출력: [64 X 128 X 128]

        self.down2 = UNetDown(64, 128) # 출력: [128 X 64 X 64]
        self.down3 = UNetDown(128, 256) # 출력: [256 X 32 X 32]
        self.down4 = UNetDown(256, 512, dropout=0.5) # 출력: [512 X 16 X 16]
        self.down5 = UNetDown(512, 512, dropout=0.5) # 출력: [512 X 8 X 8]
        self.down6 = UNetDown(512, 512, dropout=0.5) # 출력: [512 X 4 X 4]
        self.down7 = UNetDown(512, 512, dropout=0.5) # 출력: [512 X 2 X 2]
        self.down8 = UNetDown(512, 512, dropout=0.5) # 출력: [512 X 2 X 2]
        self.down9 = UNetDown(512, 512, normalize=False, dropout=0.5) # 출력: [512 X 1 X 1]

        # Skip Connection 사용(출력 채널의 크기 X 2 == 다음 입력 채널의 크기)
        self.up1 = UNetUp(512, 512, dropout=0.5) # 출력: [1024 X 2 X 2]
        self.up2 = UNetUp(1024, 512, dropout=0.5) # 출력: [1024 X 4 X 4]
        self.up3 = UNetUp(1024, 512, dropout=0.5) # 출력: [1024 X 4 X 4]
        self.up4 = UNetUp(1024, 512, dropout=0.5) # 출력: [1024 X 8 X 8]
        self.up5 = UNetUp(1024, 512, dropout=0.5) # 출력: [1024 X 16 X 16]
        self.up6 = UNetUp(1024, 256) # 출력: [512 X 32 X 32]
        self.up7 = UNetUp(512, 128) # 출력: [256 X 64 X 64]
        self.up8 = UNetUp(256, 64) # 출력: [128 X 128 X 128]

        self.final = nn.Sequential(
            nn.Upsample(scale_factor=2), # 출력: [128 X 256 X 256]
            nn.ZeroPad2d((1, 0, 1, 0)),
            nn.Conv2d(128, out_channels, kernel_size=4, padding=1), # 출력: [3 X 256 X 256]
            nn.Tanh(),
        )

    def forward(self, x):
        # 인코더부터 디코더까지 순전파하는 U-Net 생성자(Generator)
        d1 = self.down1(x)
        d2 = self.down2(d1)
        d3 = self.down3(d2)
        d4 = self.down4(d3)
        d5 = self.down5(d4)
        d6 = self.down6(d5)
        d7 = self.down7(d6)
        d8 = self.down8(d7)
        d9 = self.down9(d8)
        u1 = self.up1(d9, d8)
        u2 = self.up2(u1, d7)
        u3 = self.up3(u2, d6)
        u4 = self.up4(u3, d5)
        u5 = self.up5(u4, d4)
        u6 = self.up6(u5, d3)
        u7 = self.up7(u6, d2)
        u8 = self.up8(u7, d1)

        return self.final(u8)


# U-Net 판별자(Discriminator) 아키텍처
class Discriminator(nn.Module):
    def __init__(self, in_channels=3):
        super(Discriminator, self).__init__()

        def discriminator_block(in_channels, out_channels, normalization=True):
            # 너비와 높이가 2배씩 감소
            layers = [nn.Conv2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1)]
            if normalization:
                layers.append(nn.InstanceNorm2d(out_channels))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        self.model = nn.Sequential(
            # 두 개의 이미지(실제/변환된 이미지, 조건 이미지)를 입력 받으므로 입력 채널의 크기는 2배
            *discriminator_block(in_channels * 2, 64, normalization=False), # 출력: [64 X 128 X 128]
            *discriminator_block(64, 128), # 출력: [128 X 64 X 64]
            *discriminator_block(128, 256), # 출력: [256 X 32 X 32]
            *discriminator_block(256, 512), # 출력: [512 X 16 X 16]
            nn.ZeroPad2d((1, 0, 1, 0)),
            nn.Conv2d(512, 1, kernel_size=4, padding=1, bias=False) # 출력: [1 X 16 X 16]
        )

    # img_A: 실제/변환된 이미지, img_B: 조건(condition)
    def forward(self, img_A, img_B):
        # 이미지 두 개를 채널 레벨에서 연결하여(concatenate) 입력 데이터 생성
        img_input = torch.cat((img_A, img_B), 1)
        return self.model(img_input)

In [48]:
def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find("BatchNorm2d") != -1:
        torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
        torch.nn.init.constant_(m.bias.data, 0.0)


# best모델 저장을 위한 디렉터리 생성 (필요한 경우)
os.makedirs("Best_saved_models", exist_ok=True)

# best모델 저장을 위한 디렉터리 생성 (필요한 경우)
os.makedirs("5epoch_saved_models", exist_ok=True)

# best모델 저장을 위한 디렉터리 생성 (필요한 경우)
os.makedirs("Last_saved_models", exist_ok=True)



G_model_name = "G_best_size1024.pt"
D_model_name = "D_best_size1024.pt"

print(os.listdir('Best_saved_models'))
if G_model_name in os.listdir('Best_saved_models'):
    # 생성자(generator)와 판별자(discriminator) 초기화
    generator = GeneratorUNet()
    discriminator = Discriminator()

    # 생성자와 판별자 불러오기
    generator.load_state_dict(torch.load(os.path.join("Best_saved_models",G_model_name)))         # 추가 학습할 generate 모델 이름
    discriminator.load_state_dict(torch.load(os.path.join("Best_saved_models",D_model_name)))  

    # gpu올리기
    generator.cuda()
    discriminator.cuda()

    # 학습모드로 변경
    generator.train()
    discriminator.train()

    print(f"저장된 {G_model_name}로드 완료")

else:
    # 생성자(generator)와 판별자(discriminator) 초기화
    generator = GeneratorUNet()
    discriminator = Discriminator()

    generator.cuda()
    discriminator.cuda()

    # 가중치(weights) 초기화
    generator.apply(weights_init_normal)
    discriminator.apply(weights_init_normal)
    print("새로운 모델 로드")




# 손실 함수(loss function)
criterion_GAN = torch.nn.MSELoss()
criterion_pixelwise = torch.nn.L1Loss()

criterion_GAN.cuda()
criterion_pixelwise.cuda()

# 학습률(learning rate) 설정
lr = 0.0002

# 생성자와 판별자를 위한 최적화 함수
optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))

[]
새로운 모델 로드


In [49]:
if "best_loss.txt" in os.listdir():
    # 저장된 best_loss값 불러오기
    with open('best_loss.txt', 'r') as file:
        # 파일의 내용을 읽어와 data 변수에 저장
        data = file.read()
        data = int(float(data))
        best_loss = data
else:
    best_loss = np.inf
print(f"best_loss값: {best_loss}")


best_loss값: inf


In [50]:
import time


n_epochs = 500          # 학습의 횟수(epoch) 설정
sample_interval = 1000  # 몇 번의 배치(batch)마다 결과를 출력할 것인지 설정

# 변환된 이미지와 정답 이미지 사이의 L1 픽셀 단위(pixel-wise) 손실 가중치(weight) 파라미터
lambda_pixel = 100

start_time = time.time()


for epoch in range(n_epochs+1):
    for i, batch in enumerate(train_dataloader):
        # 모델의 입력(input) 데이터 불러오기
        real_A = batch["A"].cuda()
        real_B = batch["B"].cuda()

        # 위 코드 경고시
        real = torch.tensor(np.ones((real_A.size(0), 1, 64, 64)), dtype=torch.float32, device='cuda')
        fake = torch.tensor(np.zeros((real_A.size(0), 1, 64, 64)), dtype=torch.float32, device='cuda')

        """ 생성자(generator)를 학습합니다. """
        optimizer_G.zero_grad()

        # 이미지 생성
        fake_B = generator(real_A)

        # 생성자(generator)의 손실(loss) 값 계산
        loss_GAN = criterion_GAN(discriminator(fake_B, real_A), real)

        # 픽셀 단위(pixel-wise) L1 손실 값 계산
        loss_pixel = criterion_pixelwise(fake_B, real_B)

        # 최종적인 손실(loss)
        loss_G = loss_GAN + lambda_pixel * loss_pixel

        # 생성자(generator) 업데이트
        loss_G.backward()
        optimizer_G.step()

        """ 판별자(discriminator)를 학습합니다. """
        optimizer_D.zero_grad()

        # 판별자(discriminator)의 손실(loss) 값 계산
        loss_real = criterion_GAN(discriminator(real_B, real_A), real) # 조건(condition): real_A
        loss_fake = criterion_GAN(discriminator(fake_B.detach(), real_A), fake)
        loss_D = (loss_real + loss_fake) / 2

        # 판별자(discriminator) 업데이트
        loss_D.backward()
        optimizer_D.step()

        done = epoch * len(train_dataloader) + i
        if done % sample_interval == 0:
            os.makedirs("check_processing", exist_ok=True)
            imgs = next(iter(val_dataloader)) # 10개의 이미지를 추출해 생성
            real_A = imgs["A"].cuda()
            real_B = imgs["B"].cuda()
            fake_B = generator(real_A)
            # real_A: 조건(condition), fake_B: 변환된 이미지(translated image), real_B: 정답 이미지
            img_sample = torch.cat((real_A.data, fake_B.data, real_B.data), -2) # 높이(height)를 기준으로 이미지를 연결하기
            save_image(img_sample, f"check_processing/{done}.jpg", nrow=5, normalize=True)


    """ 검증 손실 계산 """
    val_loss_pixel = 0
    val_loss_GAN = 0

    with torch.no_grad():
        for i, batch in enumerate(val_dataloader):
            real_A = batch["A"].cuda()
            real_B = batch["B"].cuda()
            fake_B = generator(real_A)

            # 생성자(generator)의 손실(loss) 값 계산
            loss_GAN_val = criterion_GAN(discriminator(fake_B, real_A), real)
            loss_pixel_val = criterion_pixelwise(fake_B, real_B)
            val_loss_GAN += loss_GAN_val.item()
            val_loss_pixel += loss_pixel_val.item()

    val_loss_GAN /= len(val_dataloader)
    val_loss_pixel /= len(val_dataloader)

    # 2. 검증 데이터셋에 대한 로그 출력
    print(f"[Epoch {epoch}/{n_epochs}] [Val G pixel loss: {val_loss_pixel:.6f}, adv loss: {val_loss_GAN}], Best loss: {best_loss:.6f}")

    # 3. 손실이 이전 최소값보다 작으면 모델 저장
    current_loss = val_loss_pixel + lambda_pixel * val_loss_GAN
    if current_loss < best_loss:
        best_loss = current_loss
        torch.save(generator.state_dict(), os.path.join("Best_saved_models", G_model_name))  
        torch.save(discriminator.state_dict(),  os.path.join("Best_saved_models", D_model_name))

        # best_loss값 저장하기
        data = str(best_loss)
        with open('best_loss.txt', 'w') as file:
            file.write(data)
        print(f"Saved best model with loss: {best_loss:.6f}")

    # 하나의 epoch이 끝날 때마다 로그(log) 출력
    print(f"[Epoch {epoch}/{n_epochs}] [D loss: {loss_D.item():.6f}] [G pixel loss: {loss_pixel.item():.6f}, adv loss: {loss_GAN.item()}] [Elapsed time: {time.time() - start_time:.2f}s]")

    # 50epoch마다 모델 저장
    if epoch%5 == 0:
        torch.save(generator.state_dict(), os.path.join("5epoch_saved_models", f"G_size1024_{epoch}.pt"))
        torch.save(discriminator.state_dict(),  os.path.join("5epoch_saved_models", f"D_size1024_{epoch}.pt"))
        print(f"{epoch}epoch_모델저장")
    


[Epoch 0/500] [Val G pixel loss: 0.180694, adv loss: 0.5919915676116944], Best loss: inf
Saved best model with loss: 59.379851
[Epoch 0/500] [D loss: 0.533917] [G pixel loss: 0.133123, adv loss: 0.6916313767433167] [Elapsed time: 8.45s]
0epoch_모델저장
[Epoch 1/500] [Val G pixel loss: 0.179004, adv loss: 0.7554353594779968], Best loss: 59.379851
[Epoch 1/500] [D loss: 0.299710] [G pixel loss: 0.147721, adv loss: 0.8297504186630249] [Elapsed time: 16.31s]
[Epoch 2/500] [Val G pixel loss: 0.177302, adv loss: 0.7815969944000244], Best loss: 59.379851
[Epoch 2/500] [D loss: 0.184606] [G pixel loss: 0.228377, adv loss: 0.6436433792114258] [Elapsed time: 23.48s]
[Epoch 3/500] [Val G pixel loss: 0.170971, adv loss: 0.5319607377052307], Best loss: 59.379851
Saved best model with loss: 53.367045
[Epoch 3/500] [D loss: 0.390278] [G pixel loss: 0.151496, adv loss: 0.8752924203872681] [Elapsed time: 31.08s]


: 