In [None]:
import sys  
sys.path.insert(0, '../code')

import glob
from PIL import Image
import numpy as np
import pandas
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from random import randint
from skimage.transform import resize
from sklearn.manifold import TSNE
from sklearn.metrics import balanced_accuracy_score, top_k_accuracy_score
from os import listdir
from os.path import isfile, join
from sklearn.utils import shuffle
from pytorch_grad_cam import GradCAM, ScoreCAM, GradCAMPlusPlus, AblationCAM, XGradCAM, EigenCAM
from pytorch_grad_cam.utils.image import show_cam_on_image
import ttach as tta

In [None]:
from sklearn.metrics import confusion_matrix , classification_report, balanced_accuracy_score

In [None]:
from tqdm import tqdm

def compute_integrated_gradient(batch_x, batch_blank, model, idx):
    mean_grad = 0
    n = 100

    for i in tqdm(range(1, n + 1)):
        x = batch_blank + i / n * (batch_x - batch_blank)
        x.requires_grad = True
        y = model(x)[0, idx]
        (grad,) = torch.autograd.grad(y, x)
        mean_grad += grad / n

    integrated_gradients = (batch_x - batch_blank) * mean_grad

    return integrated_gradients

def limToOne(image):
    for c in range(image.shape[0]):
        for x in range(image.shape[1]):
            for y in range(image.shape[2]):
                if image[c][x][y] > 1:
                    image[c][x][y] = 1.
                elif image[c][x][y] < 0:
                    image[c][x][y] = 0.
    return image

In [None]:
BATCH_SIZE = 30
trainSize = 0.9

In [None]:
def verifyAccuracy(model, dataloader, test = True, printcm = False, name = None, save = True):
    with torch.no_grad():
        model.to(device)
        predictions = []
        truth = []
        
        if name is None:
            name = model.name
        
        n_correct = 0
        n_samples = 0
        n_class_correct = [0 for i in range(8)]
        n_class_samples = [0 for i in range(8)]
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            # max returns (value ,index)
            _, predicted = torch.max(outputs, 1)
            n_samples += labels.size(0)
            n_correct += (predicted == labels).sum().item()

            for i in range(images.shape[0]):
                label = labels[i]
                pred = predicted[i]
                predictions.append(np.array(pred.to('cpu')))
                truth.append(np.array(label.to('cpu')))
                if (label == pred):
                    n_class_correct[label] += 1
                n_class_samples[label] += 1

        acc = 100.0 * n_correct / n_samples
        print(f'Accuracy of the network {name}: {acc} %')

        truth = np.array(truth)
        predictions = np.array(predictions)
        
        balAcc = balanced_accuracy_score(truth, predictions)
        print(f'Balanced accuracy of the network {name}: {balAcc} %')
            
        if save:    
            if test:
                model.testAcc = balAcc
            elif model.maxValAcc < balAcc:
                model.maxValAcc = balAcc
        
        if printcm:
            cm = confusion_matrix(truth,predictions)
            print_confusion_matrix(cm,labelName[:8],name = name)
            
        print(classification_report(truth, predictions, target_names=labelName[:8]))

        for i in range(8):
            acc = 100.0 * n_class_correct[i] / n_class_samples[i]
            print(f'Accuracy of {labelName[i]}: {acc} %')
            
        return balAcc

In [None]:
skinDataset = []
labelName = ["MEL", "NV", "BCC", "AK", "BKL", "DF", "VASC", "SCC", "UNK"]

i = 0
#Reading the labels
df = pandas.read_csv("../label.csv")
df = shuffle(df, random_state = 1234)
df = df.reset_index(drop=True)

dfTrain = df[df["MEL"]==1.].iloc[:round(trainSize*len(df[df["MEL"]==1.]))]

for label in ["NV", "BCC", "AK", "BKL", "DF", "VASC", "SCC", "UNK"]:
    dfTrain = pandas.concat([dfTrain, df[df[label]==1.].iloc[:round(trainSize*len(df[df[label]==1.]))]])
    
dfTrain = dfTrain.reset_index(drop=True)

