## Convolutional VAE
We will work on the sketch dataset 

### Libraries

In [4]:
# Pytorch libraries
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torch.nn.functional as F

# Torchvision libraries
import torchvision.transforms as TF

# open cv
import cv2

# pandas library
import pandas as pd

# Matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

# os library
import os


### Dataset
We will create our own custom dataset

In [5]:
class Sketches(Dataset):
    def __init__(self, annotations_file, img_dir, mode = 'train', transform = None, target_transform = None):
        self.img_labels = annotations_file        # To obtain the labels as the images are annotated in this file
        self.img_dir = img_dir      # TO obtain the directory of the image
        self.transform = transform
        self.target_transform = target_transform
        self.mode = mode
        
    def __len__(self):
        return len(self.img_labels)  # Returns no. of data through the labels
    
    def __getitem__(self, idx):
        # joining the directory of the images + finding the label with the corresponding idx, and adding its directory name
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx,0])
        # image reading on the path given: img_path
        image = cv2.imread(img_path)
        # Label from the img_labels
        label = self.img_labels.iloc[idx, 0]
       
        # Transforming of the images if needed, eg: converting to Tensors
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

Creating an instance of the dataset class

### Hyperparameters

In [6]:
# Dataset directory
directory = r'/kaggle/input/realsketches'
print(f'Dataset directory: {directory}')

# Batch size
batch_size = 16
print(f'Batch size: {batch_size}')

# Annotations file
imgs = os.listdir(directory)
frame = pd.DataFrame(imgs)

# transform
img_transform = TF.Compose([
            TF.ToTensor(),
            TF.Resize((256, 256)),
            TF.RandomResizedCrop(256)
        ])
print(f'Transform : {img_transform}')

# Device
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Device: {device}')

# Learning rate
lr = 3e-4
print(f'learning rate: {lr}')

# Weight Decay
wd = 1e-7
print(f'Weight Decay: {wd}')

# Bottleneck size
bottleneck_size = 16
print(f'bottleneck width: {bottleneck_size}')

# number of workers
n_cpu = 2
print(f'No. of workers: {n_cpu}')

# No. of epochs
epochs = 40
print(f'No. of epochs: {epochs}')

Dataset directory: /kaggle/input/realsketches
Batch size: 16
Transform : Compose(
    ToTensor()
    Resize(size=(256, 256), interpolation=bilinear, max_size=None, antialias=None)
    RandomResizedCrop(size=(256, 256), scale=(0.08, 1.0), ratio=(0.75, 1.3333), interpolation=bilinear), antialias=None)
)
Device: cpu
learning rate: 0.0003
Weight Decay: 1e-07
bottleneck width: 16
No. of workers: 2
No. of epochs: 40


In [7]:
data = Sketches(
    img_dir = directory,
    mode = 'train',
    annotations_file = frame,
    transform = img_transform,
)

### DataLoader

In [8]:
loader = DataLoader(
    dataset = data,
    batch_size = batch_size,
    shuffle = True,
    num_workers = n_cpu
)

# No. of batches:
size = len(loader)
print(f'No. of batches: {size}')

# No. of images
count = len(os.listdir(directory))
print(f'No. of images: {count}')

No. of batches: 40
No. of images: 632


### Additional Hyperparameters

In [9]:
for batch, (X, _) in enumerate(loader):    
    # Image Size
    img_size = X[0][0].shape
    print(f'image size: {img_size}')

    # Channels
    channels = X[0].shape[0]
    print(f'No. of channels: {channels}')
    break

image size: torch.Size([256, 256])
No. of channels: 3


### Model
Now that we have our dataloader and dataset, we cna move on to creating our model

