In [1]:
#basic libaries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline 
# %matplotlib inline -> the result are displayed below the code block and The resulting plots will then also be stored in the notebook document.


In [2]:
pd.options.display.max_colwidth=100 #sets the maximum width to 100 pixels per column.

In [3]:
import random
import os #The OS module in Python provides functions for creating and removing a directory (folder), fetching its contents, changing and identifying the current directory, etc
from numpy.random import seed
seed(42)
random.seed(42) #it makes random number predictable
os.environ["PYTHONHASHSEED"]=str(42) # Set a fixed value for the hash seed
os.environ["'TF_DETERMINISTIC_OPS'"]='1' #if an op is run multiple times with the same inputs on the same hardware, it will have the exact same outputs each time

In [4]:
import tensorflow
from tensorflow.keras import layers
from tensorflow.keras import callbacks
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [5]:
import glob #to return all file paths that match a specific pattern
import cv2
from tensorflow.random import set_seed #global seed
set_seed(42) #If both the global and the operation seed are set: Both seeds are used in conjunction to determine the random sequence.
import warnings
warnings.filterwarnings("ignore") #it ignore all warnings

In [6]:
#getting image data from kaggle link
main_path = "/kaggle/input/chest-xray-pneumonia/chest_xray/chest_xray"


train_path = os.path.join(main_path,"train")
test_path=os.path.join(main_path,"test")

train_normal = glob.glob(train_path+"/NORMAL/*.jpeg")
train_pneumonia = glob.glob(train_path+"/PNEUMONIA/*.jpeg")

test_normal = glob.glob(test_path+"/NORMAL/*.jpeg")
test_pneumonia = glob.glob(test_path+"/PNEUMONIA/*.jpeg")

In [7]:
#combine both train_normal and train_pneumonia as single dataset
train_list = [x for x in train_normal]
train_list.extend([x for x in train_pneumonia])

In [8]:

df_train = pd.DataFrame(np.concatenate([['Normal']*len(train_normal) , ['Pneumonia']*len(train_pneumonia)]), columns = ['class'])
df_train['image'] = [x for x in train_list]

In [9]:
#combine both test_normal and test_pneumonia as single dataset
test_list = [x for x in test_normal]
test_list.extend([x for x in test_pneumonia])

In [10]:

df_test = pd.DataFrame(np.concatenate([['Normal']*len(test_normal) , ['Pneumonia']*len(test_pneumonia)]), columns = ['class'])
df_test['image'] = [x for x in test_list]

In [11]:
df_train

In [12]:
print(df_train.shape)

In [13]:
df_test

In [14]:
print(df_test.shape)

In [15]:
plt.figure(figsize=(6,4)) #sizeof the figure

ax = sns.countplot(x='class', data=df_train, palette="mako")#palette is choosing color pattern

plt.xlabel("Class", fontsize= 12)
plt.ylabel("# of Samples", fontsize= 12)
plt.ylim(0,5000)
plt.xticks([0,1], ['Normal', 'Pneumonia'], fontsize = 11)

#to have number of count values in each class and represent the value above the each class graph
for p in ax.patches: 
    ax.annotate((p.get_height()), (p.get_x()+0.30, p.get_height()+300), fontsize = 13)
    
plt.show()


In [16]:
plt.figure(figsize=(6,4))

ax = sns.countplot(x='class', data=df_test, palette="mako")

plt.xlabel("Class", fontsize= 12)
plt.ylabel("# of Samples", fontsize= 12)
plt.ylim(0,500)
plt.xticks([0,1], ['Normal', 'Pneumonia'], fontsize = 11)

for p in ax.patches:
    ax.annotate((p.get_height()), (p.get_x()+0.32, p.get_height()+20), fontsize = 13)
    
plt.show()

In [17]:
#to see image size
# import cv2
# im = cv2.imread('photo.jpeg')
# print(im.shape)
# (712,1072,3)

