In [24]:
import torch
from models.base import BaseVAE
from torch import nn
from torch.nn import functional as F
from models.types_ import *


class VanillaVAE(BaseVAE):

    def __init__(self,
                 in_channels: int,
                 in_size: int, 
                 latent_dim: int,
                 hidden_dims: List = None,
                 **kwargs) -> None:
        super(VanillaVAE, self).__init__()

        self.latent_dim = latent_dim
        self.out_channels = in_channels
        self.nb_last_channels = hidden_dims[-1]

        if hidden_dims is None:
            hidden_dims = [32, 64, 128, 256, 512]
            div = 2**5  # Model reduces size by this factor           
        else:
            div = 2**len(hidden_dims)   # Model reduces size by this factor  
          
        # Make sure input dimension is usable     
        if in_size%div==0:
            self.smallest_size = int(in_size/div)
            
        else: raise ValueError('Input size not compatible with number of hidden layers.')
            
        self.leaky_relu = nn.LeakyReLU()
        self.tanh = nn.Tanh()
        
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=2, padding=1) 
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=2, padding=1) 
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=2, padding=1) 
        self.conv4 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, padding=1) 
       
        self.norm32 = nn.BatchNorm2d(32)
        self.norm64 = nn.BatchNorm2d(64)
        self.norm128 = nn.BatchNorm2d(128)
        self.norm256 = nn.BatchNorm2d(256)
        
        self.upconv1 = nn.ConvTranspose2d(in_channels=256, out_channels=128, kernel_size=3, stride=2, padding=1,  output_padding=1)
        self.upconv2 = nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=3, stride=2, padding=1,  output_padding=1)
        self.upconv3 = nn.ConvTranspose2d(in_channels=64, out_channels=32, kernel_size=3, stride=2, padding=1,  output_padding=1)
        self.upconv4 = nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=3, stride=2, padding=1,  output_padding=1)
        self.upconv5 = nn.Conv2d(in_channels=32, out_channels=3, kernel_size= 3, stride=1, padding= 1)
        
        
        self.fc_mu = nn.Linear(hidden_dims[-1]* self.smallest_size**2, latent_dim)
        self.fc_var = nn.Linear(hidden_dims[-1]* self.smallest_size**2, latent_dim)


        self.decoder_input = nn.Linear(latent_dim, hidden_dims[-1]* self.smallest_size**2)
        
   
    def encode(self, input: Tensor) -> List[Tensor]:
        """
        Encodes the input by passing through the encoder network
        and returns the latent codes.
        :param input: (Tensor) Input tensor to encoder [N x C x H x W]
        :return: (Tensor) List of latent codes
        """
        
        out = self.conv1(input)
        out = self.norm32(out)
        out = self.leaky_relu(out)
        
        out = self.conv2(out)
        out = self.norm64(out)
        out = self.leaky_relu(out)
        
        out = self.conv3(out)
        out = self.norm128(out)
        out = self.leaky_relu(out)
        
        out = self.conv4(out)
        out = self.norm256(out)
        out = self.leaky_relu(out)
                        
        out = torch.flatten(out, start_dim=1)


        # Split the result into mu and var components
        # of the latent Gaussian distribution
        mu = self.fc_mu(out)
        log_var = self.fc_var(out)

        return [mu, log_var]

    
    
    def decode(self, z: Tensor) -> Tensor:
        """
        Maps the given latent codes
        onto the image space.
        :param z: (Tensor) [B x D]
        :return: (Tensor) [B x C x H x W]
        """
        out = self.decoder_input(z)
        out = out.view(-1, 256, self.smallest_size, self.smallest_size)
                            
        out = self.upconv1(out)
        out = self.norm128(out)
        out = self.leaky_relu(out)
        
        out = self.upconv2(out)
        out = self.norm64(out)
        out = self.leaky_relu(out)
    
        out = self.upconv3(out)
        out = self.norm32(out)
        out = self.leaky_relu(out)
        
        out = self.upconv4(out)
        out = self.norm32(out)
        out = self.leaky_relu(out)
              
        out = self.upconv5(out)     
                                 
        out = self.tanh(out)                         
                                 
        return out

    def reparameterize(self, mu: Tensor, logvar: Tensor) -> Tensor:
        """
        Reparameterization trick to sample from N(mu, var) from
        N(0,1).
        :param mu: (Tensor) Mean of the latent Gaussian [B x D]
        :param logvar: (Tensor) Standard deviation of the latent Gaussian [B x D]
        :return: (Tensor) [B x D]
        """
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return eps * std + mu

    def forward(self, input: Tensor, **kwargs) -> List[Tensor]:
        mu, log_var = self.encode(input)
        z = self.reparameterize(mu, log_var)
        return  [self.decode(z), input, mu, log_var]

    def loss_function(self,
                      *args,
                      **kwargs) -> dict:
        """
        Computes the VAE loss function.
        KL(N(\mu, \sigma), N(0, 1)) = \log \frac{1}{\sigma} + \frac{\sigma^2 + \mu^2}{2} - \frac{1}{2}
        :param args:
        :param kwargs:
        :return:
        """
        recons = args[0]
        input = args[1]
        mu = args[2]
        log_var = args[3]

        kld_weight = kwargs['M_N'] # Account for the minibatch samples from the dataset
        recons_loss =F.mse_loss(recons, input)


        kld_loss = torch.mean(-0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp(), dim = 1), dim = 0)

        loss = recons_loss + kld_weight * kld_loss
        return {'loss': loss, 'Reconstruction_Loss':recons_loss, 'KLD':-kld_loss}

    def sample(self,
               num_samples:int,
               current_device: int, **kwargs) -> Tensor:
        """
        Samples from the latent space and return the corresponding
        image space map.
        :param num_samples: (Int) Number of samples
        :param current_device: (Int) Device to run the model
        :return: (Tensor)
        """
        z = torch.randn(num_samples,
                        self.latent_dim)

        z = z.to(current_device)

        samples = self.decode(z)
        return samples

    def generate(self, x: Tensor, **kwargs) -> Tensor:
        """
        Given an input image x, returns the reconstructed image
        :param x: (Tensor) [B x C x H x W]
        :return: (Tensor) [B x C x H x W]
        """

        return self.forward(x)[0]

