In [None]:
import numpy as np
import csv
import matplotlib.pyplot as plt
import cv2

import torch
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets
from torch.utils.data import Dataset
from torch.utils.data.dataset import random_split
from torch.utils.data import DataLoader
from torchvision import models
import torch.nn as nn
import torch.optim as optim

from PIL import Image
import dill

In [None]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [None]:
torch.cuda.empty_cache()

## Assistant Functions

In [None]:
def imshow(image, ax=None, title=None, normalize=True):
    """Imshow for Tensor."""
    if ax is None:
        fig, ax = plt.subplots()
    image = image.numpy().transpose((1, 2, 0))

    if normalize:
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        image = std * image + mean
        image = np.clip(image, 0, 1)

    ax.imshow(image)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['left'].set_visible(False)
    ax.spines['bottom'].set_visible(False)
    ax.tick_params(axis='both', length=0)
    ax.set_xticklabels('')
    ax.set_yticklabels('')

    return ax

In [None]:
def compute_accs(pred_labels, true_labels):
    """
    function that computes accuracy for each input/image.
    
    @params: pred_labels: Tensor, labels that are predicted by fitted model
             true_labels: Tensor, true labels that come with inputs
    @return: Float, accuracy of each input/image
    """
    
    num_classes = pred_labels.shape[1]
    result = torch.round(pred_labels).eq(true_labels).sum(axis=1)
    return result.detach().cpu().numpy() / num_classes

In [None]:
def select_images_labels(images, labels, names, accuracies, threshold, image_holder, label_holder, name_holder):
    
    """
    Function that selects image(s) that meet 
    the threshold and puts them into an image_holder.
    The corresponding labels are put into a label_holder.
    
    @params: images: Tensor, a batch of images
             labels: Tensor, corresponding true labels of images
             accuracies: Tensor, prediction accuracies of each image
             threshold: Float, 1.0 or 0.5 to find best and worst predicted images
             image_holder: List, list to store best and worst images
             label_holder: List, list to store labels of those images
    
    @return: None
    """

    ind = np.where(accuracies==threshold)[0]

    imgs = images[ind]
    img_labels = labels[ind]
    img_names = names[ind]
    
    if len(ind) != 0:
        for img in imgs:
            img = img.detach().cpu().numpy()
            image_holder.append(np.expand_dims(img, 0))
        for img_label in img_labels:
            label_holder.append(img_label)
        for img_name in img_names:
            name_holder.append(img_name)

In [None]:
def create_heatmap(model, image):
    
    """
    Function that creates a heatmap for a specific class of an image.
    
    @params: model: trained model that uses to make predictions.
             image: Tensor, image that is ready to be predicted.
    
    @return: heatmap: Tensor, heatmap shows locations that the model is
                      looking for to make predictions.
    """
    
    # pull the gradients out of the model
    gradients = model.get_activations_gradient()

    # pool the gradients across the channels
    pooled_gradients = torch.mean(gradients, dim=[0, 2, 3])

    # get the activations of the last convolutional layer
    activations = model.get_activations(image).detach()       # detach() stops tracking gradients

    # # weight the channels by corresponding gradients
    for i in range(512):
        activations[:, i, :, :] *= pooled_gradients[i]
    
    # average the channels of the activations
    heatmap = torch.mean(activations, dim=1).squeeze()    # average over 512 channels; squeeze() to (14, 14)

    # relu on top of the heatmap
    # expression (2) in https://arxiv.org/pdf/1610.02391.pdf
    heatmap = np.maximum(heatmap.cpu(), 0)        # ReLU; np.maximum() element-wise max

    # normalize the heatmap
    heatmap /= torch.max(heatmap)

    # draw the heatmap
#     plt.matshow(heatmap.squeeze())
    
    return heatmap

In [None]:
def predict(model, image):
    
    """
    Function that predicts probabilities and classes of an image.
    
    @params: model: trained model that is ready for predictions
             image: Tensor, an image that is to be predicted
    
    @return: pred: Tensor, predicted probabilities for each class
             output_classes: List
    """
    
    # set the evaluation mode
    model.eval()

    # get the positive prediction(s) of the model
    pred = model.forward_train(image)
    
    # get labels of the image
    labels = torch.round(torch.sigmoid(pred)).cpu()

    # get output class(es)
    output_classes = np.where(labels == 1)[1]
    
    return pred, output_classes