dfTest = pandas.concat([df,dfTrain]).drop_duplicates(keep=False)
dfVal = dfTrain.copy()
dfTrain = dfVal[dfVal["MEL"]==1.].iloc[:round(trainSize*len(dfVal[dfVal["MEL"]==1.]))]

for label in ["NV", "BCC", "AK", "BKL", "DF", "VASC", "SCC", "UNK"]:
    dfTrain = pandas.concat([dfTrain, dfVal[dfVal[label]==1.].iloc[:round(trainSize*len(dfVal[dfVal[label]==1.]))]])
    

dfVal = pandas.concat([dfVal,dfTrain]).drop_duplicates(keep=False)
dfTest = dfTest.reset_index(drop=True)
dfVal = dfVal.reset_index(drop=True)
dfTrain = dfTrain.reset_index(drop=True)

In [None]:
#Per verificare che il dataset sia ben bilanciato
def isBalanced(df):
    MELCount = len(df[df['MEL']==1.])
    NVCount = len(df[df['NV']==1.])
    BCCCount = len(df[df['BCC']==1.])
    AKCount = len(df[df['AK']==1.])
    BKLCount = len(df[df['BKL']==1.])
    DFCount = len(df[df['DF']==1.])
    VASCCount = len(df[df['VASC']==1.])
    SCCCount = len(df[df['SCC']==1.])
    UNKCount = len(df[df['UNK']==1.])

    print("Casi di MEL: " + str(MELCount))
    print("Casi di NV: " + str(NVCount))
    print("Casi di BCC: " + str(BCCCount))
    print("Casi di BKL: " + str(BKLCount))
    print("Casi di DF: " + str(DFCount))
    print("Casi di VASC: " + str(VASCCount))
    print("Casi di SCC: " + str(SCCCount))
    print("Casi di UNK: " + str(UNKCount))

print("Le dimensioni del dataset di training sono : "+str(dfTrain.shape[0])+" , mentre le dimensioni del dataset di test sono "+str(dfTest.shape[0]))

In [None]:
def BalanceVector(df):
    values = []
    
    for name in labelName[:-1]:
        values.append(len(df[df[name]==1.]))
        
    values = np.array(values)
    
    return values.sum() / values

In [None]:
isBalanced(dfTrain)
w = BalanceVector(dfTrain)

In [None]:
def showImage(image, isTensor = False):
    if isTensor:
        plt.imshow(image.permute(1, 2, 0), interpolation='nearest', aspect='equal')
    else:
        plt.imshow(image, interpolation='nearest', aspect='equal')
    plt.show()
    
def showLabel(label, prediction = False):
    if prediction:
        print("(Output della rete) La malattia è: " + labelName[label])
    else:
        print("La malattia è: " + labelName[label])
    
def showExample(example, isTensor = False):
    showLabel(example[1])
    showImage(example[0], isTensor)
    
def showLatent(label):
    print(label.size)

In [None]:
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch
import torchvision.models as models

In [None]:
class TotalDataset(Dataset):
    def __init__(self, label,image_size = 224,  aug = False):
        self.label = label
        self.lenght = self.label.shape[0]
        self.imageTransform = transforms.Compose([
             transforms.Resize((image_size, image_size)),
                transforms.ToTensor()
            ])
        self.aug = aug
        
    def __getitem__(self, index):
        pathImage = '../../ISIC_2019_Training_Input/' + self.label['image'][index] + '.jpg'
        label = np.argmax(np.array(self.label.loc[index][1:], dtype = 'float32' )[:-1])
        image = self.imageTransform(Image.open(pathImage))
        if self.aug:
            image = randomTransform(image)
        return (image, torch.tensor(label))
        
    def __len__(self):
        return self.lenght

In [None]:
def getImage(pathImage, image_size = 224):
    imageTransform = transforms.Compose([
             transforms.Resize((image_size, image_size)),
                transforms.ToTensor()
            ])
    image = imageTransform(Image.open(pathImage))
    return image.unsqueeze(0)

In [None]:
import pytorch_lightning as pl
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torch.utils.tensorboard import SummaryWriter
from efficientnet_pytorch import EfficientNet
from PLModel import PLModel
from util import print_confusion_matrix

