# Ejemplo

Detección de Neumonía utilizando CNN y herramientas de visualización como GradCam para resaltar areas de la imagen.
Utilizando dicha herramienta, al igual que con soluciones como `Saliency map`, podremos destacar las secciones donde la última capa de la CNN pondera al momento de definir una clasificación al input.

Ayudan a comprender qué parte o región de cuanta influencia tiene la imagen en la predicción del clase de esa imagen. Los mapas de calor se utilizan para resaltar la píxeles que tienen mayor efecto en la clase de esa imagen.

Las capas de CNN se comportan como un modelo no supervisado en este caso. Y nos salvamos del tedioso trabajo de anotar el enorme conjunto de datos con cuadros delimitadores manualmente. La implementación de la técnica del mapa de activación de clases depende de las capas de agrupación promedio global que son aumentado después de la capa convolucional final a espacialmente disminuir las dimensiones de la imagen y reducir los parámetros por lo tanto minimizando el sobreajuste.


Grad-CAM utiliza características específicas de la clase para producir mapas de localización de las regiones significativas de la imagen, haciendo que los modelos de caja negra sean más transparentes, al mostrar  visualizaciones que respaldan las predicciones de salida. 

En otras palabras, Grad-CAM combina gradiente de espacio de píxeles visualización con propiedad discriminativa de clase.


GradCAM se utiliza generalmente para resaltar las regiones más importantes desde el punto de vista del modelo CNN que se utilizan para realizar la clasificación.


