In [None]:
%load_ext autoreload
%autoreload 2
%load_ext tensorboard

In [None]:
import numpy as np
import torch
from torch import nn
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
from torchvision.datasets import MNIST, FashionMNIST
from torchvision.transforms import Compose, ToTensor, Normalize
from torch.utils.data import DataLoader

class Binarize:
    def __call__(self, sample):
        return torch.bernoulli(sample)

transform = Compose([ToTensor()])

dataset_train = FashionMNIST('./data', train=True, transform=transform, download=True)
dataset_test = FashionMNIST('./data', train=False, transform=transform, download=True)

loader_train = DataLoader(dataset_train, batch_size=64, shuffle=True, pin_memory=True, num_workers=4)
loader_test = DataLoader(dataset_test, batch_size=64, shuffle=False, pin_memory=True, num_workers=4)

loaders = {
    'train': loader_train,
    'test': loader_test
}

In [None]:
class AutoEncoder(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
            
    def forward(self, x):
        enc = self.encoder(x)
        dec = self.decoder(enc)
        return dec, enc
    
    
class Encoder(nn.Module):
    def __init__(self, img_dims, noise=nn.Identity()):
        super().__init__()
        flatten_img_dims = np.prod(img_dims)
        self.noise = noise
        self.normalize = nn.BatchNorm2d(1)
        self.encoder = nn.Sequential(nn.Linear(flatten_img_dims, 512), nn.BatchNorm1d(512), nn.ReLU(),
                                     nn.Linear(512, 256), nn.BatchNorm1d(256), nn.ReLU(),
                                     nn.Linear(256, 128))
            
    def forward(self, x):
        x = self.normalize(x)
        x = self.noise(x)
        x = x.flatten(start_dim=1)
        x = self.encoder(x)
        return x
    
    
class Decoder(nn.Module):
    def __init__(self, img_dims):
        super().__init__()
        self.img_dims = img_dims
        flatten_img_dims = np.prod(img_dims)
        self.decoder = nn.Sequential(nn.Linear(128, 256), nn.BatchNorm1d(256), nn.ReLU(),
                                     nn.Linear(256, 512), nn.BatchNorm1d(512), nn.ReLU(),
                                     nn.Linear(512, flatten_img_dims))
            
    def forward(self, x):
        x = self.decoder(x)
        x = x.view(x.size(0), *self.img_dims)
        if self.img_dims[0] == 1:
            x = torch.sigmoid(x)
        return x

In [None]:
import datetime
from tqdm.auto import tqdm
from tensorboard_pytorch import TensorboardPyTorch

def run_epoch(model, loaders, criterion, optim, writer, epoch, phase):
    running_acc = 0.0
    running_loss = 0.0
    for x_true, _ in loaders[phase]:
        x_true = x_true.to(device)
        x_rec, _ = model(x_true)
        loss = criterion(x_rec, x_true)
        if 'train' in phase:
            optim.zero_grad()
            loss.backward()
            optim.step()
        running_loss += loss.item() * x_true.size(0)

    epoch_loss = running_loss / len(loaders[phase].dataset)
    writer.log_scalar(f'Loss/{phase}', round(epoch_loss, 4), epoch + 1)
    if 'test' in phase:
        writer.log_reconstructions_visualize(model, loaders[phase], epoch)
    
    
def simple_trainer(model, loaders, criterion, optim, writer, epoch_start, epoch_end):
    for epoch in tqdm(range(epoch_start, epoch_end)):
        model.train()
        run_epoch(model, loaders, criterion, optim, writer, epoch, phase='train')
        model.eval()
        with torch.no_grad():
            run_epoch(model, loaders, criterion, optim, writer, epoch, phase='test')

# Setting

In [None]:
EPOCHS = 100
IMG_DIMS = (1, 28, 28)

encoder = Encoder(IMG_DIMS)
decoder = Decoder(IMG_DIMS)
model = AutoEncoder(encoder, decoder).to(device)
criterion = nn.BCELoss().to(device)
optim = torch.optim.SGD(model.parameters(), lr=0.01, weight_decay=0.001)

date = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
writer = TensorboardPyTorch(f'tensorboard/autoencoder/fmnist/loss:bce_optim:sgd_lr:{0.01}_wd:{0.001}_epochs_nobn1d:{EPOCHS}/{date}', device)

In [None]:
simple_trainer(model, loaders, criterion, optim, writer, epoch_start=0, epoch_end=EPOCHS)

In [None]:
%tensorboard --logdir=tensorboard

# Denoising Autoencoders (DAE) (Dropout or Noise)

## Dropout

In [None]:
EPOCHS = 100
IMG_DIMS = (1, 28, 28)

encoder = Encoder(IMG_DIMS, nn.Dropout(0.5))
decoder = Decoder(IMG_DIMS)
model = AutoEncoder(encoder, decoder).to(device)
criterion = nn.BCELoss().to(device)
optim = torch.optim.SGD(model.parameters(), lr=0.01, weight_decay=0.001)

date = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
writer = TensorboardPyTorch(f'tensorboard/autoencoder/fmnist_noise:dropout:{0.5}/loss:bce_optim:sgd_lr:{0.01}_wd:{0.001}_epochs:{EPOCHS}/{date}', device)

In [None]:
simple_trainer(model, loaders, criterion, optim, writer, epoch_start=0, epoch_end=EPOCHS)

## Gaussian Noise

### Static std

In [None]:
class GaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + torch.randn_like(tensor) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

In [None]:
EPOCHS = 100
IMG_DIMS = (1, 28, 28)

encoder = Encoder(IMG_DIMS, GaussianNoise(std=0.5))
decoder = Decoder(IMG_DIMS)
model = AutoEncoder(encoder, decoder).to(device)
criterion = nn.BCELoss().to(device)
optim = torch.optim.SGD(model.parameters(), lr=0.01, weight_decay=0.001)

date = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
writer = TensorboardPyTorch(f'tensorboard/autoencoder/fmnist_noise:gaussian:{0.1}/loss:bce_optim:sgd_lr:{0.01}_wd:{0.001}_epochs:{EPOCHS}/{date}', device)

In [None]:
simple_trainer(model, loaders, criterion, optim, writer, epoch_start=0, epoch_end=EPOCHS)

### Dynamic std

In [None]:
class GaussianNoiseDynamic(object):
    def __init__(self, mean=0., mean_std=1.):
        self.std = (torch.randn(1).cuda() * mean_std + mean_std) ** 2
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + torch.randn_like(tensor) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

In [None]:
EPOCHS = 100
IMG_DIMS = (1, 28, 28)

encoder = Encoder(IMG_DIMS, GaussianNoiseDynamic(mean_std=0.5))
decoder = Decoder(IMG_DIMS)
model = AutoEncoder(encoder, decoder).to(device)
criterion = nn.BCELoss().to(device)
optim = torch.optim.SGD(model.parameters(), lr=0.01, weight_decay=0.001)

date = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
writer = TensorboardPyTorch(f'tensorboard/autoencoder/fmnist_noise:gaussian_dynamic:{0.5}/loss:bce_optim:sgd_lr:{0.01}_wd:{0.001}_epochs:{EPOCHS}/{date}', device)

In [None]:
simple_trainer(model, loaders, criterion, optim, writer, epoch_start=0, epoch_end=EPOCHS)