## Optimisation of X-ray Analysis - Diagnosis of Pneumonia using machine learning

# Notebook: X-ray Image Analysis for Pneumonia Diagnosis

## Description

This notebook provides the implementation details for automating the analysis of chest X-ray images to diagnose pneumonia using Transfer Learning and Deep Template Matching. It includes the hardware requirements, necessary libraries, and steps to build and evaluate the models.

## Authors
Marzena Halama

Oliwia

Michał Gruszkowski



## Required Libraries
The following Python libraries are required for this project:
- TensorFlow
- Keras
- OpenCV
- NumPy
- Pandas
- Matplotlib
- Scikit-learn

## Models
This notebook implements and compares two machine learning models:
1. **Transfer Learning**: Utilizing pre-trained models to fine-tune on the X-ray image dataset for pneumonia detection.
2. **Deep Template Matching**: Creating templates from X-ray images and using deep learning techniques to match and identify pneumonia-related patterns.


## Dataset
The project requires downloading the chest X-ray image dataset from Kaggle. Follow these steps to obtain the dataset:

Download the dataset from [Kaggle](https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia/data).

Extract the dataset to a directory accessible by the notebook.

## Model Evaluation
To assess the effectiveness of the machine learning models, the following metrics are used:
- **Accuracy**: The proportion of correctly predicted instances out of the total instances.
- **F1 Score**: The harmonic mean of precision and recall, providing a balance between the two.

