# Taller teórico - práctico sobre redes neuronales

Dr. Héctor Henríquez Leighton



In [None]:
!pip3 install keras-visualizer
!pip install lime

In [None]:
## Herramientas generales
import numpy as np
import matplotlib.pyplot as plt
from skimage.segmentation import mark_boundaries
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import cv2
import os
import pandas as pd
from collections import Counter
import random

## Construcción de modelos
import tensorflow as tf
from keras import backend as K
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.layers import Dense, Flatten, Dropout, Input, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.metrics import AUC
from lime import lime_image

import warnings
warnings.filterwarnings('ignore')

In [None]:
## Recorre los directorios y extrae imágenes y sus respectivas etiquetas
def read_images(path_dir):
  images = []
  labels = []
  for modality in os.listdir(path_dir):
    modality_dir = os.path.join(path_dir, modality)
    for exam in os.listdir(modality_dir):
      image = cv2.imread(os.path.join(modality_dir, exam))
      #image = cv2.resize(image, (32, 32),interpolation = cv2.INTER_LINEAR)
      images.append(image)
      labels.append(modality)
  images = np.array(images)
  labels = np.array(labels)
  np.random.seed(10)
  np.random.shuffle(images)
  np.random.seed(10)
  np.random.shuffle(labels)
  return(images, labels)


In [None]:
!wget -N "https://cainvas-static.s3.amazonaws.com/media/user_data/cainvas-admin/MedNIST.zip"
!unzip -qo "MedNIST.zip"
!rm "MedNIST.zip"
train_dir = '/content/Medical/Medical_train'
test_dir =  '/content/Medical/Medical_test'

classes = sorted(os.listdir(train_dir))
classes = np.array(classes)
print("Clases de estudios:", classes)

In [None]:
for modality in os.listdir(train_dir):
  modality_dir = os.path.join(train_dir, modality)
  print("Numero de estudios para la clase {}: {}".format(modality, len(os.listdir(modality_dir))))

## Lectura y procesamiento de las imágenes

In [None]:
### Se leen las imágenes y se asignan a los grupos de entrenamiento y test
X_train, y_train = read_images(train_dir)
X_test, y_test = read_images(test_dir)

### El formato corresponde a imágenes en color de 8 bit
### Se normalizan dividiendo por 255, lo que deja valores de pixeles entre 0 y 1
### Se cambian a tipo flotante de 32 bits
X_train = (X_train/255).astype('float32')
X_test = (X_test/255).astype('float32')

print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

In [None]:
## Vsualización de casos aleatorios
ROWS = 8
fig, axes1 = plt.subplots(ROWS,ROWS,figsize=(10,10))
for j in range(ROWS):
    for k in range(ROWS):
        i = np.random.randint(0, X_train.shape[0])
        axes1[j][k].set_axis_off()
        axes1[j][k].imshow(X_train[i,:,:,:], cmap='gray', vmin=0, vmax=255)
        axes1[j][k].set_title(y_train[i], size=8)

In [None]:
## Visualización por modalidad
#'Hand' 'BreastMRI' 'ChestCT' 'HeadCT' 'AbdomenCT' 'CXR'

modality = 'CXR'

ROWS = 4
fig, axes1 = plt.subplots(ROWS,ROWS,figsize=(8,8))
for j in range(ROWS):
    for k in range(ROWS):
        i = np.random.randint(0, X_train[np.where(y_train == modality)].shape[0])
        axes1[j][k].set_axis_off()
        axes1[j][k].imshow(X_train[np.where(y_train == modality)][i,:,:,:], cmap='gray', vmin=0, vmax=255)
        axes1[j][k].set_title(modality, size=8)


## Procesamiento de las etiquetas

In [None]:
## Visualización de etiquetas:
print(y_train[0:6]) # vemos que las etiquetas son texto

In [None]:
### Preprocesamiento de etiquetas

train_labels = np.zeros((y_train.shape[0], len(classes)))
test_labels = np.zeros((y_test.shape[0], len(classes)))

for i in range(train_labels.shape[0]):
  train_labels[i, np.where(classes == y_train[i])] = 1

for i in range(test_labels.shape[0]):
  test_labels[i, np.where(classes == y_test[i])] = 1


