# **IMPORTAZIONE LIBRERIE**

In [None]:
import csv
import shutil
import os
import cv2
from PIL import Image
import numpy as np
import random
import time
import pandas as pd
from glob import glob

!pip install dataframe_image
import dataframe_image as dfi

from keras.utils import normalize, image_dataset_from_directory
import matplotlib.pyplot as plt


import tensorflow as tf
from tensorflow import keras
from keras.models import Model, Sequential
from keras.layers import Input, Dense, Flatten, add
from keras.layers import RandomFlip, RandomRotation, RandomZoom, \
                         RandomTranslation, Rescaling, Resizing
from tensorflow.keras.applications.resnet50 import ResNet50
from keras.applications.vgg16 import VGG16
from keras.optimizers import Adam, SGD
from keras.preprocessing import image
from keras.preprocessing.image import ImageDataGenerator
from keras.utils import img_to_array, load_img
from keras.metrics import Recall, Precision, AUC
from keras.callbacks import EarlyStopping
from keras.wrappers.scikit_learn import KerasClassifier


from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay,roc_curve,\
  auc, accuracy_score, average_precision_score, balanced_accuracy_score, \
  precision_recall_curve, f1_score, make_scorer


from google.colab import drive
drive.mount('/content/drive')

In [None]:
print("GPU is", "available" if tf.config.list_physical_devices('GPU') else "NOT AVAILABLE")

#**PREPARAZIONE DEI DATI**

In [None]:
ISIC_path = 'drive/MyDrive/Colab Notebooks/ISIC_challenge/'
train_dir = ISIC_path + 'datasets/training/'
test_dir = ISIC_path + 'datasets/test/'
csv_dir = ISIC_path + 'csv/'
results_dir = ISIC_path + 'results/'

In [None]:
"""
# DIVIDERE LE IMMAGINI DEL TRAINING E TEST SET IN DUE CLASSI (CARTELLE)
# Leggendo il file csv, sposto le immagini nelle rispettive cartelle (benigno o maligno)
def move_ben_mal(file_csv, dest):
  with open(file_csv) as file_obj:
  reader = csv.reader(file_obj)

  # itero ogni riga del file csv
  for row in reader:
    if  os.path.exists(dest+row[0]+'.jpg'):
      if row[1] == 'benign' or row[1] == "0.0":
        shutil.move(dest+row[0]+'.jpg', dest+'Benigno')
      elif row[1] == 'malignant' or row[1] == "1.0":
        shutil.move(dest+row[0]+'.jpg', dest+'Maligno')
      else:
        print("errore: label non riconosciuta")


move_ben_mal(csv_dir + 'ISBI2016_ISIC_Part3_Training_GroundTruth.csv', train_dir)
move_ben_mal(csv_dir+'ISBI2016_ISIC_Part3_Test_GroundTruth.csv', test_dir)
"""

In [None]:
# Funzione per plottare un istogramma dato il numero di immagini per ogni classe
def plot_histogram(n_class_1, n_class_2, title=""):
    fig, ax = plt.subplots()
    ax.bar([0, 1], [n_class_1, n_class_2], color=['royalblue', 'darkorange'])

    ax.set_xticks([0, 1])
    ax.set_xticklabels(['benign', 'malignant'])
    ax.set_xlabel('Classi')
    ax.set_ylabel('Numero di immagini')
    plt.title(title+ ' Histogram')
    plt.show()

In [None]:
# controllo la grandezza di ogni cartella
# ------ Training set---------
b = len(os.listdir(train_dir+'Benigno'))
m = len(os.listdir(train_dir+'Maligno'))
LEN_TRAIN = b + m
print("Numero di immagini totali contenute nel training set = ", LEN_TRAIN)
print(f"# immagini di nei benigni = {b}")
print(f"# immagini di nei maligni = {m}\n")

plot_histogram(b, m, 'Training set')


# ------ Test set---------
b = len(os.listdir(test_dir+'Benigno'))
m = len(os.listdir(test_dir+'Maligno'))
LEN_TEST = b + m
print("\n\n\nNumero di immagini totali contenute nel test set = ", LEN_TEST)
print(f"# immagini di nei benigni = {b}")
print(f"# immagini di nei maligni = {m}\n")

