# CycleGAN pretraining

In [None]:
!pip install pytorch_fid

In [2]:
import pickle
import argparse
import itertools
import os
import random
import glob

from PIL import Image
from matplotlib import pyplot as plt
from torch.autograd import Variable
from torchvision.utils import save_image, make_grid
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import numpy as np
import matplotlib.image as mpimg
import torch.nn as nn
import torch
import torch.nn.functional as F
from pytorch_fid import fid_score

## Подготовка Датасета

In [3]:
!wget "http://efrosgans.eecs.berkeley.edu/cyclegan/datasets/vangogh2photo.zip"
!unzip -q vangogh2photo.zip -d dataset

--2024-10-08 14:33:30--  http://efrosgans.eecs.berkeley.edu/cyclegan/datasets/vangogh2photo.zip
Resolving efrosgans.eecs.berkeley.edu (efrosgans.eecs.berkeley.edu)... 128.32.244.190
Connecting to efrosgans.eecs.berkeley.edu (efrosgans.eecs.berkeley.edu)|128.32.244.190|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 306590349 (292M) [application/zip]
Saving to: ‘vangogh2photo.zip’


2024-10-08 14:34:44 (4.00 MB/s) - ‘vangogh2photo.zip’ saved [306590349/306590349]



In [4]:
# Вспомогательные функции

def convert_to_rgb(image):
    rgb_image = Image.new("RGB", image.size)
    rgb_image.paste(image)
    return rgb_image

def show_img(img,size=10):
  img = img / 2 + 0.5
  npimg = img.numpy()
  plt.figure(figsize=(size, size))
  plt.imshow(np.transpose(npimg, (1, 2, 0)))
  plt.show()

def plot_output(path, x, y):
    img = mpimg.imread(path)
    plt.figure(figsize=(x,y))
    plt.imshow(img)
    plt.show()

In [5]:
class ImageDataset(Dataset):
    def __init__(self, root, transforms_=None, unaligned=False, mode="train"):
        self.transform = transforms.Compose(transforms_) # применяем необходимые трансформации
        self.unaligned = unaligned
        self.root_A = root[0]
        self.root_B = root[1]

        self.files_A = sorted(glob.glob(self.root_A + "/*.*"))
        self.files_B = sorted(glob.glob(self.root_B + "/*.*"))

    def __getitem__(self, index):
        image_A = Image.open(self.files_A[index % len(self.files_A)])

        if self.unaligned:
            image_B = Image.open(self.files_B[random.randint(0, len(self.files_B) - 1)])
        else:
            image_B = Image.open(self.files_B[index % len(self.files_B)])

        if image_A.mode != "RGB":
            image_A = convert_to_rgb(image_A)
        if image_B.mode != "RGB":
            image_B = convert_to_rgb(image_B)

        item_A = self.transform(image_A)
        item_B = self.transform(image_B)

        # Возвращаем словарь
        return {"A": item_A, "B": item_B}

    def __len__(self):
        return max(len(self.files_A), len(self.files_B))

## Модель CycleGAN
Будем строить архитектуру на основе ResNet

Для поддержания разнообразия и эффективности обучения, будем использовать буфер изображений

Когда буфер заполнен, для каждого нового изображения сгенерированного моделью:

 1) Либо добавляется новое изображение, заменяя одно из старых с вероятностью 0.5.

 2) Либо возвращается одно из старых изображений.

Таким образом, буфер помогает обучению генеративной модели стабилизироваться и не зацикливаться на одном и том же наборе изображений

In [6]:
class ReplayBuffer:
    def __init__(self, max_size=50):
        assert max_size > 0, "Empty buffer."
        self.max_size = max_size
        self.data = []

    def push_and_pop(self, data):
        to_return = []
        for element in data.data:
            element = torch.unsqueeze(element, 0)
            if len(self.data) < self.max_size:
                self.data.append(element)
                to_return.append(element)
            else:
                # если больше 0.5 возвращаем новое
                if random.uniform(0, 1) > 0.5:
                    i = random.randint(0, self.max_size - 1)
                    to_return.append(self.data[i].clone())
                    self.data[i] = element  # заменяем старое
                else:
                    # иначе отправляем старое
                    to_return.append(element)
        return Variable(torch.cat(to_return))