In [25]:
from torchvision.utils import save_image

# Achtung: Momentan wird im Trainer nur Reconstruction Loss verwendet
class Trainer(object):
    def __init__(self, model,
                 optimizer, loss_function,
                 loader_train, loader_val,
                 dtype, device, **in_params):
        """
        :param model: PyTorch model of the neural network

        :param optimizer: PyTorch optimizer

        :param print_every: How often should we print the loss during training
        """
        # Create attributes:
        self.device = device
        self.model = model.to(device=self.device)  # move the model parameters to CPU/GPU
        self.optimizer = optimizer
        self.loss_function = loss_function
        self.loader_train = loader_train
        self.loader_val = loader_val
        self.print_every = in_params["print_every"]
        self.dtype = dtype
        self.batch_size = in_params["batch_size"]
        self.input_size = in_params["input_size"]
        self.path = in_params["path"]


    def train_model(self, epoch):
        """
        - epoch: An integer giving the epoch
        """
        train_loss = 0
        self.model.train()  # put model to training mode
        for t, (input,_) in enumerate(self.loader_train):
            input = input.to(device=self.device, dtype=self.dtype)  # move to device, e.g. GPU
            
            # do a step in training
            args = self.model(input)
            loss = self.loss_function(*args,**{'M_N':self.batch_size/len(self.loader_train.dataset)})['Reconstruction_Loss']
            self.optimizer.zero_grad()
            loss.backward() 
            train_loss += loss.item() # accumulate for average loss
            self.optimizer.step()

            # print loss
            if t % self.print_every == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, t * len(args[1]), len(self.loader_train.dataset),
                    100. * t / len(self.loader_train),
                    loss.item() / len(args[1])))
        # print average loss
        print('====> Epoch: {} Average loss: {:.6f}'.format(
              epoch, train_loss / len(self.loader_train.dataset)))

    def test_model(self, epoch):
        self.model.eval() # Put model to evaluation mode
        test_loss = 0.

        with torch.no_grad():
            # During validation, we accumulate these values across the whole dataset and then average at the end:
            for i, (input,_) in enumerate(self.loader_val):
                input = input.to(device=self.device, dtype=self.dtype)  # move to device, e.g. GPU
       
                # compute loss and accumulate
                args = self.model(input)
                test_loss += self.loss_function(*args,**{'M_N':self.batch_size/len(self.loader_val.dataset)})['Reconstruction_Loss'].item()
                if i == 0:
                    n = min(args[1].size(0), 8)
                    comparison = torch.cat([args[1][:n],
                                          args[0].view(self.batch_size, self.model.out_channels, self.input_size, self.input_size)[:n]])
                    save_image(comparison.cpu(),
                             self.path + '/reconstruction_' + str(epoch) + '.png', nrow=n)

        # print average loss
        test_loss /= len(self.loader_val.dataset)
        print('====> Test set loss: {:.6f}'.format(test_loss))
        
    def train_and_test(self, epochs):
        for e in range(1,epochs+1):
            self.train_model(e)
            self.test_model(e)
            with torch.no_grad():
                sample = self.model.sample(64,device)
                save_image(sample.view(64, self.model.out_channels, self.input_size, self.input_size),
                           self.path + '/sample_' + str(e) + '.png')
        

In [26]:
from torchvision import datasets, transforms

in_params = {"batch_size": 32,
        "epochs": 150,
        "no_cuda": False,
        "seed": 1,
        "print_every": 10,
        "input_size": 64,
        "path": 'results_reformulated_original'
        }
in_params["cuda"] = not in_params["no_cuda"] and torch.cuda.is_available()
torch.manual_seed(in_params["seed"])

