In [1]:
import os
import torch
import pandas as pd
from torch import nn 

Set working directory, so we don't download the same dataset twice

In [2]:
working_dir = os.getcwd()
working_dir

'c:\\Users\\Studen\\Documents\\vscode_projects\\kaggleCompetitions\\Digit_Recognizer\\VAE'

In [15]:
new_working_dir = os.path.dirname(working_dir)
os.chdir(os.path.join(new_working_dir, "scripts"))
os.getcwd()

'c:\\Users\\Studen\\Documents\\vscode_projects\\kaggleCompetitions\\Digit_Recognizer\\scripts'

Load train set

In [16]:
df_train = pd.read_csv("C:/Users/Studen/Documents/vscode_projects/kaggleCompetitions/Digit_Recognizer/Dataset/train.csv")

In [65]:
print(df_train.shape)
df_train.head(5)

(42000, 785)


Unnamed: 0,label,pixel0,pixel1,pixel2,pixel3,pixel4,pixel5,pixel6,pixel7,pixel8,...,pixel774,pixel775,pixel776,pixel777,pixel778,pixel779,pixel780,pixel781,pixel782,pixel783
0,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,4,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


Model

In [21]:
# Input img -> Hidden dim -> mean, std -> Paramertication Trick -> Decoder -> Output img
class VariationalAutoEncoder(nn.Module):
    def __init__(self, input_dim, h_dim=200, z_dim=20) -> None:
        super().__init__()
        # Encoder
        self.img_2hid = nn.Linear(input_dim, h_dim)
        self.hid_2mu = nn.Linear(h_dim, z_dim)
        self.hid_2sigma = nn.Linear(h_dim, z_dim)

        # Decoder
        self.z_2hid = nn.Linear(z_dim, h_dim)
        self.hid_2img = nn.Linear(h_dim, input_dim)

        # Activation
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        # self.tanh = nn.Tanh()

    def encode(self, x):
        # q_phi(z|x)
        h = self.relu(self.img_2hid(x))
        mu = self.hid_2mu(h)
        sigma = self.hid_2sigma(h)
        return mu, sigma

    def decode(self, z):
        # p_theta(x|z)
        h = self.relu(self.z_2hid(z))
        img = self.sigmoid(self.hid_2img(h))
        return img

    def forward(self, x):
        mu, sigma = self.encode(x)
        epsilon = torch.randn_like(sigma)
        z_reparametrized = mu + sigma*epsilon
        x_reconstructed = self.decode(z_reparametrized)
        return x_reconstructed, mu, sigma

Check the shape

In [22]:
x = torch.randn(4, 28*28)
vae = VariationalAutoEncoder(input_dim=784)
x_reconstructed, mu, sigma = vae(x)
print(x_reconstructed.shape)
print(mu.shape)
print(sigma.shape)

torch.Size([4, 784])
torch.Size([4, 20])
torch.Size([4, 20])


Training

In [23]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision.utils import save_image
from torchvision import datasets, transforms
from tqdm import tqdm
from pathlib import Path
from scripts.globals import *
from scripts.dataset_loader import *

In [24]:
# Configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[INFO] Current working device: {device}")
INPUT_DIM = 784
H_DIM = 200
Z_DIM = 20
NUM_EPOCH = 30 # 100 
BATCH_SIZE = 64
LR_RATE = 3e-4 # Karpathy constant

[INFO] Current working device: cuda


In [26]:
dataset = DigitDataset(data_path="C:/Users/Studen/Documents/vscode_projects/kaggleCompetitions/Digit_Recognizer/Dataset/train.csv", type="train")
train_loader = DataLoader(dataset=dataset, batch_size=BATCH_SIZE, shuffle=True)

In [27]:
dataiter = iter(train_loader)
images, labels = dataiter.next()
print(f"[INFO] Minimum data value: {torch.min(images)}, Maximum data value: {torch.max(images)}")

[INFO] Minimum data value: 0.0, Maximum data value: 1.0


In [28]:
# Model params
model = VariationalAutoEncoder(INPUT_DIM, H_DIM, Z_DIM).to(device)
optimizer = optim.Adam(model.parameters(), lr=LR_RATE)
loss_fn = nn.BCELoss(reduction="sum")

Traning loop

In [29]:
best_loss = None
for epoch in range(NUM_EPOCH):
    loop = tqdm(enumerate(train_loader))
    for i, (x, _) in loop:
        # Forward pass
        x = x.to(device).view(x.shape[0], INPUT_DIM)
        x_reconstructed, mu, sigma = model(x)

        # Compute loss
        reconstructed_loss = loss_fn(x_reconstructed, x)
        kl_div = -torch.sum(1 + torch.log(sigma.pow(2)) - mu.pow(2) - sigma.pow(2))

        # Backprop
        loss = reconstructed_loss + kl_div
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        loop.set_postfix(loss=loss.item())
        if best_loss is None or loss.item() < best_loss:
            best_loss = loss.intem()
            torch.save(
                model.state_dict(), f"C:/Users/Studen/Documents/vscode_projects/kaggleCompetitions/Digit_Recognizer/VAE/best_weights.pth"
            )
            tqdm.write(f"    Best accuracy in {epoch} epoch saved. ✅")