In [7]:
# scheduler
class LambdaLR:
    def __init__(self, n_epochs, offset, decay_start_epoch):
        if (n_epochs - decay_start_epoch) < 0:
            raise Exception("Decay should start before training ends."
                            "Change decay_start_epoch to a value less than {}.".format(n_epochs))
        self.n_epochs = n_epochs
        self.offset = offset
        self.decay_start_epoch = decay_start_epoch

    def step(self, epoch):
        return 1.0 - max(0, epoch + self.offset - self.decay_start_epoch) / (self.n_epochs - self.decay_start_epoch)

Определим главный строительный блок

In [8]:
class ResNetBlock(nn.Module):
    def __init__(self, channels):
        super(ResNetBlock, self).__init__()

        self.conv_block = nn.Sequential(
            nn.ReflectionPad2d(1), # лучше zero padding
            nn.Conv2d(channels, channels, kernel_size=3, padding=0, bias=True),
            nn.InstanceNorm2d(channels), # нормализуем не по батчам
            nn.ReLU(inplace=True),
            nn.ReflectionPad2d(1),
            nn.Conv2d(channels, channels, kernel_size=3, padding=0, bias=True),
            nn.InstanceNorm2d(channels)
        )

    def forward(self, x):
        return x + self.conv_block(x) # прокидываем изображение через блок

### Генератор с backbone ResNet

In [9]:
class GeneratorResNet(nn.Module):
    def __init__(self, input_shape, num_residual_blocks):
        super(GeneratorResNet, self).__init__()

        channels = input_shape[0]

        # начальный convolution block
        out_channels = 64
        model = [
            nn.ReflectionPad2d(channels),
            nn.Conv2d(channels, out_channels, 7),
            nn.InstanceNorm2d(out_channels),
            nn.LeakyReLU(inplace=True),
        ]
        in_channels = out_channels

        # Encoder
        for _ in range(2):
            out_channels *= 2
            model += [
                nn.Conv2d(in_channels, out_channels, 3, stride=2, padding=1),
                nn.InstanceNorm2d(out_channels),
                nn.LeakyReLU(inplace=True),
            ]
            in_channels = out_channels

        # Residual блоки, усложняем представление

        for _ in range(num_residual_blocks):
            model += [ResNetBlock(out_channels)]

        # Decoder
        for _ in range(2):
            out_channels //= 2
            model += [
                nn.Upsample(scale_factor=2),
                nn.Conv2d(in_channels, out_channels, 3, stride=1, padding=1),
                nn.InstanceNorm2d(out_channels),
                nn.LeakyReLU(inplace=True),
            ]
            in_channels = out_channels

        # Output layer
        model += [
            nn.ReflectionPad2d(channels),
            nn.Conv2d(out_channels, channels, 7),
            nn.Tanh(), # для выходов от -1 до 1
        ]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)

### PatchGAN Discriminator
PatchGAN проверяет не все изображение целиком, а отдельные участки изображения (patches). На основе этих участков дискриминатор принимает решение о том, является ли каждый участок "реальным" или "сгенерированным".

In [10]:
class Discriminator(nn.Module):
    def __init__(self, input_shape):
        super(Discriminator, self).__init__()

        channels, height, width = input_shape

        # вычисляем выходную размерность PatchGAN
        self.output_shape = (1, height // 2 ** 4, width // 2 ** 4)

        def discriminator_block(in_channels, out_channels, normalize=True):
            layers = [nn.Conv2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1)]
            if normalize:
                layers.append(nn.InstanceNorm2d(out_channels))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        # C64 -> C128 -> C256 -> C512
        self.model = nn.Sequential(
            *discriminator_block(channels, out_channels=64, normalize=False),
            *discriminator_block(64, out_channels=128),
            *discriminator_block(128, out_channels=256),
            *discriminator_block(256, out_channels=512),
            nn.ZeroPad2d((1, 0, 1, 0)),
            nn.Conv2d(in_channels=512, out_channels=1, kernel_size=4, padding=1) # каждый пиксель теперь соответсвует патчу
        )

    def forward(self, img):
        return self.model(img)

### Зададим гиперпараметры

In [11]:
class Hyperparameters(object):
    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)

    def __str__(self):
        return str(self.__class__) + ": " + str(self.__dict__)