In [None]:
def testAccuracy(model):
    PCAVector = []
    truth = []
    correct = []
    correctPred = 0
    model.to(device)
    
    for i in range(len(datasetTest)):
        output = model(datasetTest[i][0].unsqueeze(0).to(device))
        output = np.array(output.detach().to('cpu'))
        if datasetTest[i][1] == np.argmax(output[0]):
            correctPred += 1
            correct.append(i)
        PCAVector.append(np.array(activation['avgpool'].to('cpu')).reshape(-1))
        truth.append(datasetTest[i][1])
        print("{:.2f} % ({:d} su {:d}) acc = {:.2f}".format(100*i/len(datasetTest), i, len(datasetTest), 100 * correctPred / (i + 1)), end="\r")
    print("Accuracy of prediction ("+ model.name+ ") "+str(correctPred/len(datasetTest)))
    
    tsne = TSNE(n_components=2)
    PCAtoplot = tsne.fit_transform(np.array(PCAVector))
    PCAtoplot = np.append(PCAtoplot, np.array(truth).reshape(-1, 1), axis=1)
    
    fig = plt.figure(figsize=(10, 7))
    firstLabel = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
    
    for x, y, color in PCAtoplot:
        color = int(color)
        if color == 0:
            if firstLabel[color] == 1:
                plt.plot(x, y, 'bo', label=labelName[color])
                firstLabel[color] = 0
            else:
                plt.plot(x, y, 'bo')
        if color == 1:
            if firstLabel[color] == 1:
                plt.plot(x, y, 'go', label=labelName[color])
                firstLabel[color] = 0
            else:
                plt.plot(x, y, 'go')
        if color == 2:
            if firstLabel[color] == 1:
                plt.plot(x, y, 'ro', label=labelName[color])
                firstLabel[color] = 0
            else:
                plt.plot(x, y, 'ro')
        if color == 3:
            if firstLabel[color] == 1:
                plt.plot(x, y, 'yo', label=labelName[color])
                firstLabel[color] = 0
            else:
                plt.plot(x, y, 'yo')
        if color == 4:
            if firstLabel[color] == 1:
                plt.plot(x, y, 'kd', label=labelName[color])
                firstLabel[color] = 0
            else:
                plt.plot(x, y, 'kd')
        if color == 5:
            if firstLabel[color] == 1:
                plt.plot(x, y, 'ch', label=labelName[color])
                firstLabel[color] = 0
            else:
                plt.plot(x, y, 'ch')
        if color == 6:
            if firstLabel[color] == 1:
                plt.plot(x, y, 'm*', label=labelName[color])
                firstLabel[color] = 0
            else:
                plt.plot(x, y, 'm*')
        if color == 7:
            if firstLabel[color] == 1:
                plt.plot(x, y, 'bs', label=labelName[color])
                firstLabel[color] = 0
            else:
                plt.plot(x, y, 'bs')
            
    plt.ylabel('PC1')
    plt.xlabel('PC2')
    plt.legend(loc="upper right")
    plt.show()
    
    return correct

In [None]:
optimName = ''

In [None]:
loss = nn.CrossEntropyLoss(weight = torch.tensor(w).type(torch.FloatTensor) )

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

Models = []

In [None]:
resnext = torch.hub.load('pytorch/vision:v0.9.0', 'resnext50_32x4d', pretrained=True)
num_f = resnext.fc.in_features
resnext.fc = nn.Linear(num_f, 8)
Models.append(PLModel('Resnext50', resnext, loss = loss, optimName = optimName))

In [None]:
resnext = torch.hub.load('pytorch/vision:v0.9.0', 'resnet152', pretrained=True)
num_f = resnext.fc.in_features
resnext.fc = nn.Linear(num_f, 8)
Models.append(PLModel('Resnet152', resnext, loss = loss, optimName = optimName))

In [None]:
model = PLModel('EfficientNet', EfficientNet.from_pretrained('efficientnet-b4', num_classes=8))
Models.append(model)

In [None]:
model = PLModel('EfficientNetB5', EfficientNet.from_pretrained('efficientnet-b5', num_classes=8))
Models.append(model)

