# Notebook information
Dataset corresponds to these three categories: 
-> Healthy
-> Bacterial-pneumonia
-> Viral-pneumonia

If chest x-rays is Bacterial-pneumonia + Viral-pneumonia, this person is infected with covid-19. Reason for not grouping Bacterial-pneumonia and Viral-pneumonia into one category is because there is different features in the images to look out for in these two categories. Combining them will mess things up as accuracy would be affected, there could be misclassification. Another pointer, is that there is uneven quantity of data in these respective categories. To illustrate, we have 2000+ imgs of Bacterail-pneumonia but only 1400 imgs of healthy lungs. Hence, acc will def be affected.


# Imports 

In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import shutil
import os

# Reading in and looking @ data

In [2]:
metadata=pd.read_csv("../input/coronahack-chest-xraydataset/Chest_xray_Corona_Metadata.csv")
metadata.head()

In [3]:
fig, ax = plt.subplots(1, 2,figsize=(20, 5))
ax[0].hist(metadata['Label']);
ax[1].hist(metadata['Label_1_Virus_category'].astype(str));

# Divide the data to three categories(test, valid, training)

In [4]:
#get training data and testing data
train_df = metadata[metadata['Dataset_type'] == 'TRAIN']
test_df = metadata[metadata['Dataset_type'] == 'TEST']

In [5]:
#Divide each virus with corresponding images to different variables
train_virus = train_df[train_df.Label_1_Virus_category == 'Virus']['X_ray_image_name']
train_bacterial=train_df[train_df.Label_1_Virus_category == 'bacteria']['X_ray_image_name']
train_normal=train_df[train_df.Label == 'Normal']['X_ray_image_name']

len(train_virus),len(train_bacterial),len(train_normal)


**Split the data to test,valid and training**

In [6]:
def split_to_training_validation(data,split=0.2):
    """
    20% -> validation
    80% -> training
    
    data -> data series images 
    split -> pararmeter to split
    
    returns a validation and training set
    """
    
    valid_data=data[:round(split*len(data))]
    train_data=data[round(split*len(data)):]
    
    return valid_data, train_data

In [7]:
valid_virus,train_virus=split_to_training_validation(train_virus)
valid_bacterial,train_bacterial=split_to_training_validation(train_bacterial)
valid_normal,train_normal=split_to_training_validation(train_normal)

len(train_virus),len(valid_virus), len(valid_normal),len(train_normal)

In [8]:
lables=['Healthy','Viral-pneumonia','Bacterial-pneumonia']
training_data_classes=[train_normal,train_virus,train_bacterial]
source='../input/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/train'

for i in range(0,len(lables)):
    target='/dataset/train/'+lables[i] #choose where the data from kaggle should be placed
    
    
    
    os.makedirs('/dataset/train/'+lables[i]) #create new folder with lables
    move=training_data_classes[i]
    for j in move:
        
        
        #move everything from source path to new target path as iterating through the labels
        path=os.path.join(source,j)
        shutil.copy(path,target)

In [9]:
validation_data_classes=[valid_normal,valid_virus,valid_bacterial]
for i in range(0,len(lables)):
    target='/dataset/valid/'+lables[i] #choose where the data from kaggle should be placed
    
    
    
    os.makedirs('/dataset/valid/'+lables[i]) #create new folder with lables
    move=validation_data_classes[i]
    for j in move:
        #move everything from source path to new target path as iterating through the labels
        path=os.path.join(source,j)
        shutil.copy(path,target)

In [10]:
test_virus = test_df[test_df.Label_1_Virus_category == 'Virus']['X_ray_image_name']
test_bacterial=test_df[test_df.Label_1_Virus_category == 'bacteria']['X_ray_image_name']
test_normal=test_df[test_df.Label == 'Normal']['X_ray_image_name']

len(test_virus),len(test_bacterial),len(test_normal)

In [11]:
classes=[test_normal,test_virus,test_bacterial]
source='../input/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/test'