![](https://www.researchgate.net/profile/Elias-Ennadifi/publication/345429793/figure/fig4/AS:1030530145464323@1622708941122/Localization-of-symptoms-using-GradCAM-visualization-method-Left-input-images-Middle.ppm)


## Cómo funciona?

![](https://www.researchgate.net/profile/Elias-Ennadifi/publication/345429793/figure/fig2/AS:1030530141274114@1622708940983/Flowchart-of-the-proposed-approach.ppm)


>NOTE: Para el siguiente Ejemplo se recomienda ejecutar en Kaggle y hacer referencia al dataset `chest-xray-pneumonia`

## Importamos librerias

In [None]:
import cv2
import datetime
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, recall_score

from tensorflow.keras import backend as K
from tensorflow.keras import layers
from tensorflow.keras import Model, Sequential
from tensorflow.keras.utils import plot_model
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping
from tensorflow.keras.preprocessing.image import ImageDataGenerator


print(tf.__version__)

## Instalamos los addons de tensorflow

In [None]:
!pip install tensorflow-addons
import tensorflow_addons as tfa

In [None]:
BATCH_SIZE = 8
EPOCHS = 30
IM_SIZE = 512
AUTOTUNE = tf.data.experimental.AUTOTUNE

tf.random.set_seed(10)

In [None]:
device_name = tf.test.gpu_device_name()
if "GPU" not in device_name:
    print("GPU device not found")
print('Found GPU at: {}'.format(device_name))

## Exploramos los directorios

In [None]:
for dirname, _, filenames in os.walk('../input/chest-xray-pneumonia/chest_xray/chest_xray/'):
    print(dirname)

## Verificamos algunos filenames

In [None]:
filenames = tf.io.gfile.glob('../input/chest-xray-pneumonia/chest_xray/chest_xray/train/*/*') # Utilizar el path correspondiente
# filenames.extend(tf.io.gfile.glob('../input/chest-xray-pneumonia/chest_xray/chest_xray/val/*/*'))
filenames.extend(tf.io.gfile.glob('../input/chest-xray-pneumonia/chest_xray/chest_xray/test/*/*'))

filenames[:3]

## Armamos un Dataframe a partir de los datos

In [None]:
data = pd.DataFrame()
for el in range(0, len(filenames)):
    target = filenames[el].split('/')[-2]
    path = filenames[el]
    data.loc[el, 'filename'] = path
    data.loc[el, 'class'] = target

print(data['class'].value_counts(dropna=False))
data[:10]

## Shuffle Data

In [None]:
data = shuffle(data, random_state=42)
data.reset_index(drop=True, inplace=True)
data[:10]

## Creamos una función que elimine todo lo que no sea imágenes

In [None]:
indexes=[]

def func(x):
    if x[-5:] != '.jpeg':
        idx = data[data['filename'] == x].index
        indexes.append(idx[0])
        print(idx[0], x)
    return x

data['filename'].map(func)

print(data.shape)
data.drop(index=indexes, axis=0, inplace=True)
data.reset_index(drop=True, inplace=True)
print(data.shape)

## Realizamos el split `train_data`, `test_data`

In [None]:
train_data, test_data = train_test_split(data, test_size=0.1, random_state=42, stratify=data['class'])
test_data = test_data[ : test_data.shape[0] // BATCH_SIZE * BATCH_SIZE]
print(train_data['class'].value_counts(dropna=False))
print(test_data['class'].value_counts(dropna=False))

## Realizamos el split `train_data`, `val_data`

In [None]:
train_data, val_data = train_test_split(train_data, test_size=0.1, random_state=42, stratify=train_data['class'])
print(train_data['class'].value_counts(dropna=False))
print(val_data['class'].value_counts(dropna=False))

## Definimos un `ImageDataGenerator` y `Augmentation` para expandir nuestro dataset (Solo para training!)

In [None]:
datagen = ImageDataGenerator(rescale = 1./255,
                             zoom_range=0.1, # 0.05
                             brightness_range=[0.9, 1.0],
                             height_shift_range=0.05, 
                             width_shift_range=0.05,
                             rotation_range=10, 
                            )



train_gen = datagen.flow_from_dataframe(train_data,
                                        x_col="filename",
                                        y_col="class",
                                        target_size=(IM_SIZE, IM_SIZE),
                                        color_mode='grayscale',
                                        batch_size=BATCH_SIZE,
                                        class_mode='categorical',
                                        shuffle=True,
                                        num_parallel_calls=AUTOTUNE)

In [None]:
test_datagen = ImageDataGenerator(rescale = 1./255)

val_gen = test_datagen.flow_from_dataframe(val_data,
                                        x_col="filename",
                                        y_col="class",
                                        target_size=(IM_SIZE, IM_SIZE),
                                        color_mode='grayscale',
                                        batch_size=BATCH_SIZE,
                                        class_mode='categorical',
                                        shuffle=False,
                                        num_parallel_calls=AUTOTUNE)

test_gen = test_datagen.flow_from_dataframe(test_data,
                                        x_col="filename",
                                        y_col="class",
                                        target_size=(IM_SIZE, IM_SIZE),
                                        color_mode='grayscale',
                                        batch_size=BATCH_SIZE,
                                        class_mode='categorical',
                                        shuffle=False,
                                        num_parallel_calls=AUTOTUNE)

## Definimos nuestra arquitectura CNN (Se puede optar por una red pre-entrenada)

In [None]:
# Define CNN model
def create_model():
    with tf.device('/gpu:0'):        
    
        # Model input
        input_layer = layers.Input(shape=(IM_SIZE, IM_SIZE, 1), name='input')  
        
        # First block
        x = layers.Conv2D(filters=128, kernel_size=3, 
                          activation='relu', padding='same', 
                          name='conv2d_1')(input_layer)
        x = layers.MaxPool2D(pool_size=2, name='maxpool2d_1')(x)
        x = layers.Dropout(0.1, name='dropout_1')(x)

        # Second block
        x = layers.Conv2D(filters=128, kernel_size=3, 
                          activation='relu', padding='same', 
                          name='conv2d_2')(x)
        x = layers.MaxPool2D(pool_size=2, name='maxpool2d_2')(x)
        x = layers.Dropout(0.1, name='dropout_2')(x)

        # Third block
        x = layers.Conv2D(filters=128, kernel_size=3, 
                          activation='relu', padding='same', 
                          name='conv2d_3')(x)
        x = layers.MaxPool2D(pool_size=2, name='maxpool2d_3')(x)
        x = layers.Dropout(0.1, name='dropout_3')(x)

        # Fourth block
        x = layers.Conv2D(filters=256, kernel_size=3, 
                          activation='relu', padding='same', 
                          name='conv2d_4')(x)
        x = layers.MaxPool2D(pool_size=2, name='maxpool2d_4')(x)
        x = layers.Dropout(0.1, name='dropout_4')(x)

        # Fifth block
        x = layers.Conv2D(filters=256, kernel_size=3, 
                          activation='relu', padding='same', 
                          name='conv2d_5')(x)
        x = layers.MaxPool2D(pool_size=2, name='maxpool2d_5')(x)
        x = layers.Dropout(0.1, name='dropout_5')(x)

        # Sixth block
        x = layers.Conv2D(filters=512, kernel_size=3, 
                          activation='relu', padding='same', 
                          name='conv2d_6')(x)
        x = layers.MaxPool2D(pool_size=2, name='maxpool2d_6')(x)
        x = layers.Dropout(0.1, name='dropout_6')(x)

        # Seventh block
        x = layers.Conv2D(filters=512, kernel_size=3, 
                          activation='relu', padding='same', 
                          name='conv2d_7')(x)
        x = layers.MaxPool2D(pool_size=2, name='maxpool2d_7')(x)
        x = layers.Dropout(0.1, name='dropout_7')(x)
        
        # GlobalAveragePooling
        x = layers.GlobalAveragePooling2D(name='global_average_pooling2d')(x)   
        x = layers.Flatten()(x)
        
        # Head
        x = layers.Dense(1024,activation='relu')(x)
        x = layers.Dropout(0.1, name='dropout_head_2')(x)
        x = layers.Dense(128,activation='relu')(x)
        
        # Output
        output = layers.Dense(units=2, 
                              activation='softmax', 
                              name='output')(x)


        model = Model(input_layer, output)   

        F_1_macro = [tfa.metrics.f_scores.F1Score(num_classes=2, average="macro")] 
        
        model.compile(optimizer='adam', 
                      loss='categorical_crossentropy', 
                      metrics=F_1_macro)

    return model

model = create_model()

In [None]:
model.summary()

In [None]:
def feed_data(dataset):
    return dataset.prefetch(buffer_size=AUTOTUNE)  

## Entrenamos el modelo

In [None]:
init_time = datetime.datetime.now()


train_steps = train_gen.samples // BATCH_SIZE
valid_steps = val_gen.samples // BATCH_SIZE

early_stopping = EarlyStopping(monitor="val_loss", patience=8, mode="min")
checkpoint = ModelCheckpoint("acc-{val_loss:.4f}.h5", monitor="val_loss", verbose=0, 
                             save_best_only=True, save_weights_only=True, mode="min")
learning_rate_reduction = ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=3, 
                                            min_lr=1e-7, verbose=1, mode="min")


history = model.fit(
    train_gen,
    validation_data=val_gen,
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    steps_per_epoch=train_steps,
    validation_steps=valid_steps,
    callbacks=[
                checkpoint, 
                early_stopping, 
                learning_rate_reduction],
    verbose=1,
    )

requared_time = datetime.datetime.now() - init_time
print(f'\nRequired time:  {str(requared_time)}\n')

In [None]:
history_df = pd.DataFrame(history.history)
history_df.loc[0:, ['loss', 'val_loss']].plot()
print("Minimum Validation Loss: {:0.4f}".format(history_df['val_loss'].min()));

In [None]:
test_steps = test_gen.samples // BATCH_SIZE

test_loss, test_acc = model.evaluate(test_gen, steps=test_steps)
print('\naccuracy:', test_acc, 'loss: ',test_loss)

In [None]:
predict = model.predict(test_gen, steps=test_steps)
y_hat = np.argmax(predict, axis=1)
y_hat[:20]

In [None]:
test_labels_df = pd.DataFrame()
test_labels_df[['class']] = test_data[['class']]
test_labels_df['class'] = test_labels_df['class'].map({'NORMAL':0, 'PNEUMONIA':1})

y_test = np.array(test_labels_df['class'])
y_test[:20]

# Classification report

In [None]:
print(classification_report(y_test, y_hat), '\n')
cm = confusion_matrix(y_test, y_hat)
sns.heatmap(cm, annot=True, cmap="Blues", fmt='.0f', cbar=False)

In [None]:
model.save('/kaggle/working/model/')

# Feature Visualization

In [None]:
!pip install tf_keras_vis

In [None]:
import tf_keras_vis 
from matplotlib import cm
tf_keras_vis.__version__

In [None]:
from tf_keras_vis.gradcam import Gradcam
from tf_keras_vis.utils.scores import CategoricalScore
from tf_keras_vis.utils.model_modifiers import ReplaceToLinear
from tf_keras_vis.gradcam_plus_plus import GradcamPlusPlus
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()

In [None]:
def model_modifier_function(cloned_model):
    '''modify model activation'''
    cloned_model.layers[-1].activation = tf.keras.activations.linear


def get_gradcam_plus(img,
                    score,
                    model=model,
                    model_modifier=ReplaceToLinear()):    
    gradcampls = GradcamPlusPlus(model,
                          model_modifier=model_modifier,
                          clone=True)
    heatmap = gradcampls(score, img)
    heatmap = np.uint8(cm.jet(heatmap[0])[..., :3] * 255)
    return heatmap

In [None]:
iterator = iter(test_gen)
iterator.next()
iterator.next()

imgs,labs = iterator.next()
real_labs = list(np.argmax(labs, axis=1))
print(real_labs)

In [None]:
class_dict = {0: 'NORMAL', 
              1: 'PNEUMONIA',} 

In [None]:
plt.rcParams['font.size'] = '20'
plt.subplots(8,2,figsize=(20,100))

idx=1
for i,img in enumerate(imgs[:8]):
#     print(f'Image {i}')    
    img_4d = tf.cast(tf.reshape(img, [1, IM_SIZE, IM_SIZE, 1]), tf.float32)
    predict = model.predict(img_4d)
    prd = np.argmax(predict)
#     print(f'class: {class_dict[prd]}')
    score1 = CategoricalScore(prd)
    original_lab = real_labs[i]
    
    plt.subplot(8,2,idx)
    plt.title(f'orignal {class_dict[original_lab]}')
    plt.axis('off')
    plt.imshow(img, cmap=plt.cm.binary)
    idx+=1
    
    plt.subplot(8,2,idx)
    gdcam_pls = get_gradcam_plus(img, score1)
    plt.imshow(img, cmap=plt.cm.binary)
    if prd:
        plt.imshow(gdcam_pls, alpha=0.2, cmap='jet')
        proba = round(float(predict[0][1]), 4)
    else:
        proba = round(float(predict[0][0]), 4)
    plt.title(f'predicted {class_dict[prd]}  {proba} probability')
    plt.axis('off')
    idx+=1
    if idx>20:
        break

plt.tight_layout()
plt.show()

## Source: https://www.kaggle.com/code/artyomkolas/pneumonia-cnn-feature-visualization
- Updated and tweaked by Alejandro Casas