657it [00:07, 82.46it/s, loss=3.06e+3] 
657it [00:07, 92.59it/s, loss=2.88e+3] 
657it [00:07, 84.89it/s, loss=2.52e+3]
657it [00:07, 83.10it/s, loss=2.25e+3]
657it [00:07, 83.09it/s, loss=2.07e+3]
657it [00:07, 83.21it/s, loss=2.3e+3] 
657it [00:07, 82.85it/s, loss=2.29e+3]
657it [00:07, 83.12it/s, loss=2.38e+3]
657it [00:07, 83.12it/s, loss=2.56e+3]
657it [00:07, 83.14it/s, loss=2.15e+3]
657it [00:07, 83.02it/s, loss=2.23e+3]
657it [00:07, 83.27it/s, loss=2.35e+3]
657it [00:07, 83.06it/s, loss=2.47e+3]
657it [00:07, 82.69it/s, loss=2.19e+3]
657it [00:07, 83.16it/s, loss=2.1e+3] 
657it [00:07, 83.04it/s, loss=2.16e+3]
657it [00:07, 82.90it/s, loss=2.32e+3]
657it [00:07, 82.40it/s, loss=2.05e+3]
657it [00:07, 82.63it/s, loss=2.24e+3]
657it [00:07, 83.29it/s, loss=2.32e+3]
657it [00:07, 83.23it/s, loss=1.86e+3]
657it [00:07, 83.05it/s, loss=2.14e+3]
657it [00:07, 82.91it/s, loss=2.26e+3]
657it [00:07, 83.23it/s, loss=2.06e+3]
657it [00:07, 82.89it/s, loss=2.08e+3]
657it [00:07, 82.93it/s

In [32]:
dataset.__len__()

42000

In [35]:
for index in range(dataset.__len__()):
    data, label = dataset.__getitem__(index)
    print(data.shape, label)
    break

torch.Size([1, 28, 28]) 1


In [63]:
model = model.to("cpu")
def create_new_data(num_examples=1, out_path:Path=Path("C:/Users/Studen/Documents/vscode_projects/kaggleCompetitions/Digit_Recognizer/Dataset")):
    df_vae = pd.DataFrame(columns=df_train.columns)
    print(f"[INFO] Shape of one row from existing data: {df_train.iloc[0].shape}")
    encoding_digit = []
    out_path.mkdir(parents=True, exist_ok=True)
    for index in range(dataset.__len__()):
        data, label = dataset.__getitem__(index)
        # print(data.shape, label)
        with torch.no_grad():
            mu, sigma = model.encode(data.view(1, 784))
    #     encoding_digit.append((mu, sigma))
    
    # mu, sigma = encoding_digit[digit]
        for example in range(num_examples):
            epsilon = torch.randn_like(sigma)
            z = mu + sigma * epsilon
            out = model.decode(z)
            out = out.detach().numpy()
            scaled_out = (out * 255).astype(np.uint8)
            new_arr = np.insert(scaled_out, 0, label, axis=1)
            # print(f"[INFO] Shape of output: {out.shape}")
            # print(f"[INFO] Shape of extended with label: {new_arr.shape}")
            # print(f"[INFO] Real label: {label}")
            # print(f"[INFO] Added label: {new_arr[0][0]}")
            # print(f"[INFO] Min value: {np.min(new_arr)}, max value: {np.max(new_arr)}")
            new_arr = new_arr.squeeze(axis=0)
            # print(f"[INFO] Shape of extended with label after squeeze: {new_arr.shape}")
            df_vae.loc[len(df_vae)] = new_arr
        # print(df_vae.shape)
    return df_vae

In [64]:
df_vae = create_new_data()

[INFO] Shape of one row from existing data: (785,)
[INFO] Shape of output: (1, 784)
[INFO] Shape of extended with label: (1, 785)
[INFO] Real label: 1
[INFO] Added label: 1
[INFO] Min value: 0, max value: 252
[INFO] Shape of extended with label after squeeze: (785,)
(1, 785)


In [66]:
model = model.to("cpu")
def inference(digit, num_examples=1, out_path:Path=Path("./VAE_gen_examples")):
    """
    Generates (num_exmaples) of a particular digit.
    Specifically we extract an exmaple of eaxh digit, 
    then after we have mu, sigma representation for 
    each digit we can sample from that.

    After we sample we can run the decoder part of the VAE and generate examples.
    """
    out_path.mkdir(parents=True, exist_ok=True)
    images = []
    idx = 0
    for x, y in dataset:
        if y == idx:
            images.append(x)
            idx += 1
        if idx == 10:
            break

    encoding_digit = []
    for d in range(10):
        with torch.no_grad():
            mu, sigma = model.encode(images[d]. view(1, 784))
        encoding_digit.append((mu, sigma))
    
    mu, sigma = encoding_digit[digit]
    for example in range(num_examples):
        epsilon = torch.randn_like(sigma)
        z = mu + sigma * epsilon
        out = model.decode(z)
        out = out.view(-1, 1, 28, 28)
        save_image(out, out_path / f"generated_{digit}_ex{example}.png")

In [67]:
for idx in range(10):
    inference(idx, num_examples=5)