# Organizing Data #

In [2]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


Don't need to re-run

In [2]:
from sklearn.model_selection import train_test_split
import os
from shutil import copyfile

# Set paths to image folders
class1_dir = '/content/drive/MyDrive/COVID-19_Radiography_Dataset/Viral_Pneumonia/images'
class2_dir = '/content/drive/MyDrive/COVID-19_Radiography_Dataset/Normal/images'
class3_dir = '/content/drive/MyDrive/COVID-19_Radiography_Dataset/Lung_Opacity/images'
class4_dir = '/content/drive/MyDrive/COVID-19_Radiography_Dataset/COVID/images'

# Set paths to output directories
train_dir = '/content/drive/MyDrive/COVID-19_Radiography_Dataset/train'
val_dir = '/content/drive/MyDrive/COVID-19_Radiography_Dataset/val'
test_dir = '/content/drive/MyDrive/COVID-19_Radiography_Dataset/test'

# Create output directories
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

# Split images into train, validation, and test sets
for class_dir, class_name in zip([class1_dir, class2_dir, class3_dir, class4_dir], ['class1', 'class2', 'class3', 'class4']):
    image_files = os.listdir(class_dir)
    train_files, test_files = train_test_split(image_files, test_size=0.1, random_state=42)
    train_files, val_files = train_test_split(train_files, test_size=0.25, random_state=42)

    # Copy train images to train folder
    for file_name in train_files:
        src_path = os.path.join(class_dir, file_name)
        dst_path = os.path.join(train_dir, class_name, file_name)
        os.makedirs(os.path.dirname(dst_path), exist_ok=True)
        copyfile(src_path, dst_path)

    # Copy validation images to validation folder
    for file_name in val_files:
        src_path = os.path.join(class_dir, file_name)
        dst_path = os.path.join(val_dir, class_name, file_name)
        os.makedirs(os.path.dirname(dst_path), exist_ok=True)
        copyfile(src_path, dst_path)

    # Copy test images to test folder
    for file_name in test_files:
        src_path = os.path.join(class_dir, file_name)
        dst_path = os.path.join(test_dir, class_name, file_name)
        os.makedirs(os.path.dirname(dst_path), exist_ok=True)
        copyfile(src_path, dst_path)


# Advanced Models: EVAE-Net

In [3]:
# imports

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
from torchvision import transforms, utils
import torchvision.models as models
from torch.utils.data.sampler import SubsetRandomSampler

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

import warnings
warnings.filterwarnings("ignore")

In [53]:
class EVAE(nn.Module):
    def __init__(self, latent_dim, num_classes):
        super(EVAE, self).__init__()
        self.latent_dim = latent_dim
        self.num_classes = num_classes #4
        
        # Define ResNet50 Encoder
        resnet = models.resnet18(pretrained=True)
        resnet_layers = list(resnet.children())[:-1]  # Remove last layer (classification head)
        self.resnet_encoder = nn.Sequential(*resnet_layers)
        
        # Define VGG16 Encoder
        vgg16 = models.vgg16(pretrained=True)
        vgg16_layers = list(vgg16.features.children())[:-1]  # Remove last layer (max pooling)
        self.vgg16_encoder = nn.Sequential(*vgg16_layers)

        # Define reparameterization layers
        self.fc0 = nn.Linear(100864, latent_dim)
        self.fc1 = nn.Linear(latent_dim, 512)
        self.fc2 = nn.Linear(512, latent_dim*2)

        # Define classification head
        self.classification_head = nn.Linear(latent_dim, num_classes)
        
        # Define decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(latent_dim, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid()
        )
    
    def decode(self, z):
        x_hat = self.decoder(z.unsqueeze(-1).unsqueeze(-1))

        return x_hat

    def encode(self, x):
        # encode
        resnet_features = self.resnet_encoder(x)
        vgg16_features = self.vgg16_encoder(x)

        # flatten the features and concatenate them
        features = torch.cat([resnet_features.view(x.size(0), -1), 
                              vgg16_features.view(x.size(0), -1)], dim=1)
        
        # apply reparameterization
        x = F.relu(self.fc0(features))
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        mu, log_var = torch.chunk(x, 2, dim=-1)
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        z = mu + eps * std

        return z, mu, log_var

        # def forward(self, x):
    #     mu, log_var = self.encode(x)
    #     std = torch.exp(0.5 * log_var)
    #     eps = torch.randn_like(std)
    #     z = eps * std + mu
    #     x_hat = self.decode(z)
    #     y = self.classification_head(z)
    #     return x_hat, y, mu, log_var

    def forward(self, x):
          print('x:', x.shape)
          z, mu, log_var = self.encode(x)
          x_hat = self.decode(z)
          y = self.classification_head(z)
          print('x_hat:', x_hat.shape)
          print('z:', z.shape)
          print('y:', y.shape)
          return x_hat, y, mu, log_var
    
    def loss_function(self, x_hat, x, y, target, mu, log_var):
        num_pixels = x.shape[1] * x.shape[2] * x.shape[3]
        # Upsample to get x and x_hat pixels matching
        x_hat_upsampled = F.interpolate(x_hat, size=x.shape[2:], mode='bilinear', align_corners=False)
      
        # Compute reconstruction loss
        recons_loss = F.mse_loss(x_hat_upsampled, x, reduction='sum')
        kld_loss = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
        # Lcls = F.cross_entropy(torch.argmax(y, dim=1), torch.argmax(target, dim=1))
        Lcls = F.cross_entropy(y, target.argmax(dim=1))
        
        return recons_loss, kld_loss, Lcls

