## Import the Lib

In [None]:
import os
import sys
import cv2
import math
import yaml
import torch
import joblib
import zipfile
import argparse
import warnings
import unittest
import pandas as pd
from PIL import Image
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torchview import draw_graph
from torchvision import transforms
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

## Utility functions

In [2]:
def dump(value: str, filename: str):
    if (value is not None) and (filename is not None):
        joblib.dump(value=value, filename=filename)
    else:
        raise ValueError(
            f"Both 'value' and 'filename' should be provided.".capitalize()
        )


def load(filename: str):
    if filename is not None:
        return joblib.load(filename=filename)
    else:
        raise ValueError(f"Please provide a valid 'filename' to load.".capitalize())
    
def config():
    with open("../config.yml", "r") as file:
        return yaml.safe_load(file)
    

def weight_init(m):
    classname = m.__class__.__name__

    if classname.find("Conv") != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find("BatchNorm") != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)


def device_init(device: str = "cuda"):
    if device == "cuda":
        return torch.device("cuda" if torch.cuda.is_available() else "cpu")
    elif device == "mps":
        return torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    else:
        return torch.device("cpu")

## Dataloader

In [None]:
warnings.filterwarnings("ignore")

class Loader:
    def __init__(
        self, dataset: str, image_size=32, batch_size: int = 8, split_size: float = 0.25
    ):
        self.dataset = dataset
        self.image_size = image_size
        self.batch_size = batch_size
        self.split_size = split_size

        self.X1 = []
        self.X2 = []

    def transforms(self, type: str = "coupled"):
        if type != "coupled":
            return transforms.Compose(
                [
                    transforms.Resize(size=(self.image_size, self.image_size)),
                    transforms.CenterCrop(size=(self.image_size, self.image_size)),
                    transforms.ToTensor(),
                    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
                ]
            )

        else:
            return transforms.Compose(
                [
                    transforms.Resize(size=(self.image_size, self.image_size)),
                    transforms.CenterCrop(size=(self.image_size, self.image_size)),
                    transforms.ToTensor(),
                    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
                    transforms.RandomHorizontalFlip(),
                    transforms.RandomVerticalFlip(),
                    transforms.RandomRotation(degrees=45),
                ]
            )

    def split_dataset(self, **dataset):
        X = dataset["X"]
        y = dataset["y"]

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=self.split_size, random_state=42
        )

        return {
            "X_train": X_train,
            "X_test": X_test,
            "y_train": y_train,
            "y_test": y_test,
        }

    def features_extractor(self):
        dataset = os.path.join(config()["path"]["processed_path"], "dataset")

        for image in tqdm(os.listdir(dataset)):
            image = os.path.join(dataset, image)

            if (image is not None) and (image.endswith((".png", ".jpg", ".jpeg"))):
                image = cv2.imread(filename=image)
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

                image = Image.fromarray(image)

                self.X1.append(self.transforms(type=None)(image))
                self.X2.append(self.transforms(type="coupled")(image))

            else:
                print(f"Invalid image: {image}".capitalize())

        assert len(self.X1) == len(
            self.X2
        ), "Length should be same while extracting the image".capitalize()

        dataset = self.split_dataset(X=self.X1, y=self.X2)

        return dataset

    def unzip_folder(self):
        if os.path.exists(path=config()["path"]["processed_path"]):
            with zipfile.ZipFile(file=self.dataset, mode="r") as zip_file:
                zip_file.extractall(path=config()["path"]["processed_path"])

        else:
            raise FileNotFoundError("File not found - processed data path".capitalize())

    def create_dataloader(self):
        try:
            dataset = loader.features_extractor()

            train_dataloader = DataLoader(
                dataset=list(zip(dataset["X_train"], dataset["y_train"])),
                batch_size=self.batch_size,
                shuffle=True,
            )

            valid_dataloader = DataLoader(
                dataset=list(zip(dataset["X_test"], dataset["y_test"])),
                batch_size=self.batch_size,
                shuffle=True,
            )

            for filename, value in [
                ("train_dataloader", train_dataloader),
                ("valid_dataloader", valid_dataloader),
            ]:
                dump(
                    value=value,
                    filename=os.path.join(
                        config()["path"]["processed_path"], filename + ".pkl"
                    ),
                )

            print(
                "Dataloader is stored in the directory of {}".capitalize().format(
                    config()["path"]["processed_path"]
                )
            )

        except Exception as e:
            print(f"Error occurred: {e}")
            return None

    def display_images(self):
        dataloader = os.path.join(
            config()["path"]["processed_path"], "valid_dataloader.pkl"
        )

        dataloader = load(filename=dataloader)

        X1, X2 = next(iter(dataloader))

        assert (
            X1.size() == X2.size()
        ), "Cannot be possible to display the images in the same order".capitalize()

        num_of_rows = int(math.sqrt(X1.size(0)))
        num_of_columns = X1.size(0) // num_of_rows

        plt.figure(figsize=(40, 15))

        plt.suptitle("Training Images".title())

        for index, image1 in enumerate(X1):
            image1 = image1.squeeze().permute(1, 2, 0).numpy()
            image1 = (image1 - image1.min()) / (image1.max() - image1.min())

            image2 = X2[index].squeeze().permute(1, 2, 0).numpy()
            image2 = (image2 - image2.min()) / (image2.max() - image2.min())

            plt.subplot(2 * num_of_rows, 2 * num_of_columns, 2 * index + 1)
            plt.imshow(image1)
            plt.title("IMG-1")
            plt.xticks([])
            plt.yticks([])
            plt.axis("off")

            plt.subplot(2 * num_of_rows, 2 * num_of_columns, 2 * index + 2)
            plt.imshow(image2)
            plt.title("IMG-2")
            plt.xticks([])
            plt.yticks([])
            plt.axis("off")

        plt.tight_layout()
        plt.savefig(os.path.join(config()["path"]["artifacts_path"], "images.jpeg"))
        print("Image stored in " + config()["path"]["artifacts_path"])
        plt.show()

    @staticmethod
    def dataset_details():
        train_dataloader = os.path.join(
            config()["path"]["processed_path"], "train_dataloader.pkl"
        )
        valid_dataloader = os.path.join(
            config()["path"]["processed_path"], "valid_dataloader.pkl"
        )

        train_dataloader = load(filename=train_dataloader)
        valid_dataloader = load(filename=valid_dataloader)

        total_train_dataset = sum(X1.size(0) for X1, _ in train_dataloader)
        total_valid_dataset = sum(X1.size(0) for X1, _ in valid_dataloader)

        total_dataset = total_train_dataset + total_valid_dataset

        train_dataset_dimension, _ = next(iter(train_dataloader))
        valid_dataset_dimension, _ = next(iter(valid_dataloader))

        dataset_details = pd.DataFrame(
            {
                "train_dataset_dimension": str(train_dataset_dimension.size()),
                "valid_dataset_dimension": str(valid_dataset_dimension.size()),
                "total_dataset": total_dataset,
                "train_dataset_size": total_train_dataset,
                "valid_dataset_size": total_valid_dataset,
                "total_dataset_size": total_dataset,
                "percentage_of_train_dataset": str(
                    total_train_dataset / total_dataset * 100
                    if total_train_dataset > 0
                    else 0
                )
                + "%",
                "percentage_of_valid_dataset": str(
                    total_valid_dataset / total_dataset * 100
                    if total_valid_dataset > 0
                    else 0
                )
                + "%",
            },
            index=["Dataset Details"],
        ).T

        dataset_details.to_csv(
            os.path.join(config()["path"]["artifacts_path"], "dataset_details.csv")
        )

        print(
            "Dataset details stored in the folder {}".capitalize().format(
                config()["path"]["artifacts_path"]
            )
        )


