In [1]:
import copy
import glob
import os
from typing import Dict, List, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.utils.data as data
import torchvision
import torchvision.transforms as transforms
from PIL import Image
from torch.quantization import DeQuantStub, QuantStub
from torchvision.models import alexnet

In [2]:
from google.colab import drive
drive.mount('/content/drive')

%cd /content/drive/My\ Drive/

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/My Drive


In [3]:
import random
class ImageLoader(data.Dataset):
  '''
  Class for data loading
  '''

  train_folder = 'COVID' # class attributes: train and test folder names
  test_folder = 'COVID'

  def __init__(self,
               root_dir: str,
               split: str = 'train',
               transform: torchvision.transforms.Compose = None):
    '''
    Init function for the class.

    Args:
    - root_dir: the dir path which contains the train and test folder
    - split: 'test' or 'train' split
    - transforms: the transforms to be applied to the data
    '''
    #self.root = os.path.expanduser(root_dir)
    self.transform = transform
    self.split = split

    if split == 'train':
      self.curr_folder = os.path.join(self.train_folder)
    elif split == 'test':
      self.curr_folder = os.path.join(self.test_folder)

    self.class_dict = self.get_classes()
    self.dataset = self.load_imagepaths_with_labels(self.class_dict)
    random.shuffle(self.dataset)
    # change here
    self.train_dataset = self.dataset[:2000]
    self.test_dataset = self.dataset[2000:2200]
    if split == 'train':
      self.dataset = self.train_dataset
    elif split == 'test':
      self.dataset = self.test_dataset

    print(f"Train {len(self.train_dataset)}")
    print(f"Test {len(self.test_dataset)}")
    print(f"Full {len(self.dataset)}")
    
  def load_imagepaths_with_labels(self,
                                  class_labels: Dict[str, int]
                                  ) -> List[Tuple[str, int]]:
    '''
    Fetches all image paths along with labels

    Args:
    -   class_labels: the class labels dictionary, with keys being the classes
                      in this dataset and the values being the class index.
    Returns:
    -   list[(filepath, int)]: a list of filepaths and their class indices
    '''
    img_paths = []  # a list of (filename, class index)
    for class_name, class_idx in class_labels.items():
      img_dir = os.path.join(self.curr_folder, class_name, 'images/*.png')
      files = glob.glob(img_dir)
      img_paths += [(f, class_idx) for f in files]
    return img_paths

  def get_classes(self) -> Dict[str, int]:
    '''
    Get the classes (which are folder names in self.curr_folder) along with
    their associated integer index.

    Returns:
    -   Dict of class names (string) to integer labels
    '''

    classes = dict()
    classes_list = [d.name for d in os.scandir(self.curr_folder) if d.is_dir()]
    classes = {classes_list[i]: i for i in range(len(classes_list))}
    return classes

  def load_img_from_path(self, path: str) -> Image:
    ''' 
    Loads the image as grayscale (using Pillow)

    Args:
    -   path: the path of the image
    Returns:
    -   image: grayscale image loaded using pillow (Use 'L' flag while converting using Pillow's function)
    '''

    img = None
    with open(path, 'rb') as f:
      img = Image.open(f)
      img = img.convert('L')
    return img

  def __getitem__(self, index: int) -> Tuple[torch.tensor, int]:
    '''
    Fetches the item (image, label) at a given index

    Args:
        index (int): Index
    Returns:
        tuple: (image, target) where target is index of the target class.
    '''
    img = None
    class_idx = None
    filename, class_idx = self.dataset[index]
    # load the image and apply the transforms
    img = self.load_img_from_path(filename)
    if self.transform is not None:
      img = self.transform(img)
    return img, class_idx

  def __len__(self) -> int:
    """
    Returns the number of items in the dataset

    Returns:
        int: length of the dataset
    """
    l = len(self.dataset)
    return l

In [4]:
image_loader = ImageLoader("./", split='train',)

Train 2000
Test 200
Full 2000


In [5]:
print("Length of dataset: ", len(image_loader))

Length of dataset:  2000


In [6]:
print("Classes: ", image_loader.get_classes())

Classes:  {'Lung_Opacity': 0, 'Viral Pneumonia': 1, 'Normal': 2, 'COVID': 3}