plot_histogram(b, m, 'Test set')

In [None]:
#stampo un esempio  di un neo maligno preso casualmente dal trainingset
l = os.listdir(train_dir+'Maligno')
m=random.randint(0, len(l))
print("Un esempio di neo maligno è:")
plt.figure(figsize=(5,7))
plt.axis('off')
plt.imshow(load_img(train_dir + 'Maligno/' + l[m]))
plt.show()

#stampo un esempio  di un neo benigno preso casualmente dal trainingset
l = os.listdir(train_dir+'Benigno')
m=random.randint(0, len(l))
print("\n\n\nUn esempio di neo benigno è:")
plt.figure(figsize=(5,7))
plt.axis('off')
plt.imshow(load_img(train_dir + 'Benigno/' + l[m]))
plt.show()

# **NO AUGMENTATION**

In [None]:
IMAGE_SIZE = [224,224]

#-----------train_dataset_generator-------------
train_generator = image_dataset_from_directory(
    train_dir,
    color_mode='rgb',
    image_size=IMAGE_SIZE,
    batch_size=LEN_TRAIN,
    label_mode='binary',
)


#----------test_dataset_generator------------
test_generator = image_dataset_from_directory(
    test_dir,
    image_size=IMAGE_SIZE,
    batch_size=LEN_TEST,
    label_mode='binary'
)

print("\nLe classi sono: ", test_generator.class_names)

In [None]:
# funzione che prende in input un dataset e ritorna due array, uno con
# le immagini sottoforma di array di float, uno con le label associate
################################################################################
def from_data_to_array(data_generator):
  data = []
  target = []
  for x,y in data_generator:
    data.append(x)
    target.append(y)

  data = np.squeeze(data)
  # crea un numpy array che contiene tutte le immagini
  data = np.array(data)
  # Normalizza i valori dei pixel nell'intervallo [0, 1]
  data = data.astype('float32') / 255.0

  target = np.squeeze(target)
  # crea un numpy array che contiene tutte le label
  target = np.array(target)
  return data, target


train_data,train_target = from_data_to_array(train_generator)
print('train_data shape = ',np.shape(train_data))
print('train_target shape = ',train_target.shape)
test_data,test_target = from_data_to_array(test_generator)
print('test_data shape = ',np.shape(test_data))
print('test_target shape = ',test_target.shape)

# **DATA AUGMENTATION**

In [None]:
IMAGE_SIZE = [224,224]


# ImageDataGenerator per il data augmentation
datagen = ImageDataGenerator(rotation_range=90,
                             shear_range=15,
                             zoom_range=0.2,
                             fill_mode='reflect',
                             vertical_flip=True,
                             horizontal_flip=True,
                             )


train_data = []
train_target = []

for sub_dir in os.listdir(train_dir):
  if sub_dir == 'Maligno':
    label = 1.
    # numero di immagini generate per ogni immagine originale
    num_augmented_images = 4
  else:
    label = 0.
    num_augmented_images = 1

  for filename in os.listdir(os.path.join(train_dir, sub_dir)):
    img = load_img(os.path.join(train_dir, sub_dir, filename), target_size=IMAGE_SIZE)
    img_array = img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    # Aggiungo l'immagine originale
    train_data.append(img_array[0])
    train_target.append(label)

    # Aggiungo le immagini modificate
    i = 0
    for x_batch, y_batch in datagen.flow(img_array, [0], batch_size=1):
      if i >= num_augmented_images:
        break
      train_data.append(x_batch[0])
      train_target.append(label)
      i += 1


# crea un numpy array che contiene tutte le immagini
train_data = np.array(train_data)

# normalizza i valori dei pixel tra 0 e 1
train_data = train_data.astype('float32') / 255.0

# crea un numpy array che contiene tutte le label
train_target = np.array(train_target)