In [None]:
model = PLModel('EfficientNetB6', EfficientNet.from_pretrained('efficientnet-b6', num_classes=8))
Models.append(model)

In [None]:
model = PLModel('EfficientNetB7', EfficientNet.from_pretrained('efficientnet-b7', num_classes=8))
Models.append(model)

In [None]:
activation = {}
def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook

for model in Models:
    if model.name.find("EfficientNet") > -1:
        print(model.model._avg_pooling)
        model.model._avg_pooling.register_forward_hook(get_activation('avgpool'))
    elif model.name.find("Densenet") > -1:
        print(model.model.features.norm5)
        model.model.features.norm5.register_forward_hook(get_activation('avgpool'))
    else:
        print(model.model.avgpool)
        model.model.avgpool.register_forward_hook(get_activation('avgpool'))

In [None]:
cartella = '/home/cino/Documents/BestModel/'
bestModel = []

for model in Models:
    if model.name == 'EfficientNetB7':
        subcartella = 'EffNetB7'
        image_size = 600
    if model.name == 'EfficientNetB6':
        subcartella = 'EffNetB6'
        image_size = 528
    if model.name == 'EfficientNetB5':
        subcartella = 'EffNetB5'
        image_size = 456
    if model.name == 'EfficientNet':
        subcartella = 'EffNetB4'
        image_size = 380
    if model.name == 'Resnext50':
        subcartella = 'ResNext50/600px'
        image_size = 600
    if model.name == 'Resnet152':
        subcartella = 'ResNet152/600px'
        image_size = 600
    mypath = cartella + subcartella
    
    datasetTest = TotalDataset(dfTest, image_size)
    datasetVal = TotalDataset(dfVal, image_size)

    dataloaderTest = DataLoader(dataset=datasetTest, batch_size=BATCH_SIZE , num_workers=2 )
    dataloaderVal = DataLoader(dataset=datasetVal, batch_size=BATCH_SIZE , num_workers=2 )
    
    weights = [f for f in listdir(mypath) if isfile(join(mypath, f))]
    
    bestAcc = 0
    bestValAcc = 0
    bestTestAcc = 0
    bestM = model
    
    for weight in weights:
        try:
            temp = torch.load(mypath + '/' + weight)['state_dict']
            del temp['loss.weight']
            model.load_state_dict(temp)
            print("Versione precedente trovata")
        except Exception as e:
            print('Caricamento pytorch lig fallito, provo senza cancellare loss.weight' + str(e))
            try:
                temp = torch.load(mypath + '/' + weight)['state_dict']
                model.load_state_dict(temp)
                print("Versione precedente trovata")
            except Exception as e:
                print('Caricamento pytorch lig fallito, provo pytorch' + str(e))
                try:
                    temp = torch.load(mypath + '/' + weight)
                    model.model.load_state_dict(temp)
                    print("Versione precedente trovata")
                except Exception as e:
                    print("Impossibile recuperare " + weight)
                    continue
        model.eval()
        
        valAcc = verifyAccuracy(model, dataloaderVal, test = False)
        testAcc = verifyAccuracy(model, dataloaderTest, test = True)
        acc = valAcc + testAcc
        if acc > bestAcc:
            bestValAcc = valAcc
            bestTestAcc = testAcc
            bestM = model
            
    
    correct = testAccuracy(bestM)
    tta_model = tta.ClassificationTTAWrapper(bestM, tta.aliases.d4_transform(), merge_mode='mean')
    
    valAccTTA = verifyAccuracy(tta_model, dataloaderVal, test = False, name = model.name, save = False)
    testAccTTA = verifyAccuracy(tta_model, dataloaderTest, test = True, name = model.name, save = False)
    
    print(model.name, bestValAcc, bestTestAcc, valAccTTA, testAccTTA)
    bestModel.append((bestM, model.name, bestValAcc, bestTestAcc, valAccTTA, testAccTTA, correct))