In [18]:
IMG_SIZE = 224 #image size 
BATCH = 32 # batch_size
SEED = 42 #seed value

In [19]:
print('Train Set - Normal')

plt.figure(figsize=(12,12))

for i in range(0, 12):
    plt.subplot(3,4,i + 1)
    img = cv2.imread(train_normal[i])
    img = cv2.resize(img, (IMG_SIZE,IMG_SIZE)) #resizing the (712,1072) image to (224,224)
    plt.imshow(img)
    plt.axis("off")

plt.tight_layout()#automatically adjust subplot parameters to give specified padding
plt.show()

In [20]:
print('Train Set - Pneumonia')

plt.figure(figsize=(12,12))

for i in range(0, 12):
    plt.subplot(3,4,i + 1)
    img = cv2.imread(train_pneumonia[i])
    img = cv2.resize(img, (IMG_SIZE,IMG_SIZE))
    plt.imshow(img)
    plt.axis("off")

plt.tight_layout()

plt.show()

In [21]:
print('Test Set - Normal')

plt.figure(figsize=(12,12))

for i in range(0, 12):
    plt.subplot(3,4,i + 1)
    img = cv2.imread(test_normal[i])
    img = cv2.resize(img, (IMG_SIZE,IMG_SIZE))
    plt.imshow(img)
    plt.axis("off")

plt.tight_layout()

plt.show()

In [22]:
print('Test Set - Pneumonia')

plt.figure(figsize=(12,12))

for i in range(0, 12):
    plt.subplot(3,4,i + 1)
    img = cv2.imread(test_pneumonia[i])
    img = cv2.resize(img, (IMG_SIZE,IMG_SIZE))
    plt.imshow(img)
    plt.axis("off")

plt.tight_layout()

plt.show()

In [23]:
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.metrics import accuracy_score

In [24]:
train_df, val_df = train_test_split(df_train, test_size = 0.20, random_state = SEED, stratify = df_train['class'])# (statify->data is split in a stratified(layered) fashion, using this as the class labels)

In [25]:
print(train_df.shape)
print(val_df.shape)

**We begin by defining the data generators. With Keras Image Data Generator, we can rescale the pixel values and apply random transformation techniques for data augmentation on the fly. We define two different generators. The val_datagen is used to simply rescale the validation and test sets. The train_datagen includes some transformations to augment the train set.

We apply those generators on each dataset using the flow_from_dataframe method. Apart from the transformations defined in each generator, the images are also resized based on the target_size set.**

In [26]:
train_datagen = ImageDataGenerator(rescale=1/255.,
                                  zoom_range = 0.1,
                                  width_shift_range = 0.1,
                                  height_shift_range = 0.1)
val_datagen = ImageDataGenerator(rescale=1/255.)
#flow_from_diectory->allows you to read the images directly from the directory and augment them while the neural network model is learning on the training data

ds_train = train_datagen.flow_from_dataframe(train_df,
                                             x_col = 'image',
                                             y_col = 'class',
                                             target_size = (IMG_SIZE, IMG_SIZE),
                                             class_mode = 'binary',
                                             batch_size = BATCH,
                                             seed = SEED)
ds_val = val_datagen.flow_from_dataframe(val_df,
                                            x_col = 'image',
                                            y_col = 'class',
                                            target_size = (IMG_SIZE, IMG_SIZE),
                                            class_mode = 'binary',
                                            batch_size = BATCH,
                                            seed = SEED)

ds_test = val_datagen.flow_from_dataframe(df_test,
                                            x_col = 'image',
                                            y_col = 'class',
                                            target_size = (IMG_SIZE, IMG_SIZE),
                                            class_mode = 'binary',
                                            batch_size = 1,
                                            shuffle = False)


In [27]:
# callbacks --> cocndition to stop model traning 

#earlystopping-->Stop training when a monitored metric has stopped improving.

early_stopping = callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5,
    min_delta=1e-7,
    restore_best_weights=True,
)

#ReduceLROnPlateau-->Reduce learning rate when a metric has stopped improving.