b =  np.sum(train_target == 0)
m = np.sum(train_target == 1)
print('Numero totale (dopo l\'augmentation) di immagini per il training set:', train_data.shape[0])
print('Numero di immagini per la classe \'Benigno\':', b)
print('Numero di immagini per la classe \'Maligno\':', m)
print('\n\n\n')
plot_histogram(b, m, 'Augmented Training set')

In [None]:
# ---------------shuffle del train_data-----------------------------------------
indices = np.arange(len(train_data))

# mescolamento degli indici in modo randomico
np.random.shuffle(indices)

train_data = train_data[indices]
train_target = train_target[indices]

In [None]:
# faccio la stessa cosa per il test set, ma senza effettuare augmentation

test_data = []
test_target = []

for sub_dir in os.listdir(test_dir):
  label = 1 if sub_dir == 'Maligno' else 0

  for filename in os.listdir(os.path.join(test_dir, sub_dir)):
    img = load_img(os.path.join(test_dir, sub_dir, filename), target_size=IMAGE_SIZE)
    x = img_to_array(img)
    test_data.append(x)
    test_target.append(label)


# Crea un array numpy che contiene tutte le immagini
test_data = np.array(test_data)

# Normalizza i valori dei pixel nell'intervallo [0, 1]
test_data = test_data.astype('float32') / 255.0

# Crea un array numpy che contiene tutte le label
test_target = np.array(test_target)

# Stampa il numero totale di immagini e il numero di immagini per ogni classe
print('Numero totale di immagini per il test set:', test_data.shape[0])
print('Numero di immagini per la classe \'Benigno\':', np.sum(test_target == 0))
print('Numero di immagini per la classe \'Maligno\':', np.sum(test_target == 1))

# **HYPERPARAMETER OPTIMIZATION**

In [None]:
class_weights = compute_class_weight('balanced', classes=np.unique(train_target), y=train_target)
class_weights = {0: class_weights[0], 1: class_weights[1]}
print(class_weights)


In [None]:
# definire il modello di base
def create_model():
  model = keras.Sequential()
  model.add(ResNet50(input_shape=IMAGE_SIZE+[3], include_top=False))
  model.add(Flatten())
  model.add(Dense(1, activation='sigmoid'))
  model.compile(
    loss='binary_crossentropy',
    optimizer=Adam(learning_rate=0.0001),
    metrics=['accuracy']
  )
  return model


model = KerasClassifier(build_fn=create_model)

# parametri della griglia di ricerca
epochs = [25, 30, 35, 40]
batch_size = [16, 32, 64]
param_grid = {'epochs': epochs, 'batch_size': batch_size}


scorer = make_scorer(balanced_accuracy_score)

grid = GridSearchCV(estimator=model, param_grid=param_grid, scoring=scorer, verbose=1)
grid_result = grid.fit(train_data, train_target, class_weight=class_weights, shuffle=True)


# stampa dei risultati
print(grid_result)
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))

In [None]:
# definire il modello di base
def create_model(optimizer, lr):
  model = keras.Sequential()
  model.add(ResNet50(input_shape=IMAGE_SIZE+[3], include_top=False))
  model.add(Flatten())
  model.add(Dense(1, activation='sigmoid'))
  if optimizer == 'adam':
      opt = Adam(learning_rate=lr)
  elif optimizer == 'sgd':
      opt = SGD(learning_rate=lr)
  else:
      raise ValueError('Optimizer non valido:', optimizer)

  model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
  return model



def hp_tuning(train_data, train_target):
  # dizionario con tutti i parametri da ottimizzare
  params = {'optimizer': ['adam', 'sgd'], 'learning_rate': [0.0001, 0.001, 0.01]}

  kc = KerasClassifier(build_fn=create_model, epochs=35)

  scorer = make_scorer(balanced_accuracy_score)

  grid = GridSearchCV(kc, param_grid=params, scoring=scorer, verbose=1)
  grid_res = grid.fit(train_data, train_target, class_weight=class_weights, shuffle=True)

  # Risultati
  print("Best: %f using %s" % (grid_res.best_score_, grid_res.best_params_))
  return grid_res.best_params_

In [None]:
hp_tuning(train_data, train_target)

# **CREAZIONE MODELLO: VGG16**

