# Libraries

In [3]:
import os
import numpy as np
import scipy
from scipy.linalg import sqrtm
import torchvision.transforms as transforms
from torchvision.utils import save_image
from torch.utils.data import DataLoader
from torchvision import datasets
from torch.autograd import Variable
import torch.autograd as autograd
from torchvision.utils import make_grid
import torch.nn as nn
import torch.nn.functional as F
import torch
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
from IPython.display import clear_output
import itertools
import glob
import random
from torch.utils.data import Dataset
from PIL import Image
from skimage import io
import sys
import time
import datetime
import tensorflow as tf

In [None]:
!nvidia-smi

# Neural Network Architecture

In [None]:
# RESIDUAL BLOCK

class ResidualBlock(nn.Module):
    def __init__(self, in_channel):
        super(ResidualBlock, self).__init__()

        self.block = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(in_channel, in_channel, kernel_size=3, stride=1, padding=0, bias=False),
            nn.InstanceNorm2d(in_channel),
            nn.ReLU(inplace=True),
            nn.ReflectionPad2d(1),
            nn.Conv2d(in_channel, in_channel, kernel_size=3, stride=1, padding=0, bias=False),
            nn.InstanceNorm2d(in_channel),
        )
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        return self.relu(x + self.block(x))


In [None]:
# GENERATOR BLOCK

class GeneratorResNet(nn.Module):
    def __init__(self, input_shape, num_residual_blocks):
        super().__init__()

        channels = input_shape[0]
        out_channels = 64

        model = [
            nn.ReflectionPad2d(channels),
            nn.Conv2d(channels, out_channels, kernel_size=7, padding=0),
            nn.InstanceNorm2d(out_channels),
            nn.ReLU(inplace=True),
        ]
        in_channels = out_channels

        for _ in range(2):
            out_channels *= 2
            model += [
                nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=2, padding=1),
                nn.InstanceNorm2d(out_channels),
                nn.ReLU(inplace=True),
            ]
            in_channels = out_channels

        for _ in range(num_residual_blocks):
            model += [ResidualBlock(out_channels)]

        # upsampling
        for _ in range(2):
            out_channels //= 2
            model += [
                nn.Upsample(scale_factor=2, mode='nearest'),
                nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=2, padding=1),
                nn.InstanceNorm2d(out_channels),
                nn.ReLU(inplace=True),
            ]
            in_channels = out_channels

        model += [
            nn.ReflectionPad2d(channels),
            nn.Conv2d(out_channels, channels, kernel_size=7, padding=0),
            nn.Tanh(),
        ]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)


In [None]:
# DISCRIMINATOR BLOCK

class Discriminator(nn.Module):
    def __init__(self, input_shape):
        super(Discriminator, self).__init__()

        channels, height, width = input_shape
        self.output_shape = (1, height // 2 ** 4, width // 2 ** 4)

        def discriminator_block(in_channels, out_channels, normalize=True):
            layers = []
            layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=4, stride=2, padding=1))
            if normalize:
                layers.append(nn.InstanceNorm2d(out_channels))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return nn.Sequential(*layers)

        self.model = nn.Sequential(
            discriminator_block(channels, out_channels=64, normalize=False),
            discriminator_block(64, out_channels=128),
            discriminator_block(128, out_channels=256),
            discriminator_block(256, out_channels=512),
            nn.ZeroPad2d((1, 0, 1, 0)),
            nn.Conv2d(in_channels=512, out_channels=1, kernel_size=4, padding=1)
        )
       
    def forward(self, img):
        return self.model(img)


# DataPreprocessing and Utilities

In [None]:
#dataloader and rgb confirmation

def convert_to_rgb(image):
    rgb_image = Image.new("RGB", image.size)
    rgb_image.paste(image)
    return rgb_image


class ImageDataset(Dataset):
    def __init__(self, root, transforms_=None, unaligned=False, mode="train"):
        self.transform = transforms.Compose(transforms_) if transforms_ else None
        self.unaligned = unaligned        
        self.files_A = sorted(glob.glob(os.path.join(root, f"{mode}A", "*.*")))
        self.files_B = sorted(glob.glob(os.path.join(root, f"{mode}B", "*.*")))

    def __getitem__(self, index):
        image_A = Image.open(self.files_A[index % len(self.files_A)])
        if self.unaligned:
            image_B = Image.open(self.files_B[random.randint(0, len(self.files_B) - 1)])
        else:
            image_B = Image.open(self.files_B[index % len(self.files_B)])
        if image_A.mode != "RGB":
            image_A = convert_to_rgb(image_A)
        if image_B.mode != "RGB":
            image_B = convert_to_rgb(image_B)
        item_A = self.transform(image_A) if self.transform else image_A
        item_B = self.transform(image_B) if self.transform else image_B
        return {"A": item_A, "B": item_B}

    def __len__(self):
        return max(len(self.files_A), len(self.files_B))


