In [1]:
import jupyter_black

import sys
import numpy as np
import matplotlib.pyplot as plt

jupyter_black.load()


if "/usr/src" not in sys.path:
    sys.path.append("/usr/src")

In [2]:
# Import torch modules

import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
from torchvision.utils import make_grid

import lightning.pytorch as pl

In [3]:
# Import custom modules

import helper_plots as hplt
import data.dataset, data.transform

# from model.vae import Encoder, Decoder
from models.helper_train import train

## Settings

In [4]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Hyperparameters
RANDOM_SEED = 111
LEARNING_RATE = 0.0005
NUM_EPOCHS = 200
BATCH_SIZE = 128
IMAGE_SIZE = 64
LATENT_SIZE = 4
RECONSTRUCTION_TERM_WEIGHT = 1

## Dataset

In [5]:
# Directory with source data

cells_dir = "/usr/src/data/separated_bacterias"

transform = transforms.Compose(
    [
        transforms.ToPILImage(),
        data.transform.CellPadResize(IMAGE_SIZE),
        transforms.ToTensor(),
    ]
)

dataset = data.dataset.CellsImageDataset(cells_dir, transform=transform)

test_size = int(0.2 * len(dataset))
train_size = len(dataset) - test_size
print("Train sample's size =", train_size)
print("Test sample's size =", test_size)

Train sample's size = 1528
Test sample's size = 382


In [7]:
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

In [13]:
# Checking the dataset
print("Training Set:")
for images in train_loader:
    print("Image batch dimensions:", images.size())
    break

# Checking the dataset
print("\nTesting Set:")
for images in test_loader:
    print("Image batch dimensions:", images.size())
    break

Training Set:
Image batch dimensions: torch.Size([128, 1, 64, 64])

Testing Set:
Image batch dimensions: torch.Size([128, 1, 64, 64])


In [14]:
class Trim(nn.Module):
    def __init__(self, image_size):
        super().__init__()
        self.size = image_size

    def forward(self, x):
        return x[:, :, : self.size, : self.size]


class Encoder(nn.Module):
    def __init__(self, image_size, latent_size):
        super().__init__()
        self.image_size = image_size
        self.latent_size = latent_size

        self.convolution_seria = nn.Sequential(
            nn.Conv2d(1, 32, stride=(1, 1), kernel_size=(3, 3), padding=1),
            nn.LeakyReLU(0.01),
            nn.Conv2d(32, 64, stride=(2, 2), kernel_size=(3, 3), padding=1),
            nn.LeakyReLU(0.01),
            nn.Conv2d(64, 64, stride=(2, 2), kernel_size=(3, 3), padding=1),
            nn.LeakyReLU(0.01),
            nn.Conv2d(64, 64, stride=(2, 2), kernel_size=(3, 3), padding=1),
            nn.LeakyReLU(0.01),
            nn.Conv2d(64, 64, stride=(1, 1), kernel_size=(3, 3), padding=1),
            nn.Flatten(),  # (N, 64, 8, 8) -> (N, 4096)
        )

        self.z_mean = torch.nn.Linear(4096, self.latent_size)
        self.z_log_var = torch.nn.Linear(4096, self.latent_size)

    def forward(self, x):
        x = self.convolution_seria(x)
        z_mean = self.z_mean(x)
        z_log_var = self.z_log_var(x)
        eps = torch.randn((z_mean.size(0), self.latent_size)).to(
            "cuda" if torch.cuda.is_available() else "cpu"
        )
        z = z_mean + eps * torch.exp(z_log_var / 2.0)
        return z, z_mean, z_log_var


class Decoder(nn.Module):
    def __init__(self, image_size, latent_size):
        super().__init__()
        self.image_size = image_size
        self.latent_size = latent_size

        self.convolution_transpose_seria = nn.Sequential(
            torch.nn.Linear(self.latent_size, 4096),
            nn.Unflatten(1, (64, 8, 8)),
            nn.ConvTranspose2d(64, 64, stride=(1, 1), kernel_size=(3, 3), padding=1),
            nn.LeakyReLU(0.01),
            nn.ConvTranspose2d(64, 64, stride=(2, 2), kernel_size=(3, 3), padding=1),
            nn.LeakyReLU(0.01),
            nn.ConvTranspose2d(64, 64, stride=(2, 2), kernel_size=(3, 3), padding=0),
            nn.LeakyReLU(0.01),
            nn.ConvTranspose2d(64, 32, stride=(2, 2), kernel_size=(3, 3), padding=0),
            nn.LeakyReLU(0.01),
            nn.ConvTranspose2d(32, 1, stride=(1, 1), kernel_size=(3, 3), padding=0),
            Trim(self.image_size),  # 1x65x65 -> 1x64x64
            nn.Sigmoid(),
        )

    def forward(self, x):
        return self.convolution_transpose_seria(x)