In [None]:
LEARNING_RATE = 1e-4
pretrained_mod = 'VGG16/'

# creo il modello
vgg = VGG16(input_shape=IMAGE_SIZE+[3], include_top=False)
x = Flatten()(vgg.output)
prediction = Dense(1, activation='sigmoid')(x)  #nell'output layer avrò un solo nodo (output binario)
model = Model(inputs=vgg.input, outputs=prediction)

# compilo il modello
model.compile(
  loss='binary_crossentropy',
  optimizer=Adam(learning_rate=LEARNING_RATE),
  metrics=['accuracy', AUC(name='auc')]
)

In [None]:
from keras.utils.vis_utils import plot_model
plot_model(model,show_shapes=True, to_file=results_dir+'VGG16/model_plot.png', show_layer_names=True)

# **CREAZIONE MODELLO: Resnet50**

In [None]:
# creo il modello
resnet = ResNet50(include_top=False, input_shape=(224,224,3))
x = Flatten()(resnet.output)
prediction = Dense(1, activation='sigmoid')(x)
model = Model(inputs=resnet.input, outputs=prediction)


# compilo il modello
model.compile(
  loss='binary_crossentropy',
  optimizer=Adam(learning_rate=1e-4),
  metrics=['accuracy', AUC(name='auc')]
)

In [None]:
from keras.utils.vis_utils import plot_model
plot_model(model, to_file=results_dir+'Resnet50/model_plot.png', show_shapes=True, show_layer_names=True)

# **MODEL FITTING**

In [None]:
# ----------------CALLBACKS------------------------------------
early_stop = EarlyStopping(
    monitor='loss',
    patience=5,
    verbose=1,
    restore_best_weights=True
)

In [None]:
EPOCHS = 30

# Dizionario per plottare l'andamento di loss, accuracy e auc durante il training del modello
hist = {'loss':[], 'accuracy':[], 'auc':[], 'val_loss':[], 'val_accuracy':[], 'val_auc':[]}

yt = []
yp = []
fold = 1

# Timer per misurare il tempo totale di training
t0 = time.time()

for train,val in StratifiedKFold(8, shuffle=True).split(train_data,train_target):
  print(f'\n\n\n\n------------- Fold #{fold} as validation set ---------------')

  # divido il dataset in training e validation folds
  x_train, x_val, y_train, y_val = train_data[train], train_data[val], \
                                  train_target[train], train_target[val],

  history = model.fit(
    x_train,
    y_train,
    validation_data=(x_val, y_val),
    epochs=EPOCHS,
    class_weight=class_weights,
    verbose=1,
    shuffle=True,
    callbacks=[early_stop]
  )

  hist['loss'].append(np.mean(history.history['loss']))
  hist['accuracy'].append(np.mean(history.history['accuracy']))
  hist['auc'].append(np.mean(history.history['auc']))
  hist['val_loss'].append(np.mean(history.history['val_loss']))
  hist['val_accuracy'].append(np.mean(history.history['val_accuracy']))
  hist['val_auc'].append(np.mean(history.history['val_auc']))

  y_pred = model.predict(x_val)

  yt.append(y_val)
  yp.append(y_pred)
  fold += 1

t1 = time.time()
training_time = int(t1-t0)
print('Tempo di training: ', training_time)

yt = np.concatenate(yt)
yp = np.concatenate(yp)

In [None]:
res_fold = results_dir + f'Resnet50/4mal_1ben_AUG_{EPOCHS}epochs_tt{training_time}/'
os.mkdir(res_fold)

In [None]:
df = pd.DataFrame(hist)
print(df)
dfi.export(df,res_fold+'train_df',table_conversion = 'matplotlib')

In [None]:
# plot della accuracy
plt.title('Training and Validation Accuracy')
plt.plot(hist['accuracy'], label='train accuracy')
plt.plot(hist['val_accuracy'], label='val accuracy')
plt.legend(loc='lower right')
plt.ylabel('accuracy')
plt.xlabel('Fold iterations')
plt.savefig(res_fold+'train_acc')
plt.show()

print('\n\n\n')

