## Downloading Chest XRAYS Data

In [0]:
!gdown https://drive.google.com/uc?id=1-491oqElItj4TOTApzBgkfVCNsNH-_HL
!unzip /content/A_05_Part_02_Dataset.zip
data_dir = '/content/content/A_05_Part_02_Dataset'

## Importing Libraries

In [0]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.autograd import Variable
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy

## DataLoaders

In [0]:
# #Define transforms for the training data and testing data
train_transforms = transforms.Compose([transforms.RandomRotation(30),
                                       transforms.Resize(256),
                                       transforms.CenterCrop(224),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.ToTensor(),
                                       transforms.Normalize([0.4924, 0.4924, 0.4925],
                                                            [0.2491, 0.2491, 0.2491])])

test_transforms = transforms.Compose([transforms.Resize(256),
                                      transforms.CenterCrop(224),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.4924, 0.4924, 0.4925],
                                                            [0.2491, 0.2491, 0.2491])])


#pass transform here-in
train_data = datasets.ImageFolder(data_dir + '/Train', transform=train_transforms)
val_data = datasets.ImageFolder(data_dir + '/Validation', transform=test_transforms)
test_data = datasets.ImageFolder(data_dir + '/Test', transform=test_transforms,)

#data loaders
batch_size =32
trainloader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)
valloader = torch.utils.data.DataLoader(val_data, batch_size=batch_size, shuffle=True)
testloader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, shuffle=True)



print("Classes: ")
class_names = train_data.classes
print(class_names)

## Model Initialization

In [0]:
def initialize_model(model,freeze):
    if model == 'res18':
        res18 = models.resnet18(pretrained=True)
        # print(res18.fc.in_features)
        fc1_in = res18.fc.in_features
        # print(fc1_in)
        
        if freeze == 'all':
            for param in res18.parameters():
                param.requires_grad=False
        elif freeze == 'partial':
            for param in list(res18.parameters())[:-17]:
                param.requires_grad=False
        elif freeze == 'none':
            pass


    elif model == 'vgg16':
        vgg16 = models.vgg16(pretrained=True)
        fc1_in = vgg16.classifier[0].in_features

        if freeze == 'all':
            for param in vgg16.features.parameters():
                param.requires_grad = False
        elif freeze == 'partial':
            for param in list(vgg16.features.parameters())[:-6]:
                param.requires_grad = False
        elif freeze == 'none':
            pass


    fc1_out = 150 #5*10+100
    fc2_in = fc1_out
    fc2_out = 3
    features = [nn.Linear(fc1_in,fc1_out,bias=True),nn.ReLU(inplace=True),nn.Dropout(p=0.5,inplace=False),nn.Linear(fc2_in,fc2_out,bias=True)]

    if model=='res18':
        res18.fc = nn.Sequential(*features)
        return res18
    if model == 'vgg16':
        vgg16.classifier = nn.Sequential(*features)
        return vgg16

net = 'res18'
freeze = 'none'
model = initialize_model(net,freeze)
# pretrained_weights = '/content/drive/My Drive/focal_trained/res18_focal_loss.pth'
# model.load_state_dict(torch.load(pretrained_weights)['state_dict'])



## Focal Loss

In [0]:

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

class FocalLoss2d(nn.modules.loss._WeightedLoss):

    def __init__(self, gamma=2,alpha=torch.Tensor([0.8764,0.0438,0.0797])):
        super(FocalLoss2d, self).__init__()
        self.gamma = gamma
        self.alpha = alpha

    def forward(self, inpt, target):
        
        assert len(inpt.shape) == len(target.shape)
        assert inpt.size(0) == target.size(0)
        assert inpt.size(1) == target.size(1)
        
           
        logpt = -torch.nn.BCEWithLogitsLoss(reduction='none')(inpt, target)

        pt = torch.exp(logpt)
        # print(pt.shape,logpt.shape,gamma.shape)

        focal_loss = -( (1-pt).pow(self.gamma) ) * logpt
        balanced_focal_loss = self.alpha*focal_loss
        return torch.mean(balanced_focal_loss)



##Multilabel One Hot Encoding