In [None]:
#transposing dimensions between plt and torch

def show_img(img, size=10):
    img = img / 2 + 0.5     
    npimg = img.numpy()
    plt.figure(figsize=(size, size))
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

def to_img(x):    
    x = x.view(x.size(0)*2, hp.channels, hp.img_size, hp.img_size)
    return x

def plot_output(path, x, y):
    img = plt.imread(path)
    plt.figure(figsize=(x,y))
    plt.imshow(img)  
    plt.show()

In [None]:
#Image Buffer of 50 images

class ReplayBuffer:
    def __init__(self, max_size=50):
        if max_size <= 0:
            raise ValueError("max_size should be greater than 0.")
        self.max_size = max_size
        self.buffer = []

    def push_and_pop(self, data):
        to_return = []
        for element in data:
            element = torch.unsqueeze(element, 0)
            if len(self.buffer) < self.max_size:
                self.buffer.append(element)
                to_return.append(element)
            else:
                # Randomly replace an existing element with the new element with a 0.5 probability.
                if random.uniform(0, 1) > 0.5:
                    idx = random.randint(0, self.max_size - 1)
                    to_return.append(self.buffer[idx].clone())
                    self.buffer[idx] = element
                else:
                    to_return.append(element)
        return torch.cat(to_return, dim=0)


In [None]:
# Learning Rate and Decay

class LambdaLR:
    def __init__(self, n_epochs, offset, decay_start_epoch):
        assert (n_epochs - decay_start_epoch) > 0, "Error in class for learning rate"
        self.n_epochs = n_epochs
        self.offset = offset
        self.decay_start_epoch = decay_start_epoch

    def step(self, epoch):
        numerator = max(0, epoch + self.offset - self.decay_start_epoch)
        denominator = self.n_epochs - self.decay_start_epoch
        return 1.0 - numerator / denominator if denominator > 0 else 1.0


In [None]:
# Convolution weights

def initialize_conv_weights_normal(m):
    if isinstance(m, torch.nn.Conv2d):
        torch.nn.init.normal_(m.weight.data, mean=0.0, std=0.02)
        if m.bias is not None:
            torch.nn.init.constant_(m.bias.data, 0.0)
    elif isinstance(m, torch.nn.BatchNorm2d):
        torch.nn.init.normal_(m.weight.data, mean=1.0, std=0.02)
        torch.nn.init.constant_(m.bias.data, 0.0)


# Hyperparamters and Optimization

In [None]:
# HYPERPARAMETERS 
class Hyperparameters:
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)

hp = Hyperparameters(
    epoch=0,
    n_epochs=200,    
    dataset_train_mode="train",
    dataset_test_mode="test", 
    batch_size=4,        
    lr=.0002,
    decay_start_epoch=100,
    b1=.5,
    b2=0.999,
    n_cpu=8,
    img_size=128,
    channels=3,
    n_critic=5,
    sample_interval=100,
    num_residual_blocks=18,
    lambda_cyc=10.0,
    lambda_id=5.0
)

# Root Path for Google Drive
root_path = '/content/drive/MyDrive/CycleGAN/Images/maps'
save_path = '/content/drive/MyDrive/CycleGAN/Images/save'


In [None]:
def save_img_samples(batches_done):
    """Saves a generated sample from the test set"""
    print('batches_done ', batches_done)
    imgs = next(iter(val_dataloader))
    Gen_AB.eval()
    Gen_BA.eval()
    real_A = Variable(imgs["A"].type(Tensor))
    fake_B = Gen_AB(real_A)
    real_B = Variable(imgs["B"].type(Tensor))
    fake_A = Gen_BA(real_B)
    
    for i in range(real_A.size(0)):
        path = save_path + "/real_A_%s.png" % (i + batches_done*real_A.size(0))
        save_image(real_A[i], path, normalize=True)

    for i in range(fake_B.size(0)):
        path = save_path + "/fake_B_%s.png" % (i + batches_done*fake_B.size(0))
        save_image(fake_B[i], path, normalize=True)

    for i in range(real_B.size(0)):
        path = save_path + "/real_B_%s.png" % (i + batches_done*real_B.size(0))
        save_image(real_B[i], path, normalize=True)

    for i in range(fake_A.size(0)):
        path = save_path + "/fake_A_%s.png" % (i + batches_done*fake_A.size(0))
        save_image(fake_A[i], path, normalize=True)
    
    return path

In [None]:
cuda = torch.cuda.is_available()
device = torch.device('cuda' if cuda else 'cpu')
print("CUDA" if cuda else "No CUDA")

