In [8]:
import os
import zipfile
import joblib as pickle
import torch.nn as nn
import torch
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from collections import OrderedDict

In [2]:
# In utils.py
def create_pickle(data = None, filename = None):
    if data is not None and filename is not None:
        pickle.dump(value=data, filename=filename)
    else:
        raise ValueError("No data provided".capitalize())

In [28]:
class Loader:
    def __init__(self, image_path = None, batch_size = 64, image_size = 64):
        self.image_path = image_path
        self.batch_size = batch_size
        self.image_size = image_size
        self.to_extract = "../data/raw/"
        self.to_processed = "../data/processed/"

    def unzip_folder(self):
        with zipfile.ZipFile(self.image_path, 'r') as zip_ref:
            if os.path.exists(self.to_extract):
                zip_ref.extractall(path=self.to_extract)
            else:
                print("There is no path to extract the dataset".capitalize())
                os.makedirs(self.to_extract)
                zip_ref.extractall(path=self.to_extract)

    def _do_normalization(self, transform = False):
        if transform:
            return transforms.Compose(
                [
                    transforms.Resize((self.image_size)),
                    transforms.CenterCrop(self.image_size),
                    transforms.ToTensor(),
                    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
                ]
            )
        else:
            raise Exception("Error while doing the normalization".capitalize())

    def create_dataloader(self):
        if os.path.exists(self.to_extract):
            dataset = ImageFolder(root = self.to_extract, transform=self._do_normalization(transform=True))
            print(self.batch_size)
            dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)

            try:
                if os.path.exists(self.to_processed):
                    create_pickle(
                        data=dataloader,
                        filename=os.path.join(self.to_processed, "dataloader.pkl"),
                    )
                else:
                    print("There is no data path named processed & creating the data path...".capitalize())
                    os.makedirs(self.to_processed)
                    create_pickle(
                        data=dataloader,
                        filename=os.path.join(self.to_processed, "dataloader.pkl"),
                    )

            except Exception as e:
                print("Error caught in the section # {}".format(e))
        else:
            raise Exception("There is no path to create the dataloader".capitalize())    


if __name__ == "__main__":
    loader = Loader(
        image_path="/Users/shahmuhammadraditrahman/Desktop/alzheimer_dataset.zip",
        batch_size=64,
        image_size=64
    )
    loader.unzip_folder()
    loader.create_dataloader()

64


In [29]:
# unittest
data = pickle.load("../data/processed/dataloader.pkl")
images, label = next(iter(data))

def total_data(data = None):
    return sum(image.shape[0] for image, _ in data) if data is not None else "Data is empty".capitalize()

if __name__ == "__main__":
    print("Quantity of the dataset # {} ".format(total_data(data = data)))
    print(images.shape)

Quantity of the dataset # 11519 
torch.Size([64, 3, 64, 64])


In [24]:
for images, labels in data:
    # Here, `images` and `labels` are batches of the specified size
    print(
        images.shape
    )  # This should show a torch.Size([batch_size, channels, image_size, image_size])

torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 64, 64])
torch.Size([3, 6

In [5]:
# utils.py

def total_params(model = None):
    if model is not None:
        return sum(params.numel() for params in model.parameters())
    else:
        raise Exception("Model is empty".capitalize())
    

#### Create Generator

In [21]:
class Generator(nn.Module):
    def __init__(self, latent_space=50, image_size=64, in_channels=1):
        self.latent_space = latent_space
        self.image_size = image_size
        self.in_channels = in_channels
        self.layers_config = [
            (self.latent_space, image_size * 8, 4, 1, 0, True),
            (self.image_size * 8, image_size * 4, 4, 2, 1, True),
            (self.image_size * 4, image_size * 2, 4, 2, 1, True),
            (self.image_size * 2, image_size, 4, 2, 1, True),
            (self.image_size, self.in_channels, 4, 2, 1),
        ]
        super(Generator, self).__init__()

        self.model = self.connected_layer(layers_config=self.layers_config)

    def connected_layer(self, layers_config=None):
        layers = OrderedDict()
        if layers_config is not None:
            for index, (
                in_channels,
                out_channels,
                kernel_size,
                stride,
                padding,
                batch_norm,
            ) in enumerate(self.layers_config[:-1]):
                layers[f"{index + 1}_convTrans"] = nn.ConvTranspose2d(
                    in_channels, out_channels, kernel_size, stride, padding
                )
                if batch_norm:
                    layers[f"{index + 1}_batchNorm"] = nn.BatchNorm2d(out_channels)

                layers[f"{index + 1}_relu"] = nn.ReLU()

            (in_channels, out_channels, kernel_size, stride, padding) = (
                self.layers_config[-1]
            )
            layers["out_convTrans"] = nn.ConvTranspose2d(
                in_channels, out_channels, kernel_size, stride, padding
            )
            layers["out_tanh"] = nn.Tanh()

            return nn.Sequential(layers)
        else:
            raise ValueError("Layers config is empty".capitalize())

    def forward(self, x):
        return self.model(x) if x is not None else "ERROR"

#### Create the Discriminator

In [17]:
class Discriminator(nn.Module):
    def __init__(self, image_size = 64, input_channels = 3):
        self.image_size = image_size
        self.input_channels = input_channels
        self.layers_config = [
            (self.input_channels, self.image_size, 4, 2, 1, 0.2, False),
            (self.image_size, self.image_size * 2, 4, 2, 1, 0.2, True),
            (self.image_size * 2, self.image_size * 4, 4, 2, 1, 0.2, True),
            (self.image_size * 4, self.image_size * 8, 4, 2, 1, 0.2, True),
            (self.image_size * 8, 1, 4, 1, 0)
        ]
        super(Discriminator, self).__init__()

        self.model = self.connected_layer(layers_config=self.layers_config)

    def connected_layer(self, layers_config = None):
        layers = OrderedDict()
        if layers_config is not None:
            for index, (in_channels, out_channels, kernel_size, stride, padding, slope, batch_norm) in enumerate(layers_config[:-1]):
                layers[f"{index + 1}_conv"] = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding)
                if batch_norm:
                    layers[f"{index + 1}_batch_norm"] = nn.BatchNorm2d(out_channels)
                    
                layers[f"{index + 1}_leaky_relu"] = nn.LeakyReLU(slope)
                
            
            (in_channels, out_channels, kernel_size, stride, padding) = layers_config[-1]
            layers[f"out_conv"]= nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding)
            
            return nn.Sequential(layers)
        else:
            raise ValueError("layers_config is empty".capitalize())

    def forward(self, x):
        return self.model(x) if x is not None else "ERROR"

In [7]:
# unittest - discriminator
net_D = Discriminator(
        image_size = 64,
        input_channels = 3,
    )

noise_data = torch.randn(64, 3, 64, 64)

print("Shape # {}".format(net_D(noise_data).shape))
print("Total params # {}".format(total_params(model = net_D)))

Shape # torch.Size([64, 1, 1, 1])
Total params # 2766529


In [20]:
# unittest - Generator
net_G = Generator(
    latent_space=50,
    image_size=64,
    in_channels=3,
)

noise_data = torch.randn(64, 50, 1, 1)

print("Shape # {}".format(net_G(noise_data).shape))
print("Total params # {}".format(total_params(model=net_G)))

Shape # torch.Size([64, 3, 64, 64])
Total params # 3168067
