In [None]:
from __future__ import print_function, division

import os
import time
import copy
import cv2
import matplotlib
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from itertools import cycle
from get_metrics import *
plt.ion()   # interactive mode

In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchnet import meter
from torchnet.meter import aucmeter
from torch.optim import lr_scheduler
from ignite.contrib.metrics import roc_auc
from torchvision import datasets, models, transforms
from torchvision.datasets.folder import default_loader

In [None]:
ROOT = 'mura_clahe'
data_dir = os.path.join(ROOT, 'mura')
images_dir = os.path.join(data_dir, 'images')
train_dir = os.path.join(data_dir, 'train')
val_dir = os.path.join(data_dir, 'val')

In [None]:
pretrained_size = 320

pretrained_means = [0.485,0.456,0.406]

pretrained_stds= [0.229,0.224,0.225]

batch_size=8

data_transforms = {
    
    'train': transforms.Compose([
        transforms.Resize((pretrained_size,pretrained_size)),
                           transforms.RandomHorizontalFlip(),
                           transforms.RandomRotation(5),
                           transforms.ToTensor(),
                           transforms.Normalize(mean = pretrained_means, 
                                                std = pretrained_stds)
    ]),
    'val': transforms.Compose([
        transforms.Resize((pretrained_size,pretrained_size)),
                           transforms.ToTensor(),
                           transforms.Normalize(mean = pretrained_means, 
                                                std = pretrained_stds)
    ]),
                   }
print("Initializing Datasets and Dataloaders...\n")
# Create training and validation datasets
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'val']}
# Create training and validation dataloaders
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True, num_workers=4,pin_memory=True) for x in ['train', 'val']}
device = torch.device("cuda:0")
dataset_sizes ={x:len(image_datasets[x]) for x in ['train','val']}

In [None]:
# Class names convert to index
image_datasets['train'].class_to_idx
class_names=image_datasets['train'].classes
print(">>Class Names: {}\n".format(image_datasets['train'].classes))
print(">>Class Index: {}\n".format(image_datasets['train'].class_to_idx))
print(">>Number of images in training={}\n".format(dataset_sizes['train']))
print(">>Number of images in test={}\n".format(dataset_sizes['val']))
print("    Number of steps for training set={}\n".format(len(dataloaders['train'])))
print("    Number of steps for test set={}\n".format(len(dataloaders['val']))) 
# 1:positive #0:negative

In [None]:
#for class 1
student_0=models.resnet34(pretrained=True)
student_1=models.densenet201(pretrained=True)
#for class 0
student_3=models.densenet169(pretrained=True)
student_4=models.resnext50_32x4d(pretrained=True)

In [None]:
class SpinalNet_ResNet(nn.Module):
    def __init__(self):
        super(SpinalNet_ResNet, self).__init__()
        self.fc_spinal_layer1 = nn.Sequential(
        nn.Linear(256, 256),
        nn.ReLU(inplace=True),)
        self.fc_spinal_layer2 = nn.Sequential(
        nn.Linear(256+256, 256),
        nn.ReLU(inplace=True),)
        self.fc_spinal_layer3 = nn.Sequential(
        nn.Linear(256+256, 256),
        nn.ReLU(inplace=True),)
        self.fc_spinal_layer4 = nn.Sequential(
        nn.Linear(256+256, 256),
        nn.ReLU(inplace=True),)
        self.fc_out = nn.Sequential(
        nn.Linear(256*4, 2),nn.Sigmoid())
    def forward(self, x):
        x1 = self.fc_spinal_layer1(x[:, 0:256])
        x2 = self.fc_spinal_layer2(torch.cat([ x[:,256:2*256], x1], dim=1))
        x3 = self.fc_spinal_layer3(torch.cat([ x[:,0:256], x2], dim=1))
        x4 = self.fc_spinal_layer4(torch.cat([ x[:,256:2*256], x3], dim=1))
        x = torch.cat([x1, x2], dim=1)
        x = torch.cat([x, x3], dim=1)
        x = torch.cat([x, x4], dim=1)
        x = self.fc_out(x)
        return x

In [None]:
class SpinalNet_Dense(nn.Module):
    def __init__(self):
        super(SpinalNet_Dense, self).__init__()
        self.fc_spinal_layer1 = nn.Sequential(
        nn.Dropout(p = 0.5), nn.Linear(960, 240),
        nn.BatchNorm1d(240), nn.ReLU(inplace=True),)
        self.fc_spinal_layer2 = nn.Sequential(
        nn.Dropout(p = 0.5),
        nn.Linear(960+240, 240),
        nn.BatchNorm1d(240),
        nn.ReLU(inplace=True),)
        self.fc_spinal_layer3 = nn.Sequential(
        nn.Dropout(p = 0.5),
        nn.Linear(960+240, 240),
        nn.BatchNorm1d(240),
        nn.ReLU(inplace=True),)
        self.fc_spinal_layer4 = nn.Sequential(
        nn.Dropout(p = 0.5),
        nn.Linear(960+240, 240),
        nn.BatchNorm1d(240),
        nn.ReLU(inplace=True),)
        self.fc_out = nn.Sequential(
        nn.Dropout(p = 0.5),
        nn.Linear(240*4, 2),nn.Sigmoid())
    def forward(self, x):
        x1 = self.fc_spinal_layer1(x[:, 0:960])
        x2 = self.fc_spinal_layer2(torch.cat([ x[:,960:2*960], x1], dim=1))
        x3 = self.fc_spinal_layer3(torch.cat([ x[:,0:960], x2], dim=1))
        x4 = self.fc_spinal_layer4(torch.cat([ x[:,960:2*960], x3], dim=1))
        x = torch.cat([x1, x2], dim=1)
        x = torch.cat([x, x3], dim=1)
        x = torch.cat([x, x4], dim=1)
        x = self.fc_out(x)
        return x

