<a href="https://colab.research.google.com/github/Igor-Tukh/deep-unsupervised-learning-hse/blob/hw03/hw03_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Deep Unsupervised Learning course, HSE, fall-winter 2019**
## **HW 03.2**
#### **Student: Igor Tukh**

In [0]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import os
import pickle

In [0]:
from sklearn.model_selection import train_test_split
from tqdm import tqdm_notebook as tqdm

In [3]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
RESOURCES_PATH = 'gdrive/My Drive/Colab Notebooks/resources'
PKL_PATH = os.path.join(RESOURCES_PATH, 'hw3-q2.pkl')

In [0]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [15]:
print(device)

cuda:0


In [16]:
with open(PKL_PATH, 'rb') as file:
  data = pickle.load(file)

print(f'Data keys: {data.keys()}')

train = data['train']
test = data['test']
valid = data['valid']

print(f'Train shape: {train.shape}, valid shape: {valid.shape}, test shape: {test.shape}')

data_train = torch.from_numpy(train)
data_test = torch.from_numpy(test)
data_valid = torch.from_numpy(valid)

Data keys: dict_keys(['train', 'valid', 'test'])
Train shape: (65931, 32, 32, 3), valid shape: (7326, 32, 32, 3), test shape: (26032, 32, 32, 3)


**a)**

In [0]:
class GatedShortcutConnection(nn.Module):
  def __init__(self):
    super(GatedShortcutConnection, self).__init__()

    self.sigmoid = nn.Sigmoid()
    self.conv1 = nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1))
    self.conv2 = nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1))
    
  def forward(self, x):
    return self.conv1(x) * self.sigmoid(self.conv2(x))

class ResidualStack(nn.Module):
  def __init__(self):
    super(ResidualStack, self).__init__()
    layers = [nn.ReLU(), 
              nn.Conv2d(256, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
              nn.ReLU(),
              nn.Conv2d(64, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
              GatedShortcutConnection()]

    all_layers = nn.ModuleList()
    for _ in range(5):
      all_layers.extend(layers)
    all_layers.append(nn.ReLU())

    self.layers = nn.Sequential(*all_layers)

  def forward(self, x):
    return self.layers(x)

class Encoder(nn.Module):
  def __init__(self):
    super(Encoder, self).__init__()

    self.layers = nn.Sequential(nn.Conv2d(3, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1)),
                                nn.ReLU(),
                                nn.Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1)),
                                nn.ReLU(),
                                nn.Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                ResidualStack())

  def forward(self, x):
    return self.layers(x)

class Decoder(nn.Module):
  def __init__(self):
    super(Decoder, self).__init__()

    self.layers = nn.Sequential(nn.Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                ResidualStack(),
                                nn.ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1)),
                                nn.ReLU(),
                                nn.ConvTranspose2d(128, 6, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1)))

  def forward(self, x):
    return self.layers(x)


class VAE(nn.Module):
  def __init__(self):
    super(VAE, self).__init__()
    self.encoder = Encoder()
    self.decoder = Decoder()

  def encode(self, x):
    x = self.encoder(x)
    mu, sigma = torch.chunk(x, 2, dim=1)
    return mu, sigma

  def decode(self, x):
    x = self.decoder(x)
    mu, sigma = torch.chunk(x, 2, dim=1)
    return mu, sigma

  def sample(self, mu, sigma):
    return mu.to(device) + torch.randn(size=mu.shape).to(device) * torch.sqrt(sigma).to(device)

  def forward(self, x):
    mu, sigma = self.encode(x)
    z = self.sample(mu, sigma)
    mu_x, sigma_x = self.decode(z)
    return mu, sigma, mu_x, sigma_x

In [0]:
loss = nn.MSELoss()

def get_loss(x, output):
  mu, sigma, mu_x, sigma_x = output
  pl = -0.5 * (torch.log(np.pi * 2 * sigma_x) + torch.pow(x - mu_x, 2) / sigma_x)
  # pl = -pl.sum(dim=1)
  pl = pl.mean()
  # pl = loss(output, x)
  kl = -0.5 * (1. + torch.log(sigma) - mu**2 - sigma)
  kl = kl.sum(dim=1)
  kl = kl.mean()
  return pl, kl

In [0]:
def plot_losses(train_losses, val_losses, title='Losses'):
  plt.title(title)
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  epoches = range(0, len(train_losses))
  plt.plot(epoches, train_losses, label='train loss')
  plt.plot(epoches, val_losses, label='val loss')
  plt.legend()
  plt.show()

In [0]:
def train(model, train_batches, val_batches, lossf=get_loss, epoches_number=20, lr=1e-2, batch_size=100, epoch_print=1):
  optimizer = torch.optim.Adam(model.parameters(), lr=lr)

  train_losses = []
  val_losses = []

  for epoch in tqdm(range(epoches_number)):
    losses = []
    for batch in train_batches:
      batch = batch.to(device)
      optimizer.zero_grad()
      output = model(batch)
      pl, kl = lossf(batch, output)
      # print(pl, kl)
      loss = pl + kl
      loss.backward()
      optimizer.step()
      losses.append([pl.detach().cpu().numpy(), kl.detach().cpu().numpy()])
    losses = np.array(losses)
    train_losses.append([losses[:,0].mean(), losses[:,1].mean()])

    with torch.no_grad():
      losses = []
      for batch in val_batches:
        batch = batch.to(device)
        output = model(batch)
        pl, kl = lossf(batch, output)
        loss = pl + kl
        losses.append((pl.detach().cpu().numpy(), kl.detach().cpu().numpy()))
    losses = np.array(losses)
    val_losses.append([losses[:,0].mean(), losses[:,1].mean()])
    
    if epoch % epoch_print == 0 or epoch == epoches_number - 1:
      train_loss = np.mean(train_losses[-1])
      val_loss = np.mean(val_losses[-1])
      print(f'Epoch: {epoch}, train loss: {train_loss}, val loss: {val_loss}')
    
  train_losses = np.array(train_losses)
  val_losses = np.array(val_losses)

  plot_losses(train_losses[:,0], val_losses[:, 0], title='Prob losses')
  plot_losses(train_losses[:,1], val_losses[:, 1], title='KL losses')
  plot_losses(train_losses.sum(axis=1), val_losses.sum(axis=1), title='Losses')

In [0]:
batch_size = 200

dt = 1. * data_train.permute(0, 3, 1, 2) / 255
dv = 1. * data_valid.permute(0, 3, 1, 2) / 255

train_batches = torch.utils.data.DataLoader(dt, batch_size=batch_size)
validate_batches = torch.utils.data.DataLoader(dv, batch_size=batch_size)

In [54]:
model = VAE().to(device)
train(model, train_batches, validate_batches)

HBox(children=(IntProgress(value=0, max=20), HTML(value='')))

Epoch: 0, train loss: nan, val loss: nan


KeyboardInterrupt: ignored