for i in range(0,len(lables)):
    
    
    
    target='/dataset/test/'+lables[i] #choose where the data from kaggle should be placed 
    
    
    
    os.makedirs('/dataset/test/'+lables[i]) #create new folder with lables
    move=classes[i]
    for j in move:
        #move everything from source path to new target path as iterating through the labels
        path=os.path.join(source,j)
        shutil.copy(path,target)

**Get class names to confirm the division is done right**

In [12]:
import pathlib



#Print out classes from the created directory
data_dir = pathlib.Path("/dataset/valid")
class_names = np.array(sorted([item.name for item in data_dir.glob("*")])) # Created a list of class_names from the subdirectories
print(class_names)

**Plot one random image of bacterial-pneumonia lung (it can be changed to others by changing the target_class arg)**

In [13]:
import matplotlib.image as mpimg
from matplotlib.pyplot import figure
import random



def view_random_image(target_dir, target_class):
    # Setup the target directory 
    target_folder = target_dir+target_class

    # Get a random image path
    random_image = random.sample(os.listdir(target_folder), 1)
    print(random_image)
    
    # Read in the image and plot it using matplotlib
    plt.figure(figsize=(7, 5))
    plt.subplot(1, 1,1)
    
    
    
    
    img = mpimg.imread(target_folder + "/" + random_image[0])
    plt.imshow(img,cmap='gray')
    plt.title(target_class)
    plt.axis("off");
    print(f"Image shape: {img.shape}") # show the shape of the image

    return img

In [14]:
#dir and class can be changed
image_1= view_random_image(target_dir="/dataset/valid/",
                        target_class="Bacterial-pneumonia")

# Preprocessing data for the neural network

* Validation is used when fitting the model
* This ensures that hyperparameter tuning isnt chosen based on the unseen test data

In [15]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
IMG_SIZE = (224, 224)
BATCH_SIZE = 64

tf.random.set_seed(42)

#Define training and testing directories
train_dir = "/dataset/train"
valid_dir="/dataset/valid"
test_dir = "/dataset/test"

#Normalize images
train_aug = ImageDataGenerator(rescale=1/255.,
                               shear_range=0.1,
                               rotation_range=20,
                               zoom_range=0.1)

valid_gen=ImageDataGenerator(rescale=1/255.)
test_gen = ImageDataGenerator(rescale=1/255.)


train_data = train_aug.flow_from_directory(train_dir,
                                          target_size=IMG_SIZE,
                                          color_mode='grayscale',
                                          batch_size=BATCH_SIZE,
                                          class_mode="categorical")

valid_data=valid_gen.flow_from_directory(valid_dir,
                                        target_size=IMG_SIZE,
                                        color_mode='grayscale',
                                        batch_size=BATCH_SIZE,
                                        class_mode="categorical")

test_data = test_gen.flow_from_directory(test_dir,
                                        target_size=IMG_SIZE,
                                        color_mode='grayscale',
                                        batch_size=BATCH_SIZE,
                                        class_mode="categorical")

* **4228** images for training
* **1056** for validation
* **624** for testing purpose

In [16]:
#Plot three images of augmented training data
for _ in range(3):
    img, label = train_data.next()
    plt.figure(figsize=(7, 7))
    plt.imshow(img[0],cmap="gray")
    plt.show()

In [17]:
import datetime




def create_tensorboard_callback(dir_name, experiment_name):

    #store log files with filepath to tensorboard
    log_dir = dir_name + "/" + experiment_name + "/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=log_dir
    )
    print(f"Saving TensorBoard log files to: {log_dir}")
    return tensorboard_callback

**Create sequential deep learning model**

* Get prediction probabilites later on using softmax activation function in dense layer
* Use categorical crossentropy as loss metric

In [18]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPool2D, Flatten, Dense, Activation,BatchNormalization, Dropout

#Create model, increase filter and decrease kernel as going deeper since pixels are bigger than 128x128. This is the AlexNet neural netowrk
#architecture