plateau = callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor = 0.2,                                     
    patience = 2,                                   
    min_delt = 1e-7,                                
    cooldown = 0,                               
    verbose = 1
) 

In [28]:
#basic_CNN_model
def my_model():
    
    #1
    model=tensorflow.keras.Sequential()
    model.add(layers.Conv2D(input_shape=(IMG_SIZE,IMG_SIZE,3),filters=16,kernel_size=3,padding="valid"))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPool2D())
    model.add(layers.Dropout(0.2))
    
    #2
    model.add(layers.Conv2D(filters=32,kernel_size=3,padding="valid"))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPool2D())
    model.add(layers.Dropout(0.2))
    
    #3
    model.add(layers.Conv2D(filters=64,kernel_size=3,padding="valid"))
    model.add(layers.Conv2D(filters=64,kernel_size=3,padding="valid"))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPool2D())
    model.add(layers.Dropout(0.4))
    
    #Flatten and Dense layers
    model.add(layers.Flatten())
    model.add(layers.Dense(64,activation="relu"))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(1,activation="sigmoid"))
    
    #final model
    return model

    
    

In [29]:
tensorflow.keras.backend.clear_session()#Clearing the session removes all the nodes left over from previous models, freeing memory and preventing slowdown.
model=my_model()
model.compile(loss="binary_crossentropy",optimizer=tensorflow.keras.optimizers.Adam(learning_rate=3e-5),metrics="binary_accuracy")

model.summary()

In [30]:
#training the model
history=model.fit(ds_train,batch_size=BATCH,epochs=50,validation_data=ds_val,callbacks=[early_stopping,plateau],steps_per_epoch=(len(train_df)/BATCH),validation_steps=(len(val_df)/BATCH))

In [31]:
fig, ax = plt.subplots(figsize=(20,8))
sns.lineplot(x = history.epoch, y = history.history['loss'])
sns.lineplot(x = history.epoch, y = history.history['val_loss'])
ax.set_title('Learning Curve (Loss)')
ax.set_ylabel('Loss')
ax.set_xlabel('Epoch')
ax.set_ylim(0, 0.5)
ax.legend(['train', 'val'], loc='best')
plt.show()

In [32]:
fig, ax = plt.subplots(figsize=(20,8))
sns.lineplot(x = history.epoch, y = history.history['binary_accuracy'])
sns.lineplot(x = history.epoch, y = history.history['val_binary_accuracy'])
ax.set_title('Learning Curve (Accuracy)')
ax.set_ylabel('Accuracy')
ax.set_xlabel('Epoch')
ax.set_ylim(0.80, 1.0)
ax.legend(['train', 'val'], loc='best')
plt.show()

In [33]:
#VALIDATION DATA ACCURACY AND LOSS
score = model.evaluate(ds_val, steps = len(val_df)/BATCH, verbose = 0)
print('Val loss:', score[0])
print('Val accuracy:', score[1])

In [34]:
#TEST DATA ACCURACY AND LOSS
score = model.evaluate(ds_test, steps = len(df_test), verbose = 0)

print('Test loss:', score[0])
print('Test accuracy:', score[1])

** TRANSFER LEARNING **

In [35]:
# 1.ResNet152V2 MODEL 

base_model = tensorflow.keras.applications.ResNet152V2(
    weights='imagenet',
    input_shape=(IMG_SIZE, IMG_SIZE, 3),
    include_top=False) #include_top=false-> means that a fully-connected layer will not be added at the end of the model. 
base_model.trainable = False #->layers weight get freezed

def get_pretrained():
    inputs = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    
    x = base_model(inputs) #->passing input_shape to resnet model

    # 1
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.1)(x)
    
    #Final Layer
    output = layers.Dense(1, activation='sigmoid')(x)
    
    model = tensorflow.keras.Model(inputs=[inputs], outputs=output)
    
    return model

