In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print("", end="")# os.path.join(dirname, filename))

# Any results you write to the current directory are saved as output.

# Bibliotecas usadas

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import keras
from keras.models import Sequential
from keras.layers import Dense, Conv2D , MaxPool2D , Flatten , Dropout , BatchNormalization
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report,confusion_matrix, ConfusionMatrixDisplay
from keras.callbacks import ReduceLROnPlateau
import tensorflow as tf
import tensorflow.keras.backend as K

import cv2
import os

# Sobre o Dataset...

O Dataset pode ser acessado na plataforma **Kaggle**, contendo imagens de raio-x de pulmões humanos. No total, existem 3 pastas: *test*, *train* e *val* onde dentro de cada uma, possui 1 pasta contendo imagens de pulmões com pneuminia e outra com saudáveis. Saiba mais em [Chest X-Ray Images (Pneumonia)](https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia/data)

In [None]:
sit_possiveis = ['PNEUMONIA', 'NORMAL']
img_size = 150
def processa_dados(diretorio_dados: str) -> np.array:
    '''
    Recebe um caminho a ser lido : str
    
    Retorna um np.array, contendo um array de imagens e labels
    '''
    raw = [] 
    
    for sit in sit_possiveis: 
        caminho = os.path.join(diretorio_dados, sit)
        classe = sit_possiveis.index(sit)
        for img in os.listdir(caminho):
            try:
                imagem_input = cv2.imread(os.path.join(caminho, img), cv2.IMREAD_GRAYSCALE)
                redimensionada = cv2.resize(imagem_input, (img_size, img_size)) 
                
                raw.append([redimensionada, classe]) # [..., [imagem, PNEUMONIA|NORMAL], ...]
                
                # print(len(raw))
            except Exception as e:
                print(e)
    
    return np.array(raw)

## Constantes

In [None]:
EPOCAS = 16
otimizador = 'adam'

# Loading the Dataset

In [None]:
train = processa_dados('../input/chest-xray-pneumonia/chest_xray/chest_xray/train')
test = processa_dados('../input/chest-xray-pneumonia/chest_xray/chest_xray/test')
val = processa_dados('../input/chest-xray-pneumonia/chest_xray/chest_xray/val')

# Análise do _dataset_

In [None]:
# Estatisticas do daraset 

estagios = ['TRAIN', 'TEST', 'VALIDATION']

com_pneumonia = [
    len([el for el in train if el[1] == 0]),
    len([el for el in test if el[1] == 0]),
    len([el for el in val if el[1] == 0])
]

sem_pneumonia = [
    len([el for el in train if el[1] == 1]),
    len([el for el in test if el[1] == 1]),
    len([el for el in val if el[1] == 1])
]


stages_viz = pd.DataFrame(
    {
        "PNEUMONIA": com_pneumonia,
        "NORMAL": sem_pneumonia,
        "ESTAGIOS": estagios,
    }
)

s1 = sns.barplot(x="ESTAGIOS", y="PNEUMONIA", data=stages_viz, color="red")

s2 = sns.barplot(x="ESTAGIOS", y="NORMAL", data=stages_viz, color="blue")

In [None]:
fig, ax = plt.subplots(3, 3, figsize=(8, 8))
c = 0

for i in range(3):
    for j in range(3):
            dcm = train[c][0]
            label = train[c][1]
            ax[i][j].imshow(dcm, cmap="bone")
            ax[i][j].set_title(sit_possiveis[label])
            c+=1
            
plt.subplots_adjust(wspace=0.5)
plt.subplots_adjust(hspace=0.5)

In [None]:
x_train = np.array([img for img, label in train])
y_train = np.array([label for img, label in train])

x_val = np.array([img for img, label in val])
y_val = np.array([label for img, label in val])

x_test = np.array([img for img, label in test])
y_test = np.array([label for img, label in test])

In [None]:
# Normalize the data
x_train = x_train / 255
x_val = x_val / 255
x_test = x_test / 255

In [None]:
# resize data for deep learning 
x_train = x_train.reshape(-1, img_size, img_size, 1)
x_val = x_val.reshape(-1, img_size, img_size, 1)
x_test = x_test.reshape(-1, img_size, img_size, 1)

In [None]:
# With data augmentation to prevent overfitting and handling the imbalance in dataset

datagen = ImageDataGenerator(
        featurewise_center=False,
        samplewise_center=False,
        featurewise_std_normalization=False,
        samplewise_std_normalization=False,
        rotation_range=30,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.2,
        horizontal_flip=True,
        vertical_flip=False, # me atentar a isso
        zca_whitening=False,
)



datagen.fit(x_train)

# Treinando o Modelo Convolucional

