In [116]:
import torch
import torchvision
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import pandas as pd
import numpy as np
import torch.nn.functional as F
from torch.utils.data import random_split
import torch.optim as optim

import matplotlib.pyplot as plt
%matplotlib inline
from IPython.display import set_matplotlib_formats
set_matplotlib_formats('svg', 'pdf') # For export
from matplotlib.colors import to_rgb
import matplotlib
matplotlib.rcParams['lines.linewidth'] = 2.0
import seaborn as sns
sns.reset_orig()
sns.set()

  set_matplotlib_formats('svg', 'pdf') # For export


In [117]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [118]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)
    torch.cuda.manual_seed_all(42)
print("Device:", device)

Device: cuda


In [119]:
# importing the zipfile module
from zipfile import ZipFile

# loading the temp.zip and creating a zip object
with ZipFile("/content/drive/MyDrive/trafic_32.zip", 'r') as zObject:

    # Extracting specific file in the zip
    # into a specific location.
    zObject.extractall(path="/content")
zObject.close()

In [120]:
transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize((0.5,),(0.5,))])

data_set = datasets.ImageFolder("/content/trafic_32", transform = transform)

In [121]:
data_set[0][0].size()

torch.Size([3, 32, 32])

In [122]:
dataset_size = len(data_set)

train_size = int(0.8 * dataset_size)
test_size = dataset_size - train_size

train_set, test_set = random_split(data_set, [train_size, test_size])

batch_size=256
trainloader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=8)
testloader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=8)

In [123]:
class Encoder(nn.Module):
  def __init__(self, input_dim, hidden_dim, latent_dim):
    super().__init__()
    self.conv1 = nn.Conv2d(input_dim, hidden_dim, kernel_size=5, stride=2, padding=1)
    self.conv2 = nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1)
    self.conv3 = nn.Conv2d(hidden_dim, 2*hidden_dim, kernel_size=5, stride=2, padding=1)
    self.conv4 = nn.Conv2d(2*hidden_dim,2*hidden_dim, kernel_size=3, stride=1, padding=1)
    self.conv5 = nn.Conv2d(2*hidden_dim, 2*hidden_dim, kernel_size=3, stride=2, padding=1)
    self.fc_mean  = nn.Linear(16*2*hidden_dim, latent_dim)
    self.fc_var   = nn.Linear (16*2*hidden_dim, latent_dim)
    self.LeakyReLU = nn.LeakyReLU(0.2)
    self.training = True

  def forward(self, x):
    x = F.relu(self.conv1(x))
    print(x.size())
    x = F.relu(self.conv2(x))
    print(x.size())
    x = F.relu(self.conv3(x))
    print(x.size())
    x = F.relu(self.conv4(x))
    print(x.size())
    x = F.relu(self.conv5(x))
    print(x.size())
    x = torch.flatten(x, 1) # flatten all dimensions except batch
    print(x.size())
    mean     = self.fc_mean(x)
    log_var  = self.fc_var(x)
    return mean, log_var


In [124]:
class Decoder(nn.Module):
  def __init__(self, output_dim, hidden_dim, latent_dim):
    super().__init__()
    self.convT1 = nn.ConvTranspose2d(latent_dim, 2*hidden_dim, kernel_size=3, output_padding=1, padding=1, stride=2)
    self.conv1 = nn.Conv2d(2*hidden_dim, 2*hidden_dim, kernel_size=3, stride=1, padding=1)
    self.convT2 = nn.ConvTranspose2d(2*hidden_dim, hidden_dim, kernel_size=5, output_padding=1, padding=1, stride=2)
    self.conv2 = nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1)
    self.convT3 = nn.ConvTranspose2d(hidden_dim, output_dim, kernel_size=5, output_padding=1, padding=1, stride=2)

  def forward(self, x):
    print(x.size())
    x = F.relu(self.convT1(x))
    print(x.size())
    x = F.relu(self.conv1(x))
    print(x.size())
    x = F.relu(self.convT2(x))
    print(x.size())
    x = F.relu(self.conv2(x))
    print(x.size())
    x = F.tanh(self.convT3(x))
    print(x.size())
    return x