In [36]:
tensorflow.keras.backend.clear_session() #->clearing all session removes all nodes left over from previous model, freeing memory and preventing slowdown

model_pretrained = get_pretrained()
model_pretrained.compile(loss='binary_crossentropy',optimizer = tensorflow.keras.optimizers.Adam(learning_rate=5e-5), metrics='binary_accuracy')

model_pretrained.summary()

In [37]:
history = model_pretrained.fit(ds_train,batch_size = BATCH, epochs = 50,validation_data=ds_val,
          callbacks=[early_stopping, plateau],steps_per_epoch=(len(train_df)/BATCH),
          validation_steps=(len(val_df)/BATCH));

In [38]:
fig, ax = plt.subplots(figsize=(20,8))
sns.lineplot(x = history.epoch, y = history.history['loss'])
sns.lineplot(x = history.epoch, y = history.history['val_loss'])
ax.set_title('Learning Curve (Loss)')
ax.set_ylabel('Loss')
ax.set_xlabel('Epoch')
ax.set_ylim(0, 0.5)
ax.legend(['train', 'val'], loc='best')
plt.show()

In [39]:
fig, ax = plt.subplots(figsize=(20,8))
sns.lineplot(x = history.epoch, y = history.history['binary_accuracy'])
sns.lineplot(x = history.epoch, y = history.history['val_binary_accuracy'])
ax.set_title('Learning Curve (Accuracy)')
ax.set_ylabel('Accuracy')
ax.set_xlabel('Epoch')
ax.set_ylim(0.80, 1.0)
ax.legend(['train', 'val'], loc='best')
plt.show()


In [40]:
score = model_pretrained.evaluate(ds_val, steps = len(val_df)/BATCH, verbose = 0)
print('Val loss:', score[0])
print('Val accuracy:', score[1])

In [41]:
score = model_pretrained.evaluate(ds_test, steps = len(df_test), verbose = 0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [42]:
# 2.vgg19


base_model_1 =tensorflow.keras.applications.VGG19(weights='imagenet', 
                                include_top=False, 
                                input_shape=(224, 224,3))
base_model_1.trainable = False

def get_pretrained_model():
    inputs = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    
    x = base_model_1(inputs) #->passing input_shape to vgg19 model

    # 1
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.1)(x)
    
    #Final Layer
    output = layers.Dense(1, activation='sigmoid')(x)
    
    model = tensorflow.keras.Model(inputs=[inputs], outputs=output)
    
    return model

In [43]:
tensorflow.keras.backend.clear_session()

model_pretrained_1 = get_pretrained_model()
model_pretrained_1.compile(loss='binary_crossentropy',optimizer = tensorflow.keras.optimizers.Adam(learning_rate=5e-5), metrics='binary_accuracy')

model_pretrained_1.summary()

In [44]:
history_1 = model_pretrained_1.fit(ds_train,batch_size = BATCH, epochs = 50,validation_data=ds_val,
          callbacks=[early_stopping, plateau],steps_per_epoch=(len(train_df)/BATCH),
          validation_steps=(len(val_df)/BATCH));

In [45]:
fig, ax = plt.subplots(figsize=(20,8))
sns.lineplot(x = history_1.epoch, y = history_1.history['loss'])
sns.lineplot(x = history_1.epoch, y = history_1.history['val_loss'])
ax.set_title('Learning Curve (Loss)')
ax.set_ylabel('Loss')
ax.set_xlabel('Epoch')
ax.set_ylim(0, 0.5)
ax.legend(['train', 'val'], loc='best')
plt.show()

In [46]:
fig, ax = plt.subplots(figsize=(20,8))
sns.lineplot(x = history_1.epoch, y = history_1.history['binary_accuracy'])
sns.lineplot(x = history_1.epoch, y = history_1.history['val_binary_accuracy'])
ax.set_title('Learning Curve (Accuracy)')
ax.set_ylabel('Accuracy')
ax.set_xlabel('Epoch')
ax.set_ylim(0.80, 1.0)
ax.legend(['train', 'val'], loc='best')
plt.show()