### Training

In [5]:
from torchvision import datasets
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torchvision import transforms # need to adapt image format


# Access train data
train_dir = '/content/drive/MyDrive/COVID-19_Radiography_Dataset/train'
transform = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])])
train_data = datasets.ImageFolder(train_dir, transform=transform)

In [6]:
# Dataloader
train_loader = torch.utils.data.DataLoader(train_data, batch_size=4, shuffle=True)

In [51]:
# Training loop

def train(model, optimizer, train_loader, device):
    model.train()
    train_loss = 0
    
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        
        recon_batch, y, mu, log_var = model(data)
        target_onehot = F.one_hot(target, num_classes=4).float() 
        BCE, KLD, Lcls = model.loss_function(recon_batch, data, y, target_onehot, mu, log_var)
        loss = BCE + KLD + Lcls
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
        
        if batch_idx % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader),
                loss.item() / len(data)))

    print('====> Epoch: {} Average loss: {:.4f}'.format(
          epoch, train_loss / len(train_loader.dataset)))


In [54]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = EVAE(latent_dim=256, num_classes=4).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
epochs = 2

for epoch in range(epochs):
    train(model, optimizer, train_loader, device)

x: torch.Size([4, 3, 224, 224])
x_hat: torch.Size([4, 3, 16, 16])
z: torch.Size([4, 256])
y: torch.Size([4, 4])
tensor([[0., 1., 0., 0.],
        [0., 1., 0., 0.],
        [0., 0., 0., 1.],
        [0., 0., 0., 1.]])
tensor([[-0.0995, -0.7553, -0.0288,  0.6865],
        [ 0.5592,  0.4866, -0.5601,  0.5061],
        [-0.0401, -0.0015,  0.3525, -0.1635],
        [-0.1565, -0.5974, -0.0158, -0.2127]], grad_fn=<AddmmBackward0>)
target torch.Size([4, 4])
x: torch.Size([4, 3, 224, 224])
x_hat: torch.Size([4, 3, 16, 16])
z: torch.Size([4, 256])
y: torch.Size([4, 4])
tensor([[0., 0., 1., 0.],
        [0., 1., 0., 0.],
        [1., 0., 0., 0.],
        [0., 1., 0., 0.]])
tensor([[ 0.2599,  1.7462, -0.2203, -0.4450],
        [ 0.7800, -0.5635,  0.3680,  0.2479],
        [ 0.4095,  1.1414, -1.1106, -0.9730],
        [-0.2493, -0.0679, -0.3846,  1.1143]], grad_fn=<AddmmBackward0>)
target torch.Size([4, 4])
x: torch.Size([4, 3, 224, 224])
x_hat: torch.Size([4, 3, 16, 16])
z: torch.Size([4, 256])
y:

KeyboardInterrupt: ignored

# ----BACK UP------

### Vanilla VAE from Git Repo

