# Machine Learning model for the pneumonia detection

**Author:** [Andranik Fakirian](https://www.tumblr.com/andranikfakirian)<br>
**Date created:** 2024/12/06<br>
**Last modified:** 2025/1/17<br>
**Description:** Create a 3D convolutional neural network using self-supervised learning methods to predict presence of pneumonia.

## Abstract

This clean project code shows the steps used to build a 3D convolutional neural network (CNN)
to predict the presence of viral pneumonia in computer tomography (CT) scans using self-supervised learning methods. To do this, firstly, build an autoencoder model using multiple lung datasets and augmentation methods. And secondly, build a classifier using the previously built autoencoder as a preset and the pneumonia dataset.

## References

References from the basic code

- [A survey on Deep Learning Advances on Different 3D DataRepresentations](https://arxiv.org/pdf/1808.01462.pdf)
- [VoxNet: A 3D Convolutional Neural Network for Real-Time Object Recognition](https://www.ri.cmu.edu/pub_files/2015/9/voxnet_maturana_scherer_iros15.pdf)
- [FusionNet: 3D Object Classification Using MultipleData Representations](http://3ddl.cs.princeton.edu/2016/papers/Hegde_Zadeh.pdf)
- [Uniformizing Techniques to Process CT scans with 3D CNNs for Tuberculosis Prediction](https://arxiv.org/abs/2007.13224)

Basic code

- [Zunair, H.(2020). Tutorial on 3D Image Classification](https://github.com/hasibzunair/3D-image-classification-tutorial/tree/master)

Additional references

- [UNet3D: Create 3-D U-Net convolutional neural network for semantic segmentation of volumetric images](https://se.mathworks.com/help/vision/ref/unet3d.html)
- [PlatiPy: Processing Library and Analysis Toolkit for Medical Imaging in Python](https://pyplati.github.io/platipy/getting_started.html)
- [A series of Russian articles about Self-Supervised Learning on Habr](https://habr.com/ru/articles/704710/)

## Setup

In [None]:
import os
import zipfile
import numpy as np
import tensorflow as tf
import gc
from tqdm import tqdm

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.python.framework.tensor_shape import TensorShape
from datetime import datetime

In case you are running code in Google Colab, run the cell below to use Google Drive.

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

For the code to work correctly if it is run in the Colab environment, this project uses the path-variable. Replace the path in the variable with the commented one in case you are running the code in Colab.

In [None]:
curr_path=os.path.join(os.getcwd()) #os.path.join(os.getcwd(), 'drive', 'MyDrive', 'Your_Folder')
print(curr_path)

## Downloading the MosMedData: Chest CT Scans with COVID-19 Related Findings

In this project, a subset of the
[MosMedData: Chest CT Scans with COVID-19 Related Findings](https://www.medrxiv.org/content/10.1101/2020.05.20.20100362v1) is used.
This dataset consists of lung CT scans with COVID-19 related findings, as well as without such findings.

The associated radiological findings of the CT scans will be used as labels to build
a classifier to predict presence of viral pneumonia.
Hence, the final task is a binary classification problem.

In [None]:
# Download url of normal CT scans.
url = "https://github.com/hasibzunair/3D-image-classification-tutorial/releases/download/v0.2/CT-0.zip"
filename = os.path.join(curr_path, "CT-0.zip")
keras.utils.get_file(origin=url, cache_dir=curr_path)

# Download url of abnormal CT scans.
url = "https://github.com/hasibzunair/3D-image-classification-tutorial/releases/download/v0.2/CT-23.zip"
filename = os.path.join(curr_path, "CT-23.zip")
keras.utils.get_file(origin=url, cache_dir=curr_path)

# Make a directory to store the data.
os.makedirs(os.path.join(curr_path, "MosMedData"))

# Unzip data in the newly created directory.
with zipfile.ZipFile(os.path.join(curr_path, "datasets", "CT-0.zip"), "r") as z_fp:
    z_fp.extractall(os.path.join(curr_path, "MosMedData"))

with zipfile.ZipFile(os.path.join(curr_path, "datasets", "CT-23.zip"), "r") as z_fp:
    z_fp.extractall(os.path.join(curr_path, "MosMedData"))

## Loading data and preprocessing

The files are provided in Nifti format with the extension .nii. To read the
scans, the `nibabel` package is used.
You can install the package via `pip install nibabel`. CT scans store raw voxel
intensity in Hounsfield units (HU). They range from -1024 to above 2000 in this dataset.
Above 400 are bones with different radiointensity, so this is used as a higher bound. A threshold
between -1000 and 400 is commonly used to normalize CT scans.

To process the data, the following are done:

* To fix the orientation, volumes are first rotated by 90 degrees for non-platipy datasets
* The HU values are scaled to be between 0 and 1.
* Also width, height and depth are resized.

Here several helper functions are defined to process the data. These functions
will be used when building training and validation datasets.

In [None]:
import nibabel as nib

from scipy import ndimage


def read_nifti_file(filepath):
    """Read and load volume"""
    # Read file
    scan = nib.load(filepath)
    # Get raw data
    scan = scan.get_fdata()
    return scan


def normalize(volume):
    """Normalize the volume"""
    min = -1000
    max = 400
    volume[volume < min] = min
    volume[volume > max] = max
    volume = (volume - min) / (max - min)
    volume = volume.astype("float32")
    return volume


def resize_volume(img, platipy = False):
    """Resize across z-axis"""
    # Set the desired depth
    desired_depth = 64
    desired_width = 128
    desired_height = 128
    # Get current depth
    current_depth = img.shape[-1]
    current_width = img.shape[0]
    current_height = img.shape[1]
    # Compute depth factor
    depth = current_depth / desired_depth
    width = current_width / desired_width
    height = current_height / desired_height
    depth_factor = 1 / depth
    width_factor = 1 / width
    height_factor = 1 / height
    # Rotate
    angle = 90
    if platipy:
        angle*=-1
    img = ndimage.rotate(img, angle, reshape=False)
    # Resize across z-axis
    img = ndimage.zoom(img, (width_factor, height_factor, depth_factor), order=1)
    return img


def process_scan(path):
    """Read and resize volume"""
    # Read scan
    volume = read_nifti_file(path)
    # Normalize
    volume = normalize(volume)
    # Resize width, height and depth
    volume = resize_volume(volume)
    return volume

Let's read the paths of the CT scans from the class directories.

In [None]:
# Folder "CT-0" consist of CT scans having normal lung tissue,
# no CT-signs of viral pneumonia.
normal_scan_paths = [
    os.path.join(curr_path, "MosMedData/CT-0", x)
    for x in os.listdir(os.path.join(curr_path, "MosMedData/CT-0"))
]
# Folder "CT-23" consist of CT scans having several ground-glass opacifications,
# involvement of lung parenchyma.
abnormal_scan_paths = [
    os.path.join(curr_path, "MosMedData/CT-23", x)
    for x in os.listdir(os.path.join(curr_path, "MosMedData/CT-23"))
]

print("CT scans with normal lung tissue: " + str(len(normal_scan_paths)))
print("CT scans with abnormal lung tissue: " + str(len(abnormal_scan_paths)))

## Build train and validation datasets
Read the scans from the class directories and assign labels. Downsample the scans to have
shape of 128x128x64. Rescale the raw HU values to the range 0 to 1.
Lastly, split the dataset into train and validation subsets.

In [None]:
# Read and process the scans.
# Each scan is resized across height, width, and depth and rescaled.
abnormal_scans=[]
for path in tqdm(abnormal_scan_paths):
    abnormal_scans.append(process_scan(path))
abnormal_scans = np.array(abnormal_scans)
normal_scans=[]
for path in tqdm(normal_scan_paths):
    normal_scans.append(process_scan(path))
normal_scans = np.array(normal_scans)

# For the CT scans having presence of viral pneumonia
# assign 1, for the normal ones assign 0.
abnormal_labels = np.ones(len(abnormal_scans))
normal_labels = np.zeros(len(normal_scans))

# Split data in the ratio 70-30 for training and validation.
x_train = np.concatenate((abnormal_scans[:70], normal_scans[:70]), axis=0)
y_train = np.concatenate((abnormal_labels[:70], normal_labels[:70]), axis=0)
x_val = np.concatenate((abnormal_scans[70:], normal_scans[70:]), axis=0)
y_val = np.concatenate((abnormal_labels[70:], normal_labels[70:]), axis=0)
print(
    "Number of samples in train and validation are %d and %d."
    % (x_train.shape[0], x_val.shape[0])
)

Save the created subsets and flip scans to orient them and subsequent scans similar way (by orienting hearts e.g. to the left).

In [None]:
np.save(os.path.join(curr_path, 'x_train'), np.flip(x_train, axis = 2))
np.save(os.path.join(curr_path, 'y_train'), y_train)
np.save(os.path.join(curr_path, 'x_val'), np.flip(x_val, axis = 2))
np.save(os.path.join(curr_path, 'y_val'), y_val)

## Additional datasets
To train the autoencoder, additional sets of CT scans of lungs are needed.
Thus let's do similar things with new bunch of datasets from the PlatiPy library. By the way, among all the scans of lungs, it contains only the cancer-affected ones.

P.S.: This part of the <ins>clean code</ins> hasn't been tested, therefore it may not work correctly.

In [1]:
import SimpleITK as sitk

from platipy.imaging.tests.data import get_lung_nifti
from platipy.imaging.projects.bronchus.run import run_bronchus_segmentation
from platipy.imaging import ImageVisualiser
from platipy.imaging.label.utils import get_com
from datetime import datetime
from tqdm import tqdm
import numpy as np
import os
from platipy.dicom.download.tcia import (
    get_collections,
    get_modalities_in_collection,
    get_patients_in_collection,
    fetch_data
)

**Behold** the list of available datasets in the Platipy library.

In [None]:
collections = get_collections()
collections

Let's select a dataset and see how many elements there are in it.

In [None]:
collection = 'LCTSC'
patients = get_patients_in_collection(collection)
print(len(patients))

Downloading a dataset. It's not very voluminous ([60 subjects](https://www.cancerimagingarchive.net/collection/lctsc/)), therefore this can be done in one go.

In [None]:
DATA=[]
for patient_id in tqdm(patients):
    try:
        data = fetch_data(
            collection,
            patient_ids=[patient_id],
            modalities=["CT"],
            nifti=True
        )
        DATA.append(data)
    except:
        pass
print(len(DATA))

Collect all the paths of the CT scans and split them into train and test groups.

In [None]:
LCTSC_train_paths = [
    os.path.join(curr_path, "tcia\\LCTSC", x, "NIFTI", x.upper().replace('-', '_'), "IMAGES", os.listdir(os.path.join("tcia\\LCTSC", x, "NIFTI", x.upper().replace('-', '_'), "IMAGES"))[0])
    for x in os.listdir("tcia\\LCTSC") if "Train" in x
]
LCTSC_test_paths = [
    os.path.join(curr_path, "tcia\\LCTSC", x, "NIFTI", x.upper().replace('-', '_'), "IMAGES", os.listdir(os.path.join("tcia\\LCTSC", x, "NIFTI", x.upper().replace('-', '_'), "IMAGES"))[0])
    for x in os.listdir("tcia\\LCTSC") if "Test" in x
]

Read scans from the directories described above in the same way.

In [None]:
LCTSC_train_scans=[]
for path in tqdm(LCTSC_train_paths):
    LCTSC_train_scans.append(process_scan(path, platipy=True))
LCTSC_train_scans = np.array(LCTSC_train_scans)
LCTSC_test_scans=[]
for path in tqdm(LCTSC_test_paths):
    LCTSC_test_scans.append(process_scan(path, platipy=True))
LCTSC_test_scans = np.array(LCTSC_test_scans)

LCTSC_train = LCTSC_train_scans
LCTSC_test = LCTSC_test_scans

Save the created subsets and flip scans if necessary.

In [None]:
np.save('LCTSC_train', np.flip(LCTSC_train, axis = 3))
np.save('LCTSC_test', LCTSC_test)

Similarly, select another dataset.

In [None]:
collection = 'Lung-PET-CT-Dx'
patients = get_patients_in_collection(collection)
print(len(patients))

Let's download a dataset. It's really bulky ([355 subjects](https://www.cancerimagingarchive.net/collection/lung-pet-ct-dx/)), this could be difficult to be done in one go. Therefore, "the design of the slices" was added for convenience.

In [None]:
DATA=[]
for patient_id in tqdm(patients[:]):
    try:
        data = fetch_data(
            collection,
            patient_ids=[patient_id],
            modalities=["CT"],
            nifti=True
        )
        DATA.append(data)
    except:
        pass
print(len(DATA))

Browse and collect paths of the CT scans and divide them into 4 groups according to the cancer type.

In [None]:
temp_paths=[
    os.path.join(curr_path, "tcia\\Lung-PET-CT-Dx", x, "NIFTI", x.upper().replace('-', '_'), "IMAGES")
    for x in os.listdir("tcia\\Lung-PET-CT-Dx") if ("A" in x) and os.path.exists(os.path.join(curr_path, "tcia\\Lung-PET-CT-Dx", x, "NIFTI"))
]
PET_A_paths=[]
for p in temp_paths:
    for x in os.listdir(p):
        PET_A_paths.append(os.path.join(p, x))
temp_paths=[
    os.path.join(curr_path, "tcia\\Lung-PET-CT-Dx", x, "NIFTI", x.upper().replace('-', '_'), "IMAGES")
    for x in os.listdir("tcia\\Lung-PET-CT-Dx") if ("B" in x) and os.path.exists(os.path.join(curr_path, "tcia\\Lung-PET-CT-Dx", x, "NIFTI"))
]
PET_B_paths=[]
for p in temp_paths:
    for x in os.listdir(p):
        PET_B_paths.append(os.path.join(p, x))
temp_paths=[
    os.path.join(curr_path, "tcia\\Lung-PET-CT-Dx", x, "NIFTI", x.upper().replace('-', '_'), "IMAGES")
    for x in os.listdir("tcia\\Lung-PET-CT-Dx") if ("E" in x) and os.path.exists(os.path.join(curr_path, "tcia\\Lung-PET-CT-Dx", x, "NIFTI"))
]
PET_E_paths=[]
for p in temp_paths:
    for x in os.listdir(p):
        PET_E_paths.append(os.path.join(p, x))
temp_paths=[
    os.path.join(curr_path, "tcia\\Lung-PET-CT-Dx", x, "NIFTI", x.upper().replace('-', '_'), "IMAGES")
    for x in os.listdir("tcia\\Lung-PET-CT-Dx") if ("G" in x) and os.path.exists(os.path.join(curr_path, "tcia\\Lung-PET-CT-Dx", x, "NIFTI"))
]
PET_G_paths=[]
for p in temp_paths:
    for x in os.listdir(p):
        PET_G_paths.append(os.path.join(p, x))

Read scans from the directories described above similarly.

In [None]:
PET_A_scans=[]
for path in tqdm(PET_A_paths):
    PET_A_scans.append(process_scan(path))
PET_A_scans = np.array(PET_A_scans)
PET_B_scans=[]
for path in tqdm(PET_B_paths):
    PET_B_scans.append(process_scan(path))
PET_B_scans = np.array(PET_B_scans)
PET_E_scans=[]
for path in tqdm(PET_E_paths):
    PET_E_scans.append(process_scan(path))
PET_E_scans = np.array(PET_E_scans)
PET_G_scans=[]
for path in tqdm(PET_G_paths):
    PET_G_scans.append(process_scan(path))
PET_G_scans = np.array(PET_G_scans)

PET_A = PET_A_scans
PET_B = PET_B_scans
PET_E = PET_E_scans
PET_G = PET_G_scans

Save the created subsets and flip scans if needed.

In [None]:
#np.save('PET_A', np.flip(PET_A))
np.save('PET_B', np.flip(PET_B, axis = 3))
np.save('PET_E', np.flip(PET_E, axis = 3))
np.save('PET_G', np.flip(PET_G, axis = 3))

To reduce the file size of A-group of scans, split this dataset into 4 "subdatasets".

In [None]:
delta = PET_A.shape[0]//4
PET_A1=PET_A[:delta]
PET_A2=PET_A[delta:2*delta]
PET_A3=PET_A[2*delta:3*delta]
PET_A4=PET_A[3*delta:]

Save the created subsets and flip scans if necessary.

In [None]:
np.save('PET_A1', np.flip(PET_A1, axis = 3))
np.save('PET_A2', np.flip(PET_A2, axis = 3))
np.save('PET_A3', np.flip(PET_A3, axis = 3))
np.save('PET_A4', np.flip(PET_A4, axis = 3))

## Data augmentation

The CT scans also augmented by rotating at random angles during training. Since
the data is stored in rank-3 tensors of shape `(samples, height, width, depth)`,
we add a dimension of size 1 at axis 4 to be able to perform 3D convolutions on
the data. The new shape is thus `(samples, height, width, depth, 1)`. There are
different kinds of preprocessing and augmentation techniques out there,
this example shows a few simple ones to get started.

In [None]:
import random

from scipy import ndimage

@tf.function
def rotate(volume):
    """Rotate the volume by a few degrees"""

    def scipy_rotate(volume):
        # define some rotation angles
        angles = [-20, -10, -5, 5, 10, 20]
        # pick angles at random
        angle = random.choice(angles)
        # rotate volume
        volume = ndimage.rotate(volume, angle, reshape=False)
        volume[volume < 0] = 0
        volume[volume > 1] = 1
        return volume

    VOLUME_SHAPE = volume.shape
    augmented_volume = tf.numpy_function(scipy_rotate, [volume], tf.float32)
    augmented_volume.set_shape(VOLUME_SHAPE)
    return augmented_volume

def train_preprocessing(volume, label):
    """Process training data by rotating and adding a channel."""
    # Rotate volume
    volume = rotate(volume)
    volume = tf.expand_dims(volume, axis=3)
    return volume, label

def validation_preprocessing(volume, label):
    """Process validation data by only adding a channel."""
    volume = tf.expand_dims(volume, axis=3)
    return volume, label
def rotatePi(volume):
    angle = 180
    # rotate volume
    volume = ndimage.rotate(volume, angle, reshape=False)
    volume[volume < 0] = 0
    volume[volume > 1] = 1
    return volume
def LCTSC_preprocessing(volume, label):
    """Process LCTSC data by rotating and adding a channel."""
    volume = tf.numpy_function(rotatePi, [volume], tf.float32)
    volume = tf.expand_dims(volume, axis=3)
    return volume, label
def PET_preprocessing(volume, label):
    """Process PET data by rotating and adding a channel."""
    volume = tf.numpy_function(rotatePi, [volume], tf.float32)
    volume = tf.expand_dims(volume, axis=3)
    return volume, label

Below are other augmentation methods provided in the class structure which are used in this project.
> "Cut" cuts out some part of the image, darkening the area on it, and "noise" - makes noise on it, pretty simple.<br>
© *Andranik Fakirian, in his [blog](https://www.tumblr.com/andranikfakirian/768855980222087168/project-mlpneumonia-augmentations?source=share) (2024)*

Also there is presented a simplified preprocessing method, which is general to all datasets in this project.

In [3]:
def cut(volume, pieces, lenmin, lenmax, widthmin, widthmax):
    for v in volume:
        for i in range(pieces):
            l = np.random.randint(lenmin, lenmax)
            w = np.random.randint(widthmin, widthmax)
            shape = v.shape
            x = np.random.randint(0, shape[0]-1-l)
            y = np.random.randint(0, shape[1]-1-w)
            v[x:x+l, y:y+w]=0
    return volume
def noise(volume, mu, sig):
    for v in volume:
        shape=v.shape
        noise=np.repeat(np.expand_dims(np.random.normal(mu, sig, shape[:-1]), axis=2), shape[-1], 2)
        v+=noise
        v[v < 0] = 0
        v[v > 1] = 1
    return volume
def ordinary_preprocessing(volume, label):
    """Process data by only adding a channel."""
    volume = tf.expand_dims(volume, axis=3)
    return volume, label
class Augmentation():
    mu: float
    sig: float
    pieces: int
    lenmin: int
    lenmax: int
    widthmin: int
    widthmax: int
    def __init__(self, mu = 0, sig = 5e-2, pieces = 1, lenmin = 0, lenmax = 32, widthmin = 0, widthmax = 32):
        self.mu = mu
        self.sig = sig
        self.pieces = pieces
        self.lenmin = lenmin
        self.lenmax = lenmax
        self.widthmin = widthmin
        self.widthmax = widthmax
    def Noise(self, volume):
        augmented_volume = noise(volume, self.mu, self.sig)
        return augmented_volume
    def Cut(self, volume):
        augmented_volume = cut(volume, self.pieces, self.lenmin, self.lenmax, self.widthmin, self.widthmax)
        return augmented_volume
    def Mix(self, volume):
        augmented_volume = cut(noise(volume, self.mu, self.sig), self.pieces, self.lenmin, self.lenmax, self.widthmin, self.widthmax)
        return augmented_volume

Loading datasets to build training and validation datasets for the autoencoder.

P.S.: This part of the <ins>clean code</ins> also hasn't been tested, therefore it may not work correctly.

In [None]:
x_train = np.load(os.path.join(curr_path, 'x_train.npy'), encoding = 'bytes')
x_val = np.load(os.path.join(curr_path, 'x_val.npy'), encoding = 'bytes')
LCTSC_train = np.load(os.path.join(curr_path, 'LCTSC_train.npy'), encoding = 'bytes')
LCTSC_test = np.load(os.path.join(curr_path, 'LCTSC_test.npy'), encoding = 'bytes')
PET_G = np.load(os.path.join(curr_path, 'PET_G.npy'), encoding = 'bytes')
PET_E = np.load(os.path.join(curr_path, 'PET_E.npy'), encoding = 'bytes')
PET_B = np.load(os.path.join(curr_path, 'PET_B.npy'), encoding = 'bytes')
#PET_A1 = np.flip(np.load(os.path.join(curr_path, 'PET_A1.npy'), encoding = 'bytes')
#PET_A2 = np.flip(np.load(os.path.join(curr_path, 'PET_A2.npy'), encoding = 'bytes')
#PET_A3 = np.flip(np.load(os.path.join(curr_path, 'PET_A3.npy'), encoding = 'bytes')
PET_A4 = np.load(os.path.join(curr_path, 'PET_A4.npy'), encoding = 'bytes')
#PET_A = np.concatenate((PET_A1, PET_A2, PET_A3, PET_A4), axis = 0)
#del PET_A1, PET_A2, PET_A3, PET_A4
#gc.collect()

Splitting a mixed dataset into training and validation subsets.

In [None]:
from sklearn.model_selection import train_test_split
encoder_general = np.concatenate((LCTSC_train, LCTSC_test, PET_G, PET_E, PET_B[:40], PET_A4, x_train, x_val), axis = 0)
del LCTSC_train, LCTSC_test, PET_G, PET_E, PET_B, PET_A4, x_train, x_val
gc.collect()
encoder_train, encoder_val = train_test_split(encoder_general, test_size=0.2, random_state=42)
del encoder_general
gc.collect()
len_encoder_train, len_encoder_val = len(encoder_train), len(encoder_val)

Divide the training dataset for <ins>RAM-saving focus</ins> and save the created subsets.

In [None]:
np.save(os.path.join(curr_path, 'encoder_train1'), encoder_train[:len_encoder_train//4])
np.save(os.path.join(curr_path, 'encoder_train2'), encoder_train[len_encoder_train//4:2*len_encoder_train//4])
np.save(os.path.join(curr_path, 'encoder_train3'), encoder_train[2*len_encoder_train//4:3*len_encoder_train//4])
np.save(os.path.join(curr_path, 'encoder_train4'), encoder_train[3*len_encoder_train//4:])
np.save(os.path.join(curr_path, 'encoder_val'), encoder_val)
del encoder_train, encoder_val
gc.collect()

## Augmentation
Load prepared training and validation subsets and create complex datasets consisting of augmented scans and non-augmented ones as labels.

P.S.: The previously mentioned <ins>focus</ins> is to gradually load the training subset into the dataset.

In [None]:
A = Augmentation()
encoder_train = np.load(os.path.join(curr_path, 'encoder_train1.npy'), encoding = 'bytes')
encoder_train_length = len(encoder_train)
encoder_train_length_s = encoder_train_length
encoder_train_loader_s = tf.data.Dataset.from_tensor_slices((encoder_train.copy(), np.concatenate((A.Cut(encoder_train[:encoder_train_length//3]), A.Noise(encoder_train[encoder_train_length//3:2*encoder_train_length//3]), A.Mix(encoder_train[2*encoder_train_length//3:])), axis = 0))[::-1])
del encoder_train
gc.collect()
encoder_train = np.load(os.path.join(curr_path, 'encoder_train2.npy'), encoding = 'bytes')
encoder_train_length = len(encoder_train)
encoder_train_length_s += encoder_train_length
encoder_train_loader = tf.data.Dataset.from_tensor_slices((encoder_train.copy(), np.concatenate((A.Cut(encoder_train[:encoder_train_length//3]), A.Noise(encoder_train[encoder_train_length//3:2*encoder_train_length//3]), A.Mix(encoder_train[2*encoder_train_length//3:])), axis = 0))[::-1])
del encoder_train
gc.collect()
encoder_train_loader_s = encoder_train_loader_s.concatenate(encoder_train_loader)
del encoder_train_loader
gc.collect()
encoder_train = np.load(os.path.join(curr_path, 'encoder_train3.npy'), encoding = 'bytes')
encoder_train_length = len(encoder_train)
encoder_train_length_s += encoder_train_length
encoder_train_loader = tf.data.Dataset.from_tensor_slices((encoder_train.copy(), np.concatenate((A.Cut(encoder_train[:encoder_train_length//3]), A.Noise(encoder_train[encoder_train_length//3:2*encoder_train_length//3]), A.Mix(encoder_train[2*encoder_train_length//3:])), axis = 0))[::-1])
del encoder_train
gc.collect()
encoder_train_loader_s = encoder_train_loader_s.concatenate(encoder_train_loader)
del encoder_train_loader
gc.collect()
encoder_train = np.load(os.path.join(curr_path, 'encoder_train4.npy'), encoding = 'bytes')
encoder_train_length = len(encoder_train)
encoder_train_length_s += encoder_train_length
encoder_train_loader = tf.data.Dataset.from_tensor_slices((encoder_train.copy(), np.concatenate((A.Cut(encoder_train[:encoder_train_length//3]), A.Noise(encoder_train[encoder_train_length//3:2*encoder_train_length//3]), A.Mix(encoder_train[2*encoder_train_length//3:])), axis = 0))[::-1])
del encoder_train
gc.collect()
encoder_train_loader_s = encoder_train_loader_s.concatenate(encoder_train_loader)
del encoder_train_loader
gc.collect()
batch_size = 2
encoder_train_dataset = (
    encoder_train_loader_s.shuffle(encoder_train_length_s)
    .map(ordinary_preprocessing)
    .batch(batch_size)
    .prefetch(2)
)

In [None]:
encoder_val = np.load(os.path.join(curr_path, 'encoder_val.npy'), encoding = 'bytes')
encoder_val_length=len(encoder_val)
encoder_validation_loader = tf.data.Dataset.from_tensor_slices((encoder_val.copy(), np.concatenate((A.Cut(encoder_val[:encoder_val_length//3]), A.Noise(encoder_val[encoder_val_length//3:2*encoder_val_length//3]), A.Mix(encoder_val[2*encoder_val_length//3:])), axis = 0))[::-1])
del encoder_val
gc.collect()
encoder_validation_dataset = (
    encoder_validation_loader.shuffle(encoder_val_length)
    .map(ordinary_preprocessing)
    .batch(batch_size)
    .prefetch(2)
)

Visualize an augmented CT scan.

In [None]:
import matplotlib.pyplot as plt

data = encoder_train_dataset.take(1)
images, labels = list(data)[0]
images = images.numpy()
image = images[0]
label = labels[0]
print("Dimension of the CT scan is:", image.shape)
plt.imshow(np.squeeze(image[:, :, 15]), cmap="gray")

More detailed visualization.

In [None]:
font={'font.size': 20}
plt.rcParams.update(font)
fig, ax = plt.subplots(2, figsize=(10, 20))
ax[0].imshow(np.squeeze(image[:, :, 15]), cmap="gray")
ax[0].set_title('Augmented CT scan')
ax[1].imshow(np.squeeze(label[:, :, 15]), cmap="gray")
ax[1].set_title('Non-augmented CT scan')
plt.show()
#fig.savefig('AugCT.png')

Since a CT scan has many slices, let's visualize a montage of the slices.

In [None]:
def plot_slices(num_rows, num_columns, width, height, data):
    """Plot a montage of 20 CT slices"""
    data = np.rot90(np.array(data))
    data = np.transpose(data)
    data = np.reshape(data, (num_rows, num_columns, width, height))
    rows_data, columns_data = data.shape[0], data.shape[1]
    heights = [slc[0].shape[0] for slc in data]
    widths = [slc.shape[1] for slc in data[0]]
    fig_width = 12.0
    fig_height = fig_width * sum(heights) / sum(widths)
    f, axarr = plt.subplots(
        rows_data,
        columns_data,
        figsize=(fig_width, fig_height),
        gridspec_kw={"height_ratios": heights},
    )
    for i in range(rows_data):
        for j in range(columns_data):
            axarr[i, j].imshow(data[i][j], cmap="gray")
            axarr[i, j].axis("off")
    plt.subplots_adjust(wspace=0, hspace=0, left=0, right=1, bottom=0, top=1)
    plt.show()


# Visualize montage of slices.
# 4 rows and 10 columns for 100 slices of the CT scan.
plot_slices(4, 10, 128, 128, image[:, :, :40])

## Define a 3D convolutional neural network

To make the model easier to understand, it's structured into blocks.
The architecture of the 3D CNN classifier used in this project
is based on [this paper](https://arxiv.org/abs/2007.13224) and the architecture of the autoencoder is based on the [UNet3D](https://se.mathworks.com/help/vision/ref/unet3d.html).

In [None]:
class Coder():
    def __init__(self, width=128, height=128, depth=64, latent_dim = 128):
        self.width=width
        self.height=height
        self.depth=depth
        self.latent_dim = latent_dim
        self.encoder = None
        self.classifier = None
    def get_encoder(self):
        """Build a 3D convolutional neural network encoder."""

        #Encode
        inputs = keras.Input((self.width, self.height, self.depth, 1), name="encoder_input")

        x = layers.Conv3D(filters=64, kernel_size=3, activation="relu", padding='same')(inputs)
        x = layers.MaxPool3D(pool_size=2)(x)
        x = layers.BatchNormalization()(x)

        x = layers.Conv3D(filters=128, kernel_size=3, activation="relu", padding='same')(x)
        x = layers.MaxPool3D(pool_size=2)(x)
        x = layers.BatchNormalization()(x)

        x = layers.Conv3D(filters=256, kernel_size=3, activation="relu", padding='same')(x)
        x = layers.MaxPool3D(pool_size=2)(x)
        x = layers.BatchNormalization()(x)

        x = layers.Flatten()(x)
        latent = layers.Dense(self.latent_dim, activation="relu", name="latent_space")(x)

        encoder = keras.Model(inputs, latent, name="3d_encoder")
        return encoder
    def get_decoder(self):
        """Build a 3D convolutional neural network decoder."""
        #Decode
        latent_inputs = tf.keras.Input(shape=(self.latent_dim,), name="decoder_input")
        x = layers.Dense(self.width // 8 * self.height // 8 * self.depth // 8 * 256, activation="relu")(latent_inputs)
        x = layers.Reshape((self.width // 8, self.height // 8, self.depth // 8, 256))(x)

        x = layers.Conv3DTranspose(filters=256, kernel_size=3, activation="relu", padding='same')(x)
        x = layers.UpSampling3D(size=2)(x)
        x = layers.BatchNormalization()(x)

        x = layers.Conv3DTranspose(filters=128, kernel_size=3, activation="relu", padding='same')(x)
        x = layers.UpSampling3D(size=2)(x)
        x = layers.BatchNormalization()(x)

        x = layers.Conv3DTranspose(filters=64, kernel_size=3, activation="relu", padding='same')(x)
        x = layers.UpSampling3D(size=2)(x)
        x = layers.BatchNormalization()(x)

        outputs = layers.Conv3DTranspose(filters=1, kernel_size=3, activation="sigmoid", padding='same')(x)

        decoder = keras.Model(latent_inputs, outputs, name="3d_decoder")
        return decoder
    def get_autoencoder(self, lr_schedule = None):
        """Build a full model of autoencoder"""
        self.encoder = self.get_encoder()
        decoder = self.get_decoder()
        autoencoder = keras.Model(self.encoder.input, decoder(self.encoder.output), name="3d_autoencoder")
        autoencoder.summary()
        if not (lr_schedule==None):
            autoencoder.compile(
                loss="mse",
                optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
            )
        return autoencoder
    def get_classifier(self, lr_schedule, trainable=False):
        """Build a classifier based on the encoder"""
        if self.encoder is None:
            self.encoder = self.get_encoder()

        for layer in self.encoder.layers: #Freeze encoder weights
            layer.trainable = trainable

        inputs = self.encoder.input
        x = self.encoder.output

        x = layers.Dense(64, activation="relu")(x)
        x = layers.Dropout(0.5)(x)

        outputs = layers.Dense(1, activation="sigmoid", name="classification_output")(x)

        self.classifier = keras.Model(inputs, outputs, name="3d_classifier")
        self.classifier.summary()
        self.classifier.compile(
            loss="binary_crossentropy",
            optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
            metrics=["acc"],
        )
        return self.classifier

    def load_encoder_weights(self, weights_path):
        """Load trained encoder weights"""
        if self.encoder is None:
            self.encoder = self.get_encoder()
        self.encoder.load_weights(weights_path)

## Train autoencoder

In [None]:
# Build model.
cod = Coder(width=128, height=128, depth=64, latent_dim=128)

# Compile model.
initial_learning_rate = 0.0001
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate, decay_steps=100000, decay_rate=0.96, staircase=True
)

autoencoder = cod.get_autoencoder(lr_schedule)

# Define callbacks.
checkpoint_cb_ae = keras.callbacks.ModelCheckpoint(
    os.path.join(curr_path, '3d_image_autoencoder.weights.h5'), save_best_only=True, save_weights_only=True
)
early_stopping_cb_ae = keras.callbacks.EarlyStopping(monitor="val_loss", patience=15)
csv_logger_cb_ae = keras.callbacks.CSVLogger(os.path.join(curr_path, '3d_image_autoencoder.csv'))

# Train the model, doing validation at the end of each epoch
epochs = 100
autoencoder.fit(
    encoder_train_dataset,
    validation_data=encoder_validation_dataset,
    epochs=epochs,
    shuffle=True,
    verbose=2,
    callbacks=[checkpoint_cb_ae, early_stopping_cb_ae, csv_logger_cb_ae],
)

## Visualizing autoencoder performance

Here the model loss for the training and the validation sets is plotted.

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(7, 4))
history = pd.read_csv(os.path.join(curr_path, '3d_image_autoencoder.csv'))
history['epoch']+=1
val_mid_idx = history[['val_loss']].idxmin()
ax.plot(history['epoch'], history['loss'], zorder=2, label = 'train')
ax.plot(history['epoch'], history['val_loss'], zorder=2, label = 'val')
ax.scatter(history.iloc[val_mid_idx]['epoch'], history.iloc[val_mid_idx]['val_loss'], zorder = 3, c = 'crimson', marker = 's', s=20, label = 'Min val loss = {:.3f}'.format(history.iloc[val_mid_idx]['val_loss'].iloc[0]))
ax.plot([history.iloc[val_mid_idx]['epoch'].iloc[0], history.iloc[val_mid_idx]['epoch'].iloc[0]], [0, history.iloc[val_mid_idx]['val_loss'].iloc[0]], zorder = 2, c = 'crimson', ls = '--', lw = 1)
ax.set_title("Model loss")
ax.set_xlabel("epochs")
ax.set_ylabel('loss')
ax.legend(loc=0)
ax.grid(zorder = 1, alpha = 0.5)
plt.show()
fig.savefig(os.path.join(curr_path, 'AE_log.png'))

To present the performance of the autoencoder, let's predict the image using augmented and compare it with the initial one.

Firstly, load the weights of the trained model.

In [None]:
import matplotlib.pyplot as plt

initial_learning_rate = 0.0001
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate, decay_steps=100000, decay_rate=0.96, staircase=True
)

cod = Coder(width=128, height=128, depth=64, latent_dim=128)
autoencoder = cod.get_autoencoder(lr_schedule)
autoencoder.load_weights(os.path.join(curr_path, '3d_image_autoencoder.weights.h5'))

And visualize it afterwards.

In [None]:
data = encoder_validation_dataset.take(1)
images, labels = list(data)[0]
images = images.numpy()
image = images[0]
label = labels[0]

prediction = autoencoder.predict(np.expand_dims(image, axis = 0))

font={'font.size': 20}
plt.rcParams.update(font)
fig, ax = plt.subplots(3, figsize=(5, 15))
ax[0].imshow(np.squeeze(image[:, :, 15]), cmap="gray")
ax[0].set_title('Augmented CT scan')
ax[1].imshow(np.squeeze(prediction[:, :, :, 15]), cmap="gray")
ax[1].set_title('Predicted CT scan')
ax[2].imshow(np.squeeze(label[:, :, 15]), cmap="gray")
ax[2].set_title('Non-augmented CT scan')
plt.show()
fig.savefig(os.path.join(curr_path, 'AE_res.png'))

## Creating datasets for the classifier
We are not doing anything new. Firstly, loading previously prepared training and validation subsets of the MosMedData.

In [None]:
x_train = np.load(os.path.join(curr_path, 'x_train.npy'), encoding = 'bytes')
x_val = np.load(os.path.join(curr_path, 'x_val.npy'), encoding = 'bytes')
y_train = np.load(os.path.join(curr_path, 'y_train.npy'), encoding = 'bytes')
y_val = np.load(os.path.join(curr_path, 'y_val.npy'), encoding = 'bytes')

Create complex datasets consisting of non-augmented scans and "indicators" of the pneumonia presence afterwards.

In [None]:
train_loader = tf.data.Dataset.from_tensor_slices((x_train, y_train))
validation_loader = tf.data.Dataset.from_tensor_slices((x_val, y_val))

batch_size = 2

train_dataset = (
    train_loader.shuffle(len(x_train))
    .map(ordinary_preprocessing)
    .batch(batch_size)
    .prefetch(2)
)

validation_dataset = (
    validation_loader.shuffle(len(x_val))
    .map(ordinary_preprocessing)
    .batch(batch_size)
    .prefetch(2)
)

del x_train, y_train, x_val, y_val
gc.collect()

## Train classifier

In [None]:
# Load pre-trained encoder
cod = Coder(width=128, height=128, depth=64, latent_dim=128)
cod.load_encoder_weights(os.path.join(curr_path, '3d_image_autoencoder.weights.h5'))

# Compile model.
initial_learning_rate = 0.0001
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate, decay_steps=100000, decay_rate=0.96, staircase=True
)

# Build the classifier with unfreezed weights for additional training
classifier = cod.get_classifier(lr_schedule, trainable=True)
# Build the classifier with freezed weights
#classifier = cod.get_classifier(lr_schedule, trainable=False)

# Define callbacks.
checkpoint_cb_cl = keras.callbacks.ModelCheckpoint(
    os.path.join(curr_path, '3d_image_classifier_Unfreezed.weights.h5'), save_best_only=True, save_weights_only=True, monitor = 'val_acc'
)
early_stopping_cb_cl = keras.callbacks.EarlyStopping(monitor="val_acc", patience=30)
csv_logger_cb_cl = keras.callbacks.CSVLogger(os.path.join(curr_path, '3d_image_classifier_Unfreezed.csv'))

epochs = 100
classifier.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=epochs,
    shuffle=True,
    verbose=2,
    callbacks=[checkpoint_cb_cl, early_stopping_cb_cl, csv_logger_cb_cl],
)

The author of the basic code also trained his classification CNN using a larger [pneumonia dataset](https://mosmed.ai/datasets/covid191110/) of over 1000 CT scans. And he achieved the result with an accuracy of 83%.

## Visualizing classifier performance

Here the model accuracy and loss for the training and the validation sets are plotted.
Since the validation set is class-balanced, accuracy provides an unbiased representation
of the model's performance.

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
fig, ax = plt.subplots(2, 1, figsize=(7, 8))
history = pd.read_csv(os.path.join(curr_path, '3d_image_classifier_Unfreezed.csv'))
history['epoch']+=1
val_mid_idx = history[['val_loss']].idxmin()
valacc_max_idx = history[['val_acc']].idxmax()
ax[0].plot(history['epoch'], history['loss'], zorder=2, label = 'train')
ax[0].plot(history['epoch'], history['val_loss'], zorder=2, label = 'val')
ax[1].plot(history['epoch'], history['acc'], zorder=2, label = 'train')
ax[1].plot(history['epoch'], history['val_acc'], zorder=2, label = 'val')
ax[0].scatter(history.iloc[val_mid_idx]['epoch'], history.iloc[val_mid_idx]['val_loss'], zorder = 3, c = 'crimson', marker = 's', s=20, label = 'Min val loss = {:.3f}'.format(history.iloc[val_mid_idx]['val_loss'].iloc[0]))
ax[1].scatter(history.iloc[valacc_max_idx]['epoch'], history.iloc[valacc_max_idx]['val_acc'], zorder = 3, c = 'crimson', marker = 's', s=20, label = 'Max val acc = {:.3f}'.format(history.iloc[valacc_max_idx]['val_acc'].iloc[0]))
ax[0].plot([history.iloc[val_mid_idx]['epoch'].iloc[0], history.iloc[val_mid_idx]['epoch'].iloc[0]], [0, history.iloc[val_mid_idx]['val_loss'].iloc[0]], zorder = 2, c = 'crimson', ls = '--', lw = 1)
ax[1].plot([history.iloc[valacc_max_idx]['epoch'].iloc[0], history.iloc[valacc_max_idx]['epoch'].iloc[0]], [0, history.iloc[valacc_max_idx]['val_acc'].iloc[0]], zorder = 2, c = 'crimson', ls = '--', lw = 1)
ax[0].set_title("Model loss")
ax[1].set_title("Model acc")
ax[0].set_xlabel("epochs")
ax[1].set_xlabel("epochs")
ax[0].set_ylabel('loss')
ax[1].set_ylabel('acc')
ax[0].legend(loc=0)
ax[1].legend(loc=0)
ax[0].grid(zorder = 1, alpha = 0.5)
ax[1].grid(zorder = 1, alpha = 0.5)
plt.show()
fig.savefig(os.path.join(curr_path, 'CL_log.png'))

## Make predictions on a single CT scan
Loading the weights of the trained model.

In [None]:
cod = Coder(width=128, height=128, depth=64, latent_dim=128)

initial_learning_rate = 0.0001
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate, decay_steps=100000, decay_rate=0.96, staircase=True
)

classifier = cod.get_classifier(lr_schedule, trainable=False)
classifier.load_weights(os.path.join(curr_path, "3d_image_classifier_Unfreezed.weights.h5"))

Visualization of a single scan image.

In [None]:
data = validation_dataset.take(1)
images, labels = list(data)[0]
images = images.numpy()
image = images[0]
label = labels.numpy()[0]
print("Dimension of the CT scan is:", image.shape)
plt.imshow(np.squeeze(image[:, :, 15]), cmap="gray")

Model predicition.

In [None]:
prediction = classifier.predict(np.expand_dims(image, axis = 0))
scores = [1 - prediction[0], prediction[0]]
print(label)

class_names = ["normal", "abnormal"]
for score, name in zip(scores, class_names):
    print(
        "This model is %.2f percent confident that CT scan is %s"
        % ((100 * score), name)
    )