model=Sequential([
    Conv2D(96, 11, 4, activation='relu', input_shape=(224,224,1)),
    BatchNormalization(),
    MaxPool2D(3, 2),
    Conv2D(256, 5, 1, activation='relu', padding="same"),
    BatchNormalization(),
    MaxPool2D(3, 2),
    Conv2D(384, 3, 1, activation='relu', padding="same"),
    BatchNormalization(),
    Conv2D(384,3, 1, activation='relu', padding="same"),
    BatchNormalization(),
    Conv2D(256, 3, 1, activation='relu', padding="same"),
    BatchNormalization(),
    MaxPool2D(3, 2),
    Flatten(),
    Dense(4096, activation='relu'),
    
    #set dropout to regularize
    Dropout(0.5),
    Dense(4096, activation='relu'),
    Dropout(0.5),
    Dense(3, activation='softmax')
])

# Compile
model.compile(loss="categorical_crossentropy",
                optimizer=tf.keras.optimizers.Adam(learning_rate=0.001,decay=1e-5),
                metrics=["AUC"])

In [19]:
# Set checkpoint path
checkpoint_path = "weights/checkpoint.ckpt"

# Create a ModelCheckpoint callback that saves the model's weights only
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                         save_best_only=True,
                                                         save_freq="epoch", # save every epoch
                                                         verbose=1)

In [20]:
# Fit the model saving checkpoints every epoch
epochs = 50

#Train the model with 150 epochs
history = model.fit(train_data,
                          epochs=epochs,
                          steps_per_epoch=train_data.samples//BATCH_SIZE,
                          validation_data=valid_data,
                          validation_steps=len(valid_data),
                          callbacks=[create_tensorboard_callback(dir_name="history_callback",
                                                                                 experiment_name="Chest_Xray"),
                                                     checkpoint_callback])



**Evaluating the model as it currently is versus the best fitted model during training**

In [None]:
model.evaluate(test_data)

In [None]:
#Load best weights from the saved model on checkpoint
model_best_weights= tf.keras.models.load_model('weights/checkpoint.ckpt')

### The model with best weights achieved an accuracy of roughly 82%

In [None]:
model_best_weights.evaluate(test_data)

# Prediction and visualization

* Predictions are made with the model fitted with best weights. Perhaps better result could be achieved by training longer

In [None]:
y_pred = model_best_weights.predict(test_data)
y_pred.shape

In [None]:
y_pred

**Predict randomly and plot**

This section takes 4 images out of testing data and classifies them using the model that was trained

The actual and predicted categories plus their prediction probability is also plotted

In [None]:
# Create a function to load and prepare images for prediction
def load_and_prep_image(filename, img_shape=224, scale=True):

  # Read in the image
    img = tf.io.read_file(filename)

    # Decode image into tensor
    img = tf.io.decode_image(img, channels=1)

    # Resize the image
    img = tf.image.resize(img, [img_shape, img_shape])

    # Scale? Yes/no
    if scale:
    # rescale the image (get all values between 0 and 1)
        return img/255.
    else:
        return img 

In [None]:
# Make preds on a series of random images
import os
import random

plt.figure(figsize=(17, 10))

#get for random images from testdataset and use model to predict infection
for i in range(4):
    
  # Choose random image(s) from random class(es)
    class_name = random.choice(class_names)
    filename = random.choice(os.listdir(test_dir + "/" + class_name))
    filepath = test_dir + "/"+ class_name + "/" + filename

    # Load the image and make predictions
    img = load_and_prep_image(filepath)
    
    img_expanded = tf.expand_dims(img, axis=0)
    print(img_expanded.shape)
    pred_prob = model_best_weights.predict(img_expanded) # get prediction probabilities array
    pred_class = class_names[pred_prob.argmax()] # get highest prediction probability index and match it class_names list
    #slice out last dimension
    img = img[:,:,0]
    print(pred_prob)
    plt.subplot(2, 2,i+1)
    # Plot the images
    print(filename)
    plt.imshow(img,cmap='gray')
    if (class_name == pred_class): # if predicted class matches truth class, make text green
        title_color = "g"
    else:
        title_color = "r"
    plt.title(f"Actual class: {class_name}, Pred class: {pred_class}, Pred prob: {pred_prob.max():.2f}%", c=title_color)
    plt.axis(False);