In [15]:
class VariationalAutoEncoder(pl.LightningModule):
    def __init__(self, encoder, decoder, learning_rate=0.001):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.latent_size = encoder.latent_size
        self.lr = learning_rate

    def training_step(self, batch, batch_idx):
        # training_step defines the train loop.
        # it is independent of forward
        x = batch

        z, z_mean, z_log_var = self.encoder(x)
        x_hat = self.decoder(z)

        pixelwise = torch.nn.functional.mse_loss(x_hat, x, reduction="none")
        pixelwise = pixelwise.view(batch.size(0), -1).sum(dim=1)
        pixelwise = pixelwise.mean()

        kl_div = -0.5 * torch.sum(
            1 + z_log_var - z_mean**2 - torch.exp(z_log_var), axis=1
        )  # sum over latent dimension  # type: ignore
        kl_div = kl_div.mean()  # average over batch dimension

        loss = pixelwise + kl_div

        # Logging to TensorBoard by default
        self.log("train_combined_loss", loss)
        self.log("pixelwise_loss", pixelwise)
        self.log("kl_div", kl_div)

        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        return optimizer

In [57]:
encoder = Encoder(IMAGE_SIZE, LATENT_SIZE)
decoder = Decoder(IMAGE_SIZE, LATENT_SIZE)

vae = VariationalAutoEncoder(encoder, decoder, LEARNING_RATE)

In [58]:
trainer = pl.Trainer(max_epochs=100, log_every_n_steps=4)
trainer.fit(vae, train_dataloaders=train_loader)

INFO: GPU available: True (cuda), used: True
INFO:lightning.pytorch.utilities.rank_zero:GPU available: True (cuda), used: True
INFO: TPU available: False, using: 0 TPU cores
INFO:lightning.pytorch.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO: IPU available: False, using: 0 IPUs
INFO:lightning.pytorch.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO: HPU available: False, using: 0 HPUs
INFO:lightning.pytorch.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO: You are using a CUDA device ('NVIDIA GeForce RTX 3090') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
INFO:lightning.pytorch.utilities.rank_zero:You are using a CUDA device ('NVIDIA GeForce RTX 3090') that has Tensor Cores. To prope

Training: 0it [00:00, ?it/s]

INFO: `Trainer.fit` stopped: `max_epochs=100` reached.
INFO:lightning.pytorch.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=100` reached.


In [54]:
def generate_and_plot(model, n_images, device, image_name=""):
    latent_size = model.latent_size
    with torch.no_grad():
        rand_features = torch.randn(n_images, latent_size).to(device)
        generated_images = model.decoder(rand_features)

    grid = make_grid(generated_images, nrow=int(np.sqrt(n_images)))
    grid = grid.detach().to(torch.device("cpu"))
    grid = np.transpose(grid.numpy(), (1, 2, 0))
    plt.imshow(grid, cmap="gray", vmin=0, vmax=255)
    plt.xticks([])
    plt.yticks([])

    if image_name:
        plt.imsave(image_name, grid, cmap="gray")

    return generated_images

In [None]:
from math import floor


def out_convolution_shape(h_in, w_in, kernel_size, stride=(1, 1), padding=(0, 0)):
    h_out = floor((h_in + 2 * padding[0] - kernel_size[0]) / stride[0] + 1)
    w_out = floor((w_in + 2 * padding[1] - kernel_size[1]) / stride[1] + 1)

    return h_out, w_out


def out_convtranspose_shape(h_in, w_in, kernel_size, stride=(1, 1), padding=(0, 0)):
    h_out = (h_in - 1) * stride[0] - 2 * padding[0] + (kernel_size[0] - 1) + 1
    w_out = (w_in - 1) * stride[1] - 2 * padding[1] + (kernel_size[1] - 1) + 1

    return h_out, w_out