if __name__ == "__main__":

    loader = Loader(
        dataset="../data/raw/dataset.zip",
        batch_size=config()["dataloader"]["batch_size"],
        image_size=config()["dataloader"]["image_size"],
        split_size=config()["dataloader"]["split_size"]
    )
    
    loader.unzip_folder()
    loader.create_dataloader()
    loader.display_images()
    
    Loader.dataset_details()

## CoupledGenerator

In [None]:
class CoupledGenerators(nn.Module):
    def __init__(
        self,
        latent_space: int = 100,
        constant: int = 128,
        image_size: int = 32,
    ):
        super(CoupledGenerators, self).__init__()
        self.latent_space = latent_space
        self.constant = constant
        self.image_size = image_size

        self.kernel_size = 3
        self.stride_size = 1
        self.padding_size = 1

        self.negative_slope = 0.2
        self.scale_factor = 2

        self.netG1Layers = []
        self.netG2Layers = []

        self.fullyConnectedLayer = nn.Linear(
            in_features=self.latent_space,
            out_features=self.constant * self.image_size // 4 * self.image_size // 4,
        )

        self.sharedConvolution = nn.Sequential(
            nn.BatchNorm2d(self.constant),
            nn.Upsample(scale_factor=self.scale_factor),
            nn.Conv2d(
                in_channels=self.constant,
                out_channels=self.constant,
                kernel_size=self.kernel_size,
                stride=self.stride_size,
                padding=self.padding_size,
            ),
            nn.LeakyReLU(negative_slope=self.negative_slope, inplace=True),
            nn.BatchNorm2d(self.constant),
            nn.Upsample(scale_factor=self.scale_factor),
        )

        for index in range(2):
            self.netG1Layers.append(
                nn.Conv2d(
                    in_channels=self.constant if index == 0 else self.constant // 2,
                    out_channels=self.constant // 2 if index == 0 else 3,
                    kernel_size=self.kernel_size,
                    stride=self.stride_size,
                    padding=self.padding_size,
                )
            )
            (
                self.netG1Layers.append(
                    nn.Sequential(
                        nn.BatchNorm2d(num_features=self.constant // 2),
                        nn.LeakyReLU(negative_slope=self.negative_slope, inplace=True),
                    )
                )
                if index == 0
                else nn.Tanh()
            )

        for index in range(2):
            self.netG2Layers.append(
                nn.Conv2d(
                    in_channels=self.constant if index == 0 else self.constant // 2,
                    out_channels=self.constant // 2 if index == 0 else 3,
                    kernel_size=self.kernel_size,
                    stride=self.stride_size,
                    padding=self.padding_size,
                )
            )
            (
                self.netG2Layers.append(
                    nn.Sequential(
                        nn.BatchNorm2d(num_features=self.constant // 2),
                        nn.LeakyReLU(negative_slope=self.negative_slope, inplace=True),
                    )
                )
                if index == 0
                else nn.Tanh()
            )

        self.generator1 = nn.Sequential(*self.netG1Layers)
        self.generator2 = nn.Sequential(*self.netG2Layers)

    def forward(self, x):
        if isinstance(x, torch.Tensor):
            x = self.fullyConnectedLayer(x)
            x = x.view(
                x.size(0), self.constant, self.image_size // 4, self.image_size // 4
            )

            shared = self.sharedConvolution(x)

            image1 = self.generator1(shared)
            image2 = self.generator2(shared)

            return image1, image2

        else:
            raise ValueError("Input should be a torch.Tensor".capitalize())


if __name__ == "__main__":
    netG = CoupledGenerators(
        latent_space=config()["netG"]["latent_space"],
        constant=config()["netG"]["constant"],
        image_size=config()["dataloader"]["image_size"],
    )

    image1, image2 = netG(
        torch.randn(
            config()["dataloader"]["batch_size"], config()["netG"]["latent_space"]
        )
    )

    assert (
        image1.size() == image2.size()
    ), "Image1 and Image2 must be the same size".capitalize()
    
    print("Imgae1 size # {}".format(image1.size()))
    print("Image2 size # {}".format(image2.size()))
    
    try:
        draw_graph(model=netG, input_data=torch.randn(config()["dataloader"]["batch_size"], config()["netG"]["latent_space"])).visual_graph.render(
            filename=os.path.join("../artifacts/files/", "coupleGenerator"), format="png"
        )
        print("Graph saved in ../artifacts/files/coupleGenerator.png")
    except Exception as e:
        print(f"Error during graph rendering: {e}")

## Couple Discriminators

In [None]:
class CoupledDiscriminators(nn.Module):
    def __init__(
        self,
        channels: int = 3,
        image_size: int = 32,
        constant: int = 128,
    ):
        super(CoupledDiscriminators, self).__init__()

        self.in_channels = channels
        self.out_channels = self.in_channels * 5 + 1
        self.image_size = image_size
        self.constant = constant

        self.kernel_size = 3
        self.stride_size = 2
        self.padding_size = 1

        self.layers = []

        for index in range(4):
            self.layers += [
                nn.Conv2d(
                    in_channels=self.in_channels,
                    out_channels=self.out_channels,
                    kernel_size=self.kernel_size,
                    stride=self.stride_size,
                    padding=self.padding_size,
                ),
            ]

            if index != 0:
                self.layers += [nn.BatchNorm2d(num_features=self.out_channels)]

            self.layers += [nn.LeakyReLU(negative_slope=0.2, inplace=True)]

            self.in_channels = self.out_channels
            self.out_channels = self.in_channels * 2

        self.sharedConvolution = nn.Sequential(*self.layers)

        self.discriminator1 = nn.Linear(
            in_features=self.constant * (self.image_size // 2**4) ** 2,
            out_features=self.in_channels // self.in_channels,
        )

        self.discriminator2 = nn.Linear(
            in_features=self.constant * (self.image_size // 2**4) ** 2,
            out_features=self.in_channels // self.in_channels,
        )

    def forward(self, image1: torch.Tensor, image2: torch.Tensor):
        if isinstance(image1, torch.Tensor) and isinstance(image2, torch.Tensor):
            shared = self.sharedConvolution(image1)

            shared = shared.view(shared.size(0), -1)

            validity1 = self.discriminator1(shared)
            validity2 = self.discriminator2(shared)

            return validity1, validity2

        else:
            raise ValueError("Both inputs must be PyTorch tensors".capitalize())


if __name__ == "__main__":
    batch_size = config()["dataloader"]["batch_size"]
    image_size = config()["dataloader"]["image_size"]
    constant = config()["netG"]["constant"]
    channels = 3

    netD = CoupledDiscriminators()
    validity1, validity2 = netD(
        image1=torch.randn((batch_size, channels, image_size, image_size)),
        image2=torch.randn((batch_size, channels, image_size, image_size)),
    )

    assert (
        validity1.size() == validity2.size()
    ), "Validity1 and Validity2 must be the same size".capitalize()

    print("Validity1 size # {}".format(validity1.size()))
    print("Validity2 size # {}".format(validity2.size()))

    try:
        input_data1 = torch.randn((batch_size, channels, image_size, image_size))
        input_data2 = torch.randn((batch_size, channels, image_size, image_size))

        draw_graph(
            model=netD, input_data=(input_data1, input_data2)
        ).visual_graph.render(
            filename=os.path.join("./artifacts/files/", "coupleDiscriminator"),
            format="png",
        )
        print("Graph saved in ./artifacts/files/coupleDiscriminator.png")
    except Exception as e:
        print(f"Error during graph rendering: {e}")


## Adversarial Loss

In [None]:
class AdversarialLoss(nn.Module):
    def __init__(
        self, name: str = "Adversarial Loss".capitalize(), reduction: str = "mean"
    ):
        super(AdversarialLoss, self).__init__()

        self.name = name
        self.reduction = reduction

        self.MSEloss = nn.MSELoss(reduction=self.reduction)

    def forward(self, pred: torch.Tensor, actual: torch.Tensor):
        if isinstance(pred, torch.Tensor) and isinstance(actual, torch.Tensor):
            loss = self.MSEloss(pred, actual)

            return loss
        else:
            raise ValueError(
                f"Both 'pred' and 'actual' should be torch.Tensor.".capitalize()
            )
            
            
if __name__ == "__main__":
    loss = AdversarialLoss()
    
    actual = torch.Tensor([1.0, 0.0, 1.0, 1.0, 0.0])
    predicted = torch.Tensor([1.0, 0.0, 1.0, 1.0, 1.0])
    
    print(f"Adversarial Loss: {loss(predicted, actual):.4f}")


## Helper function

In [None]:
def load_dataset():
    pass


def helper(**kwargs):
    adam = kwargs["adam"]
    SGD = kwargs["SGD"]

    beta1 = kwargs["beta1"]
    beta2 = kwargs["beta2"]

    lr = kwargs["lr"]
    momentum = kwargs["momentum"]

    reduction = kwargs["reduction"]

    netG = CoupledGenerators(
        latent_space=config()["netG"]["latent_space"],
        constant=config()["netG"]["constant"],
        image_size=config()["dataloader"]["image_size"],
    )

    netD = CoupledDiscriminators(
        channels=3,
        image_size=config()["dataloader"]["image_size"],
        constant=config()["netG"]["constant"],
    )

    if adam:
        optimizerG = optim.Adam(params=netG.parameters(), lr=lr, betas=(beta1, beta2))
        optimizerD = optim.Adam(params=netD.parameters(), lr=lr, betas=(beta1, beta2))
    elif SGD:
        optimizerG = optim.SGD(params=netG.parameters(), lr=lr, momentum=momentum)
        optimizerD = optim.SGD(params=netD.parameters(), lr=lr, momentum=momentum)
    else:
        raise ValueError("Invalid optimizer choice".capitalize())

    adversarial_loss = AdversarialLoss(
        name="Adversarial Loss for coupledGAN".title(), reduction=reduction
    )

    return {
        "netG": netG,
        "netD": netD,
        "optimizerG": optimizerG,
        "optimizerD": optimizerD,
        "adversarial_loss": adversarial_loss,
    }


if __name__ == "__main__":
    init = helper(
        adam=False,
        SGD=True,
        beta1=0.5,
        beta2=0.999,
        lr=0.0002,
        momentum=0.0,
        reduction="mean",
    )

    assert (
        init["netG"].__class__ == CoupledGenerators
    ), "netG should be coupledGenerators".title()
    assert (
        init["netD"].__class__ == CoupledDiscriminators
    ), "netD should be coupledDiscriminators".title()
    
    # assert init["optimizerG"].__class__ == optim.Adam, "optimizerG should be Adam".title()
    # assert init["optimizerD"].__class__ == optim.Adam, "optimizerD should be Adam".title()
    
    assert init["optimizerG"].__class__ == optim.SGD, "optimizerG should be SGD".title()
    assert init["optimizerD"].__class__ == optim.SGD, "optimizerD should be SGD".title()
    
    assert (
        init["adversarial_loss"].__class__ == AdversarialLoss
    ), "adversarial_loss should be AdversarialLoss".title()


## Unit Tests

In [None]:
class UnitTest(unittest.TestCase):
    def setUp(self):
        self.image_size = config()["dataloader"]["image_size"]
        self.batch_size = config()["dataloader"]["batch_size"]

        self.latent_space = config()["netG"]["latent_space"]
        self.constant = config()["netG"]["constant"]

        self.netG = CoupledGenerators(
            latent_space=self.latent_space,
            constant=self.constant,
            image_size=self.image_size,
        )

        self.netD = CoupledDiscriminators(
            channels=3,
            constant=self.constant,
            image_size=self.image_size,
        )

    def test_coupleGenerator(self):
        self.Z = torch.randn(self.batch_size, self.latent_space)

        image1, image2 = self.netG(self.Z)

        self.assertEqual(
            image1.size(), image2.size(), "Image1 and Image2 must be the same size"
        )

    def test_coupledDiscriminator(self):
        self.Z = torch.randn(self.batch_size, self.latent_space)

        image1, image2 = self.netG(self.Z)

        validity1, validity2 = self.netD(image1=image1, image2=image2)

        self.assertEqual(
            validity1.size(),
            validity2.size(),
            "Validity1 and Validity2 must be the same size",
        )


if __name__ == "__main__":
    unittest.main()