# Imports

In [1]:
import os
import torch
import torch.nn as nn
import numpy as np
import cv2
import matplotlib.pyplot as plt
import itertools
import torch_fidelity
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import ImageFolder
from PIL import Image
import torchvision.transforms as transforms
import pandas as pd
import random
import torch.nn.functional as F
from piqa import SSIM
import sys
from tqdm import tqdm

# SEED = 42
SEED = random.randrange(2**32 - 1)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
print("Random Seed:", SEED)


Random Seed: 1836286533


# Work

In [2]:
from models.generators import UNetGenerator
from models.discriminators import PatchDiscriminator, RandomKaggleDiscriminator

"""
Step 4. Initalize G and D¶
"""
G_AB = UNetGenerator()
D_B = PatchDiscriminator()
G_BA = UNetGenerator()
D_A = PatchDiscriminator()

## Total parameters in CycleGAN should be less than 60MB
total_params = sum(p.numel() for p in G_AB.parameters()) + \
               sum(p.numel() for p in G_BA.parameters()) + \
               sum(p.numel() for p in D_A.parameters()) + \
               sum(p.numel() for p in D_B.parameters())


total_params_million = total_params / (1024 * 1024)
print(f'Total parameters in CycleGAN model: {total_params_million:.2f} million')

Total parameters in CycleGAN model: 23.10 million


In [3]:
class HingeAdversarialLoss(nn.Module):
    def forward(self, pred, is_real):
        if is_real:
            return F.relu(1 - pred).mean()
        else:
            return F.relu(1 + pred).mean()


class EdgeConsistencyLoss(nn.Module):
    def __init__(self, data_range=1.0):
        super().__init__()
        self.ssim = SSIM(n_channels=1, value_range=data_range)
        
        # Register 4D Sobel kernels (out_channels, in_channels, H, W)
        sobel_x = torch.tensor([[[[-1., 0., 1.], 
                                  [-2., 0., 2.], 
                                  [-1., 0., 1.]]]])
        sobel_y = torch.tensor([[[[-1., -2., -1.], 
                                  [0., 0., 0.], 
                                  [1., 2., 1.]]]])
        
        self.register_buffer('sobel_x', sobel_x)
        self.register_buffer('sobel_y', sobel_y)

    def rgb_to_grayscale(self, x):
        return 0.299 * x[:, 0:1] + 0.587 * x[:, 1:2] + 0.114 * x[:, 2:3]

    def get_edge_map(self, x):
        x_gray = self.rgb_to_grayscale(x)
        
        # Compute gradients with proper kernel dimensions
        grad_x = F.conv2d(x_gray, self.sobel_x, padding=1)
        grad_y = F.conv2d(x_gray, self.sobel_y, padding=1)
        
        edge_mag = torch.sqrt(grad_x**2 + grad_y**2)
        return (edge_mag - edge_mag.min()) / (edge_mag.max() - edge_mag.min() + 1e-8)

    def forward(self, x, y):
        edge_x = self.get_edge_map(x)
        edge_y = self.get_edge_map(y)
        return 1.0 - self.ssim(edge_x, edge_y)

class IdentityPreservationLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse_loss = nn.MSELoss()
        self.ssim = EdgeConsistencyLoss(data_range=1.0)
        
    def forward(self, input, target):
        ssim_loss = self.ssim(input, target)
        gram_input = self.gram_matrix(input)
        gram_target = self.gram_matrix(target)
        style_loss = self.mse_loss(gram_input, gram_target)
        return 0.7 * ssim_loss + 0.3 * style_loss
    
    def gram_matrix(self, x):
        b, c, h, w = x.size()
        features = x.view(b, c, h * w)
        gram = torch.bmm(features, features.transpose(1, 2))
        return gram / (c * h * w)

class SobelOperator(nn.Module):
    def __init__(self):
        super().__init__()
        kernel_x = torch.tensor([[[[-1., 0., 1.],
                                   [-2., 0., 2.],
                                   [-1., 0., 1.]]]])
        kernel_y = torch.tensor([[[[-1., -2., -1.],
                                   [0., 0., 0.],
                                   [1., 2., 1.]]]])
        
        self.register_buffer('kernel_x', kernel_x)
        self.register_buffer('kernel_y', kernel_y)

    def forward(self, x):
        b, c, h, w = x.size()
        
        # Expand kernels for multi-channel input
        kernel_x = self.kernel_x.repeat(c, 1, 1, 1)
        kernel_y = self.kernel_y.repeat(c, 1, 1, 1)
        
        grad_x = F.conv2d(x, kernel_x, padding=1, groups=c)
        grad_y = F.conv2d(x, kernel_y, padding=1, groups=c)
        return torch.sqrt(grad_x**2 + grad_y**2 + 1e-8)

class GradientPreservationLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.sobel = SobelOperator()
        
    def forward(self, generated, target):
        grad_gen = self.sobel(generated)
        grad_target = self.sobel(target)
        return F.l1_loss(grad_gen, grad_target)


In [4]:
"""
Step 3. Define Loss
"""
# criterion_GAN = HingeAdversarialLoss()
criterion_GAN = nn.BCELoss()
criterion_cycle = nn.L1Loss()
criterion_identity = IdentityPreservationLoss()
criterion_gradient = GradientPreservationLoss()


def multi_scale_adversarial_loss(real, fake, discriminators):
    loss = 0
    for scale, disc in enumerate(discriminators):
        weight = 1 / (2 ** scale)
        real_feats = disc(real)
        fake_feats = disc(fake.detach())
        loss += weight * sum([criterion_GAN(f, True).mean() for f in real_feats])
        loss += weight * sum([criterion_GAN(f, False).mean() for f in fake_feats])
    return loss

In [5]:
lambda_adv = 1.0
lambda_cycle = 10.0
lambda_edge = 5.0
lambda_identity = 2.0
lambda_fm = 1.0
lambda_grad = 3.0


In [6]:
if torch.cuda.is_available():
    print(f"Current GPU: {torch.cuda.current_device()}")
    print(f"Current GPU name: {torch.cuda.get_device_name(torch.cuda.current_device())}")
    device = torch.device("cuda")
    G_AB = G_AB.cuda()
    D_B = D_B.cuda()
    G_BA = G_BA.cuda()
    D_A = D_A.cuda()
    criterion_GAN = criterion_GAN.cuda()
    criterion_cycle = criterion_cycle.cuda()
    criterion_identity = criterion_identity.cuda()
    Tensor = torch.cuda.FloatTensor
else:
    print("PyTorch does not have access to GPU, falling back to CPU")
    Tensor = torch.Tensor
    device = torch.device("cpu")



Current GPU: 0
Current GPU name: NVIDIA GeForce RTX 4070 Ti


In [7]:
"""
Step 5. Configure Optimizers
"""

def get_lr_scheduler(optimizer, n_epochs=100, n_epochs_decay=100, lr_policy='linear'):
    if lr_policy == 'linear':
        def lambda_rule(epoch):
            # Keep constant for first n_epochs, then linearly decay to zero
            lr_l = 1.0 - max(0, epoch - n_epochs) / float(n_epochs_decay + 1)
            return lr_l
        scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda_rule)
    else:
        raise NotImplementedError(f'learning rate policy {lr_policy} not implemented')
    return scheduler

# Optimizer setup
optimizer_G = torch.optim.Adam(itertools.chain(G_AB.parameters(), G_BA.parameters()),
                               lr=0.0002, betas=(0.5, 0.999))
optimizer_D = torch.optim.Adam(itertools.chain(D_A.parameters(), D_B.parameters()),
                               lr=0.0002, betas=(0.5, 0.999))

# Learning rate schedulers
scheduler_G = get_lr_scheduler(optimizer_G, n_epochs=100, n_epochs_decay=100)
scheduler_D = get_lr_scheduler(optimizer_D, n_epochs=100, n_epochs_decay=100)


In [8]:
from typing import Callable, Literal