# Loss functions
criterion_GAN = torch.nn.MSELoss()
criterion_cycle = torch.nn.L1Loss()
criterion_identity = torch.nn.L1Loss()

input_shape = (hp.channels, hp.img_size, hp.img_size)

# Initialize generator and discriminator
Gen_AB = GeneratorResNet(input_shape, hp.num_residual_blocks)
Gen_BA = GeneratorResNet(input_shape, hp.num_residual_blocks)
Disc_A = Discriminator(input_shape)
Disc_B = Discriminator(input_shape)

if cuda:
    Gen_AB = Gen_AB.cuda()
    Gen_BA = Gen_BA.cuda()
    Disc_A = Disc_A.cuda()
    Disc_B = Disc_B.cuda()
    criterion_GAN.cuda()
    criterion_cycle.cuda()
    criterion_identity.cuda()

Gen_AB.apply(initialize_conv_weights_normal)
Gen_BA.apply(initialize_conv_weights_normal)
Disc_A.apply(initialize_conv_weights_normal)
Disc_B.apply(initialize_conv_weights_normal)
fake_A_buffer = ReplayBuffer()
fake_B_buffer = ReplayBuffer()

CUDA


In [None]:
#OPTIMIZER

optimizer_G = torch.optim.Adam(itertools.chain(Gen_AB.parameters(), Gen_BA.parameters()), lr=hp.lr, betas=(hp.b1, hp.b2))
optimizer_Disc_A = torch.optim.Adam(Disc_A.parameters(), lr=hp.lr, betas=(hp.b1, hp.b2))
optimizer_Disc_B = torch.optim.Adam(Disc_B.parameters(), lr=hp.lr, betas=(hp.b1, hp.b2))

lr_scheduler_G = torch.optim.lr_scheduler.LambdaLR(
    optimizer_G, lr_lambda=LambdaLR(hp.n_epochs, hp.epoch, hp.decay_start_epoch).step
)

lr_scheduler_Disc_A = torch.optim.lr_scheduler.LambdaLR(
    optimizer_Disc_A, lr_lambda=LambdaLR(hp.n_epochs, hp.epoch, hp.decay_start_epoch).step
)

lr_scheduler_Disc_B = torch.optim.lr_scheduler.LambdaLR(
    optimizer_Disc_B, lr_lambda=LambdaLR(hp.n_epochs, hp.epoch, hp.decay_start_epoch).step
)

Tensor = torch.cuda.FloatTensor if cuda else torch.Tensor

# Training Execution

In [None]:
#initialising datasets, through dataloader

transforms_ = [
    transforms.Resize((hp.img_size, hp.img_size), Image.BICUBIC),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]

train_dataloader = DataLoader(
    ImageDataset(root_path, mode=hp.dataset_train_mode, transforms_=transforms_),
    batch_size=hp.batch_size,
    shuffle=True,
    num_workers=1,
)
val_dataloader = DataLoader(
    ImageDataset(root_path, mode=hp.dataset_test_mode, transforms_=transforms_),
    batch_size=16,
    shuffle=True,
    num_workers=1,
)

In [None]:
#Training Function

def train(
    Gen_BA,
    Gen_AB,
    Disc_A,
    Disc_B,
    train_dataloader,
    n_epochs,
    criterion_identity,
    criterion_cycle,
    lambda_cyc,
    criterion_GAN,
    optimizer_G,
    fake_A_buffer,
    fake_B_buffer,
    optimizer_Disc_A,
    optimizer_Disc_B,
    Tensor,
    sample_interval,
    lambda_id,
):
    prev_time = time.time()
    for epoch in range(hp.epoch, n_epochs):
        for i, batch in enumerate(train_dataloader):
            real_A = Variable(batch["A"].type(Tensor))
            real_B = Variable(batch["B"].type(Tensor))
            valid = Variable(
                Tensor(np.ones((real_A.size(0), *Disc_A.output_shape))),
                requires_grad=False,
            )
            fake = Variable(
                Tensor(np.zeros((real_A.size(0), *Disc_A.output_shape))),
                requires_grad=False,
            )

            #Generators
            Gen_AB.train() 
            Gen_BA.train() 
            optimizer_G.zero_grad()
            loss_identity = (criterion_identity(Gen_BA(real_A), real_A) + criterion_identity(Gen_AB(real_B), real_B))
            loss_GAN = (criterion_GAN(Disc_B( Gen_AB(real_A)), valid) + criterion_GAN(Disc_A(Gen_BA(real_B)), valid))
            loss_cycle = (criterion_cycle(Gen_BA(Gen_AB(real_A)), real_A) + criterion_cycle(Gen_AB(Gen_BA(real_B)), real_B))

            loss_G = loss_GAN + lambda_cyc * loss_cycle + lambda_id * loss_identity
            loss_G.backward()
            optimizer_G.step()

            #discriminators
            optimizer_Disc_A.zero_grad()
            loss_real = criterion_GAN(Disc_A(real_A), valid)
            fake_A_ = fake_A_buffer.push_and_pop(Gen_BA(real_B))
            loss_fake = criterion_GAN(Disc_A(fake_A_.detach()), fake)
            loss_Disc_A = (loss_real + loss_fake)
            loss_Disc_A.backward()
            optimizer_Disc_A.step()
            optimizer_Disc_B.zero_grad()

            loss_real = criterion_GAN(Disc_B(real_B), valid)
            fake_B_ = fake_B_buffer.push_and_pop(Gen_AB(real_A))
            loss_fake = criterion_GAN(Disc_B(fake_B_.detach()), fake)
            loss_Disc_B = (loss_real + loss_fake)
            loss_Disc_B.backward()
            optimizer_Disc_B.step()
            loss_D = (loss_Disc_A + loss_Disc_B)
           
            print(epoch,n_epochs,loss_G,loss_GAN,loss_cycle,loss_identity)