In [11]:
class VanillaVAE():


    def __init__(self,
                 in_channels: int,
                 latent_dim: int,
                 hidden_dims: List = None,
                 **kwargs) -> None:
        super(VanillaVAE, self).__init__()

        self.latent_dim = latent_dim

        modules = []
        if hidden_dims is None:
            hidden_dims = [32, 64, 128, 256, 512]

        # Build Encoder
        for h_dim in hidden_dims:
            modules.append(
                nn.Sequential(
                    nn.Conv2d(in_channels, out_channels=h_dim,
                              kernel_size= 3, stride= 2, padding  = 1),
                    nn.BatchNorm2d(h_dim),
                    nn.LeakyReLU())
            )
            in_channels = h_dim

        self.encoder = nn.Sequential(*modules)
        self.fc_mu = nn.Linear(hidden_dims[-1]*4, latent_dim)
        self.fc_var = nn.Linear(hidden_dims[-1]*4, latent_dim)


        # Build Decoder
        modules = []

        self.decoder_input = nn.Linear(latent_dim, hidden_dims[-1] * 4)

        hidden_dims.reverse()

        for i in range(len(hidden_dims) - 1):
            modules.append(
                nn.Sequential(
                    nn.ConvTranspose2d(hidden_dims[i],
                                       hidden_dims[i + 1],
                                       kernel_size=3,
                                       stride = 2,
                                       padding=1,
                                       output_padding=1),
                    nn.BatchNorm2d(hidden_dims[i + 1]),
                    nn.LeakyReLU())
            )



        self.decoder = nn.Sequential(*modules)

        self.final_layer = nn.Sequential(
                            nn.ConvTranspose2d(hidden_dims[-1],
                                               hidden_dims[-1],
                                               kernel_size=3,
                                               stride=2,
                                               padding=1,
                                               output_padding=1),
                            nn.BatchNorm2d(hidden_dims[-1]),
                            nn.LeakyReLU(),
                            nn.Conv2d(hidden_dims[-1], out_channels= 3,
                                      kernel_size= 3, padding= 1),
                            nn.Tanh())

    def encode(self, input: Tensor) -> List[Tensor]:
        """
        Encodes the input by passing through the encoder network
        and returns the latent codes.
        :param input: (Tensor) Input tensor to encoder [N x C x H x W]
        :return: (Tensor) List of latent codes
        """
        result = self.encoder(input)
        result = torch.flatten(result, start_dim=1)

        # Split the result into mu and var components
        # of the latent Gaussian distribution
        mu = self.fc_mu(result)
        log_var = self.fc_var(result)

        return [mu, log_var]

    def decode(self, z: Tensor) -> Tensor:
        """
        Maps the given latent codes
        onto the image space.
        :param z: (Tensor) [B x D]
        :return: (Tensor) [B x C x H x W]
        """
        result = self.decoder_input(z)
        result = result.view(-1, 512, 2, 2)
        result = self.decoder(result)
        result = self.final_layer(result)
        return result

    def reparameterize(self, mu: Tensor, logvar: Tensor) -> Tensor:
        """
        Reparameterization trick to sample from N(mu, var) from
        N(0,1).
        :param mu: (Tensor) Mean of the latent Gaussian [B x D]
        :param logvar: (Tensor) Standard deviation of the latent Gaussian [B x D]
        :return: (Tensor) [B x D]
        """
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return eps * std + mu

    def forward(self, input: Tensor, **kwargs) -> List[Tensor]:
        mu, log_var = self.encode(input)
        z = self.reparameterize(mu, log_var)
        return  [self.decode(z), input, mu, log_var]

    def loss_function(self,
                      *args,
                      **kwargs) -> dict:
        """
        Computes the VAE loss function.
        KL(N(\mu, \sigma), N(0, 1)) = \log \frac{1}{\sigma} + \frac{\sigma^2 + \mu^2}{2} - \frac{1}{2}
        :param args:
        :param kwargs:
        :return:
        """
        recons = args[0]
        input = args[1]
        mu = args[2]
        log_var = args[3]

        kld_weight = kwargs['M_N'] # Account for the minibatch samples from the dataset
        recons_loss =F.mse_loss(recons, input)


        kld_loss = torch.mean(-0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp(), dim = 1), dim = 0)

        loss = recons_loss + kld_weight * kld_loss
        return {'loss': loss, 'Reconstruction_Loss':recons_loss.detach(), 'KLD':-kld_loss.detach()}

    def sample(self,
               num_samples:int,
               current_device: int, **kwargs) -> Tensor:
        """
        Samples from the latent space and return the corresponding
        image space map.
        :param num_samples: (Int) Number of samples
        :param current_device: (Int) Device to run the model
        :return: (Tensor)
        """
        z = torch.randn(num_samples,
                        self.latent_dim)

        z = z.to(current_device)

        samples = self.decode(z)
        return samples

    def generate(self, x: Tensor, **kwargs) -> Tensor:
        """
        Given an input image x, returns the reconstructed image
        :param x: (Tensor) [B x C x H x W]
        :return: (Tensor) [B x C x H x W]
        """

        return self.forward(x)[0]

