<a href="https://colab.research.google.com/github/VadimFarutin/deep-unsupervised-learning/blob/hw03/hw03_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# HW03

## 2 High-dimensional data

### Imports and common code

In [0]:
%%capture
!pip install graphql-core==2.0
!pip install wandb -q

In [2]:
import wandb
!wandb login

[34m[1mwandb[0m: (1) Create a W&B account
[34m[1mwandb[0m: (2) Use an existing W&B account
[34m[1mwandb[0m: (3) Don't visualize my results
[34m[1mwandb[0m: Enter your choice: 2
[34m[1mwandb[0m: You chose 'Use an existing W&B account'
[34m[1mwandb[0m: You can find your API key in your browser here: https://app.wandb.ai/authorize
[34m[1mwandb[0m: Paste an API key from your profile and hit enter: 8c292ca69e334e8a562d4a4c6570fdd3ad29c825
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[32mSuccessfully logged in to Weights & Biases![0m


In [0]:
from collections import defaultdict
from tqdm import tnrange, tqdm_notebook
from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split
import numpy as np
from matplotlib import pyplot as plt
import matplotlib.gridspec as gridspec
import random
import pickle

import torch
import torch.nn as nn
import torch.nn.utils as utils
from torch.nn.modules import loss
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data.dataset import Dataset
from torch.nn.utils.rnn import pad_sequence
from torch.distributions import Normal, Uniform, MultivariateNormal

In [0]:
SEED = 42
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
random.seed(SEED)
np.random.seed(SEED)

In [0]:
EPS = 1e-9

In [7]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# DEVICE = torch.device('cpu')
print(DEVICE)

cpu


In [8]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
def read_data():
    path = 'drive/My Drive/hw3-q2.pkl'

    with open(path, 'rb') as file:
        dataset = pickle.load(file)
    
    return dataset['train'].transpose(0, 3, 1, 2), dataset['valid'].transpose(0, 3, 1, 2), dataset['test'].transpose(0, 3, 1, 2)

In [19]:
train, val, test = read_data()
print(train.shape, val.shape, test.shape)

(65931, 3, 32, 32) (7326, 3, 32, 32) (26032, 3, 32, 32)


In [0]:
def MyVAELosses(x, mu_x, var_x, mu_z, var_z):
    nll = -torch.sum(-0.5 * (torch.log(2 * np.pi * var_x) + torch.pow(x - mu_x, 2) / var_x), dim=1, keepdim=True)
    kl_div = -0.5 * torch.sum(1.0 + torch.log(var_z) - torch.pow(mu_z, 2) - var_z, dim=1, keepdim=True)

    return torch.mean(kl_div + nll), torch.mean(kl_div), torch.mean(nll) / 2.0

In [0]:
def fit(model, train, val, test, optimizer, loss_function, epoch_cnt, batch_size):
    train_loader = torch.utils.data.DataLoader(torch.from_numpy(train), batch_size=batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(torch.from_numpy(val), batch_size=batch_size)
    test_loader = torch.utils.data.DataLoader(torch.from_numpy(test), batch_size=batch_size)
    train_loss_values = []
    val_loss_values = []
            
    for epoch in tnrange(epoch_cnt, desc='Epoch'):
        model.train()
        for batch_data in train_loader:
            x = batch_data.float().to(DEVICE)
            optimizer.zero_grad()
            mu_x, var_x, mu_z, var_z = model(x)
            loss, kl_div, nll = loss_function(x, mu_x, var_x, mu_z, var_z)
            loss.backward()
            optimizer.step()
            train_loss_values.append([loss.item(), kl_div.item(), nll.item()])

        loss_values = []
        model.eval()
        for batch_data in val_loader:
            x = batch_data.float().to(DEVICE)
            mu_x, var_x, mu_z, var_z = model(x)
            loss, kl_div, nll = loss_function(x, mu_x, var_x, mu_z, var_z)
            loss_values.append([loss.item(), kl_div.item(), nll.item()])
        val_loss_values.append(np.mean(np.array(loss_values), axis=0))
    
    loss_values = []
    model.eval()
    for batch_data in test_loader:
        x = batch_data.float().to(DEVICE)
        mu_x, var_x, mu_z, var_z = model(x)
        loss, kl_div, nll = loss_function(x, mu_x, var_x, mu_z, var_z)
        loss_values.append([loss.item(), kl_div.item(), nll.item()])

    print("Test set loss: ", np.mean(np.array(loss_values), axis=0))

    return np.array(train_loss_values), np.array(val_loss_values) 


In [0]:
def plot_loss_values(train_loss_values, val_loss_values, ax, title):
    ax.plot(np.arange(len(train_loss_values)), train_loss_values, color='blue', label='train')
    ax.plot(np.arange(0, len(train_loss_values), len(train_loss_values) / epochs), val_loss_values, color='red', label='validation')
    ax.legend()
    ax.set_title(title)
    ax.set_xlabel("iteration")
    ax.set_ylabel(title)

def plot_samples(decoder, n, ax):
    z = torch.randn([n, 2]).to(DEVICE)
    mu_x, var_x = decoder(z)
    x = mu_x + torch.randn([n, 2]).to(DEVICE) * torch.sqrt(var_x)

    x = x.cpu().data.numpy()
    mu_x = mu_x.cpu().data.numpy()
    
    ax.scatter(x[:, 0], x[:, 1], marker='.', label='Full generation path')
    ax.scatter(mu_x[:, 0], mu_x[:, 1], marker='.', label='Without decoder noise')

def plot_losses_and_samples(train_loss_values, val_loss_values, decoder, n):
    fig = plt.figure(figsize=(7, 7))
    gs = gridspec.GridSpec(2, 2)

    plot_loss_values(train_loss_values[:, 0], val_loss_values[:, 0], plt.subplot(gs[0]), "Loss")
    plot_loss_values(train_loss_values[:, 1], val_loss_values[:, 1], plt.subplot(gs[1]), "KL")
    plot_loss_values(train_loss_values[:, 2], val_loss_values[:, 2], plt.subplot(gs[2]), "Decoder")
    plot_samples(decoder, n, plt.subplot(gs[3]))


## 2.1 Part A

In [0]:
class GatedShortcutConnection(nn.Module):
    def __init__(self, hidden_size):
        super(GatedShortcutConnection, self).__init__()
                 
        self.conv1 = nn.Conv2d(hidden_size, hidden_size, kernel_size=1)
        self.conv2 = nn.Conv2d(hidden_size, hidden_size, kernel_size=1)
        self.sigmoid = nn.Sigmoid()
                 
    def forward(self, x):
        return self.conv1(x) * self.sigmoid(self.conv2(x))

In [0]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, hidden_size):
        super(ResidualBlock, self).__init__()

        self.layers = nn.Sequential(nn.ReLU(),
                                    nn.Conv2d(in_channels, hidden_size, kernel_size=3, padding=1),
                                    nn.ReLU(),
                                    nn.Conv2d(hidden_size, hidden_size * 2, kernel_size=3, padding=1),
                                    GatedShortcutConnection(hidden_size * 2))
                 
    def forward(self, x):
        return self.layers(x)

In [0]:
class ResidualStack(nn.Module):
    def __init__(self, in_channels, hidden_size):
        super(ResidualStack, self).__init__()

        self.layers = nn.Sequential(ResidualBlock(in_channels, hidden_size),
                                    ResidualBlock(hidden_size * 2, hidden_size),
                                    ResidualBlock(hidden_size * 2, hidden_size),
                                    ResidualBlock(hidden_size * 2, hidden_size),
                                    ResidualBlock(hidden_size * 2, hidden_size),
                                    nn.ReLU())
                 
    def forward(self, x):
        return self.layers(x)

In [0]:
class Encoder(nn.Module):
    def __init__(self, in_channels, hidden_size):
        super(Encoder, self).__init__()
        
        self.hidden_size = hidden_size
        
        self.layers = nn.Sequential(nn.Conv2d(in_channels, hidden_size, kernel_size=4, stride=2, padding=1),
                                    nn.ReLU(),
                                    nn.Conv2d(hidden_size, hidden_size * 2, kernel_size=4, stride=2, padding=1),
                                    nn.ReLU(),
                                    nn.Conv2d(hidden_size * 2, hidden_size * 4, kernel_size=3, padding=1),
                                    nn.ReLU(),
                                    ResidualStack(hidden_size * 4, hidden_size))
                 
    def forward(self, x):
        out = self.layers(x)
        out = out.reshape(-1, 2, 2 * self.hidden_size, 8, 8)
        mu, log_var = out[:, 0], out[:, 1]

        return mu, torch.exp(log_var)

In [0]:
class Decoder(nn.Module):
    def __init__(self, in_channels, hidden_size, out_channels):
        super(Decoder, self).__init__()
        
        self.out_channels = out_channels
        
        self.layers = nn.Sequential(nn.Conv2d(in_channels, hidden_size * 4, kernel_size=3, padding=1),
                                    ResidualStack(hidden_size * 4, hidden_size),
                                    nn.ConvTranspose2d(hidden_size * 2, hidden_size * 2, kernel_size=4, stride=2, padding=1),
                                    nn.ReLU(),
                                    nn.ConvTranspose2d(hidden_size * 2, out_channels * 2, kernel_size=4, stride=2, padding=1))
                 
    def forward(self, x):
        out = self.layers(x)
        out = out.reshape(-1, 2, self.out_channels, 32, 32)
        mu, log_var = out[:, 0], out[:, 1]

        return mu, torch.exp(log_var)

In [0]:
class VAE(nn.Module):
    def __init__(self, encoder, decoder):
        super(VAE, self).__init__()
       
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, x):
        mu_z, var_z = self.encoder(x)
        z = mu_z + torch.randn(x.shape).to(DEVICE) * torch.sqrt(var_z)
        mu_x, var_x = self.decoder(z)

        return mu_x, var_x, mu_z, var_z

In [0]:
in_channels = 3
out_channels = 3
hidden_size = 128
lr = 1e-3
batch_size = 128
epochs = 10
n_samples = 10000

In [0]:
encoder = Encoder(in_channels=in_channels, hidden_size=hidden_size)
decoder = Decoder(in_channels=in_channels, hidden_size=hidden_size, out_channels=out_channels)
model = VAE(encoder, decoder).float().to(DEVICE)

loss_function = MyVAELosses
optimizer = optim.Adam(encoder.parameters(), lr=lr)

train_loss_values, val_loss_values =\
    fit(model, train, val, test, optimizer, loss_function, epochs, batch_size)

HBox(children=(IntProgress(value=0, description='Epoch', max=10, style=ProgressStyle(description_width='initia…


Test set loss:  [4.44404249e+00 1.54236319e-03 2.22125006e+00]


In [0]:
plot_losses_and_samples(train_loss_values, val_loss_values, decoder, n_samples)

## 2.2 Part B