# Introduction

In this notebook, we cover the process of training the best possible model to classify MRI images of brain tumours.

# Importing Modules
We first start by importing all the modules we need to proceed with this project.

In [21]:
from keras.api._v2.keras.applications.resnet import ResNet50
import os
import cv2
import numpy as np

from sklearn.model_selection import train_test_split, ParameterGrid
from keras.utils import to_categorical
import tensorflow as tf
from tensorflow.keras.preprocessing import image

Model = tf.keras.models.Model
VGG19 = tf.keras.applications.VGG19
ResNet50 = tf.keras.applications.ResNet50

GlobalAveragePooling2D = tf.keras.layers.GlobalAveragePooling2D
Dense = tf.keras.layers.Dense
Dropout = tf.keras.layers.Dropout



# Configuring Global Constants
We need to define some global constants to keep things consistent across the notebook. This will allow us to use the same constant that could contain different values based on the configuration.

In [24]:
ON_COLAB = False

Since running this in the local computer takes a huge amount of time behind hardware acceleration, we have two configurations that we can load.

In [26]:
if ON_COLAB:
    # mound google drive
    from google.colab import drive

    DRIVE_ROOT = "/content/drive"
    DRIVE_DATA_ROOT = os.path.join(drive_root, "My Drive/CP322")
    DRIVE_TRAINING_FOLDER = os.path.join(drive_data_root, "Training")
    DRIVE_TESTING_FOLDER = os.path.join(drive_data_root, "Testing")

    drive.mount(drive_root)

    MODEL_SAVE_PATH = os.path.join(drive_data_root)
    
else:
    DRIVE_ROOT = "../"
    DRIVE_DATA_ROOT = os.path.join(drive_root, "data/raw")
    DRIVE_TRAINING_FOLDER = os.path.join(drive_data_root, "Training")
    DRIVE_TESTING_FOLDER = os.path.join(drive_data_root, "Testing")
    
    MODEL_SAVE_PATH = os.path.join(drive_root, 'trained-models')

print(DRIVE_ROOT)
print(DRIVE_DATA_ROOT)
print(DRIVE_TRAINING_FOLDER)
print(DRIVE_TESTING_FOLDER)
print(MODEL_SAVE_PATH)

../
../data/raw
../data/raw\Training
../data/raw\Testing
../trained-models


# Defining Data Processing Methods
Here we define the methods used to load data and pre-process raw data. This will be useful down the line when doing multiple adjustments and/or loading different set of data to use.

In [33]:
def preprocess_image(img_path, target_size=(224, 224)):
  assert type(target_size) == tuple, "Target size must be of type tuple"
  img = tf.io.read_file(img_path)
  img = tf.image.decode_image(img, channels=3)
  img = tf.image.resize(img, target_size)
  img /= 255.0 # normalize
  return img

def load_mixed_data(target="train", test_size=0.2, random_state=42, limit=200):
  # for now the method only loads all the data as yes or no

  X = []
  Y = []
  # load images that don't have brain tumour
  if target == "train":
    data_dir = DRIVE_TRAINING_FOLDER
  else:
    data_dir = DRIVE_TESTING_FOLDER
  
  no_dir = os.path.join(data_dir, "no")
  count = 0
  for img_name in os.listdir(no_dir):
    if count == limit:
      break
    img = preprocess_image(os.path.join(no_dir, img_name))
    X.append(img)
    Y.append(0)
    count += 1
  
  # load images that have brain tumour
  labels = ['pituitary', 'meningioma', 'glioma']
  for label in labels:
    count = 0
    folder = os.path.join(data_dir, label)
    for img_name in os.listdir(folder):
      if count == limit:
        break
      img = preprocess_image(os.path.join(folder, img_name))
      X.append(img)
      Y.append(1)
      count += 1
  
  X = np.array(X)
  Y = np.array(Y)

  return train_test_split(X, Y, test_size=test_size, random_state=random_state)  

def load_petuitary_data(target="train", test_size=0.2, random_state=42):
  pass

def load_meningioma_data(target="train", test_size=0.2, random_state=42):
  pass

def load_glioma_data(target="train", test_size=0.2, random_state=42):
  pass

def load_general_data(target="train", test_size=0.2, random_state=42):
  pass