In [7]:
class AlexNetwork(nn.Module):
  def __init__(self,aux_logits = False):
      super(AlexNetwork, self).__init__()
      self.cnn = nn.Sequential(
        nn.Conv2d(3, 96, kernel_size=11, stride=4),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.LocalResponseNorm(96),
        
        nn.Conv2d(96, 384, kernel_size=5, stride = 2,padding = 2),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.LocalResponseNorm(384),
        
        nn.Conv2d(384, 384, kernel_size=3, stride=1,padding = 1),
        nn.ReLU(inplace=True),
        nn.BatchNorm2d(384),
        
        nn.Conv2d(384, 384, kernel_size=3, stride=1,padding = 1),
        nn.ReLU(inplace=True),
        nn.BatchNorm2d(384),
        
        nn.Conv2d(384, 256, kernel_size=3, stride=1,padding = 1),
        nn.ReLU(inplace=True),
        nn.BatchNorm2d(256),
        nn.MaxPool2d(kernel_size=3, stride=2,padding = 1),
      )
      self.fc6 = nn.Sequential(
        nn.Linear(256,4096),
        nn.ReLU(inplace=True),
        nn.BatchNorm1d(4096),
      )
      self.fc = nn.Sequential(
        nn.Linear(2*4096,4096),
        nn.ReLU(inplace=True),

        nn.Linear(4096, 4096),
        nn.ReLU(inplace=True),

        nn.Linear(4096, 8)
      )

  def forward_once(self, x):
    output= self.cnn(x)
    output = output.view(output.size()[0], -1)
    output = self.fc6(output)
    return output


  def forward(self, uniform_patch, random_patch):
    output_fc6_uniform = self.forward_once(uniform_patch)
    output_fc6_random = self.forward_once(random_patch)
    output = torch.cat((output_fc6_uniform,output_fc6_random), 1)
    output = self.fc(output)
    return output, output_fc6_uniform, output_fc6_random

In [8]:
model = torch.load("alexnet_kaggle_context_pred.pt")


In [9]:
class AlexNetwork(nn.Module):
  def __init__(self,aux_logits = False):
      super(AlexNetwork, self).__init__()
      self.cnn = nn.Sequential(
        nn.Conv2d(3, 96, kernel_size=11, stride=4),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.LocalResponseNorm(96),
        
        nn.Conv2d(96, 384, kernel_size=5, stride = 2,padding = 2),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.LocalResponseNorm(384),
        
        nn.Conv2d(384, 384, kernel_size=3, stride=1,padding = 1),
        nn.ReLU(inplace=True),
        nn.BatchNorm2d(384),
        
        nn.Conv2d(384, 384, kernel_size=3, stride=1,padding = 1),
        nn.ReLU(inplace=True),
        nn.BatchNorm2d(384),
        
        nn.Conv2d(384, 256, kernel_size=3, stride=1,padding = 1),
        nn.ReLU(inplace=True),
        nn.BatchNorm2d(256),
        nn.MaxPool2d(kernel_size=3, stride=2,padding = 1),
      )
      self.fc6 = nn.Sequential(
        nn.Linear(256,4096),
        nn.ReLU(inplace=True),
        nn.BatchNorm1d(4096),
      )
      self.fc = nn.Sequential(
        nn.Linear(2*4096,4096),
        nn.ReLU(inplace=True),

        nn.Linear(4096, 4096),
        nn.ReLU(inplace=True),

        nn.Linear(4096, 8)
      )

  def forward_once(self, x):
    output= self.cnn(x)
    output = output.view(output.size()[0], -1)
    output = self.fc6(output)
    return output


  def forward(self, uniform_patch, random_patch):
    output_fc6_uniform = self.forward_once(uniform_patch)
    output_fc6_random = self.forward_once(random_patch)
    output = torch.cat((output_fc6_uniform,output_fc6_random), 1)
    output = self.fc(output)
    return output, output_fc6_uniform, output_fc6_random

In [10]:
my_alexnet = AlexNetwork()

In [11]:
print(model.keys())
print(model["model_state_dict"].keys())

