#### Utils file

In [None]:
import joblib
import yaml
import torch
import torch.nn as nn


def dump(value=None, filename=None):
    if (value is not None) and (filename is not None):
        joblib.dump(value=value, filename=filename)

    else:
        raise ValueError("value and filename must be provided".capitalize())


def load(filename=None):
    if filename is not None:
        return joblib.load(filename=filename)

    else:
        raise ValueError("filename should be defined".capitalize())


def device_init(device="cuda"):
    if device == "cuda":
        return torch.device("cuda" if torch.cuda.is_available() else "cpu")

    elif device == "mps":
        return torch.device("mps" if torch.backends.mps.is_available() else "cpu")

    else:
        return torch.device("cpu")


def weight_init(m):
    classname = m.__class__

    if classname.find("Conv") != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)

    elif classname.find("BatchNorm") != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)


def config():
    with open("./config.yml", "r") as file:
        return yaml.safe_load(file)

#### Encoder Block

In [None]:
import torch
import torch.nn as nn

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, in_channels = 3, out_channels = 64, kernel_size = 4, stride = 2, padding = 1, use_norm = True):
        super(EncoderBlock, self).__init__()
        
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.is_normalization = use_norm
        
        self.encoder_block = self.block()
        
    def block(self):
        self.layers = []
        
        self.layers.append(
            nn.Conv2d(
                in_channels=self.in_channels,
                out_channels=self.out_channels,
                kernel_size=self.kernel_size,
                stride=self.stride,
                padding=self.padding
            )
        )
        
        if self.is_normalization:
            self.layers.append(nn.BatchNorm2d(num_features=self.out_channels))
            
        self.layers.append(nn.LeakyReLU(negative_slope=0.2, inplace=True))
        
        return nn.Sequential(*self.layers)
    
    def forward(self, x):
        if isinstance(x, torch.Tensor):
            return self.encoder_block(x)
        else:
            raise ValueError("Input should be in the format of the tensor".capitalize())


if __name__ == "__main__":
    in_channels = 3
    out_channels = 64
    kernel_size = 4
    stride = 2
    padding = 1

    layers = []

    for _ in range(2):
        layers.append(EncoderBlock(
            in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding, use_norm=False))

        in_channels = out_channels

    for _ in range(3):
        layers.append(EncoderBlock(
            in_channels=in_channels, out_channels=out_channels * 2, kernel_size=kernel_size, stride=stride, padding=padding, use_norm=True))

        in_channels = out_channels * 2
        out_channels = in_channels

    layers.append(
        nn.Conv2d(
            in_channels=in_channels,
            out_channels=4000,
            kernel_size=1,
            stride=1,
            padding=0
        )
    ) 

    model = nn.Sequential(*layers)

    assert model(torch.randn(1, 3, 128, 128)).size() == (1, 4000, 4, 4)

#### Decoder Block

In [None]:
class DecoderBlock(nn.Module):
    def __init__(
        self, in_channels=4000, out_channels=512, kernel_size=4, stride=2, padding=1
    ):
        super(DecoderBlock, self).__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

        self.decoder_block = self.block()

    def block(self):
        self.layers = []

        self.layers.append(
            nn.ConvTranspose2d(
                in_channels=self.in_channels,
                out_channels=self.out_channels,
                kernel_size=self.kernel_size,
                stride=self.stride,
                padding=self.padding,
            )
        )

        self.layers.append(nn.BatchNorm2d(num_features=self.out_channels))
        self.layers.append(nn.ReLU(inplace=True))

        return nn.Sequential(*self.layers)

    def forward(self, x):
        if isinstance(x, torch.Tensor):
            return self.decoder_block(x)

        else:
            raise ValueError("Input should be in the format of the tensor".capitalize())


if __name__ == "__main__":
    layers = []

    in_channels = 4000
    out_channels = 512
    kernel_size = 4
    stride = 2
    padding = 1

    for _ in range(4):
        layers.append(
            DecoderBlock(
                in_channels=in_channels,
                out_channels=out_channels,
                kernel_size=kernel_size,
                stride=stride,
                padding=padding,
            )
        )
        in_channels = out_channels
        out_channels = in_channels // 2

    layers.append(
        nn.ConvTranspose2d(
            in_channels=in_channels,
            out_channels=3,
            kernel_size=kernel_size - 1,
            stride=stride // stride,
            padding=padding,
        )
    )

    model = nn.Sequential(*layers)

    print(model(torch.randn(1, 4000, 4, 4)).size())