In [12]:
hyperparameters = {
    "name": "CycleGan_VanGogh_Checkpoint",  # Название модели
    "n_epochs": 50,  # Количество эпох
    "batch_size": 4,  # Размер батча
    "lr": 0.0002,  # learning rate
    "decay_start_epoch": 5,  # Эпоха, с которой начинается уменьшение скорости обучения
    "b1": 0.5,  # Параметр для Adam оптимизатора (beta1)
    "b2": 0.999,  # Параметр для Adam оптимизатора (beta2)
    "img_size": 256,  # Размер изображения (например, 256x256)
    "channels": 3,  # Количество каналов
    "num_residual_blocks": 9,
    "lambda_cyc": 10.0,  # Взвешивающий коэффициент для циклической потери
    "lambda_id": 5.0,  # Взвешивающий коэффициент для идентификационной потери
    "data_dir_A": "dataset/vangogh2photo/trainA",  # Путь к данным A
    "data_dir_B": "dataset/vangogh2photo/trainB",  # Путь к данным B
    "val_data_dir_A": "dataset/vangogh2photo/testA",  # Путь к валидационным данным A
    "val_data_dir_B": "dataset/vangogh2photo/testB"  # Путь к валидационным данным B
}

hp = Hyperparameters(**hyperparameters)
print("Hyperparameters: \n")
print(hp)

Hyperparameters: 

<class '__main__.Hyperparameters'>: {'name': 'CycleGan_VanGogh_Checkpoint', 'n_epochs': 50, 'batch_size': 4, 'lr': 0.0002, 'decay_start_epoch': 5, 'b1': 0.5, 'b2': 0.999, 'img_size': 256, 'channels': 3, 'num_residual_blocks': 9, 'lambda_cyc': 10.0, 'lambda_id': 5.0, 'data_dir_A': 'dataset/vangogh2photo/trainA', 'data_dir_B': 'dataset/vangogh2photo/trainB', 'val_data_dir_A': 'dataset/vangogh2photo/testA', 'val_data_dir_B': 'dataset/vangogh2photo/testB'}


### Подготовка моделей

