# Exercise 7
## A
This was a single $[2\times 1]$ vector per image in the batch. 
Change it such that your Encoder now has two $[2 \times 1]$ vector per image and refer to these vectors as the
mean and standard deviation.

[Reference_1](https://towardsdatascience.com/understanding-variational-autoencoders-vaes-f70510919f73)

[Reference_2](https://chih-sheng-huang821.medium.com/pytorch%E6%89%8B%E6%8A%8A%E6%89%8B%E5%AF%A6%E4%BD%9C-autoencoder-f5a048fcab5b)

[Reference_3](https://avandekleut.github.io/vae/)


In [1]:
import torch
import MNIST_dataloader
from tqdm import tqdm
import matplotlib.pyplot as plt
import os
from torch import nn, optim
from typing import Dict, Optional, Tuple, List
import sys

# %% set torches random seed
torch.random.manual_seed(0)

# File manager
PATH = os.getcwd()
data_loc = os.path.join(PATH,'data')
# make fodler for exercise 7 and 8
data_loc_ex78 = os.path.join(data_loc,'ex78')
# os.mkdir(data_loc_ex78) 
# print(data_loc_ex78)

data_loc_MNSIST = os.path.join(data_loc,os.path.abspath("."),"MNIST")
# print(data_loc_MNSIST)

batch_size = 64
# device fridenly not at all
if  torch.cuda.is_available():
        device = torch.device('cuda')
else:
    device = torch.device('cpu')

train_loader, test_loader = MNIST_dataloader.create_dataloaders(data_loc_MNSIST, batch_size)
x_clean_train = train_loader.dataset.Clean_Images
x_noisy_train = train_loader.dataset.Noisy_Images
labels_train  = train_loader.dataset.Labels

x_clean_test  = test_loader.dataset.Clean_Images
x_noisy_test  = test_loader.dataset.Noisy_Images
labels_test   = test_loader.dataset.Labels
x_clean_example = x_clean_test[0:10,:,:,:]
x_noisy_example = x_noisy_test[0:10,:,:,:]
labels_example = labels_test[0:10]

  from .autonotebook import tqdm as notebook_tqdm


## Create model Auto Encoder
### Variance, mean, standard deviation
From slide,
 - Mean and deviation can be derived from the encoder.
   - $\mu$,$\sigma = \:$ encoder $_{\phi}(x)$. Whereas the x is the minibatch of the input.
 - Variance $\sigma^2$ : logorithm of the variance is $ log( \sigma^2) = 2 log\: \sigma$
 - Deviation $\sigma$: Taking the exponential and half of log-variance. E.g., $exp(\frac{1}{2}log\:\sigma^2)$ 
 - Sample $\epsilon \sim \mathcal{N}(0,I)$
 - Laten variable $z = \mu +\sigma\cdot\epsilon$
 - KL Divergence $\:KL=\sum (\mu^2 -2 log(\sigma)+\sigma^2-1)$


Within the context of the VAE, the latent space C can be interpreted as the space of codes (or vectors) that represents the observed elements from X

Instead of mapping input x to a latent vector $z = e(x)$, we map it to mean vector and standard deviation vector.

## b
the distribution that projects the data onto the latent space)
### custom loss function.

In [130]:
import torch
import torch.nn as nn
X =torch.ones(64,1,32,32)
conv = nn.Sequential(
    nn.Conv2d(1, out_channels = 16, kernel_size=3,padding=1), # Padding = 1 ensures that the final output is the same size as input 
    nn.BatchNorm2d(16),
    nn.ReLU(inplace=True),
    nn.MaxPool2d(2),
    nn.Conv2d(16, out_channels = 16, kernel_size=3,padding=1), # Padding = 1 ensures that the final output is the same size as input 
    nn.BatchNorm2d(16),
    nn.ReLU(inplace=True),
    nn.BatchNorm2d(16),
    nn.MaxPool2d(2),
    nn.Conv2d(16, out_channels = 16, kernel_size=3,padding=1), # Padding = 1 ensures that the final output is the same size as input     
    nn.BatchNorm2d(16),
    nn.ReLU(inplace=True),
    nn.MaxPool2d(2),                      # Downsampling to
    nn.Conv2d(16, out_channels = 16, kernel_size=3,padding=1), # Padding = 1 ensures that the final output is the same size as input 
    nn.BatchNorm2d(16),
    nn.ReLU(inplace=True),
    nn.MaxPool2d(2),                      # Downsampling to
    nn.Flatten() 
)
Emu = nn.Linear(2*2*16,16)
y = conv(X)
print(y.shape)

Mu = Emu(y)
print(Mu.shape)


torch.Size([64, 64])
torch.Size([64, 16])


In [141]:
# %% imports
import torch
import torch.nn as nn
# Build an Encoder for classfication
# %%  Encoder
class Encoder(nn.Module):
    def __init__(self,in_channel=1):
        super(Encoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.ConvTranspose2d(in_channels=in_channel, out_channels = 16, kernel_size=3,padding=1), # Padding = 1 ensures that the final output is the same size as input 
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.ConvTranspose2d(16, out_channels = 16, kernel_size=3,padding=1), # Padding = 1 ensures that the final output is the same size as input 
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.ConvTranspose2d(16, out_channels = 16, kernel_size=3,padding=1), # Padding = 1 ensures that the final output is the same size as input 
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),                      # Downsampling to
            nn.ConvTranspose2d(16, out_channels = 16, kernel_size=3,padding=1), # Padding = 1 ensures that the final output is the same size as input 
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),                      # Downsampling to
            nn.Flatten() 
        )
        self.Emu = nn.Linear(2*2*16,2)
        self.E_logVar= nn.Linear(2*2*16,2)
        self.N = torch.distributions.Normal(0,1) # normal dis sample
        self.KL = 0 # not KL divergence yet
        
    def forward(self, x):
        # use the created layers here
        x =  self.encoder(x)
        mu = self.Emu(x)
        LogVar = self.E_logVar(x)
        Std = torch.exp(LogVar/2)
        LaVar = mu+Std*self.N.sample(mu.shape)
        # calculate KL
        self.KL = (Std**2 + mu**2 - torch.log(Std)-0.5).sum()
        return mu, Std, LaVar