#### Generator

In [None]:
class Generator(nn.Module):
    def __init__(self, in_channels=3, out_channels=64):
        super(Generator, self).__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels

        self.kernel_size = 4
        self.stride = 2
        self.padding = 1

        self.layers = []

        for _ in range(2):
            self.layers.append(
                EncoderBlock(
                    in_channels=in_channels,
                    out_channels=out_channels,
                    kernel_size=self.kernel_size,
                    stride=self.stride,
                    padding=self.padding,
                    use_norm=False,
                )
            )

            in_channels = out_channels

        for _ in range(3):
            self.layers.append(
                EncoderBlock(
                    in_channels=in_channels,
                    out_channels=out_channels * 2,
                    kernel_size=self.kernel_size,
                    stride=self.stride,
                    padding=self.padding,
                    use_norm=True,
                )
            )

            in_channels = out_channels * 2
            out_channels = in_channels

        self.layers.append(
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=4000,
                kernel_size=self.kernel_size // self.kernel_size,
                stride=self.stride // self.stride,
                padding=0,
            )
        )

        in_channels = 4000

        for _ in range(4):
            self.layers.append(
                DecoderBlock(
                    in_channels=in_channels,
                    out_channels=out_channels,
                    kernel_size=self.kernel_size,
                    stride=self.stride,
                    padding=self.padding,
                )
            )
            in_channels = out_channels
            out_channels = in_channels // 2

        self.layers.append(
            nn.ConvTranspose2d(
                in_channels=in_channels,
                out_channels=3,
                kernel_size=self.kernel_size - 1,
                stride=self.stride // self.stride,
                padding=self.padding,
            )
        )

        self.model = nn.Sequential(*self.layers)

    def forward(self, x):
        if isinstance(x, torch.Tensor):
            return self.model(x)
        else:
            raise ValueError("Input should be in the format of the tensor".capitalize())


if __name__ == "__main__":
    netG = Generator()

    print(netG(torch.randn(1, 3, 128, 128)).size())

#### Discriminator Block

In [None]:
class DiscriminatorBlock(nn.Module):

    def __init__(
        self,
        in_channels=3,
        out_channels=64,
        momentum=0.8,
        use_normalization=True,
        stride=2,
    ):
        super(DiscriminatorBlock, self).__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels

        self.kernel_size = 3
        self.stride = stride
        self.padding = 1
        self.slope = 0.2

        self.momentum = momentum

        self.is_normalization = use_normalization

        self.discriminator_block = self.block()

    def block(self):
        self.layers = []

        self.layers.append(
            nn.Conv2d(
                in_channels=self.in_channels,
                out_channels=self.out_channels,
                kernel_size=self.kernel_size,
                stride=self.stride,
                padding=self.padding,
            )
        )

        if self.is_normalization:
            self.layers.append(
                nn.InstanceNorm2d(
                    num_features=self.out_channels, momentum=self.momentum
                )
            )

        self.layers.append(nn.LeakyReLU(negative_slope=self.slope, inplace=True))

        self.model = nn.Sequential(*self.layers)

    def forward(self, x):
        if isinstance(x, torch.Tensor):
            return self.model(x)

        else:
            raise ValueError("Input should be in the format of the tensor".capitalize())


if __name__ == "__main__":

    layers = []

    in_channels = 3
    out_channels = 54
    kernel_size = 3
    stride = 2
    padding = 1

    for idx in range(4):
        layers.append(
            DiscriminatorBlock(
                in_channels=in_channels,
                out_channels=out_channels,
                stride=1 if (idx + 1) == 4 else 2,
                use_normalization=False if idx == 0 else True,
            )
        )
        in_channels = out_channels
        out_channels *= 2

    layers.append(
        nn.Sequential(
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=in_channels // in_channels,
                kernel_size=kernel_size,
                stride=stride // stride,
                padding=padding,
            )
        )
    )

    model = nn.Sequential(*layers)

    assert model(torch.randn(1, 3, 64, 64)).size() == (1, 1, 8, 8)

