In [None]:
import random
import math
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import glob
import tensorflow as tf

import warnings
warnings.filterwarnings("ignore")

%matplotlib inline

seed = 99
random.seed(seed)
np.random.seed(seed)

img_width = 256
img_height = 256
batch_size = 64

n_jobs = 15 # n_jobs for parallel tasks

In [None]:
class CustomDataSequence(tf.keras.utils.Sequence):
    def __init__(self, set_type, batch_size=32):
        self.batch_size = batch_size
        
        y = []
        files = []

        files_in_dir = list(glob.glob(f"chest_xray/{set_type}/NORMAL/*.jpeg"))
        files.extend(files_in_dir)
        y.extend([0 for i in range(len(files_in_dir))])

        files_in_dir = list(glob.glob(f"chest_xray/{set_type}/PNEUMONIA/*.jpeg"))
        files.extend(files_in_dir)
        y.extend([1 for i in range(len(files_in_dir))])

        tuples = list(zip(files, y))
        random.shuffle(tuples)
        files, y = zip(*tuples)

        self.x = files
        self.y = y

    def __len__(self):
        return math.ceil(len(self.x) / self.batch_size)
    
    def __getitem__(self, idx):
        low = idx * self.batch_size
        high = min(low + self.batch_size, len(self.x))
        batch_x = self.x[low:high]
        batch_y = self.y[low:high]

        images = [
            np.asarray(Image.open(file_name).convert("L").resize((img_width, img_height)), dtype="float32") / 255.0
            for file_name in batch_x
        ]

        return np.array(images).reshape(img_width, img_height, 1), np.array(batch_y)
    
train_data = CustomDataSequence("train", batch_size)
val_data = CustomDataSequence("val", batch_size)
test_data = CustomDataSequence("test", batch_size)

In [None]:
def make_cnn():
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(32, (3, 3), input_shape=(img_height, img_width, 1), activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(tf.keras.layers.Conv2D(32, (3, 3), activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(tf.keras.layers.Conv2D(32, (3, 3), activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(tf.keras.layers.Conv2D(64, (3, 3), activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(tf.keras.layers.Conv2D(64, (3, 3), activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(tf.keras.layers.Conv2D(64, (3, 3), activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(tf.keras.layers.Conv2D(128, (3, 3), activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))

    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(128, activation="relu"))
    model.add(tf.keras.layers.Dense(64, activation="relu"))
    model.add(tf.keras.layers.Dense(32, activation="relu"))
    model.add(tf.keras.layers.Dense(1, activation="sigmoid"))
    return model

model = make_cnn()
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
    loss="binary_crossentropy",
    metrics=[
        tf.keras.metrics.TruePositives(name="tp"),
        tf.keras.metrics.TrueNegatives(name="tn"),
        tf.keras.metrics.FalsePositives(name="fp"),
        tf.keras.metrics.FalseNegatives(name="fn"),
        tf.keras.metrics.AUC(name="AUC"),
        tf.keras.metrics.Accuracy(name="accuracy"),
        tf.keras.metrics.Precision(name="precision"),
        tf.keras.metrics.Recall(name="recall")
    ]
)

model.summary()

In [None]:
train_history = model.fit(
    x=train_data,
    steps_per_epoch=len(train_data),
    epochs=20,
    validation_data=CustomDataSequence("val"),
    validation_steps=len(val_data),
    use_multiprocessing=True,
    workers=n_jobs
)

In [None]:
model.evaluate(test_data, steps=len(test_data))