# plot della loss
plt.title('Training and Validation Loss')
plt.plot(hist['loss'], label='train loss')
plt.plot(hist['val_loss'], label='val loss')
plt.legend(loc='upper right')
plt.ylabel('Loss')
plt.xlabel('Fold iterations')
plt.savefig(res_fold+'train_loss',format='png')
plt.show()

print('\n\n\n')

# plot della auc
plt.title('Training and Validation AUC')
plt.plot(hist['auc'], label='train AUC')
plt.plot(hist['val_auc'], label='val AUC')
plt.legend(loc='lower right')
plt.ylabel('AUC')
plt.xlabel('Fold iterations')
plt.savefig(res_fold+'train_auc',format='png')
plt.show()

In [None]:
# CONFUSION MATRIX
def plot_confusion_matrix(labels, predictions, title='', filename='confusion_matrix'):
  cm = confusion_matrix(labels, np.round(predictions))
  ConfusionMatrixDisplay.from_predictions(labels, np.round(predictions), cmap=plt.cm.OrRd, display_labels=['Benigno','Maligno'])
  plt.title(title + ' Confusion Matrix')
  plt.savefig(res_fold+filename,format='png')
  plt.show()
  return cm


plot_confusion_matrix(yt, yp,'Training','train_cm')

# **ANALISI DEI RISULTATI**

In [None]:
y_pred = model.predict(test_data)

# ROC CURVE
fpr,tpr,_ = roc_curve(test_target, y_pred)
plt.plot(fpr,tpr, marker='.', label='ROC Curve')
plt.plot([0,1],[0,1], 'y--', label='Random Classifier')
plt.grid(True)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend(loc='lower right')
plt.title('ROC Curve')
plt.savefig(res_fold+'test_ROC',format='png')
plt.show()

print('\n\n\n')


#PRECISON RECALL CURVE
precision, recall,_ = precision_recall_curve(test_target, y_pred)
plt.plot(recall, precision, marker='.')
plt.grid(True)
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.savefig(res_fold+'test_prec_recall',format='png')
plt.show()

print('\n\n\n')


#CONFUSION MATRIX
cm = plot_confusion_matrix(test_target, y_pred, 'Test', 'test_cm')

In [None]:
TN = cm[0,0]
FP = cm[0,1]
FN = cm[1,0]
TP = cm[1,1]


bal_acc = balanced_accuracy_score(test_target, np.round(y_pred))
print('Balanced accuracy = ',bal_acc)

############## Integral Metrics #########################
auc_value = auc(fpr, tpr)
print('\nArea Under the Curve (AUC) = ', auc_value)


selected_points = np.where(tpr > 0.8) # seleziona i punti della curva ROC in cui la sensitivity è maggiore dell'80%
auc_sens80 = auc(fpr[selected_points], tpr[selected_points])
print('AUC, Sens. > 80% = ', auc_sens80)


avg_prec_score = average_precision_score(test_target, y_pred)
print('Average Precision Score = ',avg_prec_score)

############### Threshold Metrics #########################
accuracy = accuracy_score(test_target, np.round(y_pred))
print('\nAccuracy = ', accuracy)

sensitivity = TP / (TP+FN)
print('Sensitivity = ', sensitivity)    # TPR / recall

specificity = TN / (TN+FP)
print('Specificity = ', specificity)

f1 = f1_score(test_target, np.round(y_pred))
print('Dice Coefficient = ', f1)

PPV = TP/(TP+FP)
print('PPV = ', PPV)    # Positive predictive value or precision

NPV = TN/(TN+FN)
print('NPV = ', NPV)    # Negative predictive value



# salvo i risultati come un immagine
dict = {'Balanced accuracy': bal_acc,
        'AUC': auc_value,
        'AUC, Sens>80%': auc_sens80,
        'Average Precision': avg_prec_score,
        'Accuracy': accuracy,
        'Sensitivity': sensitivity,
        'Specificity': specificity,
        'Dice Coefficient': f1,
        'PPV': PPV,
        'NPV': NPV
        }

df = pd.DataFrame.from_dict(dict, orient='index',columns=['value'])
dfi.export(df,res_fold+'results',table_conversion = 'matplotlib')