In [None]:
def grad_CAM(model, image, labels, image_path, save_dir, file_type):
    
    """
    Function that performs grad-CAM on an image and saves the superimposed image.
    
    @params: model: trained model that is ready for predictions
             image: Numpy Array, an image that is to be predicted
             labels: Tensor, labels corresponding to the image
             image_path: Numpy String, directory where the original image is saved
             save_dir: String, directory where to save output
             file_type: String, output type
    
    @return: None
    """
    
    img = torch.tensor(image).clone().detach().cuda()
    pred, output_classes = predict(model, img)
    print(pred)
    print(output_classes)
    print(np.array(classes)[output_classes])

    # verify output classes and true classes
#     true_labels = np.where(labels.cpu()==1)[0]
#     print(true_labels == output_classes)

    # backprop
    # get the gradient of the support devices wrt the parameters of the model
    for j in range(len(output_classes)):

        pred, output_classes = predict(model, img)
        pred[:, output_classes[j]].backward()

        # generate heatmap
        heatmap = create_heatmap(model, img)

        # load in original image
        im = cv2.imread(image_path)

        # resize heatmap to match original image
        heatmap = cv2.resize(np.float32(heatmap), (im.shape[1], im.shape[0]))
        heatmap = np.uint8(255 * heatmap)
        heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
        superimposed_img = cv2.addWeighted(im, 0.6, heatmap, 0.4, 0)

        # save superimposed image (original+heatmap)
        patient_name = image_path[29:41]
        class_name = '_' + np.array(classes)[output_classes[j]]
        savePath = save_dir + patient_name + class_name + file_type
        cv2.imwrite(savePath, superimposed_img)

## 1. Create Dataset
We create our dataset from a csv file which includes image names and mask of each image. Each mask includes 14 different types of pathologies. It's possible that each image has more than one pathologies, indicating we use multi-hot encoding in this project.<br><br>
_Mask Type:_
* Blanks (''): the existence of the pathology is unknown.
* Ones (1): the pathology is detected thanks to the image.
* Zeros (0): the pathology can't be detected thanks to the image.
* Uncertain (-1): the pathology may be detected.

_Policies:_
* ones: seeing all the uncertain pathologies as positive
* zeros: seeing all the uncertain pathologies as negative

In [None]:
# Paths to the files with training, and validation sets.
# Each file contains pairs (path to image, output vector)
pathFileTrain = './train_50000.csv'
pathFileValid = './valid.csv'
pathFileTest = './test.csv'

# Neural network parameters:
nnClassCount = 14                   #dimension of the output

# Training settings: batch size
trBatchSize = 64
# Test settings: batch size
tsBatchSize = 32

# Parameters related to image transforms: size of the down-scaled image, cropped image
imgtransResize = (320, 320)
imgtransCrop = 224

# class names
classes = ['No_Finding', 'Enlarged_Cardiomediastinum', 'Cardiomegaly', 'Lung_Opacity', 
           'Lung_Lesion', 'Edema', 'Consolidation', 'Pneumonia', 'Atelectasis', 'Pneumothorax', 
           'Pleural_Effusion', 'Pleural_Other', 'Fracture', 'Support_Devices']

In [None]:
class CheXpertDataSet(Dataset):
    def __init__(self, image_list_file, transform=None, policy="ones"):
        """
        image_list_file: path to the file containing images with corresponding labels.
        transform: optional transform to be applied on a sample.
        Upolicy: name the policy with regard to the uncertain labels
        """
        x = []
        y = []

        with open(image_list_file, "r") as f:
            csvReader = csv.reader(f)
            next(csvReader, None)
            k=0
            for line in csvReader:
                k+=1
                image_name= line[0]
                label = line[5:]
                
                for i in range(14):
                    if label[i]:
                        a = float(label[i])
                        if a == 1:
                            label[i] = 1
                        elif a == -1:
                            if policy == "ones":
                                label[i] = 1
                            elif policy == "zeroes":
                                label[i] = 0
                            else:
                                label[i] = 0
                        else:
                            label[i] = 0
                    else:
                        label[i] = 0
                        
                x.append('../' + image_name)
                y.append(label)

        self.x = x
        self.y = y
        self.transform = transform

    def __getitem__(self, index):
        """Take the index of item and returns the image and its labels"""
        
        image_name = self.x[index]
        image = Image.open(image_name).convert('RGB')
        label = self.y[index]
        if self.transform is not None:
            image = self.transform(image)
        return (image, torch.FloatTensor(label), (image_name,))
    
    def __len__(self):
        return len(self.x)

## 2. Create DataLoaders

First we define a transform model to resize all images and normalize them.