In [None]:
def create_and_fit(learning_rate, epochs,  batch_size, x_train, y_train, x_val, y_val):
    
    # defining model
    
    model = Sequential()
    model.add(Conv2D(32 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu' , input_shape = (150,150,1)))
    model.add(BatchNormalization()) # normaliza a camada anterior, acelera o treinamento
    model.add(MaxPool2D((2,2) , strides = 2 , padding = 'same'))
    model.add(Conv2D(64 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu'))
    model.add(Dropout(0.1))
    model.add(BatchNormalization())
    model.add(MaxPool2D((2,2) , strides = 2 , padding = 'same'))
    
    model.add(Conv2D(256 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu', name='l_05'))
    model.add(Dropout(0.2))
    model.add(BatchNormalization())
    model.add(MaxPool2D((2,2) , strides = 2 , padding = 'same'))    
    model.add(Flatten()) # lineariza a imagem
    model.add(Dense(units = 128 , activation = 'relu'))
    
    model.add(Dropout(0.2))
    model.add(Dense(units = 1 , activation = 'sigmoid'))
    
    model.compile(optimizer = otimizador, 
                  loss = 'binary_crossentropy', 
                  metrics = ['accuracy']
    )
    
    # fiting the model 
    history = model.fit(
        datagen.flow(x_train,y_train, batch_size=batch_size),
        epochs=epochs, 
        validation_data=datagen.flow(x_val, y_val),
        callbacks=[learning_rate])
    
    return history, model

In [None]:
learning_rate_reduction = ReduceLROnPlateau(monitor='val_accuracy', patience = 2, verbose=1,factor=0.3, min_lr=0.000001)

In [None]:
history, model = create_and_fit(
    learning_rate_reduction, 
    EPOCAS, 
    32, 
    x_train, 
    y_train, 
    x_val, 
    y_val
)

In [None]:
print("Loss of the model is - " , model.evaluate(x_test,y_test)[0])
print("Accuracy of the model is - " , model.evaluate(x_test,y_test)[1]*100 , "%")

# Métricas do modelo

In [None]:
epochs = [i for i in range(EPOCAS)]

fig, ax = plt.subplots(1, 2)
train_acc = history.history['accuracy']
train_loss = history.history['loss']

val_loss = history.history['val_loss']
val_acc = history.history['val_accuracy']
fig.set_size_inches(15,7.5)

ax[0].plot(epochs, train_acc, label="Treino")
ax[0].plot(epochs, val_acc, label="Validação")
ax[0].set_title('Acurácia do Treino x Validação')
ax[0].legend()
ax[0].set_xlabel("Épocas")
ax[0].set_ylabel("Acurácia")

ax[1].plot(epochs, train_loss, label="Treino")
ax[1].plot(epochs, val_loss, label="Validação")
ax[1].set_title('Perda do Treino x Validação')
ax[1].legend()
ax[1].set_xlabel("Épocas")
ax[1].set_ylabel("Perda")

In [None]:
prev = model.predict_classes(x_test)
prev = prev.reshape(1,-1)[0]
prev[:15]

In [None]:
conf_matrix = confusion_matrix(y_test,prev)
conf_matrix

In [None]:
conf_matrix = pd.DataFrame(conf_matrix , index = ['0','1'] , columns = ['0','1'])
plt.figure(figsize = (10,10))

sns.heatmap(conf_matrix,
            cmap= "bone", 
            linecolor = 'black' , 
            linewidth = 1 , 
            annot = True, 
            fmt='',
            xticklabels = sit_possiveis,
            yticklabels = sit_possiveis)



In [None]:
print(classification_report(y_test, prev, target_names = ['Pneumonia','Normal']))

In [None]:
correta = np.nonzero(prev == y_test)[0]
incorreta = np.nonzero(prev != y_test)[0]

correta.size, incorreta.size


### Classes preditas corretamente e incorretamente

In [None]:
plt.figure()

plt.subplot(1, 2, 1)
plt.imshow(x_test[correta[0]].reshape(150,150), cmap="gray", interpolation='none')
plt.title("Classe predita {}, Classe real {}".format(prev[correta[0]], y_test[correta[0]]))

plt.subplot(1, 2, 2)
plt.imshow(x_test[incorreta[0]].reshape(150,150), cmap="gray", interpolation='none')
plt.title("Classe predita {}, Classe real {}".format(prev[incorreta[0]], y_test[incorreta[0]]))

plt.subplots_adjust(wspace=0.5)  

plt.show()

# Treinando o Modelo Resnet50

In [None]:
def resnet_create_and_fit(learning_rate, epochs,  batch_size, x_train, y_train, x_val, y_val):
    
    # defining model
    
    img_input = tf.keras.layers.Input(shape=(img_size, img_size,1))
    img_conc = tf.keras.layers.Concatenate()([img_input, img_input, img_input])    

    
    base_model = tf.keras.applications.ResNet50(
        weights='imagenet', 
        include_top=False, 
        input_shape=(150, 150, 3), 
        input_tensor=img_conc
    )
    model = tf.keras.models.Sequential([
        base_model,
        tf.keras.layers.GlobalAveragePooling2D(),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    model.compile(optimizer= otimizador,
              loss='binary_crossentropy',
              metrics=['accuracy'])
    
    # fiting the model 
    history = model.fit(
        datagen.flow(x_train,y_train, batch_size=batch_size),
        epochs=epochs, 
        validation_data=datagen.flow(x_val, y_val),
        callbacks=[learning_rate])
    
    return history, model

In [None]:
history_resnet, resnet = resnet_create_and_fit(
    learning_rate_reduction, 
    EPOCAS, 
    32, 
    x_train, 
    y_train, 
    x_val, 
    y_val
)

In [None]:
print("Perda - " , resnet.evaluate(x_test,y_test)[0])
print("Acurácia - " , resnet.evaluate(x_test,y_test)[1]*100 , "%")

In [None]:
epochs = [i for i in range(EPOCAS)]

fig, ax = plt.subplots(1, 2)
train_resnet_acc = history_resnet.history['accuracy']
train_resnet_loss = history_resnet.history['loss']

val_resnet_loss = history_resnet.history['val_loss']
val_resnet_acc = history_resnet.history['val_accuracy']
fig.set_size_inches(15,7.5)

ax[0].plot(epochs, train_resnet_acc, label="Treino")
ax[0].plot(epochs, val_resnet_acc, label="Validação")
ax[0].set_title('Acurácia do Treino x Validação')
ax[0].legend()
ax[0].set_xlabel("Épocas")
ax[0].set_ylabel("Acurácia")

ax[1].plot(epochs, train_resnet_loss, label="Treino")
ax[1].plot(epochs, val_resnet_loss, label="Validação")
ax[1].set_title('Perda do Treino x Validação')
ax[1].legend()
ax[1].set_xlabel("Épocas")
ax[1].set_ylabel("Perda")

In [None]:
prev = resnet.predict_classes(x_test)
prev = prev.reshape(1,-1)[0]
prev[:15]

In [None]:
conf_matrix = confusion_matrix(y_test,prev)
conf_matrix

In [None]:
conf_matrix = pd.DataFrame(conf_matrix , index = ['0','1'] , columns = ['0','1'])
plt.figure(figsize = (10,10))

sns.heatmap(conf_matrix,
            cmap= "bone", 
            linecolor = 'black' , 
            linewidth = 1 , 
            annot = True, 
            fmt='',
            xticklabels = sit_possiveis,
            yticklabels = sit_possiveis)


In [None]:
print(classification_report(y_test, prev, target_names = ['Pneumonia','Normal']))

In [None]:
correta = np.nonzero(prev == y_test)[0]
incorreta = np.nonzero(prev != y_test)[0]

correta.size, incorreta.size


### Classes preditas corretamente e incorretamente

In [None]:
plt.figure()

plt.subplot(1, 2, 1)
plt.imshow(x_test[correta[0]].reshape(150,150), cmap="gray", interpolation='none')
plt.title("Classe predita {}, Classe real {}".format(prev[correta[0]], y_test[correta[0]]))

plt.subplot(1, 2, 2)
plt.imshow(x_test[incorreta[0]].reshape(150,150), cmap="gray", interpolation='none')
plt.title("Classe predita {}, Classe real {}".format(prev[incorreta[0]], y_test[incorreta[0]]))

plt.subplots_adjust(wspace=0.5)  

plt.show()

# Treinando o Modelo InceptionV3

In [None]:
def inception_create_and_fit(learning_rate, epochs,  batch_size, x_train, y_train, x_val, y_val):
    
    # defining model
    
    img_input = tf.keras.layers.Input(shape=(img_size, img_size, 1))
    img_conc = tf.keras.layers.Concatenate()([img_input, img_input, img_input])    

    base_model = tf.keras.applications.InceptionV3(
        weights='imagenet', 
        include_top=False, 
        input_shape=(150, 150, 3),
        input_tensor=img_conc
    )
    
    model = tf.keras.models.Sequential([
        base_model,
        tf.keras.layers.GlobalAveragePooling2D(),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    model.compile(optimizer=otimizador,
              loss='binary_crossentropy',
              metrics=['accuracy'])
    
    
    # fiting the model 
    history = model.fit(
        datagen.flow(x_train,y_train, batch_size=batch_size),
        epochs=epochs, 
        validation_data=datagen.flow(x_val, y_val),
        callbacks=[learning_rate])
    
    return history, model

In [None]:
history_inception, inception = inception_create_and_fit(
    learning_rate_reduction, 
    EPOCAS, 
    32, 
    x_train, 
    y_train, 
    x_val, 
    y_val
)

In [None]:
print("Perda - " , inception.evaluate(x_test,y_test)[0])
print("Acurácia - " , inception.evaluate(x_test,y_test)[1]*100 , "%")

In [None]:
epochs = [i for i in range(EPOCAS)]

fig, ax = plt.subplots(1, 2)
train_incep_acc = history_inception.history['accuracy']
train_incep_loss = history_inception.history['loss']

val_incep_loss = history_inception.history['val_loss']
val_incep_acc = history_inception.history['val_accuracy']
fig.set_size_inches(15,7.5)

ax[0].plot(epochs, train_incep_acc, label="Treino")
ax[0].plot(epochs, val_incep_acc, label="Validação")
ax[0].set_title('Acurácia do Treino x Validação')
ax[0].legend()
ax[0].set_xlabel("Épocas")
ax[0].set_ylabel("Acurácia")

ax[1].plot(epochs, train_incep_loss, label="Treino")
ax[1].plot(epochs, val_incep_loss, label="Validação")
ax[1].set_title('Perda do Treino x Validação')
ax[1].legend()
ax[1].set_xlabel("Épocas")
ax[1].set_ylabel("Perda")

In [None]:
prev = inception.predict_classes(x_test)
prev = prev.reshape(1,-1)[0]
prev[:15]

In [None]:
conf_matrix = confusion_matrix(y_test,prev)
conf_matrix

In [None]:
conf_matrix = pd.DataFrame(conf_matrix , index = ['0','1'] , columns = ['0','1'])
plt.figure(figsize = (10,10))

sns.heatmap(conf_matrix,
            cmap= "bone", 
            linecolor = 'black' , 
            linewidth = 1 , 
            annot = True, 
            fmt='',
            xticklabels = sit_possiveis,
            yticklabels = sit_possiveis)


In [None]:
print(classification_report(y_test, prev, target_names = ['Pneumonia','Normal']))

In [None]:
correta = np.nonzero(prev == y_test)[0]
incorreta = np.nonzero(prev != y_test)[0]

correta.size, incorreta.size

# Comparação entre modelos

In [None]:
epochs = [i for i in range(EPOCAS)]

fig, ax = plt.subplots(1, 2)
fig.set_size_inches(15,7.5)
fig.suptitle('Comparação de modelos', fontsize=16)

ax[0].plot(epochs, train_incep_acc, label="InceptionV3")
ax[0].plot(epochs, train_resnet_acc, label="ResNet50")
ax[0].plot(epochs, train_acc, label="CNN")
ax[0].set_title('Acurácia no treino')
ax[0].legend()
ax[0].set_xlabel("Épocas")
ax[0].set_ylabel("Acurácia")

ax[1].plot(epochs, train_incep_loss, label="InceptionV3")
ax[1].plot(epochs, train_resnet_loss, label="ResNet50")
ax[1].plot(epochs, train_loss, label="CNN")
ax[1].set_title('Perda no treino')
ax[1].legend()
ax[1].set_xlabel("Épocas")
ax[1].set_ylabel("Perda")

# Testagem do modelo

Nessa parte do código, vamos simular um cenário real, onde não possuímos esse "gabarito" das classes, ou seja, não sabemos realmente se a imagem apresenta pneumonia ou não.

In [None]:
import random

In [None]:
label1 = random.randint(0,1)
label2 = 0 if label1 == 1 else 1

In [None]:
def process_img(img_array):
    img = img_array / 255
    return img.reshape(-1, img_size, img_size, 1)

In [None]:
test_img1 = None
test_label1 = None
for img, label in test:
    if label == label1:
        test_img1 = img
        test_label1 = label
        break

test_img2 = None
test_label2 = None
for img, label in test:
    if label == label2:
        test_img2 = img
        test_label2 = label
        break
        
x1 = process_img(test_img1)
x2 = process_img(test_img2)

In [None]:
plt.subplot(1, 2, 1)
plt.imshow(x1.reshape(150, 150), cmap="gray", interpolation='none')
plt.title("Imagem 1")

plt.subplot(1, 2, 2)
plt.imshow(x2.reshape(150, 150), cmap="gray", interpolation='none')
plt.title("Imagem 2")
plt.show()

In [None]:
predicted_label1 = inception.predict_classes(x1)
predicted_label2 = inception.predict_classes(x2)

f"Imagem1 = {predicted_label1[0][0]}, Imagem2 = {predicted_label2[0][0]}"

In [None]:
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.imshow(x1.reshape(150, 150), cmap="gray", interpolation='none')
plt.title("Imagem 1 - " + f'Actual Label = {test_label1} , Predicted Label = {predicted_label1[0][0]}')

plt.subplot(1, 2, 2)
plt.imshow(x2.reshape(150, 150), cmap="gray", interpolation='none')
plt.title("Imagem 2 - " + f'Actual Label = {test_label2} , Predicted Label = {predicted_label2[0][0]}')
plt.show()