In [1]:
#from google.colab import drive
#drive.mount('/content/drive')

In [2]:
#!unzip -n drive/MyDrive/ssne/trafic_32.zip

In [3]:
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import numpy as np
import random

from matplotlib import pyplot as plt
from inception import InceptionV3
from scipy import linalg

In [4]:
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print("Device:", device)

Device: cuda:0


In [5]:
seed = 21
torch.cuda.manual_seed_all(seed)
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

In [6]:
transform = transforms.Compose([transforms.ToTensor()])

batch_size = 64
train_val_ratio = 0.8

dataset = datasets.ImageFolder("trafic_32", transform=transform)

classes = dataset.classes

train_size = int(train_val_ratio * len(dataset))
val_size = len(dataset) - train_size
train_set, val_set = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=1, pin_memory=True)
val_loader = torch.utils.data.DataLoader(val_set, batch_size=val_size, shuffle=False)

In [7]:
def get_train_images(num):
    return torch.stack([val_set[i][0] for i in range(10,10+num)], dim=0)

Could not connect to 127.0.0.1: 60052
Traceback (most recent call last):
  File "C:\Users\kubar\AppData\Local\JetBrains\Toolbox\apps\apps\DataSpell\ch-0\213.5744.251\plugins\python-ce\helpers\pydev\_pydevd_bundle\pydevd_comm.py", line 458, in start_client
    s.connect((host, port))
ConnectionRefusedError: [WinError 10061] Nie można nawiązać połączenia, ponieważ komputer docelowy aktywnie go odmawia
Traceback (most recent call last):
  File "C:\Users\kubar\AppData\Local\JetBrains\Toolbox\apps\apps\DataSpell\ch-0\213.5744.251\plugins\python-ce\helpers-pro\jupyter_debug\pydev_jupyter_utils.py", line 81, in attach_to_debugger
    debugger.connect(pydev_localhost.get_localhost(), debugger_port)
  File "C:\Users\kubar\AppData\Local\JetBrains\Toolbox\apps\apps\DataSpell\ch-0\213.5744.251\plugins\python-ce\helpers\pydev\pydevd.py", line 655, in connect
    s = start_client(host, port)
  File "C:\Users\kubar\AppData\Local\JetBrains\Toolbox\apps\apps\DataSpell\ch-0\213.5744.251\plugins\python-c