In [125]:
class VAE(nn.Module):
    def __init__(self, x_dim, hidden_dim, latent_dim):
        super(VAE, self).__init__()
        self.latent_dim = latent_dim
        self.encoder = Encoder(input_dim=x_dim, hidden_dim=hidden_dim, latent_dim=latent_dim)
        self.decoder = Decoder(latent_dim=latent_dim, hidden_dim = hidden_dim, output_dim = x_dim)


    def reparameterization(self, mean, var):
        z = mean
        z = torch.randn_like(mean) * var + mean
        return z


    def forward(self, x):
        mean, log_var = self.encoder(x)
        z = self.reparameterization(mean, torch.exp(0.5 * log_var)) # takes exponential function (log var -> var)
        print("z size: ", z.size())
        x_hat = self.decoder(z)
        return x_hat, mean, log_var

In [126]:
def vae_loss_function(x, x_hat, mean, log_var):
    reproduction_loss = nn.functional.mse_loss(x_hat, x, reduction='sum')
    KLD      = -0.5 * torch.sum(1+ log_var - mean.pow(2) - log_var.exp()) # regularyzacja, wymusza że wszystkie gausiki sumują się do jednego gaussa ze średnią w zerze i std 1

    return reproduction_loss + KLD

In [127]:
vae = VAE(latent_dim=16, hidden_dim=64, x_dim=3)

In [128]:
optimizer = optim.Adam(vae.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=0.99)

In [129]:
num_epochs = 30
for n in range(num_epochs):
    losses_epoch = []
    for x, _ in iter(trainloader):
        x = x
        out, means, log_var = vae(x)
        loss = vae_loss_function(x, out, means, log_var)
        losses_epoch.append(loss.item())
        loss.backward()               # backward pass (compute parameter updates)
        optimizer.step()              # make the updates for each parameter
        optimizer.zero_grad()
    L1_list = []
#     if n % 10 == 0:
    for x, _ in iter(testloader):
        x  = x
        out, _, _ = vae(x)
        L1_list.append(torch.mean(torch.abs(out-x)).item())
    print(f"Epoch {n} loss {np.mean(np.array(losses_epoch))}, test L1 = {np.mean(L1_list)}")
    scheduler.step()

  self.pid = os.fork()


torch.Size([256, 64, 15, 15])
torch.Size([256, 64, 15, 15])
torch.Size([256, 128, 7, 7])
torch.Size([256, 128, 7, 7])
torch.Size([256, 128, 4, 4])
torch.Size([256, 2048])
z size:  torch.Size([256, 16])
torch.Size([256, 16])


RuntimeError: Expected 3D (unbatched) or 4D (batched) input to conv_transpose2d, but got input of size: [256, 16]

In [None]:
def generate_images(model, n_imgs, device):
    # Generate images
    model.eval()
    with torch.no_grad():
        generated_imgs = model.decoder(torch.randn([n_imgs, model.latent_dim]))
    generated_imgs = generated_imgs.cpu()

    grid = torchvision.utils.make_grid(generated_imgs, nrow=4, normalize=False)
    grid = grid.permute(1, 2, 0)
    plt.figure(figsize=(15,10))
    plt.title(f"Generations")
    plt.imshow(grid)
    plt.axis('off')
    plt.show()

In [None]:
generate_images(vae, 16 , device)

In [None]:
# num_epochs = 30
# net = Encoder(3,64,32).to(device)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(net.parameters(), lr=0.0005)


# for epoch in range(num_epochs):  # loop over the dataset multiple times

#     running_loss = 0.0
#     for i, data in enumerate(trainloader, 0):
#         # get the inputs; data is a list of [inputs, labels]
#         inputs, labels = data
#         inputs, labels = inputs.to(device), labels.to(device)

#         # zero the parameter gradients
#         optimizer.zero_grad()

#         # forward + backward + optimize
#         outputs = net(inputs)
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()

#         # print statistics
#         running_loss += loss.item()

#     print('[%d/%d] loss: %.3f' %
#           (epoch+1 ,  num_epochs, running_loss / 2000))
#     running_loss = 0.0

# print('Finished Training')