In [None]:
#TRAINING

train(
    Gen_BA = Gen_BA,
    Gen_AB = Gen_AB,
    Disc_A = Disc_A,
    Disc_B = Disc_B,
    train_dataloader = train_dataloader,
    n_epochs = hp.n_epochs,
    criterion_identity = criterion_identity,
    criterion_cycle = criterion_cycle,
    lambda_cyc = hp.lambda_cyc,
    criterion_GAN = criterion_GAN,
    optimizer_G = optimizer_G,
    fake_A_buffer = fake_A_buffer,
    fake_B_buffer = fake_B_buffer,
    optimizer_Disc_A = optimizer_Disc_A,
    optimizer_Disc_B = optimizer_Disc_B,
    Tensor = Tensor,
    sample_interval = hp.sample_interval,
    lambda_id = hp.lambda_id,
)


# Evaluation

In [None]:

def calculate_fcn_scores(real_images, fake_images, num_classes=3, batch_size=64):
    
    # Load VGG16 model
    vgg_model = tf.keras.applications.VGG16(include_top=False, weights='imagenet', input_shape=(128, 128, 3))
    
    # Preprocess real images and fake images for VGG16
    real_images = tf.keras.applications.vgg16.preprocess_input(real_images)
    fake_images = tf.keras.applications.vgg16.preprocess_input(fake_images)
    
    # Calculate features of real images and fake images
    real_features = vgg_model.predict(real_images, batch_size=batch_size)
    fake_features = vgg_model.predict(fake_images, batch_size=batch_size)
    
    # Calculate mean and covariance of real features
    real_mean = np.mean(real_features, axis=0)
    real_covariance = np.cov(np.transpose(real_features), bias=True)
    
    # Calculate mean and covariance of fake features
    fake_mean = np.mean(fake_features, axis=0)
    fake_covariance = np.cov(np.transpose(fake_features), bias=True)
    
    # Calculate squared Frobenius norm between real and fake features
    squared_norm = np.linalg.norm(real_mean - fake_mean) ** 2
    squared_trace = np.trace(real_covariance + fake_covariance - 2 * sqrtm(np.dot(real_covariance, fake_covariance)))
    
    # Calculate per-pixel accuracy
    per_pixel_acc = np.sum(np.argmax(real_images, axis=-1) == np.argmax(fake_images, axis=-1)) / np.prod(real_images.shape[:-1])
    
    # Calculate per-class accuracy
    per_class_acc = []
    for c in range(num_classes):
        real_class_mask = np.argmax(real_images, axis=-1) == c
        fake_class_mask = np.argmax(fake_images, axis=-1) == c
        class_acc = np.sum(real_class_mask == fake_class_mask) / np.sum(real_class_mask)
        per_class_acc.append(class_acc)
    
    # Calculate class IOU losses
    class_iou_losses = []
    for c in range(num_classes):
        real_class_mask = np.argmax(real_images, axis=-1) == c
        fake_class_mask = np.argmax(fake_images, axis=-1) == c
        intersection = np.sum(np.logical_and(real_class_mask, fake_class_mask))
        union = np.sum(np.logical_or(real_class_mask, fake_class_mask))
        iou_loss = 1 - intersection / union
        class_iou_losses.append(iou_loss)
    
    return per_pixel_acc, per_class_acc, class_iou_losses

# Example usage
real_images = io.imread("/content/real_B_984000.png")  
fake_images = io.imread("/content/fake_A_984000.png")
per_pixel_acc, per_class_acc, class_iou_losses = calculate_fcn_scores(real_images, fake_images)
print("FCN Score:", per_pixel_acc, per_class_acc, class_iou_losses)