## Pre-trained Models
We will use the following pre-trained models as base models for Transfer Learning:
1. **MobileNetv2** - details of this model can be found on the [MobileNetv2](https://keras.io/api/applications/mobilenet/#mobilenetv2-function)
2. **EfficientNetV2B0** - details of this model can be found on the [EfficientNetV2B0](https://keras.io/api/applications/efficientnet_v2/#efficientnetv2b0-function)
3. **ResNetV2** - details of this model can be found on the [ResNetv2](https://keras.io/api/applications/resnet/#resnet50v2-function)


In [1]:
# Importing the required libraries
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.metrics import (roc_curve, auc, precision_recall_fscore_support, f1_score, confusion_matrix, ConfusionMatrixDisplay)
from sklearn.preprocessing import label_binarize
from itertools import cycle

# Libraries required for the transfer learning with pre-trained models 
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications import EfficientNetV2B0
from tensorflow.keras.applications import ResNet50


  from pandas.core import (


## Data Preprocessing
Depending on the pre-trained model used, different data preprocessing techniques may be required. Ensure to follow the specific preprocessing steps suitable for the chosen model to achieve optimal results.


In [19]:
# Libraries required for the transfer learning with pre-trained models 
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input as Pre_input_MN
from tensorflow.keras.applications.efficientnet_v2 import preprocess_input as Pre_input_EF
from tensorflow.keras.applications.resnet import preprocess_input as Pre_input_RN


## Parameters Description:
In this section, we define the parameters that will be used in the model training and evaluation process:

- **img_size**: This defines the dimensions of the input images, which can vary depending on the model being used.

- **batch_size**: This is the number of training examples utilized in one iteration. We will use a batch size of 32.
- **n_classes**: This indicates the number of classes for classification. Since we are dealing with a binary classification problem (pneumonia vs. no pneumonia), n_classes is set to 2.
- **validation_split**: This is the proportion of the dataset to include in the validation split. We will use 20% of the data for validation.
- **seed**: This is the random seed for shuffling and transformations. Setting a seed ensures reproducibility. We use the seed value of 1337.

In [1]:
# Parameters
img_size = (224,224)  # Dimensions of the images depending on the model
batch_size = 32  # Batch size
n_classes = 1  # Number of classes for binary classification
validation_split = 0.2  # Proportion of the dataset to use for validation
seed = 1337  # seed for reproducibility


## Transfer Learning

Transfer learning is a machine learning technique where a model developed for one task is reused as the starting point for a model on a second task. 

This approach leverages the knowledge gained while solving one problem to address a different but related problem, often leading to significant 

improvements in performance and efficiency.

![Transfer Learning](./TL_scheme.png)


### Benefits
- **Reduced Training Time:** Leveraging pre-trained models significantly cuts down the time required to train a new model.

- **Improved Performance:** Models often achieve higher accuracy and better generalization on the new task.

- **Reduced data requirements**



| Name             | Size (MB) | Parameters |
|------------------|-----------|------------|
| MobileNetV2      | 16        | 4.3M       |
| ResNet50V2       | 98        | 25.6M      |
| EfficientNetV2B0 | 29        | 7.2M       |


In [4]:
# Base model MobileNetV2
model_MN = MobileNetV2(
    include_top=True,
    weights="imagenet",
    input_tensor=None,
    input_shape=None,
    pooling='avg',
    classes=1000,
    classifier_activation="softmax"
)
# creating a model without the top layer 
inter_output_model_MN = tf.keras.Model(inputs=model_MN.input,
                           outputs=model_MN.layers[-2].output)
                           

# Base model EfficientNetV2B0
model_EF = EfficientNetV2B0(
    include_top=True,
    weights="imagenet",
    input_tensor=None,
    input_shape=None,
    pooling='avg',
    classes=1000,
    classifier_activation="softmax",
)

# creating a model without the top layer 

inter_output_model_EF = tf.keras.Model(inputs=model_EF.input,
                           outputs=model_EF.layers[-2].output)



# Base model ResNet50
model_RN = ResNet50(
    include_top=True,
    weights="imagenet",
    input_tensor=None,
    input_shape=None,
    pooling='avg',
    classes=1000,
    classifier_activation="softmax",
)

# creating a model without the top layer 
inter_output_model_RN = tf.keras.Model(inputs=model_RN.input, 
                            outputs=model_RN.layers[-2].output)

## Loading images from the database



In [5]:
train_ds_path = 'train'
test_ds_path = 'test'

In [17]:

def create_train_val_datasets(train_path, validation_split, seed, image_size, batch_size):
    train_ds = tf.keras.preprocessing.image_dataset_from_directory(
        train_path,
        validation_split=validation_split,
        subset="training",
        seed=seed,
        image_size=image_size,
        batch_size=batch_size,
        color_mode='grayscale'
    )

    val_ds = tf.keras.preprocessing.image_dataset_from_directory(
        train_path,
        validation_split=validation_split,
        subset="validation",
        seed=seed,
        image_size=image_size,
        batch_size=batch_size,
        color_mode='grayscale'
    )

    return train_ds, val_ds

test_ds = tf.keras.preprocessing.image_dataset_from_directory(
    test_ds_path,
    seed=seed,
    image_size=img_size,
    batch_size=batch_size,
    color_mode='grayscale'
)


train_ds, val_ds = create_train_val_datasets(
    train_ds_path,
    validation_split,
    seed,
    image_size=img_size,
    batch_size=batch_size
)


Found 624 files belonging to 2 classes.
Found 5232 files belonging to 2 classes.
Using 4186 files for training.
Found 5232 files belonging to 2 classes.
Using 1046 files for validation.


In [25]:
def Pre_input_EF(x):
    x = tf.image.grayscale_to_rgb(x)
    x = tf.keras.applications.efficientnet.preprocess_input(x)
    return x

def Pre_input_MN(x):
    x = tf.image.grayscale_to_rgb(x)
    x = tf.keras.applications.mobilenet_v2.preprocess_input(x)
    return x

def Pre_input_RN(x):
    x = tf.image.grayscale_to_rgb(x)
    x = tf.keras.applications.resnet_v2.preprocess_input(x)
    return x


## pre-processing and data normalisation

In [37]:

# To create a based model, select one of the three available pre-trained models:
# MobileNetV2 > inter_output_model_MN
# EfficientNetV2B0 > inter_output_model_EF
# ResNetV2 > inter_output_model_RN



normalized_ds_train = train_ds.map(lambda x, y: (Pre_input_EF(x), y))

x = inter_output_model_EF.output
x = Dense(1024, activation='relu')(x) 
x = Dropout(0.5)(x)  
predictions = Dense(n_classes, activation='sigmoid')(x)  


model_EF = Model(inputs=inter_output_model_EF.input, outputs=predictions)



normalized_ds_train = train_ds.map(lambda x, y: (Pre_input_MN(x), y))

x = inter_output_model_MN.output
x = Dense(1024, activation='relu')(x) 
x = Dropout(0.5)(x)  
predictions = Dense(n_classes, activation='sigmoid')(x)  


model_MN = Model(inputs=inter_output_model_MN.input, outputs=predictions)




normalized_ds_train = train_ds.map(lambda x, y: (Pre_input_RN(x), y))

x = inter_output_model_RN.output
x = Dense(1024, activation='relu')(x) 
x = Dropout(0.5)(x)  
predictions = Dense(n_classes, activation='sigmoid')(x)  


model_RN = Model(inputs=inter_output_model_RN.input, outputs=predictions)


### Convert grayscale images to three-channel images by stacking the grayscale data along the third dimension:

In [38]:
def preprocess_image(image):
    image = tf.image.resize(image, (224,224))
    image = tf.image.grayscale_to_rgb(image) 
    image = preprocess_input(image)
    return image

In [39]:
normalized_ds_train = tf.data.Dataset.from_generator(
    lambda: ((preprocess_image(x), y) for x, y in train_ds),
    output_types=(tf.float32, tf.float32),
    output_shapes=((224, 224, 3), ())
)


In [40]:
def preprocess_dataset(dataset):
    images = []
    labels = []
    for batch in dataset:
        batch_images, batch_labels = batch
        for img, lbl in zip(batch_images, batch_labels):
            img = preprocess_image(img)
            images.append(img.numpy())
            labels.append(lbl.numpy())
    return np.array(images), np.array(labels)

X_train, Y_train = preprocess_dataset(train_ds)
X_val, Y_val = preprocess_dataset(val_ds)
X_test, Y_test = preprocess_dataset(test_ds)

normalized_ds_train = train_ds

In [41]:

# Extraction of X_train and Y_train from normalised data
X_train = np.concatenate([x.numpy() for x, y in normalized_ds_train], axis=0)
Y_train = np.concatenate([y.numpy() for x, y in normalized_ds_train], axis=0)

In [42]:
def convert_to_three_channels(images):
    if images.shape[-1] == 1:
        images = np.repeat(images, 3, axis=-1)
    return images

In [43]:
X_val = convert_to_three_channels(X_val)
X_train = convert_to_three_channels(X_train)

In [44]:
class F1ScoreCallback(Callback):
    def __init__(self, val_ds):
        super().__init__()
        self.val_ds = val_ds
        self.results = []

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}

        # Oblicz F1-score dla zbioru walidacyjnego
        val_predictions = (self.model.predict(self.val_ds[0]) > 0.5).astype(int)
        val_labels = self.val_ds[1]
        val_f1 = f1_score(val_labels, val_predictions)

        # Dodaj wyniki do listy
        self.results.append({
            'epoch': epoch + 1,
            'accuracy': logs.get('accuracy'),
            'val_accuracy': logs.get('val_accuracy'),
            'precision': logs.get('precision'),
            'val_precision': logs.get('val_precision'),
            'recall': logs.get('recall'),
            'val_recall': logs.get('val_recall'),
            'val_f1': val_f1
        })

        # Dodaj F1-score do logów
        logs['val_f1'] = val_f1
        print(f' — val_f1: {val_f1:.4f}')

# Inicjalizacja callbacku
f1_callback = F1ScoreCallback((X_val, Y_val))


# Compilation of the models

In [45]:
model_MN.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [46]:
model_EF.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [47]:
model_RN.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Training and performance evaluation of models 

In [None]:
# Training the model
history_1 = model_MN.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=30,
    callbacks=[f1_callback]
)

# Evaluation on training set
train_predictions = (model_MN.predict(X_train) > 0.5).astype(int)
train_labels = Y_train  # Assuming Y_train contains the true labels
train_f1 = f1_score(train_labels, train_predictions, average='weighted')
print(f'Train F1-score: {train_f1:.4f}')

# Evaluation on validation set
val_predictions = (model_MN.predict(X_val) > 0.5).astype(int)
val_labels = Y_val  # Assuming Y_val contains the true labels
val_f1 = f1_score(val_labels, val_predictions, average='weighted')
print(f'Validation F1-score: {val_f1:.4f}')

# Creating DataFrame with results
results_df = pd.DataFrame({
    'epoch': range(1, 2),  # Adjust this range based on the number of epochs
    'train_f1': [train_f1],  # Placeholder, update with actual F1 scores per epoch if needed
    'val_f1': [val_f1]  # Assuming you have val_f1 stored in the callback
})
results_df.loc['Final'] = ['Final', train_f1, val_f1]

# Save results to Excel
results_df.to_excel("training_results_MN.xlsx", index=False)
print("Results saved to training_results_MN.xlsx")



In [None]:
# Training the model
history_1 = model_EF.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=30,
    callbacks=[f1_callback]
)

# Evaluation on training set
train_predictions = (model_EF.predict(X_train) > 0.5).astype(int)
train_labels = Y_train  # Assuming Y_train contains the true labels
train_f1 = f1_score(train_labels, train_predictions, average='weighted')
print(f'Train F1-score: {train_f1:.4f}')

# Evaluation on validation set
val_predictions = (model_EF.predict(X_val) > 0.5).astype(int)
val_labels = Y_val  # Assuming Y_val contains the true labels
val_f1 = f1_score(val_labels, val_predictions, average='weighted')
print(f'Validation F1-score: {val_f1:.4f}')

# Creating DataFrame with results
results_df = pd.DataFrame({
    'epoch': range(1, 2),  # Adjust this range based on the number of epochs
    'train_f1': [train_f1],  # Placeholder, update with actual F1 scores per epoch if needed
    'val_f1': [val_f1]  # Assuming you have val_f1 stored in the callback
})
results_df.loc['Final'] = ['Final', train_f1, val_f1]

# Save results to Excel
results_df.to_excel("training_results_EF.xlsx", index=False)
print("Results saved to training_results_EF.xlsx")

In [None]:
# Training the model
history_1 = model_RN.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=30,
    callbacks=[f1_callback]
)

# Evaluation on training set
train_predictions = (model_RN.predict(X_train) > 0.5).astype(int)
train_labels = Y_train  # Assuming Y_train contains the true labels
train_f1 = f1_score(train_labels, train_predictions, average='weighted')
print(f'Train F1-score: {train_f1:.4f}')

# Evaluation on validation set
val_predictions = (model_RN.predict(X_val) > 0.5).astype(int)
val_labels = Y_val  # Assuming Y_val contains the true labels
val_f1 = f1_score(val_labels, val_predictions, average='weighted')
print(f'Validation F1-score: {val_f1:.4f}')

# Creating DataFrame with results
results_df = pd.DataFrame({
    'epoch': range(1, 2),  # Adjust this range based on the number of epochs
    'train_f1': [train_f1],  # Placeholder, update with actual F1 scores per epoch if needed
    'val_f1': [val_f1]  # Assuming you have val_f1 stored in the callback
})
results_df.loc['Final'] = ['Final', train_f1, val_f1]

# Save results to Excel
results_df.to_excel("training_results_RN.xlsx", index=False)
print("Results saved to training_results_RN.xlsx")