# Predicción de Neumonía con Red Neuronal

In [None]:
import pathlib

path=str(pathlib.Path().resolve())+"/assets/"

In [None]:
#!pip install -q opencv-python

## Obtenemos el mapa de bits de la imagen en escala de 128^2 para las dos imagenes en escala de grises por lo que la 3ª dimensión de la imagen será 1

In [None]:
import os
import numpy as np
import cv2 as cv
import random

def get_data(path):
    images = []  # Lista para las imágenes
    labels = []  # Lista para las etiquetas
    dirs = os.listdir(path)
    
    for dir in dirs:
        for img in os.listdir(path+"/"+dir):
            images.append(cv.cvtColor(cv.resize(cv.imread(path +"/"+  dir + "/" + img), dsize=(128, 128),interpolation=cv.INTER_AREA),cv.COLOR_BGR2GRAY))
            if dir=="NORMAL":
                labels.append(0)
            else:
                labels.append(1)
    
    # Barajar las imágenes y las etiquetas de manera sincronizada
    combined = list(zip(images, labels))
    random.shuffle(combined)
    images, labels = zip(*combined)
    
    print(path + "/" + dirs[1])
    
    # Convertir las listas en arrays NumPy
    
    return np.array(images),np.array(labels) 


Ejecutamos en multi hilo la recolección de cada una de las carpetas

In [None]:
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    f1 = executor.submit(get_data, path+"train")
    f2 = executor.submit(get_data, path+"test")
    f3 = executor.submit(get_data, path+"val")

    train = f1.result()
    test = f2.result()
    val = f3.result()


Concatenamos cada uno de los valores en X e Y que luego dividiremos en sus respectivos valores con Train-Split

In [None]:
X=np.concatenate((train[0],test[0],val[0]))
y=np.concatenate((train[1],test[1],val[1]))

## Dividimos los datos en Train,test,val, estos los usaremos en la Red neuronal como valores de entrenamiento, test, validación

In [None]:
from sklearn.model_selection import train_test_split

X_train_full, X_test , y_train_full, y_test = train_test_split(X,y,test_size=0.2,random_state=42)
X_train, X_val , y_train, y_val = train_test_split(X_train_full,y_train_full,test_size=0.2,random_state=42)

In [None]:
X_train

Mostramos de forma Random imagenes que tienen Normal o Pneumonia de X_train e y_train

In [None]:
from matplotlib import pyplot as plt


class_names = ["NORMAL","PNEUMONIA"]

n_rows = 4
n_cols = 10
plt.figure(figsize=(n_cols * 1.2, n_rows * 1.2))
for row in range(n_rows):
    for col in range(n_cols):
        index = n_cols * row + col
        plt.subplot(n_rows, n_cols, index + 1)
        plt.imshow(X_train[index], cmap="binary", interpolation="nearest")
        plt.axis('off')
        plt.title(class_names[y_train[index]], fontsize=12)
plt.subplots_adjust(wspace=0.2, hspace=0.5)
plt.show()

In [None]:
X_train.shape

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from keras.datasets import fashion_mnist
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import SGD
from keras.utils import to_categorical


Estandarizamos las imagenes dividiendo entre 255 todos los valores de cada uno de los datos

In [None]:
X_train = X_train / 255.
X_test = X_test / 255.
X_val = X_val / 255.

In [None]:
#X_val,y_val=X_test[:int(len(X_test)/2)],y_test[:int(len(y_test)/2)]
#X_test,y_test=X_test[int(len(X_test)/2):],y_test[int(len(y_test)/2):]

In [None]:
#X_train=X_train.astype(np.uint8,copy=False)
#X_test=X_test.astype(np.uint8,copy=False)
#X_val=X_val.astype(np.uint8,copy=False)
#X_train

In [None]:
print(X_train.shape)
print(X_test.shape)

In [None]:
X_val

In [None]:
X_train.shape
#(1280, 64, 64, 3)

## Creación del modelo

Creamos un modelo de 9 capas
- 3 Conv2(32,64,128),(3,3)
- 3 MaxPooling2D(2,2)
- 1 Flatten()
- 2 Dense("relu","sigmoid")(128,1)

In [None]:
model = models.Sequential()

# Va hacer 32 Kernels que serán 3X3 es decir
# Recogera de cada 3 pixeles el central para la nueva imagen 
# Podríamos indicar los Strides (1,1) que son las casillas que avanza
# Le decimos que sea 28 x 28 y que tenga un canal
model.add(layers.Conv2D(32,(3,3),activation="relu",input_shape=(X.shape[1],X.shape[2],1)))
#model.add(layers.Conv2D(32,(3,3),activation="relu"))

# Se va reduciendo la imagen por lo que podemos aumentar el número de filtros, donde la imagen se nos va a la mitad
model.add(layers.MaxPooling2D((2,2)))

# Lo normal es aumentar el número de filtros en la imagen
model.add(layers.Conv2D(64,(3,3),activation="relu"))
#model.add(layers.Conv2D(64,(3,3),activation="relu"))
model.add(layers.MaxPooling2D((2,2)))

# Aplana la capa actual
model.add(layers.Conv2D(128,(3,3),activation="relu"))
#model.add(layers.Conv2D(128,(3,3),activation="relu"))
model.add(layers.MaxPooling2D((2,2)))
model.add(layers.Flatten())

model.add(layers.Dense(128,activation="relu"))
model.add(layers.Dense(1,activation="sigmoid"))

model.summary()

In [None]:
keras.utils.plot_model(model, "model.png", show_shapes=True,show_dtype=True,show_layer_names=True,)

Indicamos que usamos el Adam con lr de 0.001, con el binary_crossentropy y la metrica de accuracy

In [None]:
model.compile(loss="binary_crossentropy",  optimizer=keras.optimizers.Adam(learning_rate=0.001),   metrics=["accuracy"])

## Entrenamiento del modelo

Entrenamos el modelo con el EarlyStopping y una paciencia de 10, también indicamos que use CPU ya que disponemos de integrada

In [None]:

import sys

#gpus = tf.config.experimental.list_physical_devices('GPU')
#print(gpus)
#tf.config.set_visible_devices([], 'CPU') # hide the CPU
#tf.config.set_visible_devices(gpus[0], 'GPU') # unhide potentially hidden GPU
#tf.config.get_visible_devices()


cpus = tf.config.experimental.list_physical_devices('CPU')
print(cpus)
tf.config.set_visible_devices([], 'GPU')  # hide the GPU
tf.config.set_visible_devices(cpus[0], 'CPU') # unhide potentially hidden CPU
tf.config.get_visible_devices()

early_stopping_cb = keras.callbacks.EarlyStopping(patience=10,restore_best_weights=True)




history = model.fit(X_train,y_train,epochs=sys.maxsize,validation_data=(X_val,y_val),callbacks=[early_stopping_cb])

In [None]:
X_test.shape

In [None]:
pd.DataFrame(history.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

Evaluamos el modelo para ver su rendimiento

In [None]:
model.evaluate(X_test,y_test)
#accuracy: 0.9582 - loss: 0.1282

In [None]:
y_test_pred = model.predict(X_test)
y_test_pred_labels = y_test_pred.round(0)
y_test_true_labels = y_test

## Matriz de confusión para ver como predijo el modelo

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm = confusion_matrix(y_test_true_labels, y_test_pred_labels)

plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.ylabel('Verdadero')
plt.xlabel('Predicho')
plt.title('Matriz de confusión')
plt.show()