In [47]:
score = model_pretrained_1.evaluate(ds_val, steps = len(val_df)/BATCH, verbose = 0)
print('Val loss:', score[0])
print('Val accuracy:', score[1])

In [48]:
score = model_pretrained_1.evaluate(ds_test, steps = len(df_test), verbose = 0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [49]:
base_model.trainable = True #->make all layers are true to train


for layer in base_model.layers[:-13]:
    layer.trainable = False #-> again make all layers are false execept last few layers

In [50]:
# Check which layers are tuneable (trainable)
for layer_number, layer in enumerate(base_model.layers):
    print(layer_number, layer.name, layer.trainable)

In [51]:
model_pretrained.compile(loss='binary_crossentropy'
              , optimizer = tensorflow.keras.optimizers.Adam(learning_rate=2e-6), metrics='binary_accuracy')

model_pretrained.summary()

In [52]:
history = model_pretrained.fit(ds_train,
          batch_size = BATCH, epochs = 50,
          validation_data=ds_val,
          callbacks=[early_stopping, plateau],
          steps_per_epoch=(len(train_df)/BATCH),
          validation_steps=(len(val_df)/BATCH));

In [53]:
fig, ax = plt.subplots(figsize=(20,8))
sns.lineplot(x = history.epoch, y = history.history['loss'])
sns.lineplot(x = history.epoch, y = history.history['val_loss'])
ax.set_title('Learning Curve (Loss)')
ax.set_ylabel('Loss')
ax.set_xlabel('Epoch')
ax.set_ylim(0, 0.3)
ax.legend(['train', 'val'], loc='best')
plt.show()

In [54]:
fig, ax = plt.subplots(figsize=(20,8))
sns.lineplot(x = history.epoch, y = history.history['binary_accuracy'])
sns.lineplot(x = history.epoch, y = history.history['val_binary_accuracy'])
ax.set_title('Learning Curve (Accuracy)')
ax.set_ylabel('Accuracy')
ax.set_xlabel('Epoch')
ax.set_ylim(0.90, 1.0)
ax.legend(['train', 'val'], loc='best')
plt.show()

In [55]:
score = model_pretrained.evaluate(ds_val, steps = len(val_df)/BATCH, verbose = 0)
print('Val loss:', score[0])
print('Val accuracy:', score[1])

In [56]:
score = model_pretrained.evaluate(ds_test, steps = len(df_test), verbose = 0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [59]:

num_label = {'Normal': 0, 'Pneumonia' : 1} #->creating a dictionary
Y_test = df_test['class'].copy().map(num_label).astype('int') #-> map the class and num_label values as int

In [60]:
ds_test.reset()
predictions = model_pretrained.predict(ds_test, steps=len(ds_test), verbose=0) #->predict the ds_test data with above model
pred_labels= np.where(predictions>0.5, 1, 0)

In [61]:
print("Test Accuracy: ", accuracy_score(Y_test, pred_labels))

In [62]:
# confusion matrix
confusion_matrix = metrics.confusion_matrix(Y_test, pred_labels)
sns.heatmap(confusion_matrix, annot=True, fmt="d")

plt.xlabel("Predicted Label", fontsize= 12)
plt.ylabel("True Label", fontsize= 12)

plt.show()

In [63]:
print(metrics.classification_report(Y_test, pred_labels, labels = [0, 1]))

In [64]:
roc_auc = metrics.roc_auc_score(Y_test, predictions)
print('ROC_AUC: ', roc_auc)

fpr, tpr, thresholds = metrics.roc_curve(Y_test, predictions)

plt.plot(fpr, tpr, label = 'ROC_AUC = %0.3f' % roc_auc)

plt.xlabel("False Positive Rate", fontsize= 12)
plt.ylabel("True Positive Rate", fontsize= 12)
plt.legend(loc="lower right")

plt.show()

In [76]:
model_pretrained.save("Prediction.h5") #->save the above model