In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import tarfile
from torchvision.datasets.utils import download_url
from torch.utils.data import random_split
import os
from torchvision.datasets import ImageFolder
import torch.nn as nn
import torch.nn.functional as F
from itertools import chain
from tqdm.autonotebook import tqdm
import numpy as np
from torch import optim

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Resize(32)])

batch_size = 128

In [3]:
dataset_url = "https://s3.amazonaws.com/fast-ai-imageclas/cifar10.tgz"
download_url(dataset_url, '.')

Using downloaded and verified file: .\cifar10.tgz


In [4]:
# Extract from archive
with tarfile.open('./cifar10.tgz', 'r:gz') as tar:
    tar.extractall(path='./data')

In [5]:
data_dir = './data/cifar10'

print(os.listdir(data_dir))
classes = os.listdir(data_dir + "/train")
print(classes)

['test', 'train']
['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']


In [7]:
trainset = ImageFolder(data_dir+'/train', transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

testset = ImageFolder(data_dir+'/test', transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

In [8]:
# final
class Encoder(nn.Module):
    '''
    simple encoder with a single hidden dense layer (ReLU activation)
    and linear projections to the diag-Gauss parameters
    '''
    def __init__(self, latent_size=256, nc=3):
        super(Encoder, self).__init__()
        self.latent_size = latent_size
        self.nc = nc

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(16, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(16, out_channels=32, kernel_size=3, stride=2, padding=1)
        self.conv4 = nn.Conv2d(32, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv2d(32, out_channels=64, kernel_size=3, stride=2, padding=1)
        self.conv6 = nn.Conv2d(64, out_channels=64, kernel_size=3, stride=1, padding=1)

        self.enc_linear = nn.Sequential(
                nn.Linear(1024, 256),
                nn.LeakyReLU(),
                nn.Linear(256, 512),
                nn.LeakyReLU())
        
        self.fc_mu = nn.Linear(512, latent_size)
        self.fc_var = nn.Linear(512, latent_size)

    def forward(self, x):
        # block 1
        x = self.conv1(x)
        x = F.leaky_relu(x)
        x = self.conv2(x)
        x = F.leaky_relu(x)

        #block 2
        x = self.conv3(x)
        x = F.leaky_relu(x)
        x = self.conv4(x)
        x = F.leaky_relu(x)

        # block 3
        x = self.conv5(x)
        x = F.leaky_relu(x)
        x = self.conv6(x)
        x = F.leaky_relu(x)
        x = x.view(x.shape[0], -1)
        x = self.enc_linear(x)

        # Split the result into mu and var components
        # of the latent Gaussian distribution
        mu = self.fc_mu(x)
        log_var = self.fc_var(x)

        return mu, log_var


In [9]:
# final
class Decoder(nn.Module):
    '''
    simple decoder: single dense hidden layer (ReLU activation) followed by 
    output layer with a sigmoid to squish values
    '''
    def __init__(self, latent_size=256, nc=3):
        super(Decoder, self).__init__()
        self.latent_size = latent_size
        self.nc = nc
        self.decoder_input = nn.Sequential(
            nn.Linear(latent_size, 256),
            nn.LeakyReLU(),
            nn.Linear(256, 512),
            nn.LeakyReLU(),
        )

        self.activation = nn.Sigmoid()

        self.conv1U = nn.ConvTranspose2d(32, out_channels=64, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.conv2U = nn.ConvTranspose2d(64, out_channels=32, kernel_size=3, stride=1, padding=1, output_padding=0)
        self.conv3U = nn.ConvTranspose2d(32, out_channels=32, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.conv4U = nn.ConvTranspose2d(32, out_channels=16, kernel_size=3, stride=1, padding=1, output_padding=0)
        self.conv5U = nn.ConvTranspose2d(16, out_channels=16, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.conv6U = nn.ConvTranspose2d(16, out_channels=3, kernel_size=3, stride=1, padding=1, output_padding=0)
    
    def forward(self, x):
        x = self.decoder_input(z)
        x = x.view(-1, 32, 4, 4)

        # block 1
        x = self.conv1U(x)
        x = F.leaky_relu(x)
        x = self.conv2U(x)
        x = F.leaky_relu(x)

        # block 2
        x = self.conv3U(x)
        x = F.leaky_relu(x)
        x = self.conv4U(x)
        x = F.leaky_relu(x)

        # block 3
        x = self.conv5U(x)
        x = F.leaky_relu(x)
        x = self.conv6U(x)

        return self.activation(x)



In [10]:
def sample(mu, logvar):
    std = torch.exp(0.5*logvar)  # e^(1/2 * log(std^2))
    eps = torch.randn_like(std)  # random ~ N(0, 1)
    return eps.mul(std).add_(mu)

In [None]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"

In [11]:
list_fw1x_enc = ['C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/encCIFAR10beta1.weights',
                 'C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/encCIFAR10beta16.weights',
                 'C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/encCIFAR10beta64.weights',
                 'C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/encCIFAR10beta256.weights'
                ]

list_fw1x_dec = ['C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/decCIFAR10beta1.weights',
                 'C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/decCIFAR10beta16.weights',
                 'C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/decCIFAR10beta64.weights',
                 'C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/decCIFAR10beta256.weights'
                ]

In [10]:
# build the model and load state
enc = Encoder()
enc.load_state_dict(torch.load('C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x1-width/encCIFAR10beta1_x1.weights', map_location=torch.device('cpu')))

dec = Decoder()
dec.load_state_dict(torch.load('C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x1-width/decCIFAR10beta1_x1.weights', map_location=torch.device('cpu')))

# put model in eval mode
enc = enc.eval() 
dec = dec.eval() 

In [12]:
for i, j in zip(list_fw1x_enc, list_fw1x_dec):
    print(i, j)

C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/encCIFAR10beta1.weights C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/decCIFAR10beta1.weights
C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/encCIFAR10beta16.weights C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/decCIFAR10beta16.weights
C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/encCIFAR10beta64.weights C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/decCIFAR10beta64.weights
C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/encCIFAR10beta256.weights C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/decCIFAR10beta256.weights


In [13]:
beta = 1
loss_for_beta = []
for i, j in zip(list_fw1x_enc, list_fw1x_dec):
    enc = Encoder()
    enc.load_state_dict(torch.load(i, map_location=torch.device('cpu')))

    dec = Decoder()
    dec.load_state_dict(torch.load(j, map_location=torch.device('cpu')))

    # put model in eval mode
    enc = enc.eval() 
    dec = dec.eval() 
    losses = []
    for i, data in enumerate(testloader, 0):
        inputs, _ = data
    #     inputs, _ = inputs.to(device), _.to(device)

        mu, log_sigma2 = enc(inputs)
        z = sample(mu, log_sigma2)
        outputs = dec(z)

        recon = F.binary_cross_entropy(outputs, inputs, reduction='sum') / inputs.shape[0]

        kl_diverge = 0.5 * torch.mean(torch.pow(mu, 2) + torch.pow(log_sigma2, 2) - torch.log(torch.pow(log_sigma2, 2)) - 1)

        loss = recon + beta*kl_diverge 

        # keep track of the loss and update the stats
        losses.append(loss.item())
    loss_for_beta.append(np.mean(losses))

In [14]:
print(loss_for_beta)

[1724.9300258974486, 1730.2648106828522, 1742.8041945831685, 1770.117901379549]


In [16]:
class FiLMBlock(nn.Module):
    def __init__(self, out_channels = None):
        super(FiLMBlock, self).__init__()
        self.out_channels = out_channels
        self.mu = nn.Linear(256, out_channels)
        self.sigma = nn.Linear(256, out_channels)
        self.activation = nn.Sigmoid()

    def forward(self, x):
        mu = self.mu(x)
        sigma = self.sigma(x)
        return self.activation(mu), self.activation(sigma)

In [17]:
class FiLMImplement(nn.Module):
    def __init__(self, window):
        super(FiLMImplement, self).__init__()
        self.window = window

    def broadcast_2d(self, x):
        return torch.broadcast_to(x, (self.window, self.window))

    def forward(self, x, mu, sigma):
        mu_broadcast = torch.stack(tuple(map(self.broadcast_2d, mu.squeeze(0))), dim=0)
        sigma_broadcast = torch.stack(tuple(map(self.broadcast_2d, sigma.squeeze(0))), dim=0)
        x = sigma_broadcast * x + mu_broadcast
        return x

In [18]:
# final
class Encoder1(nn.Module):
    '''
    simple encoder with a single hidden dense layer (ReLU activation)
    and linear projections to the diag-Gauss parameters
    '''
    def __init__(self, latent_size=256, nc=3):
        super(Encoder1, self).__init__()
        self.latent_size = latent_size
        self.nc = nc

        self.film1 = FiLMBlock(16)
        self.film2 = FiLMBlock(32)
        self.film3 = FiLMBlock(64)
        self.filmimp1 = FiLMImplement(16)
        self.filmimp2 = FiLMImplement(8)
        self.filmimp3 = FiLMImplement(4)

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(16, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(16, out_channels=32, kernel_size=3, stride=2, padding=1)
        self.conv4 = nn.Conv2d(32, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv2d(32, out_channels=64, kernel_size=3, stride=2, padding=1)
        self.conv6 = nn.Conv2d(64, out_channels=64, kernel_size=3, stride=1, padding=1)

        self.enc_linear = nn.Sequential(
                nn.Linear(1024, 256),
                nn.LeakyReLU(),
                nn.Linear(256, 512),
                nn.LeakyReLU())
        
        self.fc_mu = nn.Linear(512, latent_size)
        self.fc_var = nn.Linear(512, latent_size)

    def forward(self, x, helo=None):
        # block 1
        x = self.conv1(x)
        x = F.leaky_relu(x)
        x = self.conv2(x)
        mu, sigma = self.film1( helo )
        x = self.filmimp1(x, mu, sigma)
        x = F.leaky_relu(x)

        #block 2
        x = self.conv3(x)
        x = F.leaky_relu(x)
        x = self.conv4(x)
        mu2, sigma2 = self.film2( helo )
        x = self.filmimp2(x, mu2, sigma2)
        x = F.leaky_relu(x)

        # block 3
        x = self.conv5(x)
        x = F.leaky_relu(x)
        x = self.conv6(x)
        mu3, sigma3 = self.film3( helo )
        x = self.filmimp3(x, mu3, sigma3)
        x = F.leaky_relu(x)
        x = x.view(x.shape[0], -1)
        x = self.enc_linear(x)

        # Split the result into mu and var components
        # of the latent Gaussian distribution
        mu = self.fc_mu(x)
        log_var = self.fc_var(x)

        return mu, log_var


In [19]:
# final
class Decoder1(nn.Module):
    '''
    simple decoder: single dense hidden layer (ReLU activation) followed by 
    output layer with a sigmoid to squish values
    '''
    def __init__(self, latent_size=256, nc=3):
        super(Decoder1, self).__init__()
        self.latent_size = latent_size
        self.nc = nc
        self.decoder_input = nn.Sequential(
            nn.Linear(latent_size, 256),
            nn.LeakyReLU(),
            nn.Linear(256, 512),
            nn.LeakyReLU(),
        )

        self.activation = nn.Sigmoid()

        self.film1U = FiLMBlock(32)
        self.film2U = FiLMBlock(16)
        self.film3U = FiLMBlock(3)
        self.filmimp1U = FiLMImplement(8)
        self.filmimp2U = FiLMImplement(16)
        self.filmimp3U = FiLMImplement(32)

        self.conv1U = nn.ConvTranspose2d(32, out_channels=64, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.conv2U = nn.ConvTranspose2d(64, out_channels=32, kernel_size=3, stride=1, padding=1, output_padding=0)
        self.conv3U = nn.ConvTranspose2d(32, out_channels=32, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.conv4U = nn.ConvTranspose2d(32, out_channels=16, kernel_size=3, stride=1, padding=1, output_padding=0)
        self.conv5U = nn.ConvTranspose2d(16, out_channels=16, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.conv6U = nn.ConvTranspose2d(16, out_channels=3, kernel_size=3, stride=1, padding=1, output_padding=0)
    
    def forward(self, x , helo=None):
        x = self.decoder_input(z)
        x = x.view(-1, 32, 4, 4)

        # block 1
        x = self.conv1U(x)
        x = F.leaky_relu(x)
        x = self.conv2U(x)
        mu11, sigma11 = self.film1U( helo )
        x = self.filmimp1U(x, mu11, sigma11)
        x = F.leaky_relu(x)

        # block 2
        x = self.conv3U(x)
        x = F.leaky_relu(x)
        x = self.conv4U(x)
        mu22, sigma22 = self.film2U( helo )
        x = self.filmimp2U(x, mu22, sigma22)
        x = F.leaky_relu(x)

        # block 3
        x = self.conv5U(x)
        x = F.leaky_relu(x)
        x = self.conv6U(x)
        mu33, sigma33 = self.film3U( helo )
        x = self.filmimp3U(x, mu33, sigma33)
        return self.activation(x)



In [20]:
enc1 = Encoder1()
enc1.load_state_dict(torch.load('C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/encCIFAR10yoto.weights', map_location=torch.device('cpu')))

dec1 = Decoder1()
dec1.load_state_dict(torch.load('C:/Users/Hanu/Downloads/DL_cw/CIFAR10-x2-width/decCIFAR10yoto.weights', map_location=torch.device('cpu')))

# put model in eval mode
enc1 = enc1.eval() 
dec1 = dec1.eval() 

In [21]:
beta_vals = [1,16,64,256]

In [24]:
loss_for_beta_yoto = []
for beta in beta_vals:
    print(beta)
    losses = []
    for i, data in enumerate(testloader, 0):
        inputs, _ = data
    #     inputs, _ = inputs.to(device), _.to(device)
        
        beta2 = torch.tensor([float(beta)], requires_grad=False)
        beta2 = torch.broadcast_to(beta2, (1, 256))
        
        mu, log_sigma2 = enc1(inputs, beta2)
        z = sample(mu, log_sigma2)
        outputs = dec1(z, beta2)

        recon = F.binary_cross_entropy(outputs, inputs, reduction='sum') / inputs.shape[0]

        kl_diverge = 0.5 * torch.mean(torch.pow(mu, 2) + torch.pow(log_sigma2, 2) - torch.log(torch.pow(log_sigma2, 2)) - 1)

        loss = recon + beta*kl_diverge 

        # keep track of the loss and update the stats
        losses.append(loss.item())
    loss_for_beta_yoto.append(np.mean(losses))

1
16
64
256


In [25]:
print(loss_for_beta_yoto)

[1773.4991640501385, 1784.2135318804392, 1802.9030421776108, 1833.0603320930577]