In [None]:
for i in range(1):
    example = randint(0, len(datasetTest)-1)
    for model, name, val, test, valTTA, testTTA, correct in bestModel:
        image = datasetTest[example][0]
        batch_x = image.unsqueeze(0)
        model.to('cpu')
        pred = torch.argmax(model(batch_x))
        print('Il modello '+model.name+' ha predetto '+labelName[pred]+'('+labelName[datasetTest[example][1]]+')')

        if model.name.find('EfficientNet') > -1:
            target_layer = model.model._blocks[-1]
        else:
            target_layer = model.model.layer4[-1]
        # Construct the CAM object once, and then re-use it on many images:
        cam = GradCAMPlusPlus(model=model, target_layer=target_layer)

        # If target_category is None, the highest scoring category
        # will be used for every image in the batch.
        # target_category can also be an integer, or a list of different integers
        # for every image in the batch.
        target_category = datasetTest[example][1]
        target_category = target_category.unsqueeze(0)

        # You can also pass aug_smooth=True and eigen_smooth=True, to apply smoothing.
        grayscale_cam = cam(input_tensor=batch_x, target_category=target_category, aug_smooth=True)

        # In this example grayscale_cam has only one image in the batch:
        grayscale_cam = grayscale_cam[0, :]
        visualization = show_cam_on_image(np.array(image.permute(1, 2, 0)), grayscale_cam, use_rgb=True)
        f, axarr = plt.subplots(1,2, figsize=(12, 12))
        axarr[0].imshow(image.permute(1, 2, 0), interpolation='nearest', aspect='equal')
        axarr[1].imshow(visualization, interpolation='nearest', aspect='equal')
        plt.show()

for i in range(3):
    example = randint(0, len(datasetTest)-1)
    for model in Models:
        image = datasetTest[example][0]
        batch_blank = torch.zeros(1, 3, 224, 224).to(device)
        batch_x = image.unsqueeze(0).to(device)
        model.to(device)
        pred = torch.argmax(model(batch_x))
        print('Il modello '+model.name+' ha predetto '+labelName[pred]+'('+labelName[datasetTest[example][1]]+')')
        integrated_gradients = compute_integrated_gradient(batch_x, batch_blank, model, 0)[0, :, :, :].to('cpu')
        f, axarr = plt.subplots(1,2, figsize=(12, 12))
        axarr[0].imshow(image.permute(1, 2, 0), interpolation='nearest', aspect='equal')
        axarr[1].imshow(limToOne(integrated_gradients.permute(1, 2, 0)), interpolation='nearest', aspect='equal')
        plt.show()
        showImage(limToOne(image + integrated_gradients), isTensor = True)
        print()

In [None]:
x = []
y = []

plt.figure(1, figsize = (11,11))

for model, name, val, test, valTTA, testTTA, correct in bestModel:
    y.append(val*100)
    x.append(name)
    
x[-1] = x[-1] + "B4"
    
plt.hist(x, weights = y, density=False, bins=len(x)*2, label="Data")
plt.legend(loc="upper left")
plt.ylabel('Accuracy')
plt.title("Histogram");
plt.grid()
plt.show()

In [None]:
x = []
y = []

plt.figure(1, figsize = (11,11))

#for model, name, val, test, valTTA, testTTA, correct in bestModel:
#    y.append(test*100)
#    x.append(name)
    
#x[-1] = x[-1] + "B4"

x = ['Resnext50', 'Resnet152', 'EfficientNetB7', 'EfficientNetB6', 'EfficientNetB5', 'EfficientNetB4']
y = [87.06392931234282, 86.56985572727665, 86.0222951692196, 84.99923862320729, 85.3735109502255, 84.65377269774773]
    
plt.hist(x, weights = y, density=False, bins=len(x)*2, label="Test", color="green")
plt.legend(loc="upper left")
plt.ylabel('Accuracy')
plt.title("Histogram")
plt.grid()
plt.show()
print(x)
print(y)

In [None]:
x = []
y = []

plt.figure(1, figsize = (11,11))

for model, name, val, test, valTTA, testTTA, correct in bestModel:
    y.append(valTTA*100)
    x.append(name)
    

x[-1] = x[-1] + "B4"

plt.hist(x, weights = y, density=False, bins=len(x)*2, label="Validation TTA")
plt.legend(loc="upper left")
plt.ylabel('Accuracy')
plt.title("Histogram")
plt.grid()
plt.show()

In [None]:
x = []
y = []