#### Discriminator

In [None]:
class Discriminator(nn.Module):
    def __init__(self, in_channels=3):
        super(Discriminator, self).__init__()

        self.in_channels = in_channels

        self.out_channels = 64
        self.kernel_size = 3
        self.stride = 2
        self.padding = 1

        self.layers = []

        for idx in range(4):
            self.layers.append(
                DiscriminatorBlock(
                    in_channels=self.in_channels,
                    out_channels=self.out_channels,
                    stride=1 if (idx + 1) == 4 else 2,
                    use_normalization=False if idx == 0 else True,
                )
            )
            self.in_channels = self.out_channels
            self.out_channels *= 2

        self.layers.append(
            nn.Sequential(
                nn.Conv2d(
                    in_channels=self.in_channels,
                    out_channels=self.in_channels // self.in_channels,
                    kernel_size=self.kernel_size,
                    stride=self.stride // self.stride,
                    padding=self.padding,
                )
            )
        )

        self.model = nn.Sequential(*self.layers)

    def forward(self, x):
        if isinstance(x, torch.Tensor):
            return self.model(x)

        else:
            raise ValueError("Input should be in the format of the tensor".capitalize())


if __name__ == "__main__":
    netD = Discriminator()

    print(netD)

### Loss

In [None]:
import torch
import torch.nn as nn


class AdversarialLoss(nn.Module):
    def __init__(self, reduction="mean"):
        super(AdversarialLoss, self).__init__()

        self.reduction = reduction

    def forward(self, pred, actual):
        if (isinstance(pred, torch.Tensor)) and (isinstance(actual, torch.Tensor)):
            self.loss = nn.MSELoss(reduction=self.reduction)

            return self.loss(predicted, actual)

        else:
            raise ValueError(
                "Pred and actual should be in the tensor format".capitalize()
            )


if __name__ == "__main__":

    loss = AdversarialLoss()

    predicted = torch.tensor([1.0, 0.0, 1.0, 0.0])
    actual = torch.tensor([1.0, 0.0, 1.0, 0.0])

    assert loss(predicted, actual) == (0.0)

In [None]:
import torch
import torch.nn as nn


class PixelLoss(nn.Module):
    def __init__(self, reduction="mean"):
        super(PixelLoss, self).__init__()

        self.reduction = reduction

    def forward(self, pred, actual):
        if isinstance(pred, torch.Tensor) and isinstance(actual, torch.Tensor):
            self.loss = nn.L1Loss(reduction=self.reduction)

            return self.loss(pred, actual)

        else:
            raise ValueError(
                "Pred and actual should be in the tensor format".capitalize()
            )


if __name__ == "__main__":
    loss = PixelLoss()

    predicted = torch.tensor([1.0, 0.0, 1.0, 0.0])
    actual = torch.tensor([1.0, 0.0, 1.0, 0.0])

    assert loss(predicted, actual) == (0.0)

#### Helper

In [None]:
import traceback
import torch.optim as optim


def helpers(**kwargs):
    adam = kwargs["adam"]
    SGD = kwargs["SGD"]
    beta1 = kwargs["beta1"]
    beta2 = kwargs["beta2"]
    momentum = kwargs["momentum"]
    lr = kwargs["lr"]

    try:
        netG = Generator(in_channels=3, out_channels=64)
    except Exception as e:
        print("An error is occured {}".format(e))
        traceback.print_exc()

    try:
        netD = Discriminator(in_channels=3)
    except Exception as e:
        print("An error is occured {}".format(e))
        traceback.print_exc()

    if adam:
        optimizerG = optim.Adam(params=netG.parameters(), lr=lr, betas=(beta1, beta2))
        optimizerD = optim.Adam(params=netD.parameters(), lr=lr, betas=(beta1, beta2))

    if SGD:
        optimizerG = optim.SGD(params=netG.parameters(), lr=lr, momentum=momentum)
        optimizerD = optim.SGD(params=netD.parameters(), lr=lr, momentum=momentum)

    adversarial_loss = AdversarialLoss(reduction="mean")
    pixelwise_loss = PixelLoss(reduction="mean")

    return {
        "netG": netG,
        "netD": netD,
        "optimizerG": optimizerG,
        "optimizerD": optimizerD,
        "adversarial_loss": adversarial_loss,
        "pixelwise_loss": pixelwise_loss,
    }