In [None]:
print("clases a predecir:", classes)
print("ejemplo primeros 6 casos en formato OneHotEncodding:\n", train_labels[0:6])
df_visualizacion_clases = pd.DataFrame(train_labels, columns= classes)
df_visualizacion_clases.head(6)

### Algoritmo de data Augmentation y Generador de imágenes

In [None]:
### Definición de generador de data augmentation:
datagen = ImageDataGenerator(
    rotation_range=20,         # Rotación aleatoria de 20 grados
    width_shift_range=0.1,     # Desplazamiento horizontal del 10%
    height_shift_range=0.1,    # Desplazamiento vertical del 10%
    zoom_range=0.2,            # Zoom aleatorio del 20%
    horizontal_flip=True,      # Volteo horizontal aleatorio
    vertical_flip=True)

### Aplicar el generador de data augmentation sobre los datos de entrenamiento:
### Nunca se aplican sobre los datos de testeo.
### batch_size: cantidad de imágnes que entran

train_gen = datagen.flow(X_train, train_labels, batch_size=32)

### Visualizar las transformaciones para data augmentation

In [None]:
## Se genera una muestra de imágenes para transformar:

original_images = []
augmented_images = []

sample_size = 4

for i in range(sample_size):
  index = np.random.randint(X_train.shape[0])
  augmented_image = next(datagen.flow(X_train[index:index+1,:,:,:], batch_size = 1))
  original_images.append(X_train[index,:,:,:])
  augmented_images.append(augmented_image.reshape(64,64,3))

original_images = np.array(original_images)
augmented_images = np.array(augmented_images)


plt.figure(figsize=(8,8))
for i in range(sample_size):
        ax = plt.subplot(2, sample_size, i + 1)
        plt.imshow(original_images[i], cmap="gray", vmin=0, vmax=255)
        plt.title("Original")
        plt.axis("off")

        ax = plt.subplot(2, sample_size, i + 1 + sample_size)
        plt.imshow(augmented_images[i], cmap="gray", vmin=0, vmax=255)
        plt.title("Augmented")
        plt.axis("off")
plt.show()


### Construcción Modelo de red neuronal convolucional:
* ResNet50:
He, K., Zhang, X., Ren, S., & Sun, J. (2016). Deep residual learning for image recognition. Proceedings of the IEEE Conference on Computer Vision and Pattern Recognition (CVPR), 770-778. https://doi.org/10.1109/CVPR.2016.90

In [None]:
## Cargar el modelo ResNet50 preentrenado en ImageNet, sin las capas superiores
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(64, 64, 3))

## Se congelan las capas con los pesos de imagenet
base_model.trainable = False

## Se agrega una capa superior para la clasificación:
x = Flatten()(base_model.output)
x = Dense(1024, activation='relu')(x)
output = Dense(len(classes), activation='softmax')(x)

## Se instancia el modelo
model = Model(inputs=base_model.input, outputs=output)

#model.summary()

In [None]:
EPOCHS = 5
BATCH_SIZE = 32
LR = 0.005

## Reduce la tasa de aprendizaje al avanzar entrenamiento:
def scheduler(epoch, learning_rate):
  if epoch < 2:
    return learning_rate
  else:
    return learning_rate * np.exp(-0.05)

lr_scheduler = LearningRateScheduler(scheduler)

### Se guardarán los pesos del modelo que tenga mejor rendimiento
### en grupo de test. Se usará val_accuracy
checkpoint_filepath = './checkpoint.weights.h5'
model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True)

## Compilado y entrenamiento
model.compile(optimizer=Adam(learning_rate=LR), loss = 'categorical_crossentropy', metrics=['accuracy', AUC(multi_label=True)])
history = model.fit(train_gen, epochs= EPOCHS, verbose= 1, batch_size= BATCH_SIZE,
                validation_data=(X_test, test_labels), shuffle= True,
                callbacks=[model_checkpoint_callback,lr_scheduler])

model.load_weights(checkpoint_filepath)

In [None]:
accuracy = history.history['accuracy']
val_accuracy = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(loss))

plt.plot(epochs, accuracy, 'bo', label='Training accuracy', color='salmon')
plt.plot(epochs, val_accuracy, 'b', label='Validation accuracy', color='dodgerblue')
plt.title('Training and validation accuracy')
plt.legend()
plt.show()