device = torch.device("cuda" if in_params["cuda"] else "cpu")

kwargs = {'num_workers': 1, 'pin_memory': True} if in_params["cuda"] else {}


model = VanillaVAE(in_channels=3,
                   in_size=in_params["input_size"],
                   latent_dim=250,
                   hidden_dims=[32, 64, 128, 256])

# transformations of input images before feeding into nn
transformations = transforms.Compose([
    transforms.Resize(128),
    transforms.RandomCrop(in_params["input_size"]),
    transforms.ToTensor()])

#load data into DataLoader
train_dataset = datasets.ImageFolder('data_to_try/full_data/training', 
                                 transform=transformations
                                   )
test_dataset = datasets.ImageFolder('data_to_try/full_data/test', 
                                 transform=transformations
                                   )



train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=in_params["batch_size"], 
                                           shuffle=True,
                                           drop_last=True,
                                           **kwargs
                                           )
test_loader = torch.utils.data.DataLoader(test_dataset,
                                          batch_size=in_params["batch_size"], 
                                          shuffle=True,
                                          drop_last=True,
                                          **kwargs
                                          )

# Build the optimizer:
params = model.parameters()
learning_rate = 1e-4
optimizer = torch.optim.AdamW(params, lr=learning_rate)


# Build the trainer with the Soresen-Dice loss you implemented:
trainer = Trainer(model, optimizer, model.loss_function,
        train_loader, test_loader, torch.float32, device,**in_params )

# Start training:
trainer.train_and_test(in_params["epochs"])

====> Epoch: 1 Average loss: 0.004348
====> Test set loss: 0.002984
====> Epoch: 2 Average loss: 0.000964
====> Test set loss: 0.002113
====> Epoch: 3 Average loss: 0.000736
====> Test set loss: 0.001949
====> Epoch: 4 Average loss: 0.000603
====> Test set loss: 0.002149
====> Epoch: 5 Average loss: 0.000519
====> Test set loss: 0.002452
====> Epoch: 6 Average loss: 0.000468
====> Test set loss: 0.002102
====> Epoch: 7 Average loss: 0.000430
====> Test set loss: 0.001919
====> Epoch: 8 Average loss: 0.000407
====> Test set loss: 0.002170
====> Epoch: 9 Average loss: 0.000385
====> Test set loss: 0.001800
====> Epoch: 10 Average loss: 0.000361
====> Test set loss: 0.001774
====> Epoch: 11 Average loss: 0.000340
====> Test set loss: 0.001648
====> Epoch: 12 Average loss: 0.000318
====> Test set loss: 0.001665
====> Epoch: 13 Average loss: 0.000310
====> Test set loss: 0.001572
====> Epoch: 14 Average loss: 0.000300
====> Test set loss: 0.001662
====> Epoch: 15 Average loss: 0.000280
====

KeyboardInterrupt: 

In [27]:
from torchvision import models
print(model)

VanillaVAE(
  (leaky_relu): LeakyReLU(negative_slope=0.01)
  (tanh): Tanh()
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv4): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (norm32): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (norm64): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (norm128): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (norm256): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (upconv1): ConvTranspose2d(256, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
  (upconv2): ConvTranspose2d(128, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
  (upconv3): Conv

In [88]:
# For MNIST dataset

in_params = {"batch_size": 128,
        "epochs": 15,
        "no_cuda": False,
        "seed": 1,
        "print_every": 100,
        "input_size": 28,
        "path": 'results_mnist'
        }
in_params["cuda"] = not in_params["no_cuda"] and torch.cuda.is_available()
torch.manual_seed(in_params["seed"])

device = torch.device("cuda" if in_params["cuda"] else "cpu")

kwargs = {'num_workers': 1, 'pin_memory': True} if in_params["cuda"] else {}


model = VanillaVAE(in_channels=1,
                   in_size=in_params["input_size"],
                   latent_dim=100,
                   hidden_dims=[64, 128])


train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=True, download=True,
                   transform=transforms.ToTensor()),
    batch_size=in_params["batch_size"], shuffle=True,drop_last=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, transform=transforms.ToTensor()),
    batch_size=in_params["batch_size"], shuffle=True,drop_last=True, **kwargs)

# Build the optimizer:
params = model.parameters()
learning_rate = 1e-4
optimizer = torch.optim.AdamW(params, lr=learning_rate)


# Build the trainer with the Soresen-Dice loss you implemented:
trainer = Trainer(model, optimizer, model.loss_function,
        train_loader, test_loader, torch.float32, device,**in_params )

# Start training:
trainer.train_and_test(in_params["epochs"])

ValueError: Input size not compatible with number of hidden layers.

In [None]:
import matplotlib.pyplot as plt
with torch.no_grad():
    sample = torch.squeeze(model.sample(1,'cpu'))
plt.imshow(sample,cmap='gray')

In [None]:

mat = torch.arange(10,1,-1)
print(mat)