# %%  Decoder
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.DE = nn.Sequential(
            nn.Linear(2,16),
            nn.ReLU(inplace=True)
        )
        # create layers here
        self.Decoder = nn.Sequential(
            nn.ConvTranspose2d(1 , out_channels = 16, kernel_size=3, padding=1), # Padding = 1 ensures that the final output is the same size as input 
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='bilinear'),                     # Upsampling to 2x2x16
            nn.ConvTranspose2d(in_channels = 16 , out_channels = 16, kernel_size=3, padding=1), 
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='bilinear'),                     # Upsampling to 4x4x16
            nn.ConvTranspose2d(in_channels = 16 , out_channels = 16, kernel_size=3, padding=1), 
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='bilinear'),                     # Upsampling to 8x8x16
            nn.ConvTranspose2d(in_channels = 16 , out_channels = 16, kernel_size=3, padding=1), 
            # nn.ReLU(inplace=True),
            # nn.Upsample(scale_factor=2, mode='bilinear'),                     # Upsampling to 16x16x16
            # nn.ConvTranspose2d(in_channels = 16 , out_channels = 1, kernel_size=3, padding=1), 
            #nn.ReLU(inplace=True),
            # nn.Upsample(scale_factor=2, mode='bilinear'),                     # Upsampling to 32x32x1
        )
        # self.sigmoid = nn.Sigmoid()
        
    def forward(self, h):
        DE = self.DE(h)
        DE = DE.reshape(-1,1,4,4)
        r = self.Decoder(DE)
        # return self.Decoder(h) 
        return DE,r

# %%  Autoencoder
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
        
    def forward(self, x):
        mu,Std,LaVar = self.encoder(x)
        DE,r = self.decoder(LaVar)
        return mu,Std,DE,r
    