plt.plot(epochs, loss, 'bo', label='Training loss', color='salmon')
plt.plot(epochs, val_loss, 'b', label='Validation loss', color='dodgerblue')
plt.title('Training and validation loss')
plt.legend()
plt.show()

### Probando el modelo

In [None]:
pred_scores = model.predict(X_test)
y_pred = np.where(pred_scores > 0.5, 1, 0)
print(classification_report(test_labels, y_pred, target_names = classes))

## Herramienta de explicabilidad

In [None]:
## Tomaremos un ejemplo aleatorio del grupo test

index= np.random.randint(0,X_test.shape[0])

img_to_explain = X_test[index,:,:,:]

## Debe tener geometría de tensor de 4 dimensiones:
img_to_array = np.expand_dims(img_to_explain, axis=0)

pred_scores = model.predict(img_to_array)

print("Predicción Modelo:", classes[np.argmax(pred_scores)])
print("Respuesta correcta:", y_test[index])

In [None]:
# Lime Explainer
explainer = lime_image.LimeImageExplainer()

# Explicacion
explanation = explainer.explain_instance(img_to_explain, model.predict, top_labels=1, hide_color=0, num_samples=100)

## visualización
temp, mask = explanation.get_image_and_mask(explanation.top_labels[0], positive_only=True, num_features=5, hide_rest=True)

plt.imshow(mark_boundaries(temp, mask))
plt.axis('off')
plt.title("LIME Explanation")
plt.show()

### Impacto en el desbalance de las clases

In [None]:
### Proporción de cada clase
proportions = {
    'CXR': 0.1,
    'AbdomenCT': 0.9,
    'ChestCT': 0.9,
    'HeadCT': 0.8,
    'Hand': 0.9,
    'BreastMRI': 0.9
    }

X_filtered = []
y_filtered = []

for label in classes:
  indices = np.where(y_train == label)[0]
  num_examples = int(proportions[label] * len(indices))
  selected_indices = random.sample(list(indices), num_examples)
  X_filtered.extend(X_train[selected_indices])
  y_filtered.extend(y_train[selected_indices])

# Convertir las listas de nuevo a arrays numpy
X_filtered = np.array(X_filtered)
y_filtered = np.array(y_filtered)

print(X_filtered.shape)
print(y_filtered.shape)

## OneHotEncodding de las nuevas etiquetas
train_labels_bal = np.zeros((y_filtered.shape[0], len(classes)))

for i in range(train_labels_bal.shape[0]):
  train_labels_bal[i, np.where(classes == y_filtered[i])] = 1

# Imprimir el nuevo tamaño del dataset filtrado
print("Tamaño de X_filtered:", X_filtered.shape)
print("Tamaño de y_filtered:", y_filtered.shape)

count_check = dict(Counter(y_filtered))
plt.figure(figsize=(8, 6))  # Tamaño de la figura
plt.bar(count_check.keys(), count_check.values(), color='skyblue')  # Crear las barras con etiquetas
plt.title('Proporción del nuevo set de datos desbalanceado')
plt.xlabel('Modalidad')
plt.ylabel('Cantidad')
plt.show()

In [None]:
## Se instancia un nuevo modelo
tf.keras.backend.clear_session()
base_model2 = ResNet50(weights='imagenet', include_top=False, input_shape=(64, 64, 3))

## Probar manteniendo transferlearning de imagenet versus de novo
base_model2.trainable = False

x = Flatten()(base_model2.output)
x = Dense(1024, activation='relu')(x)
output2 = Dense(len(classes), activation='softmax')(x)

model_unbal = Model(inputs=base_model2.input, outputs=output2)

EPOCHS = 10
BATCH_SIZE = 32

model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True)

## Compilado y entrenamiento
model_unbal.compile(optimizer=Adam(learning_rate=LR), loss = 'categorical_crossentropy', metrics=['accuracy', AUC(multi_label=True)])

history = model_unbal.fit(X_filtered, train_labels_bal, epochs= EPOCHS, verbose= 1, batch_size= BATCH_SIZE,
                validation_data = [X_test, test_labels], shuffle= True,
                callbacks=[model_checkpoint_callback,lr_scheduler])

model_unbal.load_weights(checkpoint_filepath)


In [None]:
pred_scores = model_unbal.predict(X_test)
y_pred = np.where(pred_scores > 0.5, 1, 0)
print(classification_report(test_labels, y_pred, target_names = classes))