plt.figure(1, figsize = (11,11))

for model, name, val, test, valTTA, testTTA, correct in bestModel:
    y.append(testTTA*100)
    x.append(name)
    
x[-1] = x[-1] + "B4"
    
plt.hist(x, weights = y, density=False, bins=len(x)*2, label="Ttest TTA", color = "green")
plt.legend(loc="upper left")
plt.ylabel('Accuracy gain')
plt.title("Histogram of TTA gains")
plt.grid()
plt.show()

In [None]:
x = []
y = []

plt.figure(1, figsize = (11,11))

for model, name, val, test, valTTA, testTTA, correct in bestModel:
    y.append((valTTA - val)*100)
    x.append(name)
    
x[-1] = x[-1] + "B4"
    
plt.hist(x, weights = y, density=False, bins=len(x)*2, label="Validation TTA difference", color = "red")
plt.legend(loc="upper left")
plt.ylabel('Accuracy gain')
plt.title("Histogram of TTA gains")
plt.grid()
plt.show()

In [None]:
x = []
y = []

plt.figure(1, figsize = (11,11))

for model, name, val, test, valTTA, testTTA, correct in bestModel:
    y.append((testTTA - test)*100)
    x.append(name)
    
x[-1] = x[-1] + "B4"
    
plt.hist(x, weights = y, density=False, bins=len(x)*2, label="Test TTA difference", color = "red")
plt.legend(loc="upper left")
plt.ylabel('Accuracy gain')
plt.title("Histogram of TTA gains Test" )
plt.grid()
plt.show()

In [None]:
wrongTot = range(len(datasetTest))
minError = len(datasetTest)
class_wrong = [0 for i in range(8)]

for model, name, val, test, valTTA, testTTA, correct in bestModel:
    if minError > len(correct):
        minError = len(correct)
        
    wrongTot = [i for i in wrongTot if i not in correct]
    
for i in wrongTot:
    class_wrong[datasetTest[i][1]] = class_wrong[datasetTest[i][1]] + 1

print("Il "+str(100*len(wrongTot)/minError)+" è sbagliato da tutte le reti")
print(class_wrong)

In [None]:
wrongTot

In [None]:
for model, name, val, test, valTTA, testTTA, correct in bestModel:
    print( name, val, test, valTTA, testTTA)

In [None]:
for i in range(3):
    example = randint(0, len(wrongTot)-1)
    print(example)
    for model, name, val, test, valTTA, testTTA, correct in bestModel:
        image = datasetTest[wrongTot[example]][0]
        batch_x = image.unsqueeze(0)
        model.to('cpu')
        pred = torch.argmax(model(batch_x))
        print('Il modello '+model.name+' ha predetto '+labelName[pred]+'('+labelName[datasetTest[wrongTot[example]][1]]+')')

        if model.name.find('EfficientNet') > -1:
            target_layer = model.model._blocks[-1]
        else:
            target_layer = model.model.layer4[-1]
        # Construct the CAM object once, and then re-use it on many images:
        cam = GradCAMPlusPlus(model=model, target_layer=target_layer)

        # If target_category is None, the highest scoring category
        # will be used for every image in the batch.
        # target_category can also be an integer, or a list of different integers
        # for every image in the batch.
        target_category = datasetTest[wrongTot[example]][1]
        target_category = target_category.unsqueeze(0)

        # You can also pass aug_smooth=True and eigen_smooth=True, to apply smoothing.
        grayscale_cam = cam(input_tensor=batch_x, target_category=target_category, aug_smooth=True)

        # In this example grayscale_cam has only one image in the batch:
        grayscale_cam = grayscale_cam[0, :]
        visualization = show_cam_on_image(np.array(image.permute(1, 2, 0)), grayscale_cam, use_rgb=True)
        f, axarr = plt.subplots(1,2, figsize=(12, 12))
        axarr[0].imshow(image.permute(1, 2, 0), interpolation='nearest', aspect='equal')
        axarr[1].imshow(visualization, interpolation='nearest', aspect='equal')
        plt.show()

In [None]:
for i in range(len(wrongTot)):
    showExample(datasetTest[i], isTensor = True)