NameError: ignored

# VAE Classifier

In [6]:
class VAEClassifier(BaseVAE):
    def __init__(self,
                 in_channels: int,
                 num_classes: int,
                 hidden_dims: List = None,
                 **kwargs) -> None:
        super(VAEClassifier, self).__init__()

        self.num_classes = num_classes

        if hidden_dims is None:
            hidden_dims = [32, 64, 128, 256, 512]

        # Build Encoder
        modules = []
        for h_dim in hidden_dims:
            modules.append(
                nn.Sequential(
                    nn.Conv2d(in_channels, out_channels=h_dim,
                              kernel_size=3, stride=2, padding=1),
                    nn.BatchNorm2d(h_dim),
                    nn.LeakyReLU())
            )
            in_channels = h_dim

        self.encoder = nn.Sequential(*modules)
        self.fc_logits = nn.Linear(hidden_dims[-1]*4, num_classes)


        # Build Decoder
        modules = []

        self.decoder_input = nn.Linear(num_classes, hidden_dims[-1] * 4)

        hidden_dims.reverse()

        for i in range(len(hidden_dims) - 1):
            modules.append(
                nn.Sequential(
                    nn.ConvTranspose2d(hidden_dims[i],
                                       hidden_dims[i + 1],
                                       kernel_size=3,
                                       stride=2,
                                       padding=1,
                                       output_padding=1),
                    nn.BatchNorm2d(hidden_dims[i + 1]),
                    nn.LeakyReLU())
            )

        self.decoder = nn.Sequential(*modules)

        self.final_layer = nn.Sequential(
            nn.ConvTranspose2d(hidden_dims[-1],
                               hidden_dims[-1],
                               kernel_size=3,
                               stride=2,
                               padding=1,
                               output_padding=1),
            nn.BatchNorm2d(hidden_dims[-1]),
            nn.LeakyReLU(),
            nn.Conv2d(hidden_dims[-1], out_channels=3,
                      kernel_size=3, padding=1),
            nn.Tanh())

    def encode(self, input: Tensor) -> List[Tensor]:
        result = self.encoder(input)
        result = torch.flatten(result, start_dim=1)
        logits = self.fc_logits(result)
        return [logits, F.softmax(logits, dim=1)]

    def decode(self, z: Tensor) -> Tensor:
        result = self.decoder_input(z)
        result = result.view(-1, 512, 2, 2)
        result = self.decoder(result)
        result = self.final_layer(result)
        return result

    def reparameterize(self, logits: Tensor) -> Tensor:
        std = torch.ones_like(logits)
        eps = torch.randn_like(logits)
        return eps * std + logits

    def forward(self, input: Tensor, **kwargs) -> List[Tensor]:
        logits, probs = self.encode(input)
        z = self.reparameterize(logits)
        return [self.decode(z), input, probs]


In [5]:
from abc import abstractmethod
import torch
from torch import nn
from torch.nn import functional as F

from typing import List, Tuple, Dict, Any
from torch import Tensor

class BaseVAE(nn.Module):
    
    def __init__(self) -> None:
        super(BaseVAE, self).__init__()

    def encode(self, input: Tensor) -> List[Tensor]:
        raise NotImplementedError

    def decode(self, input: Tensor) -> Any:
        raise NotImplementedError

    def sample(self, batch_size:int, current_device: int, **kwargs) -> Tensor:
        raise NotImplementedError

    def generate(self, x: Tensor, **kwargs) -> Tensor:
        raise NotImplementedError

    @abstractmethod
    def forward(self, *inputs: Tensor) -> Tensor:
        pass

    @abstractmethod
    def loss_function(self, *inputs: Any, **kwargs) -> Tensor:
        pass