In [None]:
#TRANSFORM DATA
normalize = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
transformList = []
transformList.append(transforms.Resize(imgtransResize))
transformList.append(transforms.RandomResizedCrop(imgtransCrop))
transformList.append(transforms.RandomHorizontalFlip())
transformList.append(transforms.ToTensor())
transformList.append(normalize)      
transformSequence = transforms.Compose(transformList)

Then we build train, validation and test data loaders. 

In [None]:
#LOAD DATASET
train_ds = CheXpertDataSet(pathFileTrain ,transformSequence, policy="ones")
test_ds, train_ds = random_split(train_ds, [2000, len(train_ds) - 2000])
valid_ds = CheXpertDataSet(pathFileValid, transformSequence)

# Create dataloaders
dataloaderTrain = DataLoader(dataset=train_ds, batch_size=trBatchSize, shuffle=True)
dataloaderValid = DataLoader(dataset=valid_ds, batch_size=trBatchSize, shuffle=False)
dataloaderTest = DataLoader(dataset=test_ds, batch_size=trBatchSize, shuffle=False)

Plot four sample images from the training set.

In [None]:
# load some random training images
images, labels = iter(dataloaderTrain).next()

In [None]:
fig, axes = plt.subplots(figsize=(12,9), ncols=4)
for ii in range(4):
    ax = axes[ii]
    imshow(images[ii], ax=ax)

## 3. Create Architecture and Train Model

Create a customized vgg16_bn architecture, find the best learning rate, and train the model using best slice learning rates.

* Find the last convolution layer, which is the 42nd layer of the vgg16bn model
* Perform the first 42nd features and hook the gradients on the last convolution layer
* Perform the remaining features and the classifier
* Create two methods to extract the gradients and the activation map