dict_keys(['epoch', 'model_state_dict', 'optimizer_state_dict', 'loss', 'global_trnloss', 'global_valloss'])
odict_keys(['cnn.0.weight', 'cnn.0.bias', 'cnn.4.weight', 'cnn.4.bias', 'cnn.8.weight', 'cnn.8.bias', 'cnn.10.weight', 'cnn.10.bias', 'cnn.10.running_mean', 'cnn.10.running_var', 'cnn.10.num_batches_tracked', 'cnn.11.weight', 'cnn.11.bias', 'cnn.13.weight', 'cnn.13.bias', 'cnn.13.running_mean', 'cnn.13.running_var', 'cnn.13.num_batches_tracked', 'cnn.14.weight', 'cnn.14.bias', 'cnn.16.weight', 'cnn.16.bias', 'cnn.16.running_mean', 'cnn.16.running_var', 'cnn.16.num_batches_tracked', 'fc6.0.weight', 'fc6.0.bias', 'fc6.2.weight', 'fc6.2.bias', 'fc6.2.running_mean', 'fc6.2.running_var', 'fc6.2.num_batches_tracked', 'fc.0.weight', 'fc.0.bias', 'fc.2.weight', 'fc.2.bias', 'fc.4.weight', 'fc.4.bias'])


In [12]:
my_alexnet.load_state_dict(model["model_state_dict"], strict=False)

<All keys matched successfully>

In [13]:
print(my_alexnet.cnn)

