In [2]:
import torch
import pandas as pd
import numpy as np
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import torch.nn as nn
from model import CNN, IDNGenerator
from dataset import MNISTDataset
import torch.optim as optim
import torch.nn.functional as F

In [5]:
data = pd.read_csv("MNIST/train.csv").values
dataset = MNISTDataset(csv_file = "MNIST/train.csv", transform=transforms.ToTensor())
X = torch.tensor(data[:,1:].reshape(-1,1,28,28), dtype=torch.float32)
y = torch.tensor(data[:,0], dtype=torch.int64)
model = CNN(1, 10)

# Hyperparameters
lr = 0.001
p = 0.1
epochs = 5
batch_size = 50

optimizer = optim.Adam(model.parameters(), lr=lr)
loss_fn = nn.CrossEntropyLoss()
train_loader = DataLoader(dataset=dataset, batch_size=50, shuffle=False)

In [4]:
idn = IDNGenerator(X, y, p, epochs, model, optimizer, loss_fn, train_loader)

In [5]:
new_X, new_y = idn.generate()

100%|██████████| 840/840 [00:17<00:00, 47.02it/s, loss=0.0199] 


Loss: 0.3910123829638386, Accuracy: 93.07142857142857


100%|██████████| 840/840 [00:18<00:00, 45.36it/s, loss=0.00482] 


Loss: 0.07960443613909385, Accuracy: 97.5142857142857


100%|██████████| 840/840 [00:19<00:00, 42.74it/s, loss=0.0014]  


Loss: 0.06311340421518599, Accuracy: 98.11904761904762


100%|██████████| 840/840 [00:18<00:00, 44.45it/s, loss=0.00102] 


Loss: 0.053512110707673685, Accuracy: 98.34285714285714


100%|██████████| 840/840 [00:19<00:00, 43.17it/s, loss=0.00404] 


Loss: 0.04736038633342278, Accuracy: 98.57619047619048


In [14]:
new_X.shape

torch.Size([42000, 1, 28, 28])

In [15]:
new_y.shape

torch.Size([42000])

In [16]:
torch.save(new_X, 'X_10.pt')
torch.save(new_y, 'y_10.pt')

In [63]:
sample = X[:1,:,:]

In [71]:
def idx2onehot(idx, n):

    assert torch.max(idx).item() < n

    if idx.dim() == 1:
        idx = idx.unsqueeze(1)
    onehot = torch.zeros(idx.size(0), n).to(idx.device)
    onehot.scatter_(1, idx, 1)
    
    return onehot

In [75]:
class VAE(nn.Module):

    def __init__(self, encoder_layer_sizes, latent_size, decoder_layer_sizes,
                 conditional=False, num_labels=0):

        super(VAE, self).__init__()

        self.latent_size = latent_size

        self.encoder = Encoder(
            encoder_layer_sizes, latent_size, conditional, num_labels)
        self.decoder = Decoder(
            decoder_layer_sizes, latent_size, conditional, num_labels)

    def forward(self, x, c=None):

        if x.dim() > 2:
            x = x.view(-1, 28*28)

        means, log_var = self.encoder(x, c)
        z = self.reparameterize(means, log_var)
        recon_x = self.decoder(z, c)

        return recon_x, means, log_var, z

    def reparameterize(self, mu, log_var):

        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)

        return mu + eps * std

    def inference(self, z, c=None):

        recon_x = self.decoder(z, c)

        return recon_x


class Encoder(nn.Module):

    def __init__(self, layer_sizes, latent_size, conditional, num_labels):

        super().__init__()

        self.conditional = conditional
        if self.conditional:
            layer_sizes[0] += num_labels

        self.MLP = nn.Sequential()

        for i, (in_size, out_size) in enumerate(zip(layer_sizes[:-1], layer_sizes[1:])):
            self.MLP.add_module(
                name="L{:d}".format(i), module=nn.Linear(in_size, out_size))
            self.MLP.add_module(name="A{:d}".format(i), module=nn.ReLU())

        self.linear_means = nn.Linear(layer_sizes[-1], latent_size)
        self.linear_log_var = nn.Linear(layer_sizes[-1], latent_size)

    def forward(self, x, c=None):

        if self.conditional:
            c = idx2onehot(c, n=10)
            x = torch.cat((x, c), dim=-1)

        x = self.MLP(x)

        means = self.linear_means(x)
        log_vars = self.linear_log_var(x)

        return means, log_vars


class Decoder(nn.Module):

    def __init__(self, layer_sizes, latent_size, conditional, num_labels):

        super().__init__()

        self.MLP = nn.Sequential()

        self.conditional = conditional
        if self.conditional:
            input_size = latent_size + num_labels
        else:
            input_size = latent_size

        for i, (in_size, out_size) in enumerate(zip([input_size]+layer_sizes[:-1], layer_sizes)):
            self.MLP.add_module(
                name="L{:d}".format(i), module=nn.Linear(in_size, out_size))
            if i+1 < len(layer_sizes):
                self.MLP.add_module(name="A{:d}".format(i), module=nn.ReLU())
            else:
                self.MLP.add_module(name="sigmoid", module=nn.Sigmoid())

    def forward(self, z, c):

        if self.conditional:
            c = idx2onehot(c, n=10)
            z = torch.cat((z, c), dim=-1)

        x = self.MLP(z)

        return x

In [84]:
import matplotlib.pyplot as plt
import cv2

In [16]:
X_10 = torch.load("noisyData/X_10.pt").numpy()
y_10 = torch.load("noisyData/y_10.pt").numpy()

In [17]:
y[y.numpy() != y_10]

tensor([4, 0, 8,  ..., 4, 7, 8])

In [93]:
sample_x = X_10[0]
sample_y = y_10[0]
title = f'Label is {sample_y}'

cv2.imshow(title, sample_x.numpy())
cv2.waitKey(0)
cv2.destroyAllWindows()

error: OpenCV(4.5.5) /Users/runner/work/opencv-python/opencv-python/opencv/modules/imgproc/src/color.simd_helpers.hpp:92: error: (-2:Unspecified error) in function 'cv::impl::(anonymous namespace)::CvtHelper<cv::impl::(anonymous namespace)::Set<3, 4, -1>, cv::impl::(anonymous namespace)::Set<3, 4, -1>, cv::impl::(anonymous namespace)::Set<0, 2, 5>, cv::impl::(anonymous namespace)::NONE>::CvtHelper(cv::InputArray, cv::OutputArray, int) [VScn = cv::impl::(anonymous namespace)::Set<3, 4, -1>, VDcn = cv::impl::(anonymous namespace)::Set<3, 4, -1>, VDepth = cv::impl::(anonymous namespace)::Set<0, 2, 5>, sizePolicy = cv::impl::(anonymous namespace)::NONE]'
> Invalid number of channels in input image:
>     'VScn::contains(scn)'
> where
>     'scn' is 28


In [92]:
sample_x.numpy().shape

(1, 28, 28)