In [None]:
class SpinalNet_ResNext(nn.Module):
    def __init__(self):
        
        super(SpinalNet_ResNext, self).__init__()
        self.fc_spinal_layer1 = nn.Sequential(
        nn.Linear(1024, 20),
        nn.ReLU(inplace=True),)
        self.fc_spinal_layer2 = nn.Sequential(
        nn.Linear(1024+20, 20),
        nn.ReLU(inplace=True),)
        self.fc_spinal_layer3 = nn.Sequential(
        nn.Linear(1024+20, 20),
        nn.ReLU(inplace=True),)
        self.fc_spinal_layer4 = nn.Sequential(
        nn.Linear(1024+20, 20),
        nn.ReLU(inplace=True),)
        self.fc_out = nn.Sequential(
        nn.Linear(20*4, 2),nn.Sigmoid(),)
        
    def forward(self, x):
        
        x1 = self.fc_spinal_layer1(x[:, 0:1024])
        x2 = self.fc_spinal_layer2(torch.cat([ x[:,1024:2*1024], x1], dim=1))
        x3 = self.fc_spinal_layer3(torch.cat([ x[:,0:1024], x2], dim=1))
        x4 = self.fc_spinal_layer4(torch.cat([ x[:,1024:2*1024], x3], dim=1))
        x = torch.cat([x1, x2], dim=1)
        x = torch.cat([x, x3], dim=1)
        x = torch.cat([x, x4], dim=1)
        x = self.fc_out(x)
        return x

In [None]:
student_0.fc = SpinalNet_ResNet()
student_1.classifier = SpinalNet_Dense()
student_3.classifier = nn.Sequential(nn.Linear(1664,2),nn.Sigmoid())
student_4.fc = SpinalNet_ResNext()

In [None]:
student_0.load_state_dict(torch.load("./ResNet_34_SP_FC_Clahe_Edge.pth"))
student_1.load_state_dict(torch.load("./DenseNet_201_SP_FC_Edge_Clahe.pth"))
student_3.load_state_dict(torch.load("./DenseNet_169_FC_Edge_Clahe.pth"))
student_4.load_state_dict(torch.load("./ResNext_50_SP_FC_Clahe_Edge.pth"))

In [None]:
student_0.to(device)
student_1.to(device)
student_3.to(device)
student_4.to(device)

In [None]:
student_0.eval()
student_1.eval()
student_3.eval()
student_4.eval()

In [None]:
class rose(nn.Module):
    def __init__(self,nb_classes=2):
        super(rose, self).__init__()
        
        self.modelA = student_0
        self.modelB = student_1
        self.modelC = student_3
        self.modelE = student_4
        
        self.classifier =nn.Linear(8, 2)
        
    def forward(self, x):
        
        
        outputs_0 = self.modelA(x)
        _, predicted_0 =torch.max(outputs_0.data, 1) 
            
        outputs_1 = self.modelB(x)
        _, predicted_1 =torch.max(outputs_1.data, 1) 
            
        outputs_3 = self.modelC(x)
        _, predicted_3 =torch.max(outputs_3.data, 1) 
            
        outputs_4 = self.modelE(x)
        _, predicted_4 =torch.max(outputs_4.data, 1)           
        
        x = torch.cat((outputs_0, outputs_1,outputs_3,outputs_4), dim=1) 
                
        x = self.classifier(x)
        return x

In [None]:
sub_ensemble=rose()
sub_ensemble.to(device)
sub_ensemble.load_state_dict(torch.load("./sub_ensemble.pth"))
sub_ensemble.eval()

In [None]:
get_metric.test_model_el2(student_0,student_1,sub_ensemble,student_3,device,dataloaders['val'])

In [None]:
actuals, predictions = get_metric.test_label_predictions_el2(student_0,student_1,sub_ensemble,student_3,device,dataloaders['val'])

In [None]:
get_metric.get_classification_report(actuals, predictions)

In [None]:
get_metric.get_confusion_matrix(actuals, predictions)

In [None]:
get_metric.get_cohen_kappa(actuals, predictions)

In [None]:
get_metric.get_roc_curves_el2(student_0,student_1,sub_ensemble,student_3, device,  dataloaders['val'])