Sequential(
  (0): Conv2d(3, 96, kernel_size=(11, 11), stride=(4, 4))
  (1): ReLU(inplace=True)
  (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (3): LocalResponseNorm(96, alpha=0.0001, beta=0.75, k=1.0)
  (4): Conv2d(96, 384, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
  (5): ReLU(inplace=True)
  (6): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (7): LocalResponseNorm(384, alpha=0.0001, beta=0.75, k=1.0)
  (8): Conv2d(384, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (9): ReLU(inplace=True)
  (10): BatchNorm2d(384, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (11): Conv2d(384, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (12): ReLU(inplace=True)
  (13): BatchNorm2d(384, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (14): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (15): ReLU(inplace=True)
  (16): BatchNorm2d(256, eps=1e-

In [14]:
class CNNclassifier(nn.Module):
  def __init__(self, pretrained_cnn):
      super(CNNclassifier, self).__init__()
      self.loss_criterion = nn.CrossEntropyLoss(reduction='sum')
      self.cnn = pretrained_cnn
      self.fc = nn.Sequential(
        nn.Linear(4096,4),)
      
  def forward(self, x):
    new_x = x.repeat(1, 3, 1, 1)
    output= self.cnn(new_x)
    output = output.view(output.size()[0], -1)
    output = self.fc(output)
    return output

In [15]:
cnn_model = CNNclassifier(my_alexnet.cnn)

In [16]:
import os
from torch.optim import SGD
import matplotlib.pyplot as plt
import torch.utils
from torch.autograd import Variable

optimizer_config = {
  "optimizer_type": "adam",
  "lr": 0.0001,
  "weight_decay": 1,
}

def get_optimizer(model: torch.nn.Module,
                  config: dict) -> torch.optim.Optimizer:
  '''
  Returns the optimizer for the model params, initialized according to the config.

  Note: config has a minimum of three entries. Feel free to add more entries if you want.
  But do not change the name of the three existing entries. 
  Of course, the optimizer should be operating on the model.


  Args:
  - model: the model to optimize for
  - config: a dictionary containing parameters for the config
  Returns:
  - optimizer: the optimizer
  '''

  optimizer = None

  optimizer_type = config["optimizer_type"]
  learning_rate = config["lr"]
  weight_decay = config["weight_decay"]

  ############################################################################
  # Student code begin
  ############################################################################
  optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay=weight_decay)


  ############################################################################
  # Student code end
  ############################################################################

  return optimizer

def compute_loss(model: torch.nn.Module,
                 model_output: torch.tensor,
                 target_labels: torch.tensor,
                 is_normalize: bool = True) -> torch.tensor:
  '''
  Computes the loss between the model output and the target labels

  Note: we have initialized the loss_criterion in the model with the sum
  reduction.

  Args:
  -   model: model (which inherits from nn.Module), and contains loss_criterion
  -   model_output: the raw scores output by the net [Dim: (N, 15)]
  -   target_labels: the ground truth class labels [Dim: (N, )]
  -   is_normalize: bool flag indicating that loss should be divided by the
                    batch size
  Returns:
  -   the loss value of the input model
  '''

  ############################################################################
  # Student code begin
  ############################################################################
  # obtained_labels = predict_labels(model_output)
  # obtained_labels = nn.functional.softmax(obtained_labels, dim=1)
  kld = model.loss_criterion
  loss = kld(model_output,target_labels)
  if is_normalize:
    loss = loss/len(target_labels)

  ############################################################################
  # Student code end
  ############################################################################

  return loss

def predict_labels(model_output: torch.tensor) -> torch.tensor:
  '''
  Predicts the labels from the output of the model.

  Args:
  -   model_output: the model output [Dim: (N, 15)]
  Returns:
  -   predicted_labels: the output labels [Dim: (N,)]
  '''

  ############################################################################
  # Student code begin
  ############################################################################
  _, predicted_labels = torch.max(model_output,dim=1)

    
  ############################################################################
  # Student code end
  ############################################################################

  return predicted_labels


optimizer = get_optimizer(cnn_model, optimizer_config)

input_size = (64,64)

def get_fundamental_transforms() -> transforms.Compose:
  '''
  Returns the core transforms needed to feed the images to our model

  Suggestions: Resize(), ToTensor(), Normalize() from torchvision.transforms

  Args:
  - inp_size: tuple denoting the dimensions for input to the model
  - pixel_mean: the mean of the raw dataset [Shape=(1,)]
  - pixel_std: the standard deviation of the raw dataset [Shape=(1,)]
  Returns:
  - fundamental_transforms: transforms.Compose with the fundamental transforms
  '''

  return transforms.Compose([
      ############################################################################
      # Student code begin
      ############################################################################
    #transforms.Resize(size=inp_size),
    transforms.ToTensor(),
    #transforms.Normalize(mean=pixel_mean,std=pixel_std)

      ############################################################################
      # Student code end
      ############################################################################
  ])


class Trainer():
  '''
  This class makes training the model easier
  '''

  def __init__(self,
               data_dir,
               model,
               optimizer,
               model_dir,
               train_data_transforms,
               test_data_transforms,
               batch_size=100,
               load_from_disk=True,
               cuda=False
               ):
    self.model_dir = model_dir

    self.model = model

    self.cuda = cuda
    if cuda:
      self.model.cuda()

    dataloader_args = {'num_workers': 1, 'pin_memory': True} if cuda else {}

    self.train_dataset = ImageLoader(
        data_dir, split='train', transform=train_data_transforms)
    self.train_loader = torch.utils.data.DataLoader(self.train_dataset, 
                                                    batch_size=batch_size, 
                                                    shuffle=True,
                                                    **dataloader_args)

    self.test_dataset = ImageLoader(
        data_dir, split='test', transform=test_data_transforms)
    self.test_loader = torch.utils.data.DataLoader(self.test_dataset, 
                                                   batch_size=batch_size, 
                                                   shuffle=True,
                                                   **dataloader_args)

    self.optimizer = optimizer

    self.train_loss_history = []
    self.validation_loss_history = []
    self.train_accuracy_history = []
    self.validation_accuracy_history = []

    # load the model from the disk if it exists
    if os.path.exists(model_dir) and load_from_disk:
      checkpoint = torch.load(os.path.join(self.model_dir, 'checkpoint.pt'))
      self.model.load_state_dict(checkpoint['model_state_dict'])
      self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    self.model.train()

  def save_model(self):
    '''
    Saves the model state and optimizer state on the dict
    '''
    torch.save({
        'model_state_dict': self.model.state_dict(),
        'optimizer_state_dict': self.optimizer.state_dict()
    }, os.path.join(self.model_dir, 'checkpoint.pt'))

  def train(self, num_epochs):
    '''
    The main train loop
    '''
    self.model.train()
    #print("test")

    train_loss, train_acc = self.evaluate(split='train')
    #print("test")
    val_loss, val_acc = self.evaluate(split='test')
    #print("test")

    self.train_loss_history.append(train_loss)
    self.train_accuracy_history.append(train_acc)
    self.validation_loss_history.append(val_loss)
    self.validation_accuracy_history.append(val_acc)

    print('Epoch:{}, Training Loss:{:.4f}, Validation Loss:{:.4f}'.format(
      0, self.train_loss_history[-1], self.validation_loss_history[-1])
    )

    for epoch_idx in range(num_epochs):
      self.model.train()
      for _, batch in enumerate(self.train_loader):
        if self.cuda:
          input_data, target_data = Variable(
              batch[0]).cuda(), Variable(batch[1]).cuda()
        else:
          input_data, target_data = Variable(batch[0]), Variable(batch[1])

        #print(input_data.shape)
        #print(target_data.shape)

        output_data = self.model(input_data)
        loss = compute_loss(self.model, output_data, target_data)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

      train_loss, train_acc = self.evaluate(split='train')
      val_loss, val_acc = self.evaluate(split='test')

      self.train_loss_history.append(train_loss)
      self.train_accuracy_history.append(train_acc)
      self.validation_loss_history.append(val_loss)
      self.validation_accuracy_history.append(val_acc)

      print('Epoch:{}, Training Loss:{:.4f}, Validation Loss:{:.4f}'.format(
        epoch_idx+1, self.train_loss_history[-1], self.validation_loss_history[-1])
      )

    self.save_model()


  def evaluate(self, split='test'):
    '''
    Get the loss and accuracy on the test/train dataset
    '''
    self.model.eval()

    num_examples = 0
    num_correct = 0
    loss = 0

    for _, batch in enumerate(self.test_loader if split == 'test' else self.train_loader):
      #print("here")
      if self.cuda:
        input_data, target_data = Variable(
            batch[0]).cuda(), Variable(batch[1]).cuda()
      else:
        input_data, target_data = Variable(batch[0]), Variable(batch[1])

      output_data = self.model(input_data)

      num_examples += input_data.shape[0]
      loss += float(compute_loss(self.model,
                          output_data, target_data, is_normalize=False))
      predicted_labels = predict_labels(output_data)
      num_correct += torch.sum(predicted_labels == target_data).cpu().item()

    self.model.train()

    return loss/float(num_examples), float(num_correct)/float(num_examples)

  def plot_loss_history(self):
    '''
    Plots the loss history
    '''
    plt.figure()
    ep = range(len(self.train_loss_history))

    plt.plot(ep, self.train_loss_history, '-b', label = 'training')
    plt.plot(ep, self.validation_loss_history, '-r', label = 'validation')
    plt.title("Loss history")
    plt.legend()
    plt.ylabel("Loss")
    plt.xlabel("Epochs")
    plt.show()

  def plot_accuracy(self):
    '''
    Plots the accuracy history
    '''
    plt.figure()
    ep = range(len(self.train_accuracy_history))
    plt.plot(ep, self.train_accuracy_history, '-b', label = 'training')
    plt.plot(ep, self.validation_accuracy_history, '-r', label = 'validation')
    plt.title("Accuracy history")
    plt.legend()
    plt.ylabel("Accuracy")
    plt.xlabel("Epochs")
    plt.show()


trainer = Trainer(data_dir="./", 
                  model = cnn_model,
                  optimizer = optimizer,
                  model_dir = os.path.join("./", 'cnn_net'),
                  train_data_transforms = get_fundamental_transforms(),
                  test_data_transforms = get_fundamental_transforms(),
                  batch_size = 32,
                  load_from_disk = False,
                  cuda = True
                 )

Train 2000
Test 200
Full 2000
Train 2000
Test 200
Full 200


In [17]:
trainer.train(num_epochs=5)

Epoch:0, Training Loss:2460.0835, Validation Loss:769.0714
Epoch:1, Training Loss:1.1684, Validation Loss:1.2310
Epoch:2, Training Loss:1.1076, Validation Loss:1.1768
Epoch:3, Training Loss:1.0857, Validation Loss:1.1373
Epoch:4, Training Loss:1.0567, Validation Loss:1.0923
Epoch:5, Training Loss:1.0512, Validation Loss:1.0839


FileNotFoundError: ignored

In [18]:
train_accuracy = trainer.train_accuracy_history[-1]
validation_accuracy = trainer.validation_accuracy_history[-1]

print('Train Accuracy = {:.4f}; Validation Accuracy = {:.4f}'.format(train_accuracy, validation_accuracy))


Train Accuracy = 0.5835; Validation Accuracy = 0.5400