In [10]:
class CVAE(nn.Module):
    def __init__(self, img_size = img_size, channels = channels, h_dim = 64, z_dim = bottleneck_size ):
        super(CVAE, self).__init__()
        
        # used in encoder
        self.ENconv_block = nn.Sequential(
            nn.Conv2d(
                in_channels = channels,
                out_channels = h_dim,
                kernel_size = 3,
                stride = 2,
                padding = 1
            ),
            nn.ReLU(),
            nn.BatchNorm2d(h_dim),
            nn.Conv2d(
                in_channels = h_dim,
                out_channels = int(h_dim / 2),
                kernel_size = 3,
                stride = 1,
                padding = 1
            ),
            nn.MaxPool2d(2),
            nn.ReLU(),
            nn.BatchNorm2d(int(h_dim / 2))
        )
        
        # Used in Decoder
        self.DEconv_block = nn.Sequential(
            nn.Conv2d(
                in_channels = int(bottleneck_size),
                out_channels = int(h_dim / 2),
                stride = 1,
                kernel_size = 3,
                padding = 1
            ),
            nn.ReLU(),
            nn.Upsample(scale_factor = 2),
            nn.Conv2d(
                in_channels = int(h_dim / 2),
                out_channels = int(h_dim),
                stride = 1,
                kernel_size = 3,
                padding = 1
            ),
            nn.ReLU(),
            nn.Upsample(scale_factor = 2),
            nn.Conv2d(
                in_channels = h_dim,
                out_channels = channels,
                stride = 1,
                kernel_size = 3,
                padding = 1
            ),         
            nn.Sigmoid()
        )
        
        # Mu and Sigma
        self.Mu_block = nn.Conv2d(
                in_channels = int(h_dim / 2),
                out_channels = bottleneck_size,
                stride = 1,
                kernel_size = 3,
                padding = 1
            )
        
        self.Sigma_block = nn.Conv2d(
                in_channels = int(h_dim / 2),
                out_channels = bottleneck_size,
                stride = 1,
                kernel_size = 3,
                padding = 1
            )
        
    def encoder(self, x):
        z = self.ENconv_block(x)
        mu, sigma = self.Mu_block(z), self.Sigma_block(z)
        return mu, sigma
    
    def decoder(self, z):
        x_reconstructed = self.DEconv_block(z)
        return x_reconstructed
    
    def forward(self, data):
        # Encoding
        mu, sigma = self.encoder(data)
        
        # Reparameterization
        eps = torch.rand_like(sigma)
        z_reparameterized = mu + ( sigma * eps )
        
        # Decoding
        x_reconstructed = self.decoder(z_reparameterized)
        
        return x_reconstructed, mu, sigma

# Model
model = CVAE()
model = model.to(device)
print(model)

CVAE(
  (ENconv_block): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Conv2d(64, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): ReLU()
    (6): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (DEconv_block): Sequential(
    (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Upsample(scale_factor=2.0, mode=nearest)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): Upsample(scale_factor=2.0, mode=nearest)
    (6): Conv2d(64, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): Sigmoid()
  )
  (Mu_block): Conv2d(32, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (Sigma_block): Conv2d(32, 

### Loss and optimizer functions

In [11]:
# Loss Function
loss_fn = nn.BCELoss()
print(loss_fn)

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr = lr, weight_decay = wd)
print(optimizer)

BCELoss()
Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: False
    lr: 0.0003
    maximize: False
    weight_decay: 1e-07
)


### Train Function

In [12]:
def train(dataloader, model, loss_fn, optimizer):
    model.train()
    for batch, (X, _) in enumerate(dataloader):
        # Transforms
        X = X.to(device)
        
        # Forwardprop
        X_reconstructed, mu, sigma = model(X)
        
        # Flattening
        flatten_X = X.reshape(batch_size, -1)
        flatten_X_reconstructed = X_reconstructed.reshape(batch_size, -1)
                
        # Calculating loss
        loss = loss_fn(flatten_X_reconstructed, flatten_X)
        
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Steps
        if batch % 4 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{count:>5d}]")

### Prediction

In [13]:
def Prediction(model, dataloader):
    model.eval()
       
    with torch.no_grad():    
        X, _ = next(iter(dataloader))
        X, model = X.to(device), model.to(device)
        
        flatten_X = X.reshape(batch_size, -1)
        flatten_X_reconstructed, mu, sigma = model(flatten_X)
        
        X_reconstructed = X.reshape(batch_size, img_size[0], img_size[1])
        
        for j in range(1, batch_size):
            plt.imsave(f'Reconstructed {j} sketch', X_reconstructed[j])
            

### Model iteration

In [14]:
for i in range(epochs):
    print(f'Epoch: {i+1} -----------------------')
    train(loader, model, loss_fn, optimizer)
print('Done!')
Prediction(model, loader)

Epoch: 1 -----------------------
loss: 0.666950  [   16/  632]


KeyboardInterrupt: 