In [34]:
# %% imports
import torch
import torch.nn as nn
# Build an Encoder for classfication
# %%  Encoder
class Encoder(nn.Module):
    def __init__(self,in_channels):
        super(Encoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels, out_channels = 16, kernel_size=3, padding=1), # Padding = 1 ensures that the final output is the same size as input 
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),                      # Downsampling to 16x16x16

            nn.Conv2d(in_channels = 16, out_channels = 16, kernel_size=3, padding=1), 
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),                      # Downsampling to 8x8x16

            nn.Conv2d(in_channels = 16, out_channels = 16, kernel_size=3, padding=1), 
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),                      # Downsampling to 4x4x16

            nn.Conv2d(in_channels = 16, out_channels = 16, kernel_size=3, padding=1), 
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),                      # Downsampling to 2x2x16
            nn.MaxPool2d(2),
            # nn.MaxPool2d(2),
            nn.Flatten(),                  # Flatten 2x2x16 to 1-dim
        )
        self.Emu = nn.Linear(2*2*16,2)
        self.E_logVar= nn.Linear(2*2*16,2)
        self.N = torch.distributions.Normal(0,1) # normal dis sample
        self.KL = 0 # not KL divergence yet
        
    def forward(self, x):
        # use the created layers here
        X =  self.encoder(x)
        mu = self.Emu(x)
        LogVar = self.E_logVar(x)
        Std = torch.exp(LogVar/2)
        LaVar = mu+Std*self.N.sample(mu.shape)
        # calculate KL
        self.KL = (Std**2 + mu**2 - torch.log(Std)-0.5).sum()
        return mu, Std, LaVar
# %%  Decoder
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.DE = nn.Sequential(
            nn.Linear(2,16),
            nn.ReLU(inplace=True)
        )
        # create layers here
        self.Decoder = nn.Sequential(
            nn.ConvTranspose2d(10 , out_channels = 16, kernel_size=3, padding=1), # Padding = 1 ensures that the final output is the same size as input 
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=(2,1), mode='bilinear'),                     # Upsampling to 2x2x16
            nn.ConvTranspose2d(in_channels = 16 , out_channels = 16, kernel_size=3, padding=1), 
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='bilinear'),                     # Upsampling to 4x4x16
            nn.ConvTranspose2d(in_channels = 16 , out_channels = 16, kernel_size=3, padding=1), 
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='bilinear'),                     # Upsampling to 8x8x16
            nn.ConvTranspose2d(in_channels = 16 , out_channels = 16, kernel_size=3, padding=1), 
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='bilinear'),                     # Upsampling to 16x16x16
            nn.ConvTranspose2d(in_channels = 16 , out_channels = 1, kernel_size=3, padding=1), 
            #nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='bilinear'),                     # Upsampling to 32x32x1
        )
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, h):
        DE = self.DE(h)
        DE = DE.reshape(-1,1,4,4)
        r = self.sigmoid(self.Decoder(DE))
        # return self.Decoder(h) 
        return DE,r

# %%  Autoencoder
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
        
    def forward(self, x):
        mu,Std,LaVar = self.encoder(x)
        DE,r = self.decoder(LaVar)
        return mu,Std,DE,r
    


In [142]:

# Sanity check
sanity_check = torch.ones((64,1,32,32))
encoder_check = Encoder()
mu,sigma,z = encoder_check.forward(sanity_check)
print(f"Encoder check:{mu.shape},{sigma.shape},{z.shape}")
decoder_check = Decoder()
fc1,r = decoder_check.forward(z)
print(f"Decoder check:{fc1.shape},{r.shape}")
VAE_check = VAE()
mu, log_std,fc1,r = VAE_check.forward(sanity_check)
print(f"VAE check:{r.shape}")
latent_check = torch.ones((64,2))
fc1,output_check = VAE_check.decoder(latent_check)
print(f"VAE.decoder check:{output_check.shape}")

Encoder check:torch.Size([64, 2]),torch.Size([64, 2]),torch.Size([64, 2])
Decoder check:torch.Size([64, 1, 4, 4]),torch.Size([64, 16, 32, 32])
VAE check:torch.Size([64, 16, 32, 32])
VAE.decoder check:torch.Size([64, 16, 32, 32])