In [13]:
train_transforms_ = [
        transforms.Resize((286, 286)),
        transforms.RandomRotation(degrees=(0,180)),
        transforms.RandomCrop(size=(hp.img_size,hp.img_size)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ]

val_transforms_ = [
    transforms.Resize((hp.img_size, hp.img_size)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]

In [14]:
train_dataloader = DataLoader(
        ImageDataset(root=[hp.data_dir_A,hp.data_dir_B], transforms_=train_transforms_),
        batch_size=hp.batch_size,
        shuffle=True,
        num_workers=2)

val_dataloader = DataLoader(
    ImageDataset(root= [hp.val_data_dir_A,hp.val_data_dir_B], transforms_=val_transforms_),
    batch_size=8,
    shuffle=True,
    num_workers=2)

def to_img(x):
    x = x.view(x.size(0)*2, hp.channels, hp.img_size, hp.img_size)
    return x

In [15]:
cuda = True if torch.cuda.is_available() else False
print("Using CUDA" if cuda else "Not using CUDA")
if cuda is False:
    exit("CUDA is necessary to train the model.")
Tensor = torch.cuda.FloatTensor if cuda else torch.Tensor

Using CUDA


In [16]:
criterion_GAN = torch.nn.MSELoss()
criterion_cycle = torch.nn.L1Loss() # побуждаем генератор сохранить как можно больше исходных деталей
criterion_identity = torch.nn.L1Loss() # помогает генератору не искажать данные, если они уже принадлежат нужному домену

In [17]:
input_shape = (hp.channels, hp.img_size, hp.img_size)

# инициализируем generator и discriminator
Gen_AB = GeneratorResNet(input_shape, hp.num_residual_blocks)
Gen_BA = GeneratorResNet(input_shape, hp.num_residual_blocks)
Disc_A = Discriminator(input_shape)
Disc_B = Discriminator(input_shape)

if cuda:
    Gen_AB = nn.DataParallel(Gen_AB)
    Gen_AB = Gen_AB.cuda()
    Gen_BA = nn.DataParallel(Gen_BA)
    Gen_BA = Gen_BA.cuda()
    Disc_A = nn.DataParallel(Disc_A)
    Disc_A = Disc_A.cuda()
    Disc_B = nn.DataParallel(Disc_B)
    Disc_B = Disc_B.cuda()
    criterion_GAN.cuda()
    criterion_cycle.cuda()
    criterion_identity.cuda()

fake_A_buffer = ReplayBuffer()
fake_B_buffer = ReplayBuffer()

In [18]:
optimizer_G = torch.optim.Adam(itertools.chain(Gen_AB.parameters(),
                                               Gen_BA.parameters()),
                               lr=hp.lr,
                               betas=(hp.b1, hp.b2))

optimizer_Disc_A = torch.optim.Adam(Disc_A.parameters(), lr=hp.lr, betas=(hp.b1, hp.b2))
optimizer_Disc_B = torch.optim.Adam(Disc_B.parameters(), lr=hp.lr, betas=(hp.b1, hp.b2))

# schedulers
lr_scheduler_G = torch.optim.lr_scheduler.LambdaLR(
    optimizer_G, lr_lambda=LambdaLR(hp.n_epochs, 0, hp.decay_start_epoch).step)
lr_scheduler_Disc_A = torch.optim.lr_scheduler.LambdaLR(
    optimizer_Disc_A, lr_lambda=LambdaLR(hp.n_epochs, 0, hp.decay_start_epoch).step)
lr_scheduler_Disc_B = torch.optim.lr_scheduler.LambdaLR(
    optimizer_Disc_B, lr_lambda=LambdaLR(hp.n_epochs, 0, hp.decay_start_epoch).step)

In [19]:
checkpoint = torch.load("checkpoint\CycleGan_VanGogh_Checkpoint.pt") if os.path.exists("checkpoint\CycleGan_VanGogh_Checkpoint.pt") else None

In [20]:
def initialize_conv_weights_normal(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
        if hasattr(m, "bias") and m.bias is not None:
            torch.nn.init.constant_(m.bias.data, 0.0)
    elif classname.find("BatchNorm2d") != -1:
        torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
        torch.nn.init.constant_(m.bias.data, 0.0)

In [21]:
if checkpoint is not None:
    print("Loading checkpoint...")
    Gen_AB.load_state_dict(checkpoint['Gen_AB'])
    Gen_BA.load_state_dict(checkpoint['Gen_BA'])
    Disc_A.load_state_dict(checkpoint['Disc_A'])
    Disc_B.load_state_dict(checkpoint['Disc_A'])
    optimizer_G.load_state_dict(checkpoint['optimizer_G'])
    optimizer_Disc_A.load_state_dict(checkpoint['optimizer_Disc_A'])
    optimizer_Disc_B.load_state_dict(checkpoint['optimizer_Disc_B'])
    print("Successfully loaded checkpoint.")
else:
    # инициализируем веса
    Gen_AB.apply(initialize_conv_weights_normal)
    Gen_BA.apply(initialize_conv_weights_normal)
    Disc_A.apply(initialize_conv_weights_normal)
    Disc_B.apply(initialize_conv_weights_normal)

## Train loop

In [22]:
def save_img_samples(epoch):
    # сохраняем изображения каждую эпоху
    Tensor = torch.cuda.FloatTensor if cuda else torch.Tensor
    imgs = next(iter(val_dataloader))
    Gen_AB.eval()
    Gen_BA.eval()
    real_A = Variable(imgs["A"].type(Tensor))
    fake_B = Gen_AB(real_A)
    real_B = Variable(imgs["B"].type(Tensor))
    fake_A = Gen_BA(real_B)

    # строим сетку из изображений
    real_A = make_grid(real_A, nrow=16, normalize=True)
    real_B = make_grid(real_B, nrow=16, normalize=True)
    fake_A = make_grid(fake_A, nrow=16, normalize=True)
    fake_B = make_grid(fake_B, nrow=16, normalize=True)

    image_grid = torch.cat((real_A, fake_B, real_B, fake_A), 1)
    path =  "outputs-{}.png".format(epoch)

    save_image(image_grid, path, normalize=False)
    return path

In [23]:
def train(name,Gen_BA,Gen_AB,Disc_A,Disc_B,train_dataloader,n_epochs,criterion_identity,
          criterion_cycle,lambda_cyc,criterion_GAN,optimizer_G,fake_A_buffer,fake_B_buffer,
          optimizer_Disc_A,optimizer_Disc_B,Tensor,lambda_id):

    disc_loss = 0
    gen_loss = 0
    id_loss = 0
    disc_loss_total,gen_loss_total, id_loss_total = [],[],[]

    for epoch in range(n_epochs):
        for batch in tqdm(train_dataloader):


            real_A = Variable(batch["A"].type(Tensor))
            real_B = Variable(batch["B"].type(Tensor))

            # ground truths
            valid = Variable(
                Tensor(np.ones((real_A.size(0), *Disc_A.module.output_shape))),
                requires_grad=False,
            )
            fake = Variable(
                Tensor(np.zeros((real_A.size(0), *Disc_A.module.output_shape))),
                requires_grad=False,
            )

            #########################
            #  Train Generators
            #########################

            Gen_AB.module.train() # Gen_AB(real_A) возьмет real_A и создаст fake_B
            Gen_BA.module.train() # Gen_BA(real_B) возьмет real_B и создаст fake_A

            optimizer_G.zero_grad()

            # Identity loss
            loss_id_A = criterion_identity(Gen_BA(real_A), real_A)

            loss_id_B = criterion_identity(Gen_AB(real_B), real_B)

            loss_identity = (loss_id_A + loss_id_B) / 2
            id_loss += loss_identity.item()

            # лосс для GAN_AB
            fake_B = Gen_AB(real_A)
            loss_GAN_AB = criterion_GAN(Disc_B(fake_B), valid)

            # лосс для GAN_BA
            fake_A = Gen_BA(real_B)
            loss_GAN_BA = criterion_GAN(Disc_A(fake_A), valid)

            loss_GAN = (loss_GAN_AB + loss_GAN_BA) / 2

            # Cycle Consistency losses
            reconstructed_A = Gen_BA(fake_B)

            loss_cycle_A = criterion_cycle(reconstructed_A, real_A)

            reconstructed_B = Gen_AB(fake_A)

            loss_cycle_B = criterion_cycle(reconstructed_B, real_B)

            loss_cycle = (loss_cycle_A + loss_cycle_B) / 2

            loss_G = loss_GAN + lambda_cyc * loss_cycle + lambda_id * loss_identity
            gen_loss+=loss_G.item()

            loss_G.backward()

            optimizer_G.step()

            #########################
            #  Train Discriminator A
            #########################

            optimizer_Disc_A.zero_grad()

            # Real loss
            loss_real = criterion_GAN(Disc_A(real_A), valid)
            # Fake loss по батчу сохраненных ранее
            fake_A_ = fake_A_buffer.push_and_pop(fake_A)
            loss_fake = criterion_GAN(Disc_A(fake_A_.detach()), fake)

            loss_Disc_A = (loss_real + loss_fake) / 2
            # disc_loss_A += loss_Disc_A.item()
            loss_Disc_A.backward()

            optimizer_Disc_A.step()

            #########################
            #  Train Discriminator B
            #########################

            optimizer_Disc_B.zero_grad()

            # Real loss
            loss_real = criterion_GAN(Disc_B(real_B), valid)
            # Fake loss по батчу сохраненных ранее
            fake_B_ = fake_B_buffer.push_and_pop(fake_B)
            loss_fake = criterion_GAN(Disc_B(fake_B_.detach()), fake)
            loss_Disc_B = (loss_real + loss_fake) / 2

            loss_Disc_B.backward()

            optimizer_Disc_B.step()

            loss_D = (loss_Disc_A + loss_Disc_B) / 2
            disc_loss+= loss_D.item()

        gen_loss = gen_loss/len(train_dataloader)
        disc_loss = disc_loss/len(train_dataloader)
        id_loss = id_loss/len(train_dataloader)
        gen_loss_total.append(gen_loss)
        disc_loss_total.append(disc_loss)
        id_loss_total.append(id_loss)
        plot_output(save_img_samples(epoch), 30, 40)

        path = "./checkpoint"
        if os.path.exists(path) is not True:
            os.mkdir(path)
        path = path + "/"+name+".pt"
        torch.save({
                    'epoch': epoch,
                    'Gen_AB': Gen_AB.state_dict(),
                    'Gen_BA': Gen_BA.state_dict(),
                    'Disc_A': Disc_A.state_dict(),
                    'Disc_B': Disc_B.state_dict(),
                    'optimizer_G': optimizer_G.state_dict(),
                    'optimizer_Disc_A': optimizer_Disc_A.state_dict(),
                    'optimizer_Disc_B': optimizer_Disc_B.state_dict()}, path)
        print(
                "\r[Epoch %d/%d] [Disc loss: %f] [Gen loss: %f] [Identity loss: %f]"
                % (
                    epoch+1,
                    n_epochs,
                    disc_loss,
                    gen_loss,
                    id_loss,
                )
            )
    losses = {"gen_loss": gen_loss_total,"disc_loss": disc_loss_total,"id_loss": id_loss_total}
    #with open('outputs/losses.pickle', 'wb') as handle:
    #    pickle.dump(losses, handle, protocol=pickle.HIGHEST_PROTOCOL)

    plt.figure(figsize=(10, 5))

    plt.subplot(1, 3, 1)
    plt.plot(gen_loss_total, label="Generator Loss")
    plt.title("Generator Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()

    plt.subplot(1, 3, 2)
    plt.plot(disc_loss_total, label="Discriminator Loss")
    plt.title("Discriminator Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()

    plt.subplot(1, 3, 3)
    plt.plot(id_loss_total, label="ID Loss")
    plt.title("ID Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
train(name = hp.name, Gen_BA = Gen_BA,Gen_AB = Gen_AB,Disc_A = Disc_A,Disc_B = Disc_B,train_dataloader = train_dataloader,
          n_epochs = hp.n_epochs,criterion_identity = criterion_identity,criterion_cycle = criterion_cycle, lambda_cyc = hp.lambda_cyc,
          criterion_GAN = criterion_GAN,optimizer_G = optimizer_G,fake_A_buffer = fake_A_buffer,fake_B_buffer = fake_B_buffer,
          optimizer_Disc_A = optimizer_Disc_A,optimizer_Disc_B = optimizer_Disc_B,Tensor = Tensor, lambda_id = hp.lambda_id)

## Замерим качество модели, обученной на 50 эпохах с помощью метрики FID (Fréchet Inception Distance)

In [24]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
model = GeneratorResNet((3,256,256), 9)
model.load_state_dict(torch.load('cycle.pth', map_location=device))
model.to(device)
model.eval()

In [26]:
def generate_images(model, dataloader, output_path, device):
    os.makedirs(output_path, exist_ok=True)
    with torch.no_grad():
        for i, batch in enumerate(tqdm(dataloader)):
            real_A = batch['B'].to(device)  # Входные изображения
            generated_B = model(real_A)  # Генерация изображений

            generated_image = (generated_B * 0.5 + 0.5).cpu().clamp(0, 1)  # Масштабируем к [0, 1]
            generated_image = transforms.ToPILImage()(generated_image.squeeze(0))
            generated_image.save(f"{output_path}/generated_{i}.png")

def save_vangogh_images(dataloader, output_path):
    os.makedirs(output_path, exist_ok=True)
    for i, batch in enumerate(tqdm(dataloader)):
        real_A = batch['A']
        real_A = (real_A * 0.5 + 0.5).cpu().clamp(0, 1)
        real_image = transforms.ToPILImage()(real_A.squeeze(0))
        real_image.save(f"{output_path}/real_{i}.png")

In [None]:
val_loader = DataLoader(
    ImageDataset(root= [hp.val_data_dir_A, hp.val_data_dir_B], transforms_=val_transforms_),
    batch_size=8,
    shuffle=False,
    num_workers=2)

save_vangogh_images(val_loader, 'vangogh_images1')
generate_images(model, val_loader, "generated_images", device)

In [38]:
fid_value = fid_score.calculate_fid_given_paths([hp.data_dir_A, "generated_images"], batch_size=8, device=device, dims=2048)

print(f"FID on vangogh: {fid_value}")

100%|██████████| 50/50 [00:02<00:00, 16.92it/s]
100%|██████████| 94/94 [00:04<00:00, 23.09it/s]


FID on vangogh: 134.88557965121998


In [34]:
fid_value = fid_score.calculate_fid_given_paths([hp.data_dir_B, "generated_images"], batch_size=4, device=device, dims=2048)

print(f"FID on real: {fid_value}")

100%|██████████| 1572/1572 [00:39<00:00, 40.09it/s]
100%|██████████| 188/188 [00:04<00:00, 42.62it/s]


FID on real: 98.51018236805703


### Как видно,метрика FID достаточно велика, что говорит об обучении на недостаточном количестве эпох, поэтому необохдимо будет в будущем продолжать дообучать модель до хотя бы 150-200 эпох

### расстояние до картин Ван Гога большое, сокроее всего это из-за малого количества картин, а вот расстояние до реальных фото заметно меньше, что говорит о том, что CycleGAN все же выдаетдостаточно реалистичные картины, в отличие от Алгоритма Гатиса, который имеет большее значение FID на рельных фото, то есть больше вносит "художественный стиль"