def load_data(data_type="mixed", target="train", test_size=0.2, random_state=42, limit=200):
  # 4 different types of data
  # 1. mixed (pituitary, meningioma, and glioma, general brain tumour)
  # 2. pituitary
  # 3. meningioma
  # 4. glioma
  # 5. general (small data set of no specified type of brain tumour)

  # 2 different targets
  # 1. train -> loads training data
  # 2. test -> loads test data

  if data_type == "mixed":
    return load_mixed_data(target, test_size, random_state, limit)
  elif data_type == "pituitary":
    return load_petuitary_data(target, test_size, random_state)
  elif data_type == "meningioma":
    return load_meningioma_data(target, test_size, random_state)
  elif data_type == "glioma":
    return load_glioma_data(target, test_size, random_state)
  elif data_type == "general":
    return load_general_data(target, test_size, random_state)
  
  raise ValueError(f"Data type must be of the following: 'mixed', 'petuitary', 'meningioma', 'glioma', or 'general'.")



# Defining Models

We will use two pre-trained models VGG19 and ResNet50 as based models. This will greatly reduce the time to train a good predictor model.

We will adjust the models to better fit our need, which is to classify MRI images of brains as have tumour or no.

In [34]:
class DetectorModelBaseVGG19:
    def __init__(self, weights='imagenet', include_top=False, input_shape=(224, 224, 3), classes=["no", "yes"]):
        self.vgg19 = VGG19(weights=weights, include_top=include_top, input_shape=input_shape)
        self.classes = [cl for cl in classes] 
        self.model = None
        self.custom_layers = []
        
    def build(self):
        for layer in self.vgg19.layers:
            layer.trainable = False

        x = self.vgg19.output
        x = GlobalAveragePooling2D()(x)
        x = Dense(1024, activation='relu')(x)
        x = Dropout(0.5)(x)

        # sigmoid is logistic function, suited for binary classification
        predictions = Dense(2, activation='softmax')(x)

        self.model = Model(inputs=self.vgg19.input, outputs=predictions)

    
    def fit(self, train_X, train_Y, test_X, test_Y, batch_size=32, epochs=30, verbose=1):
        train_Y = to_categorical(train_Y, num_classes=len(self.classes))
        test_Y = to_categorical(test_Y, num_classes=len(self.classes))
        history = self.model.fit(
                    train_X,
                    train_Y,
                    batch_size=batch_size,
                    epochs=epochs,
                    verbose=verbose,
                    validation_data=(test_X, test_Y))

        return history

    def compile(self, optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"]):
        self.model.compile(
                optimizer=optimizer,
                loss=loss,
                metrics=metrics
                )

    def save(self, save_path):
        self.model.save(save_path)

    def score(self, test_X, test_Y, verbose=0):
        test_Y = to_categorical(test_Y, num_classes=len(self.classes))
        return self.model.evaluate(test_X, test_Y, verbose=verbose)

    def grid_search(self, param_grid, train_X, train_Y, test_X, test_Y, epochs=30, verbose=1):
        best_model = None
        best_score = -np.inf
        best_params = None
        best_history = None

        param_combinations = ParameterGrid(param_grid)

        for params in param_combinations:
            print(f"Training with parameters: {params}")

            # Reset model
            self.build()
            self.compile(optimizer=params['optimizer'], loss=params['loss'], metrics=["accuracy"])

            history = self.fit(train_X, train_Y, test_X, test_Y, batch_size=params['batch_size'], epochs=epochs, verbose=verbose)

            score = self.score(test_X, test_Y)

            if score[1] > best_score:
                best_model = self.model
                best_score = score[1]
                best_params = params
                best_history = history

            print(f"Accuracy: {score[1]}")

        self.model = best_model

        print(f"Best parameters: {best_params}")
        print(f"Best accuracy: {best_score}")

        return best_history, best_params

In [35]:
class DetectorModelBaseResNet:
    def __init__(self, input_shape=(224, 224, 3)):
        self.input_shape = input_shape
        self._build_model()

    def _build_model(self):
        resnet50_base = ResNet50(weights='imagenet', include_top=False,
                                                       input_shape=self.input_shape)
        x1 = resnet50_base.output
        x1 = tf.keras.layers.GlobalAveragePooling2D()(x1)
        x1 = tf.keras.layers.Dense(1024, activation='relu')(x1)
        x1 = tf.keras.layers.Dropout(0.5)(x1)
        predictions = tf.keras.layers.Dense(2, activation='softmax')(x1)

        self.model = Model(inputs=[resnet50_base.input], outputs=[predictions])

        for layer in resnet50_base.layers:
            layer.trainable = False

        self.model.compile(optimizer='adam', loss='categorical_crossentropy',
                           metrics=['accuracy'])

    def train(self, train_X, train_Y, batch_size=32, epochs=30, validation_split=0.2):
        train_Y = to_categorical(train_Y, num_classes=2)
        history = self.model.fit(train_X, train_Y, batch_size=batch_size, epochs=epochs, verbose=1,
                                 validation_split=validation_split)
        return history

    def predict(self, X):
        predictions = self.model.predict(X)
        return np.argmax(predictions, axis=1)

    def evaluate(self, test_X, test_Y):
        test_Y = to_categorical(test_Y, num_classes=2)
        score = self.model.evaluate(test_X, test_Y, verbose=0)
        return score

    def grid_search(self, param_grid, train_X, train_Y, test_X, test_Y, epochs=30, verbose=1):
        best_model = None
        best_score = -np.inf
        best_params = None
        best_history = None

        param_combinations = ParameterGrid(param_grid)

        for params in param_combinations:
            print(f"Training with parameters: {params}")

            self._build_model()
            self.model.compile(
                optimizer=params['optimizer'], loss=params['loss'], metrics=["accuracy"])
            history = self.train(train_X, train_Y, batch_size=params['batch_size'], epochs=epochs, validation_split=0.2)
            score = self.evaluate(test_X, test_Y)

            if score[1] > best_score:
                best_model = self.model
                best_score = score[1]
                best_params = params
                best_history = history

            print(f"Accuracy: {score[1]}")

        self.model = best_model

        print(f"Best parameters: {best_params}")
        print(f"Best accuracy: {best_score}")

        return best_history, best_params

# Loading the Data

Here we load the training data that is going to be used across the notebook.

In [36]:
random_seed = 42
train_X, test_X, train_Y, test_Y = load_data(random_state=random_seed, limit=400)
train_X.shape, test_X.shape, train_Y.shape, test_Y.shape

((1280, 224, 224, 3), (320, 224, 224, 3), (1280,), (320,))

# GPU Hardware Acceleration

Since this notebook can also be run on Google Colab, we set this up such that it can use hardware acceleration if available.

In [37]:
import tensorflow as tf

if tf.config.list_physical_devices('GPU'):
    print("CUDA is available. GPU:", tf.config.list_physical_devices('GPU')[0])
else:
    print("CUDA is not available.")

CUDA is not available.


In [38]:
epochs = 5 # keep the number of epochs when running locally, to save time. 5 might be too little to create anything good.
param_grid_resnet = {
    'optimizer': ['adam', 'rmsprop'],
    'loss': ['categorical_crossentropy'],
    'batch_size': [32, 64]
}

param_grid_vgg = {
    'optimizer': ['adam', 'rmsprop'],
    'loss': ['binary_crossentropy'],
    'batch_size': [32, 64]
}

In [39]:
vgg_model = DetectorModelBaseVGG19()
vgg_history, vgg_best_params = vgg_model.grid_search(
    param_grid_vgg, train_X, train_Y, test_X, test_Y, epochs=epochs)

Training with parameters: {'batch_size': 32, 'loss': 'binary_crossentropy', 'optimizer': 'adam'}
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5

KeyboardInterrupt: 

In [None]:
resnet_model = DetectorModelBaseResNet()
resnet_history, resnet_best_params = resnet_model.grid_search(
    param_grid_resnet, train_X, train_Y, test_X, test_Y, epochs=epochs)

In [None]:
vgg_best_params

In [None]:
best_vgg_model = DetectorModelBaseVGG19()

best_vgg_model.build()
best_vgg_model.compile(optimizer=vgg_best_params['optimizer'], loss=vgg_best_params['loss'])

best_vgg_model.fit(train_X, train_Y, test_X, test_Y, batch_size=vgg_best_params['batch_size'], epochs=10, verbose=0)

train_X, test_X, train_Y, test_Y = load_data(target='test', random_state=35)

score = best_vgg_model.score(test_X, test_Y)

print("loss:", score[0])
print("accuracy:", score[1])

In [None]:
best_vgg_model.save(os.path.join(MODEL_SAVE_PATH, 'vgg19.h5'))

In [None]:
resnet_best_params

{'batch_size': 32, 'loss': 'categorical_crossentropy', 'optimizer': 'adam'}

In [None]:
best_resnet_model = DetectorModelBaseResNet()
best_resnet_model.model.compile(optimizer=resnet_best_params['optimizer'], loss=resnet_best_params['loss'], metrics=["accuracy"])

best_resnet_model.train(train_X, train_Y, batch_size=resnet_best_params['batch_size'], epochs=10, validation_split=0.2)

train_X, test_X, train_Y, test_Y = load_data(target='test', random_state=35)

score = best_resnet_model.evaluate(test_X, test_Y)

print("loss:", score[0])
print("accuracy:", score[1])

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
loss: 0.2951255142688751
accuracy: 0.9411764740943909


In [None]:
best_resnet_model.model.save(os.path.join(MODEL_SAVE_PATH, 'resnet.h5'))