In [124]:
model_VAE = VAE().to(device)
learning_rate = 3e-4
no_epochs = 2
optimizer_VAE = optim.Adam(model_VAE.parameters(),lr = learning_rate)
loss_iter = []
loss_epoch = []
loss_iter_test = []
loss_epoch_test = []
torch.random.manual_seed(0)

<torch._C.Generator at 0x1038f3ad0>

In [125]:
import torch.nn.functional as F
for epoch in range(no_epochs):
    print(f"Training on {device}. Epoch {epoch+1}:")
    # go over all minibatches
    # sys.stdout.flush()
    total_loss=0
    model_VAE.train()
    for batch_idx,(x_clean, x_noisy, label) in enumerate(train_loader):
        # fill in how to train your network using only the clean images
        optimizer_VAE.zero_grad()
        inputs = x_clean.to(device=device)
        mu,sigma,fc1,outputs= model_VAE(inputs)
        # print(f"Output size",outputs.shape)
        # print(f"Inputsize",inputs.shape)
        # recst_loss = F.mse_loss(outputs,inputs)
        # kl_div = -0.5*torch.sum(log_var+1-mu.pow(2)-log_var.exp()) / batch_size
      # kl_div = -0.5*torch.sum(log_var+1-mu.pow(2)-log_var.exp()) / batch_size
        loss = F.mse_loss(outputs,inputs,reduction='sum') + 10*model_VAE.encoder.KL
        # loss = torch.sum((outputs-inputs)**2)+kl_div
        loss.backward()
        optimizer_VAE.step()
        loss_iter.append(loss.data)
        total_loss += loss.data
    loss_epoch.append(total_loss/len(train_loader))
    print(f'Epoch[{epoch+1}/{no_epochs}],Loss(Train):{total_loss/len(train_loader)}')
    # sys.stdout.flush()
    total_loss = 0
    model_VAE.eval()
    for batch_idx,(x_clean, x_noisy, label) in enumerate(test_loader):
        inputs = x_clean.to(device=device)
        mu,sigma,fc1,outputs= model_VAE(inputs)
        # recst_loss = F.mse_loss(outputs,inputs)
        # kl_div = -0.5*torch.sum(log_var+1-mu.pow(2)-log_var.exp()) / batch_size
        # loss_test = recst_loss + kl_div
        loss_test = ((outputs-inputs)**2).sum() + model_VAE.encoder.KL
        loss_iter_test.append(loss_test.data)
        total_loss += loss_test.data
    loss_epoch_test.append(total_loss/len(test_loader))
    print(f'Epoch[{epoch+1}/{no_epochs}],Loss(Test):{total_loss/len(test_loader)}')


# torch.save(model_VAE,os.path.join(model_loc,'exercise_VAE.ptg'))

Training on cpu. Epoch 1:


  loss = F.mse_loss(outputs,inputs,reduction='sum') + 10*model_VAE.encoder.KL
  loss = F.mse_loss(outputs,inputs,reduction='sum') + 10*model_VAE.encoder.KL


Epoch[1/2],Loss(Train):288590.0625
Epoch[1/2],Loss(Test):213890.796875
Training on cpu. Epoch 2:


KeyboardInterrupt: 

In [25]:
model = VAE().to(device)
vae = train(model, data)

VAE(
  (encoder): Encoder(
    (encoder): Sequential(
      (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU(inplace=True)
      (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (3): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (4): ReLU(inplace=True)
      (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (6): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (7): ReLU(inplace=True)
      (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (9): Flatten(start_dim=1, end_dim=-1)
    )
    (Emu): Linear(in_features=64, out_features=2, bias=True)
    (E_logVar): Linear(in_features=64, out_features=2, bias=True)
  )
  (decoder): Decoder(
    (DE): Sequential(
      (0): Linear(in_features=2, out_features=16, bias=True)
      (1): ReLU(inplace=True)
    )
    (Decoder): Sequential(
      (0): ConvTran