In [8]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super(Encoder, self).__init__()

        self.fc_1 = nn.Linear(input_dim, hidden_dim)
        self.fc_2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc_mean  = nn.Linear(hidden_dim, latent_dim)
        self.fc_var   = nn.Linear (hidden_dim, latent_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
        self.training = True
        
    def forward(self, x):
        x = torch.flatten(x, 1)
        x = self.LeakyReLU(self.fc_1(x))
        x = self.LeakyReLU(self.fc_2(x))
        mean = self.fc_mean(x)
        log_var = self.fc_var(x)        
        return mean, log_var

In [9]:
img_shape = (3, 32, 32)
class Decoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, output_dim):
        super(Decoder, self).__init__()
        self.fc_1 = nn.Linear(latent_dim, hidden_dim)
        self.fc_2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc_3 = nn.Linear(hidden_dim, output_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
    def forward(self, x):
        h = self.LeakyReLU(self.fc_1(x))
        h = self.LeakyReLU(self.fc_2(h))
        
        x_hat = torch.sigmoid(self.fc_3(h))
        x_hat = x_hat.view(x_hat.size(0), *img_shape)
        return x_hat

In [10]:
class VAE(nn.Module):
    def __init__(self, x_dim, hidden_dim, latent_dim):
        super(VAE, self).__init__()
        self.latent_dim = latent_dim
        self.encoder = Encoder(input_dim=x_dim, hidden_dim=hidden_dim, latent_dim=latent_dim)
        self.decoder = Decoder(latent_dim=latent_dim, hidden_dim = hidden_dim, output_dim = x_dim)

    def reparameterization(self, mean, var):
        z = torch.randn_like(mean) * var + mean
        return z
                 
    def forward(self, x):
        mean, log_var = self.encoder(x)
        z = self.reparameterization(mean, torch.exp(0.5 * log_var)) # takes exponential function (log var -> var)
        x_hat = self.decoder(z)
        return x_hat, mean, log_var

In [11]:
vae = VAE(latent_dim=32, hidden_dim=1024, x_dim=3072).to(device)

optimizer = optim.Adam(vae.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=0.99)
def vae_loss_function(x, x_hat, mean, log_var):
    reproduction_loss = nn.functional.mse_loss(x_hat, x, reduction='sum')
    KLD = -0.5 * torch.sum(1+ log_var - mean.pow(2) - log_var.exp())
    return reproduction_loss + KLD

In [12]:
def generate_images(model, n_imgs, device, show=False):
    model.eval()
    with torch.no_grad():
        generated_imgs = model.decoder(torch.randn([n_imgs, model.latent_dim]).to(device))
    generated_imgs = generated_imgs.cpu()
    if n_imgs <= 16 and show:
        grid = torchvision.utils.make_grid(generated_imgs, nrow=4, normalize=False, range=(-1,1))
        grid = grid.permute(1, 2, 0)
        plt.figure(figsize=(15,10))
        plt.title(f"Generations")
        plt.imshow(grid)
        plt.axis('off')
        plt.show()
    return generated_imgs

In [13]:
def calculate_frechet_distance(distribution_1, distribution_2, eps=1e-6):
    mu1 = np.mean(distribution_1, axis=0)
    sigma1 = np.cov(distribution_1, rowvar=False)

    mu2 = np.mean(distribution_2, axis=0)
    sigma2 = np.cov(distribution_2, rowvar=False)

    mu1 = np.atleast_1d(mu1)
    mu2 = np.atleast_1d(mu2)

    sigma1 = np.atleast_2d(sigma1)
    sigma2 = np.atleast_2d(sigma2)

    assert mu1.shape == mu2.shape
    assert sigma1.shape == sigma2.shape

    diff = mu1 - mu2

    # Product might be almost singular
    covmean, _ = linalg.sqrtm(sigma1.dot(sigma2), disp=False)
    if not np.isfinite(covmean).all():
        msg = ('fid calculation produces singular product; '
               'adding %s to diagonal of cov estimates') % eps
        print(msg)
        offset = np.eye(sigma1.shape[0]) * eps
        covmean = linalg.sqrtm((sigma1 + offset).dot(sigma2 + offset))

    # Numerical error might give slight imaginary component
    if np.iscomplexobj(covmean):
        if not np.allclose(np.diagonal(covmean).imag, 0, atol=1e-3):
            m = np.max(np.abs(covmean.imag))
            raise ValueError('Imaginary component {}'.format(m))
        covmean = covmean.real

    tr_covmean = np.trace(covmean)

    return diff.dot(diff) + np.trace(sigma1) + np.trace(sigma2) - 2 * tr_covmean

In [14]:
def count_fid(generated_imgs, input_imgs):
    model = InceptionV3([InceptionV3.BLOCK_INDEX_BY_DIM[2048]])
    generated_dist = model(generated_imgs)[0] #Zakładam że wygenerowane obrazki są już zdenormalizowane
    orig_dist = model(input_imgs/ 2 + 0.5)[0] #Proszę pamiętać o denormalizacji obrazków treningowych
    fid = calculate_frechet_distance(orig_dist.squeeze().numpy(), generated_dist.squeeze().numpy()) #Funkcja calculate_frechet_distance jest w notebooku z poprzednich zajęć
    return fid

In [18]:
a = [val_set[i][0] for i in range(len(val_set))]

KeyboardInterrupt: 

In [None]:
num_epochs = 20
for n in range(num_epochs+1):
    losses_epoch = []
    for x, _ in iter(train_loader):
        x = x.to(device)
        out, means, log_var = vae(x)
        loss = vae_loss_function(x, out, means, log_var)
        losses_epoch.append(loss.item())
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    L1_list = []
    for x, _ in iter(val_loader):
        x  = x.to(device)
        out, _, _ = vae(x)
        L1_list.append(torch.mean(torch.abs(out-x)).item())
    if n % 5 == 0:
        generated_imgs = generate_images(vae, 32, device)
        input_imgs = get_train_images(32)
        fid = count_fid(generated_imgs, input_imgs)
        print(f"Epoch {n} loss {np.mean(np.array(losses_epoch))}, test L1 = {np.mean(L1_list)}, fid = {fid}")
    else:
        print(f"Epoch {n} loss {np.mean(np.array(losses_epoch))}, test L1 = {np.mean(L1_list)}")

Loading cached inception model for validation


ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



In [None]:
def visualize_reconstructions(model, input_imgs, device):
    # Reconstruct images
    model.eval()
    with torch.no_grad():
        reconst_imgs, means, log_var = model(input_imgs.to(device))
    reconst_imgs = reconst_imgs.cpu()

    # Plotting
    imgs = torch.stack([input_imgs, reconst_imgs], dim=1).flatten(0,1)
    grid = torchvision.utils.make_grid(imgs, nrow=4, normalize=False, range=(-1,1))
    grid = grid.permute(1, 2, 0)
    if len(input_imgs) == 4:
        plt.figure(figsize=(10,10))
    else:
        plt.figure(figsize=(15,10))
    plt.title(f"Reconstructions")
    plt.imshow(grid)
    plt.axis('off')
    plt.show()

In [None]:
#visualize_reconstructions(vae, input_imgs, device)

In [None]:
generated_imgs = generate_images(vae, 1000, device)
torch.save(generated_imgs.detach(),"piatek_Pak_Robaczewski.pt")