In [0]:
# # ['covid-19', 'normal', 'pneumonia']
def one_hot_encode(label):
  label = label.long()
  # print(label.size())
  l = torch.eye(3)
  l[0] = l[0]+l[2]

  return l[label]


## Training

In [0]:
from torch.autograd import Variable

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)


Epochs = 50
lr = 1e-5



loss_type = "focal"

if loss_type == "focal":
  
  gamma = Variable(torch.Tensor([2]).to(device),requires_grad=True)
  alpha = Variable(torch.Tensor([0.25,0.25,0.25]).to(device),requires_grad=True)

  criterion = FocalLoss2d(gamma,alpha)


  optimizer = optim.SGD([*model.parameters(),alpha,gamma], lr=lr, momentum=0.9)

else:
  criterion = torch.nn.BCEWithLogitsLoss(reduction='mean')
  optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

# criterion(torch.Tensor([[0.7,0.6,0.9],[0.3,0.8,0.4]]).to(device),torch.Tensor([[1,0,1],[1,1,0]]).to(device))

In [0]:
os.mkdir('/content/trained_models')
path = '/content/trained_models'


In [0]:

from tqdm import tqdm


val_loss_min= 1e6
val_acc_prev = 0.0

epoch_lr = []
epoch_tacc = []
epoch_tloss = []
epoch_vacc= []
epoch_vloss = []


for epoch in range(Epochs):  # loop over the dataset multiple times
    val_accuracy =  0.0
    train_accuracy = 0.0

    epoch_lr.append(lr)
    val_loss = 0.0
    running_loss = 0.0


    model.train()
    pbar = tqdm(enumerate(trainloader),position=0,leave=True)
    for i,data in pbar:
        inputs, labels = data
        inputs = inputs.to(device)#, labels.to(device)
        optimizer.zero_grad()

        outputs = model(inputs)               #----> forward pass
        labels = one_hot_encode(labels).to(device)
        loss = criterion(outputs,labels)
        # loss.register_hook(lambda g: print(g))
        
        loss.backward()                     #----> backward pass
        optimizer.step()                    #----> weights update

        running_loss += loss.item()
        ##############
        outputs = torch.sigmoid(outputs)
        correct = ((outputs>0.5)==labels).float().sum()
        #################

        # correct = ((outputs>0.5)==labels).float().sum()
        accuracy = correct*100/(outputs.shape[0]*outputs.shape[1])  #.mean()*100
        train_accuracy+=accuracy

        pbar.set_description(
            'Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tAccuracy: {:.1f}%'.format(
                epoch, i * len(inputs), len(trainloader.dataset),
                100. * i / len(trainloader),
                loss.data.item(),accuracy),refresh=False)
            
    print("\nTraining Loss of Epoch ",epoch," is :",running_loss)
    
    train_accuracy =  train_accuracy/len(trainloader)
    print("Training Accuracy of Epoch ",epoch," is :",train_accuracy.item(),"\n\n")
    

    epoch_tacc.append(train_accuracy)
    epoch_tloss.append(running_loss)

    model.eval()
    pbar = tqdm(enumerate(valloader),position=0,leave=True)
    for i,data in pbar:
        inputs, labels = data
        inputs = inputs.to(device)
        labels = one_hot_encode(labels).to(device)
        outputs = model(inputs)               #----> forward pass
        loss = criterion(outputs, labels)   #----> compute loss
    
        val_loss += loss.item()
        

        outputs = torch.sigmoid(outputs)
        correct = ((outputs>0.5)==labels).float().sum()
        
        # correct = ((outputs>0.5)==labels).float().sum()
        accuracy = correct*100/(outputs.shape[0]*outputs.shape[1])
        val_accuracy += accuracy
        pbar.set_description(
            'Validation Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tAccuracy: {:.1f}%'.format(
                epoch, i * len(inputs), len(valloader.dataset),
                100. * i / len(valloader),
                loss.data.item(),accuracy),refresh=False)
        

    print("\nValidation Loss of Epoch ",epoch," is :",val_loss)
    val_accuracy = val_accuracy/len(valloader)
    print("Validation Accuracy of Epoch ",epoch," is :",val_accuracy.item(),"\n\n")

    epoch_vloss.append(val_loss)
    epoch_vacc.append(val_accuracy)

    if val_loss < val_loss_min:
        checkpoint = {
                'epoch': epoch + 1,
                'valid_loss_min': val_loss,
                'state_dict': model.state_dict(),
                'optimizer': optimizer.state_dict(),
            }

        val_loss_min = val_loss

        torch.save(checkpoint, os.path.join(path,'vgg16_ft_{:.2f}_{:.2f}_{}.pth'.format(val_accuracy,val_loss,epoch)))

    delta = abs(val_accuracy - val_acc_prev)
    val_acc_prev = val_accuracy

    if (delta < 0.4)and(lr<0.1):
        lr = lr*10
        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    
