In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import random
from PIL import Image

import torch
import torch.optim as optim
import torch.nn as nn
from torchvision import transforms, models
from torch.utils.data import DataLoader, Dataset

In [None]:
class ChestXRayDataset(Dataset):
    def __init__(self, image_dirs, transform, class_names):
        '''
        Parameters
            :param image_dirs: <dict object>, key = class_names, data = directory path for each class.
            :param transform: transform to apply to images
            :param class_names: <list object> class names as string
        '''
        self.image_dirs = image_dirs
        self.transform = transform
        self.class_names = class_names

        # images = {'covid': [], 'noraml': [], 'viral': []}
        self.images = {}
        
        for class_name in class_names:
            image_list = [x for x in os.listdir(image_dirs[class_name]) if x.lower().endswith('.png')]
            print(f'Found {len(image_list)}images for class \'{class_name}\'')
            self.images[class_name] = image_list
    
    def __len__(self):
        return sum([len(self.images[class_name]) for class_name in self.class_names])

    def __getitem__(self, index):
        # Randomly select a class
        class_name = random.choice(self.class_names)

        # Make index within range
        index = index % len(self.images[class_name])

        # Get the path of the image and open file
        image_name = self.images[class_name][index]
        image_path = os.path.join(self.image_dirs[class_name], image_name)
        image = Image.open(image_path).convert('RGB')

        return self.transform(image), self.class_names.index(class_name)

In [None]:
# Image transformation
image_transforms = { 
    'train': transforms.Compose([
        transforms.Resize(size=256),
        transforms.RandomRotation(degrees=15),
        transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(size=224),  # image size for resnet50: (224, 224)
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ]),
    'valid_test': transforms.Compose([
        transforms.Resize(size=256),
        transforms.CenterCrop(size=224),   # image size for resnet50: (224, 224)
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])
}

In [None]:
# defining image directories for custom dataset
root_dir = 'drive/MyDrive/COVID-19 Radiography Database'
class_names = ['covid', 'normal', 'viral']

train_image_dirs = {
    'covid': os.path.join(root_dir, 'train', 'covid'),
    'normal': os.path.join(root_dir, 'train', 'normal'),
    'viral': os.path.join(root_dir, 'train', 'viral')
}
valid_image_dirs = {
    'covid': os.path.join(root_dir, 'valid', 'covid'),
    'normal': os.path.join(root_dir, 'valid', 'normal'),
    'viral': os.path.join(root_dir, 'valid', 'viral')
}
test_image_dirs = {
    'covid': os.path.join(root_dir, 'test', 'covid'),
    'normal': os.path.join(root_dir, 'test', 'normal'),
    'viral': os.path.join(root_dir, 'test', 'viral')
}

In [None]:
# Create dataset
train_dataset = ChestXRayDataset(
    image_dirs=train_image_dirs, 
    transform=image_transforms['train'], 
    class_names=class_names
)

valid_dataset = ChestXRayDataset(
    image_dirs=valid_image_dirs, 
    transform=image_transforms['valid_test'], 
    class_names=class_names
)

test_dataset = ChestXRayDataset(
    image_dirs=test_image_dirs, 
    transform=image_transforms['valid_test'], 
    class_names=class_names
)

Found 1160images for class 'covid'
Found 1301images for class 'normal'
Found 1305images for class 'viral'
Found 10images for class 'covid'
Found 10images for class 'normal'
Found 10images for class 'viral'
Found 30images for class 'covid'
Found 30images for class 'normal'
Found 30images for class 'viral'


In [None]:
batch_size = 50

In [None]:
# DataLoader for each Dataset
data = {
    'train_dataloader': DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        shuffle=True
    ),

    'valid_dataloader': DataLoader(
        dataset=valid_dataset,
        batch_size=batch_size,
        shuffle=True
    ),

    'test_dataloader': DataLoader(
        dataset=test_dataset,
        batch_size=batch_size,
        shuffle=True
    )
}

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
def train_model(model, loss_func, optimizer):
  '''
  Parmaeters
      :param model: Model to train and validate
      :param loss_func: Loss function to minimize
      :param optimizer: Optimizer for computing gradients
      :param epochs: Number of epochs
  Returns
      model: Trained model
      best_epoch: returns the index of the epoch with best accuracy
      history: dict object, Training loss, accuracy and validation loss, accuracy
  '''

  # Get Data Loaders
  train_dataloader = data['train_dataloader']
  valid_dataloader = data['valid_dataloader']

  # Epoch: Train + Validation
  # Train: Forward pass -> Back propagation(get gradient) -> Update parameters -> Loss, Accuracy
  # Validation: Forward pass -> Loss, Accuracy

  # set the model to training mode
  model.train()

  # Loss and accurarcy for this epoch
  train_loss = 0
  train_acc = 0
  valid_loss = 0
  valid_acc = 0

  # Train
  for i, (inputs, labels) in enumerate(train_dataloader):
    # inputs: 4D tensor (batch_size x 3 x width x height)
    inputs = inputs.to(device)
    # labels: 1D tensor (batch_size)
    labels = labels.to(device)

    # Clear existing gradients
    optimizer.zero_grad()

    # Forward pass
    # outputs: 2D tensor (batch_size x number_of_classes)
    outputs = model(inputs)

    # Loss
    loss = loss_func(outputs, labels)

    # Backward pass: calculate gradients for parameters
    loss.backward()

    # Update parameters
    optimizer.step()

    # Calculate loss and accuracy
    batch_size = inputs.size(0)
    train_loss += loss.item() * batch_size
    
    # predictions: 1D tensor (batch_size), class index with the largest probablility of every image in the batch
    ret, predictions = torch.max(outputs.data, 1)

    correct_counts = predictions.eq(labels.data.view_as(predictions))
    acc = torch.mean(correct_counts.type(torch.FloatTensor))
    train_acc += acc.item() * batch_size

  # Validation: No gradient checking needed
  with torch.no_grad():
    # set to evaluation mode
    model.eval()

    for j, (inputs, labels) in enumerate(valid_dataloader):
      # inputs: 4D tensor (bs x 3 x width x height)
      inputs = inputs.to(device)
      # labels: 1D tensor (bs)
      labels = labels.to(device)

      # Forward pass
      # outputs: 2D tensor (batch_size x number_of_classes)
      outputs = model(inputs)

      # Loss
      loss = loss_func(outputs, labels)

      # Calculate loss and accuracy
      batch_size = inputs.size(0)
      valid_loss += loss.item() * batch_size

      ret, predictions = torch.max(outputs.data, dim=1)
      # view(shape of output), view_as(tensor whose shape is to be mimicked)
      correct_counts = predictions.eq(labels.data.view_as(predictions))
      acc = torch.mean(correct_counts.type(torch.FloatTensor))
      valid_acc += acc.item() * batch_size
    
  # Average loss and accuracy of this epoch
  # i+1: number of batches in train set, j+1: number of batches in valid set
  avg_train_loss = train_loss / len(train_dataloader.dataset)
  avg_train_acc = train_acc / len(train_dataloader.dataset)
  avg_valid_loss = valid_loss / len(valid_dataloader.dataset)
  avg_valid_acc = valid_acc / len(valid_dataloader.dataset)

  print("training loss {:.4f}, training accuracy {:.4f}%\n\tvalidation loss {:.4f}, validation accuracy {:.4f}%".format(avg_train_loss, avg_train_acc*100, avg_valid_loss, avg_valid_acc*100))
        
  return model