def run_one_epoch(
    G_AB: nn.Module,
    G_BA: nn.Module,
    D_A: nn.Module,
    D_B: nn.Module,
    state: Literal["train", "eval"],
    dataloader: DataLoader,
    criterion_identity: Callable,
    criterion_GAN: Callable,
    criterion_cycle: Callable,
    optimizer_G: torch.optim.Optimizer,
    optimizer_D: torch.optim.Optimizer,
    # optimizer_D_A: torch.optim.Optimizer,
    # optimizer_D_B: torch.optim.Optimizer,
    weights: tuple[float, float, float],
) -> dict[str, float]:

    # Set training/evaluation mode only when necessary
    if state == "train":
        G_AB.train(), G_BA.train()
        D_A.train(), D_B.train()
    else:
        G_AB.eval(), G_BA.eval()
        D_A.eval(), D_B.eval()

    weight_identity, weight_GAN, weight_cycle = weights

    running_losses = {
        "G": 0.0, "D_A": 0.0, "D_B": 0.0,
        "identity": 0.0, "GAN": 0.0, "cycle": 0.0
    }

    # No gradient tracking during evaluation
    torch.set_grad_enabled(state == "train")
    with tqdm(dataloader, unit="batch", desc="Training" if state == "train" else "Validation") as tepoch:
        for real_A, real_B in tepoch:
            optimizer_G.zero_grad()
            real_A, real_B = real_A.to(device, dtype=torch.float32, non_blocking=True), real_B.to(device, dtype=torch.float32, non_blocking=True)

            # Prepare ground truth labels
            valid, fake = torch.ones(real_A.size(0), 1, device=device, requires_grad=False), torch.zeros(real_A.size(0), 1, device=device, requires_grad=False)

            # Train Generators
            fake_B = G_AB(real_A)
            fake_A = G_BA(real_B)

            # Identity loss
            loss_id_A = criterion_identity(fake_B, real_A)
            loss_id_B = criterion_identity(fake_A, real_B)
            loss_identity = (loss_id_A + loss_id_B) / 2

            # GAN loss
            loss_GAN_AB = criterion_GAN(D_B(fake_B), valid)
            loss_GAN_BA = criterion_GAN(D_A(fake_A), valid)
            loss_GAN = (loss_GAN_AB + loss_GAN_BA) / 2

            # Cycle loss
            recov_A = G_BA(fake_B)
            recov_B = G_AB(fake_A)
            loss_cycle_A = criterion_cycle(recov_A, real_A)
            loss_cycle_B = criterion_cycle(recov_B, real_B)
            loss_cycle = (loss_cycle_A + loss_cycle_B) / 2

            # Total generator loss
            loss_G = weight_identity * loss_identity + weight_GAN * loss_GAN + weight_cycle * loss_cycle
            if state == "train":
                loss_G.backward()
                optimizer_G.step()

                optimizer_D.zero_grad()
                # Train Discriminators
                # optimizer_D_A.zero_grad()
                # optimizer_D_B.zero_grad()
                
                loss_real_A = criterion_GAN(D_A(real_A), valid)
                loss_fake_A = criterion_GAN(D_A(fake_A.detach()), fake)
                loss_D_A = (loss_real_A + loss_fake_A) / 2
                
                loss_real_B = criterion_GAN(D_B(real_B), valid)
                loss_fake_B = criterion_GAN(D_B(fake_B.detach()), fake)
                loss_D_B = (loss_real_B + loss_fake_B) / 2
                
                loss_D = (loss_D_A + loss_D_B)
                loss_D.backward()
                optimizer_D.step()
                # optimizer_D_A.step()
                # optimizer_D_B.step()

            # Accumulate losses
            running_losses["G"] += loss_G.item()
            running_losses["D_A"] += loss_D_A.item()
            running_losses["D_B"] += loss_D_B.item()
            running_losses["identity"] += loss_identity.item()
            running_losses["GAN"] += loss_GAN.item()
            running_losses["cycle"] += loss_cycle.item()

    print(f'[G loss: {loss_G.item()} | identity: {loss_identity.item()} GAN: {loss_GAN.item()} cycle: {loss_cycle.item()}]')
    print(f'[D loss: {loss_D.item()} | D_A: {loss_D_A.item()} D_B: {loss_D_B.item()}]')
    # Average the losses over the dataset
    return {k: v / len(dataloader) for k, v in running_losses.items()}

In [None]:

"""
Step 6. DataLoader
"""
from CustomImageDataset import CustomImageDataset as ImageDataset


# data_dir = '/kaggle/input/group-project/image_image_translation'
data_dir = ''