print('Finished Training')

## Loss and Accuracy Curves

In [0]:
save_path= '/content/drive/My Drive/vgg_results'
import os.path as osp

Epochs=21
## LOSS AND ACCURACY CURVES ##
plt.figure()
plt.plot(range(Epochs),np.array(epoch_tloss).reshape(Epochs),color='k',label='Train')
plt.plot(range(Epochs),np.array(epoch_vloss).reshape(Epochs),color='b',label='Validation')


plt.title('Loss Curves')
plt.xlabel('epochs')
plt.ylabel('Loss')
plt.ylim(0,100)
plt.xlim(1,50)
plt.legend()

# plt.savefig(osp.join(save_path,'Loss Curves.png'),bbox_inches='tight')
################ ACCURACY##################

plt.figure()
plt.plot(range(Epochs),epoch_tacc,color='k',label='Train')
plt.plot(range(Epochs),epoch_vacc,color='b',label='Validation')
plt.ylim(0,100)
plt.xlim(1,50)

plt.title('Accuracy Curves')
plt.xlabel('epochs')
plt.ylabel('Accuracy')
plt.legend()

# plt.savefig(osp.join(save_path,'Accuracy Curves.png'),bbox_inches='tight')

## Confusion Matrices

In [0]:
from sklearn.metrics import multilabel_confusion_matrix
import itertools

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)


correct = 0
predicted = []
gt = []
cfm =[[[0,0],[0,0]],[[0,0],[0,0]],[[0,0],[0,0]]]
with torch.no_grad():
  for data in trainloader:
        images, labels = data
        images = images.to(device)
        labels = one_hot_encode(labels).to(device)

        outputs = model(images)
        outputs = torch.sigmoid(outputs)
        
        correct += ((outputs>0.5)==labels).float().sum()
        
        cfm += multilabel_confusion_matrix(torch.Tensor.cpu(labels), torch.Tensor.cpu(outputs>0.5))#,labels=["covid-19","normal","pneumonia"]))

print('Accuracy of the network on test images: %d %%' % (
    100 * correct /(len(trainloader.dataset)*3 )))


recall = cfm[0][1,1]/cfm[0][1,:].sum()
precision = cfm[0][1,1]/cfm[0][:,1].sum()
f1 = (2*precision*recall)/(precision+recall)
print("F1 SCORE: ",f1)


def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

#     print(cm)

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
plt.figure()
plot_confusion_matrix(cfm[0], classes=['rest','covid-19'])
plt.figure()
plot_confusion_matrix(cfm[1], classes=['rest','normal'])
plt.figure()
plot_confusion_matrix(cfm[2], classes=['rest','pneumonia'])
# plt.savefig(osp.join(save_path,'cfm_train.png'),bbox_inches='tight')



        


## Saving Results

In [0]:
import torch
import pandas as pd
import numpy as np

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

predictions = []
for data in testloader:
      images, labels = data
      images = images.to(device)
      labels = one_hot_encode(labels).to(device)
      
      outputs = model(images)
      outputs = torch.sigmoid(outputs)
      

      preds = torch.Tensor.cpu(outputs>0.5).int().numpy()
      preds[:,[1,2]] = preds[:,[2,1]]
      predictions.extend(preds)


df = pd.DataFrame(predictions)
df[-1] = pd.Series([tup[0].split('/')[-1] for tup in testloader.dataset.samples])
# df[-1] = pd.Series(['1','2','3','4','5','6'])
df =  df[[-1,0,1,2]]
df.to_csv('results.csv',header=False)