In [None]:
def test_model(model, loss_func):
  '''
  Function to compute the accuracy on the test set
  Paramters
      :param model: Model to test
      :parar loss_func: 
      :param optimizer: Optimizer for computing gradients
  '''

  # Get DataLoader
  test_data_loader = data['test_dataloader']

  test_acc = 0.0
  test_loss = 0.0

  with torch.no_grad():
    # set to evaluation mode
    model.eval()

    for j, (inputs, labels) in enumerate(test_data_loader):
      # inputs: 4D tensor (bs x 3 x width x height)
      inputs = inputs.to(device)
      # labels: 1D tensor (bs)
      labels = labels.to(device)

      # Forward pass
      # outputs: 2D tensor (batch_size x number_of_classes)
      outputs = model(inputs)

      # Loss
      loss = loss_func(outputs, labels)

      # Calculate loss and accuracy
      batch_size = inputs.size(0)
      test_loss += loss.item() * batch_size

      ret, predictions = torch.max(outputs.data, dim=1)
      correct_counts = predictions.eq(labels.data.view_as(predictions))
      acc = torch.mean(correct_counts.type(torch.FloatTensor))
      test_acc += acc.item() * batch_size

      print("Test Batch number: {:03d}, Loss: {:.4f}, Accuracy: {:.4f}".format(j, loss.item(), acc.item()))
  
  # Average loss and accuracy
  avg_test_loss = test_loss / len(test_data_loader.dataset)
  avg_test_acc = test_acc / len(test_data_loader.dataset)

  print("Test accuracy: {:.4f}%".format(avg_test_acc*100))

In [None]:
# Use the pre-trained net as a feature extractor
def tl_feature_extractor(epochs=3):
  # load pre-trained model
  model = models.resnet50(pretrained=True)

  # freeze the exisiting parameters(exclude them from back propagation)
  for param in model.parameters():
    param.requires_grad = False
  
  # Activation: ReLU
  # Regularization: Dropuout
  # Final layer: Softmax
  num_features = model.fc.in_features 
  model.fc = nn.Sequential(
      nn.Linear(in_features=num_features, out_features=256),
      nn.ReLU(), 
      nn.Dropout(p=0.4),
      nn.Linear(in_features=256, out_features=10),
      nn.LogSoftmax(dim=1) #dim=1 -> calcualte probability along row
  )

  # transfer to GPU
  model = model.to(device)

  loss_func = nn.NLLLoss()
  # only the parameters of the fully connected layer if being updated by the optimizer
  optimizer = optim.Adam(model.fc.parameters())

  # train and validate
  for epoch in range(epochs):
    print('Epoch {}/{}'.format(epoch + 1, epochs))
    train_model(model, loss_func, optimizer)
  
  test_model(model, loss_func)

In [None]:
tl_feature_extractor(10)

Epoch 1/10
training loss 0.4287, training accuracy 83.1121%
	validation loss 0.3290, validation accuracy 83.3333%
Epoch 2/10
training loss 0.2300, training accuracy 92.1668%
	validation loss 0.1661, validation accuracy 93.3333%
Epoch 3/10
training loss 0.1941, training accuracy 93.1227%
	validation loss 0.1909, validation accuracy 93.3333%
Epoch 4/10
training loss 0.2068, training accuracy 93.0430%
	validation loss 0.1398, validation accuracy 96.6667%
Epoch 5/10
training loss 0.2276, training accuracy 91.2639%
	validation loss 0.1604, validation accuracy 96.6667%
Epoch 6/10
training loss 0.1966, training accuracy 92.9368%
	validation loss 0.3269, validation accuracy 83.3333%
Epoch 7/10
training loss 0.2168, training accuracy 92.2995%
	validation loss 0.1659, validation accuracy 90.0000%
Epoch 8/10
training loss 0.1964, training accuracy 93.1227%
	validation loss 0.3566, validation accuracy 83.3333%
Epoch 9/10
training loss 0.1827, training accuracy 93.2289%
	validation loss 0.3372, val