image_size = (128, 128)
transforms_ = transforms.Compose([
    transforms.Resize(image_size),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

loader_params = {
    "batch_size": 20,
    "num_workers": 2,
    "pin_memory": False,
    "shuffle": True,
    # "prefetch_factor": 2,
    "persistent_workers": True
}


trainloader = DataLoader(
    ImageDataset(data_dir, mode='train', transform=transforms_, seed=SEED),
    **loader_params
)

loader_params["shuffle"] = False
validloader = DataLoader(
    ImageDataset(data_dir, mode='valid', transform=transforms_, seed=SEED),
    **loader_params
)

"""
Step 7. Training
"""
n_epochs = 50
for epoch in range(n_epochs):
    print(f'[Epoch {epoch+1}/{n_epochs}]')
    run_one_epoch(
        G_AB, G_BA, D_A, D_B, "train", trainloader,
        criterion_identity, criterion_GAN, criterion_cycle,
        optimizer_G, optimizer_D,
        (lambda_identity, lambda_adv, lambda_cycle)
    )

    # validation
    # if (epoch+1) % 10 == 0:
    #     valid_real_A, valid_real_B = next(iter(testloader))
    #     sample_images(valid_real_A, valid_real_B)

    #     loss_D = (loss_D_A + loss_D_B) / 2
    #     print(f'[Epoch {epoch+1}/{n_epochs}]')
    #     print(f'[G loss: {loss_G.item()} | identity: {loss_identity.item()} GAN: {loss_GAN.item()} cycle: {loss_cycle.item()}]')
    #     print(f'[D loss: {loss_D.item()} | D_A: {loss_D_A.item()} D_B: {loss_D_B.item()}]')


[Epoch 1/50]


Training:   0%|          | 0/160 [00:00<?, ?batch/s]

In [None]:
from typing import Type
def evaluate(
    model: torch.nn.Module,
    input_dir: str,
    output_dir: str,
    ref_dir: str,
    batch_size: int,
    image_size: int,
    tensor_type: Type[torch.Tensor]
) -> float:
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    generate_transforms = transforms.Compose([
        transforms.Resize(image_size),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    to_image = transforms.ToPILImage()

    files = [os.path.join(input_dir, name) for name in os.listdir(input_dir)]

    model.eval()
    for i in range(0, len(files), batch_size):
        # Read and transform images
        imgs = [generate_transforms(Image.open(files[j])) for j in range(i, min(len(files), i + batch_size))]
        imgs = torch.stack(imgs, 0).type(tensor_type)

        # Generate images
        fake_imgs = model(imgs).detach().cpu()

        # Save generated images
        for j in range(fake_imgs.size(0)):
            img = fake_imgs[j].squeeze().permute(1, 2, 0).numpy()
            img = (img - np.min(img)) * 255 / (np.max(img) - np.min(img))
            img = to_image(img.astype(np.uint8))
            _, name = os.path.split(files[i + j])
            img.save(os.path.join(output_dir, name))

    # Compute metrics
    metrics: dict[str, float] = torch_fidelity.calculate_metrics(
        input1=output_dir,
        input2=ref_dir,
        cuda=True,
        fid=True,
        isc=True
    )

    fid_score: float = metrics["frechet_inception_distance"]
    is_score: float = metrics["inception_score_mean"]

    if is_score > 0:
        gms: float = np.sqrt(fid_score / is_score)
        print("Geometric Mean Score:", gms)
        return gms
    else:
        print("IS is 0, GMS cannot be computed!")
        return 0


In [None]:
# Parameters
image_size = 128
batch_size = loader_params["batch_size"]

# data_dir = '/kaggle/input/group-project/image_image_translation'
data_dir = '.'

# Raw to Cartoon
s_value_1 = evaluate(
    model=G_BA,
    input_dir=os.path.join(data_dir, 'VAE_generation/test'),
    output_dir='../Cartoon_images',
    ref_dir=f"{data_dir}/VAE_generation_Cartoon/test",
    batch_size=batch_size,
    image_size=image_size,
    tensor_type=Tensor
)

# Cartoon to Raw
s_value_2 = evaluate(
    model=G_BA,
    input_dir=os.path.join(data_dir, 'VAE_generation_Cartoon/test'),
    output_dir='../Raw_images',
    ref_dir=f"{data_dir}/VAE_generation/test",
    batch_size=batch_size,
    image_size=image_size,
    tensor_type=Tensor
)


# Output Results

In [None]:
s_value = np.round((s_value_1+s_value_2)/2, 5)
df = pd.DataFrame({'id': [1], 'label': [s_value]})

csv_path = "Username.csv"
df.to_csv(csv_path, index=False)

print(f"CSV saved to {csv_path}")