if __name__ == "__main__":
    init = helpers(
        adam=True, SGD=False, beta1=0.5, beta2=0.999, momentum=0.9, lr=0.0002
    )

    assert init["netG"].__class__.__name__ == "Generator"
    assert init["netD"].__class__.__name__ == "Discriminator"

    assert init["optimizerG"].__class__.__name__ == "Adam"
    assert init["optimizerD"].__class__.__name__ == "Adam"

    assert init["adversarial_loss"].__class__.__name__ == "AdversarialLoss"
    assert init["pixelwise_loss"].__class__.__name__ == "PixelLoss"

#### Unittest

In [None]:
import sys
import torch
import torch.optim as optim
import torch.nn as nn
import unittest

class UnitTest(unittest.TestCase):
    def setUp(self):
        self.encoder = EncoderBlock()
        self.decoder = DecoderBlock()
        self.netG = Generator()
        self.netD = Discriminator()
        self.init = helpers(
            adam=True, SGD=False, beta1=0.5, beta2=0.999, momentum=0.9, lr=0.0002
        )

    def test_encoder_block(self):
        in_channels = 3
        out_channels = 64
        kernel_size = 4
        stride = 2
        padding = 1

        layers = []

        for _ in range(2):
            layers.append(
                EncoderBlock(
                    in_channels=in_channels,
                    out_channels=out_channels,
                    kernel_size=kernel_size,
                    stride=stride,
                    padding=padding,
                    use_norm=False,
                )
            )

            in_channels = out_channels

        for _ in range(3):
            layers.append(
                EncoderBlock(
                    in_channels=in_channels,
                    out_channels=out_channels * 2,
                    kernel_size=kernel_size,
                    stride=stride,
                    padding=padding,
                    use_norm=True,
                )
            )

            in_channels = out_channels * 2
            out_channels = in_channels

        layers.append(
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=4000,
                kernel_size=kernel_size // kernel_size,
                stride=stride // stride,
                padding=0,
            )
        )

        model = nn.Sequential(*layers)

        self.assertEqual(
            model(torch.randn(1, 3, 128, 128)).size(), torch.Size([1, 4000, 4, 4])
        )

    def test_decoder_block(self):
        in_channels = 4000
        out_channels = 512
        kernel_size = 4
        stride = 2
        padding = 1

        layers = []

        for _ in range(4):
            layers.append(
                DecoderBlock(
                    in_channels=in_channels,
                    out_channels=out_channels,
                    kernel_size=kernel_size,
                    stride=stride,
                    padding=padding,
                )
            )
            in_channels = out_channels
            out_channels = in_channels // 2

        layers.append(
            nn.ConvTranspose2d(
                in_channels=in_channels,
                out_channels=3,
                kernel_size=kernel_size - 1,
                stride=stride // stride,
                padding=padding,
            )
        )

        model = nn.Sequential(*layers)

        self.assertEqual(
            model(torch.randn(1, 4000, 4, 4)).size(), torch.Size([1, 3, 64, 64])
        )

    def test_netG_size(self):
        self.assertEqual(
            self.netG(torch.randn(1, 3, 128, 128)).size(), torch.Size([1, 3, 64, 64])
        )

    def test_netD_size(self):
        self.assertEqual(
            self.netD(torch.randn(1, 3, 64, 64)).size(), torch.Size([1, 1, 8, 8])
        )

    def test_netG_total_params(self):
        self.assertEqual(Generator.total_params(self.netG), 40401059)

    def test_netD_total_params(self):
        self.assertEqual(Discriminator.total_params(self.netD), 1555585)

    def test_helpers(self):
        self.assertIsInstance(self.init["netG"], Generator)
        self.assertIsInstance(self.init["netD"], Discriminator)
        self.assertIsInstance(self.init["optimizerG"], optim.Adam)
        self.assertIsInstance(self.init["optimizerD"], optim.Adam)
        self.assertIsInstance(self.init["adversarial_loss"], AdversarialLoss)
        self.assertIsInstance(self.init["pixelwise_loss"], PixelLoss)


if __name__ == "__main__":
    unittest.main()