In [None]:
# customized model
class VGG(nn.Module):
    def __init__(self):
        super(VGG, self).__init__()
        
        # get the pretrained vgg16_bn network
        self.vgg = models.vgg16_bn(pretrained=True)
        
        # disect the network to access its last convolutional layer
        self.features_conv = self.vgg.features[:43]
        
        # get the avg pool
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        
        # get the max pool of the features stem
        self.max_pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        
        # get the classifier of the vgg16_bn
        self.classifier = nn.Sequential(nn.Linear(25088, 4096),
                          nn.ReLU(True),
                          nn.Dropout(0.5),
                          nn.Linear(4096, 4096),
                          nn.ReLU(True),
                          nn.Dropout(0.5),
                          nn.Linear(4096, nnClassCount))
        
        # placeholder for the gradients
        self.gradients = None
    
    # hook for the gradients of the activations
    def activations_hook(self, grad):
        self.gradients = grad
        
    def forward_train(self, x):
        x = self.features_conv(x)
        
        # register the hook
        h = x.register_hook(self.activations_hook)
        
        # apply the remaining pooling
        x = self.max_pool(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x
    
    def forward_test(self, x):
        # only register the hook in training
        x = self.features_conv(x)
        x = self.max_pool(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x
    
    # method for the gradient extraction
    def get_activations_gradient(self):
        return self.gradients
    
    # method for the activation exctraction
    def get_activations(self, x):
        return self.features_conv(x)

Replace the classifier of the pretrained model to the customized one matching our number of classes.

Define loss function and optimizer.

In [None]:
# Use GPU if it's available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# initialize the VGG model
model = VGG()
    
# loss function
criterion = nn.MultiLabelSoftMarginLoss()

# optimizer
optimizer = optim.Adam(model.classifier.parameters(), lr=1e-5)

model.to(device)

Train the VGG model.

In [None]:
epochs = 1
steps = 0
running_loss = 0
print_every = 5

# metrics to save the best model
best_acc = 0

for epoch in range(epochs):
    for inputs, labels, _ in dataloaderTrain:
        steps += 1
        # Move input and label tensors to the default device
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        
        logps = model.forward_train(inputs)
        loss = criterion(logps, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        
        if steps % print_every == 0:
            test_loss = 0
            accuracy = 0
            model.eval()
            with torch.no_grad():
                for inputs, labels, _ in dataloaderValid:
                    inputs, labels = inputs.to(device), labels.to(device)
                    logps = model.forward_test(inputs)
                    batch_loss = criterion(logps, labels)
                    
                    test_loss += batch_loss.item()
                    
                    # Calculate scores
                    pred_labels = torch.round(torch.sigmoid(logps))
                    accs = compute_accs(pred_labels, labels)
                    accuracy += np.mean(accs)
            
            # save the best model
            if accuracy/len(dataloaderValid) > best_acc:
                best_model = model
                
            print(f"Epoch {epoch+1}/{epochs}.. "
                  f"Train loss: {running_loss/print_every:.3f}.. "
                  f"Test loss: {test_loss/len(dataloaderValid):.3f}.. "
                  f"Acc: {accuracy/len(dataloaderValid):.3f}.. ")
            
            running_loss = 0
            model.train()
#         else:
#             print('.')
print('Done!')

Save the best model.

In [None]:
# torch.save(best_model.state_dict(), 'best_model_vgg16bn.pth')

## 4. Predict Test Data Using Best Model
* Predict test data using the best model saved in the training session.
* Extract 10 images that have an accuracy of 100% and 10 images that have an accuracy not greater than 50%.
* Perform Grad-CAM on these 20 images.

Create test dataset and test dataloader using a smaller batch size to save memory.

In [None]:
#LOAD DATASET
test_ds = CheXpertDataSet(pathFileTest, transformSequence)

# Create dataloaders
dataloaderTest = DataLoader(dataset=test_ds, batch_size=tsBatchSize, shuffle=False)

In [None]:
print('num test batches:', len(dataloaderTest))

Create four empty lists `best_images`, `worst_images`, `best_labels` and `worst_labels`. `best_images` stores images that have an accuracy of 100%, while `worst_images` stores those that have an accuracy of <= 50. The other two lists store corresponding labels.

In [None]:
best_images = []
worst_images = []
best_labels = []
worst_labels = []
best_names = []
worst_names = []

Predict outcomes for each batch of test data.

In [None]:
# set the evaluation mode
best_model.eval()

for images, labels, image_names in dataloaderTest:
    image_names = np.asarray(image_names)[0]
    images, labels = images.to(device), labels.to(device)
    pred = best_model.forward_test(images)
    
    # get labels of the image
    pred_labels = torch.round(torch.sigmoid(pred))

    # get accuracy of each image
    accs_test = compute_accs(pred_labels, labels)
    print('acc:', np.mean(accs_test))
    
    select_images_labels(images, labels, image_names, accs_test, 1.0, best_images, best_labels, best_names)
    select_images_labels(images, labels, image_names, accs_test, 0.5, worst_images, worst_labels, worst_names)

In [None]:
print('num best images:', len(best_images))
print('num worst images:', len(worst_images))

Check the number of images that are labeled all 0's and get their indices. Get rid of these images from `best_images`.

In [None]:
invalid_lst = []
for k in range(len(best_labels)):
    if sum(best_labels[k]) == 0:
        invalid_lst.append(k)
        
print(invalid_lst)

In [None]:
full_ind_lst = set(np.arange(len(best_images)))
valid_lst = list(full_ind_lst - set(invalid_lst))
print(valid_lst)

In [None]:
best_images_new = np.array(best_images)[valid_lst]
best_labels_new = np.array(best_labels)[valid_lst]
best_names_new = np.array(best_names)[valid_lst]
print(best_images_new.shape)
print(best_labels_new.shape)
print(best_names_new.shape)

Save the session, in case the model needs to be re-trained.

In [None]:
# dill.dump_session('vgg16bn_images_selected.db')

In [None]:
# dill.load_session('vgg16bn_images_selected.db')

## 4. Perform Grad-CAM on Each Selected Image from Valid Set
* Predict logits for each image regarding to the 14 labels
* Apply `torch.sigmoid()` to convert logits to binary labels
* Specify output class and call `backward()` to get the gradients in terms of the class
* Perform formulas in the paper (https://arxiv.org/pdf/1610.02391.pdf) for Grad-CAM coarse discriminative heatmap
* Resize the heatmap to match the original image and combine the two of them

Perform grad-CAM on best images.

In [None]:
for i in range(10):
    print(best_names_new[i][29:41], ':')
    grad_CAM(best_model, best_images_new[i], best_labels_new[i], best_names_new[i], save_dir='./grad-cam_results/best/', file_type='.jpg')
    print()

Perform grad-CAM on worst images.

In [None]:
# perform Grad-CAM on all worst images
# because some of the predictions are all 0's
# where the true labels have seven 1's
# these images have no Grad-CAM outputs
for i in range(14):
    print(worst_names[i][29:41], ':')
    grad_CAM(best_model, worst_images[i], worst_labels[i], worst_names[i], save_dir='./grad-cam_results/worst/', file_type='.jpg')
    print()