Import necessary modules

In [None]:
import numpy as np
from keras.layers import Dense, Conv2D, MaxPooling2D, Flatten, Dropout, Input
from keras import Sequential, optimizers, layers, Model
from imblearn.over_sampling import RandomOverSampler
import keras.backend as K
import keras_tuner as kt
import random
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.model_selection import GridSearchCV, train_test_split, KFold
import matplotlib.pyplot as plt
from sklearn import metrics

Load Dataset from https://zenodo.org/records/10519652

In [None]:
# Dataset downloaded from https://zenodo.org/records/10519652
data = np.load("bloodmnist.npz")
train_images = data["train_images"]
print(np.shape(data["train_images"]))
val_images = data["val_images"]
print(np.shape(data["val_images"]))
test_images = data["test_images"]
print(np.shape(data["test_images"]))
train_labels = data["train_labels"]
print(np.shape(data["train_labels"]))
val_labels = data["val_labels"]
print(np.shape(data["val_labels"]))
test_labels = data["test_labels"]
print(np.shape(data["test_labels"]))

Set seed

In [None]:
# set random seed
random.seed(0)
np.random.seed(0)

Define Dataset Class

In [None]:
# define dataset class for training, validation and testing
class ImageDataset:
    def __init__(self, images, labels):
        self.labels = labels
        self.class_num = len(np.unique(labels))
        self.counts = []
        self.proportions = []
        self.length = np.shape(images)[0]
        self.width = np.shape(images)[1]

        # one hot encode the labels to show non ordering of data
        self.one_hot_labels = self.one_hot_encode()

        # get proportions of each label in dataset
        self.update_counts()

        # normalise RGB values between 0 and 1
        self.images = images/255

    def update_counts(self):
        self.counts = []
        self.proportions = []
        
        # get number of occurences of each class
        for i in range(self.class_num):
            self.counts.append(len(np.where(self.labels == i)[0]))
        
        self.proportions = [count/self.length for count in self.counts]

    # function to oversample dataset so that all the classes have equal counts
    def oversample(self):
        ros = RandomOverSampler(random_state=0)

        # flatten images for oversampling
        self.images= self.images.reshape((self.length, self.width*self.width*3))
        self.images, self.labels = ros.fit_resample(self.images, self.labels)
        self.length = self.images.shape[0]

        # reshape back to images
        self.images = self.images.reshape((self.length, self.width, self.width, 3))
        self.one_hot_labels = self.one_hot_encode()
        self.update_counts()

    # function to one hot encode labels given integer labels
    def one_hot_encode(self):
        one_hot_labels = np.array([np.zeros(self.class_num) for i in range(self.length)])
        for i in range(self.length):
            one_hot_labels[i][self.labels[i]] = 1

        return one_hot_labels
    
    # shuffles dataset 
    def shuffle(self):
        # generate random permutation for shuffled indices of images and labels
        p = np.random.permutation(self.length)
        self.images, self.labels, self.one_hot_labels = self.images[p], self.labels[p], self.one_hot_labels[p]

    # augment to reduce overfitting using operation
    def augment(self, operation, factor):
        # flip some images and add to dataset
        new_images_num = int(factor*self.length)
        p = np.random.permutation(self.length)[:new_images_num]
        new_images = operation(self.images[p])
        new_labels = self.labels[p]
        self.images = np.append(self.images, new_images, axis = 0)
        self.labels = np.append(self.labels, new_labels, axis = 0)

    # augment all images such that 50% of dataset is augmented
    def augment_images(self):
        # define augmentations using keras augmentation layers
        flip = layers.RandomFlip(mode="horizontal_and_vertical")
        zoom = layers.RandomZoom(height_factor=0.5)
        translation = layers.RandomTranslation(height_factor=0.3, width_factor=0.3)
        rotate = layers.RandomRotation(factor=0.5)

        # flip some images
        self.augment(flip, 0.1)

        # zoom in on some images
        self.augment(zoom, 0.2)

        # translate some images
        self.augment(translation, 0.4)

        # rotate some images
        self.augment(rotate, 0.2)

    # use CNN model to get features from images
    def get_features(self, model):
        self.images = model.predict(self.images)

Function to plot counts of each label

In [None]:
# plot the counts of each class
def plot_label_counts(dataset):
    x = np.array(list(range(8)))
    y = np.array(dataset.counts)
    plt.bar(x, y)
    plt.savefig("label_counts.png")
    plt.show()

Function to prepare dataset

In [None]:
# function to prepare training dataset
def prepare_dataset(dataset):
    dataset.oversample()
    dataset.augment_images()
    dataset.shuffle()


Define class to tune hyperparameters of CNN

In [None]:
# define CNN class to tune hyperparameters
class CNN(kt.HyperModel):
    def build(self, hp):
        # define hyperpamaters search space for tuning
        filters_1 = hp.Int('filters_1', min_value=16, max_value=512, step=32)
        filters_2 = hp.Int('filters_2', min_value=16, max_value=512, step=32)
        filters_3 = hp.Int('filters_3', min_value=16, max_value=512, step=32)
        dense_1_size = hp.Int('size_1', min_value=10, max_value=510, step=50)
        learning_rate = hp.Choice('learning_rate', values = [1e-2, 1e-3, 1e-4, 1e-5])

        model = Sequential()
        model.add(Input(shape = (28, 28, 3)))
        model.add(Conv2D(filters=filters_1, kernel_size=(3, 3), activation='relu', input_shape = (28, 28, 3), strides=1))
        model.add(MaxPooling2D(pool_size=(2, 2), strides=2))
        model.add(Conv2D(filters=filters_2, kernel_size=(3, 3), activation='relu', strides=1))
        model.add(MaxPooling2D(pool_size=(2, 2), strides=2))
        model.add(Conv2D(filters = filters_3, kernel_size=(3, 3), activation='relu', strides=1))
        model.add(MaxPooling2D(pool_size=(2, 2), strides=2))
        model.add(Flatten())
        model.add(Dropout(.1))
        model.add(Dense(dense_1_size, activation='relu'))
        model.add(Dense(8, activation='softmax'))
        optimizer = optimizers.SGD(learning_rate=learning_rate)
        model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['AUC', 'categorical_accuracy', get_f1, get_recall, get_precision])

        return model
    
    def fit(self, hp, model, *args, **kwargs):
        return model.fit(
            *args,
            batch_size=8,
            verbose=0
        )

Get optimal CNN hyperparamters

In [None]:
def get_hyperparams(train_dataset, val_dataset):
    tuner = kt.RandomSearch(
        CNN(),
        objective='val_categorical_accuracy',
        directory='optimal_parameters',
        project_name='train_CNN'
    )

    tuner.search(train_dataset.images, train_dataset.one_hot_labels, epochs=11, validation_data=(val_dataset.images, val_dataset.one_hot_labels))

Define metric functions (from old keras source code)

In [None]:
# taken from old Keras source code
def get_f1(y_true, y_pred): 
    precision = get_precision(y_true, y_pred)
    recall = get_recall(y_true, y_pred)
    f1_val = 2*(precision*recall)/(precision+recall+K.epsilon())
    return f1_val

# function to get precision using Keras backend
def get_precision(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

# function to get recall using Keras backend
def get_recall(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

Make CNN model with tuned hyperparameters

In [None]:
# make CNN model with tuned hyperparameters
def CNN():
    model = Sequential()
    model.add(Input(shape = (28, 28, 3)))
    model.add(Conv2D(filters=48, kernel_size=(3, 3), activation='relu', input_shape = (28, 28, 3), strides=1))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=2))
    model.add(Conv2D(filters=240, kernel_size=(3, 3), activation='relu', strides=1))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=2))
    model.add(Conv2D(filters = 208, kernel_size=(3, 3), activation='relu', strides=1))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=2))
    model.add(Flatten())
    model.add(Dropout(.1))
    model.add(Dense(90, activation='relu'))
    model.add(Dense(8, activation='softmax'))
    optimizer = optimizers.SGD(learning_rate=0.01)
    model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['AUC', 'categorical_accuracy', get_f1, get_recall, get_precision])

    return model

Train model and get learning history of model

In [None]:
# train model and get history of train vs val
def train_model(model, train_dataset, val_dataset):
    history = model.fit(train_dataset.images, train_dataset.one_hot_labels, validation_data=(val_dataset.images, val_dataset.one_hot_labels), batch_size=8, epochs=10, verbose=0)

    return history

Function to plot learning history

In [None]:
# plot loss over time to verify that model has converged and not overfitted
def plot_learning_history(history, metric):
    plt.plot(history.history[metric])
    plt.plot(history.history[f'val_{metric}'])
    plt.title(f'model {metric}')
    plt.ylabel(f'{metric}')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.savefig(f"{metric}.png")
    plt.show()

Function to get confusion matrix of CNN

In [None]:
# show confusion matrix on test dataset
def cnn_confusion_matrix(test_dataset, model):
    confusion_matrix = metrics.confusion_matrix(np.argmax(model.predict(test_dataset.images), axis=1), test_dataset.labels)
    confusion_matrix_display = metrics.ConfusionMatrixDisplay(confusion_matrix=confusion_matrix, display_labels=list(range(8)))
    confusion_matrix_display.plot()
    plt.savefig("confusing_matrix.png")
    plt.show() 

Get model for use by Gradient Boosting

In [None]:
# use CNN layers of trained CNN model
def get_model(model):
    model_use = Model(
        inputs = model.input,
        outputs = model.layers[-3].output
    )

    return model_use

Combine dataset to show counts of each class and perform my own splitting

In [None]:
# combine dataset of iamges and labels
whole_images = np.append(train_images, val_images, axis=0)
whole_images = np.append(whole_images, test_images, axis=0)

whole_labels = np.append(train_labels, val_labels, axis=0)
whole_labels = np.append(whole_labels, test_labels, axis=0)

whole_dataset = ImageDataset(whole_images, whole_labels)
plot_label_counts(whole_dataset)

Split into testing dataset and training dataset with ratio 2:8

In [None]:
# split into test and train with ratio 80% to 20%
cv_images, test_images, cv_labels, test_labels = train_test_split(whole_images, whole_labels, test_size=0.2)

Perform K fold cross validation

In [None]:
# perform K cross validation
k = 5
kf = KFold(n_splits=k, random_state=None, shuffle=False)

for i, (train_index, val_index) in enumerate(kf.split(cv_images)):

    # for each split, make training and validation dataset
    train_images = cv_images[train_index]
    val_images = cv_images[val_index]
    train_labels = cv_labels[train_index]
    val_labels = cv_labels[val_index]
    train_dataset = ImageDataset(train_images, train_labels)
    val_dataset = ImageDataset(val_images, val_labels)

    # oversample, train and augment dataset
    prepare_dataset(train_dataset)

    # get CNN model
    model = CNN()

    # train CNN model
    history = train_model(model, train_dataset, val_dataset)

    # evalute CNN model on validation dataset
    results = model.evaluate(val_dataset.images, val_dataset.one_hot_labels)

    # plot learning history of loss
    plot_learning_history(history, "loss")

    # get CNN model to use for Gradient Boosting
    cnn_model = get_model(model)

    # use CNN model to get features
    train_dataset.get_features(cnn_model)
    val_dataset.get_features(cnn_model)

    grad_boost = HistGradientBoostingClassifier(max_depth=4, max_iter=200, learning_rate=0.09, random_state=0)
    grad_boost.fit(train_dataset.images, train_dataset.labels)
    trainhat = grad_boost.predict(train_dataset.images)
    valhat = grad_boost.predict(val_dataset.images)


    acc = metrics.accuracy_score(val_dataset.labels, valhat)
    val_f1_score = metrics.f1_score(val_dataset.labels, valhat, average="macro")
    val_precision = metrics.precision_score(val_dataset.labels, valhat, average = "macro")
    val_recall = metrics.recall_score(val_dataset.labels, valhat, average = "macro")

    print("Softmax test results:", results)
    print("Accuracy for gradient boosting:", acc)
    print("F1 Score for gradient boosting:", val_f1_score)
    print("Precision for gradient boosting:", val_precision)
    print("Recall for gradient boosting:", val_recall)

Combine validation and training and test both models

In [None]:
# get CNN model
model = CNN()

# for last training, combine training and validation into one dataset
final_training = ImageDataset(cv_images, cv_labels)

# make testing dataset
test_dataset = ImageDataset(test_images, test_labels)

# train CNN
history = train_model(model, final_training, test_dataset)
cnn_model = get_model(model)
final_training.get_features(cnn_model)
grad_boost = HistGradientBoostingClassifier(max_depth=4, max_iter=200, learning_rate=0.09, random_state=0)
grad_boost.fit(final_training.images, final_training.labels)

# get confusion matrix of CNN model
print("Testing: ")
results = model.evaluate(test_dataset.images, test_dataset.one_hot_labels)
y_pred = np.argmax(model.predict(test_dataset.images), axis=1)
cnn_confusion_matrix(test_dataset, model)
test_dataset.get_features(cnn_model)
testhat = grad_boost.predict(test_dataset.images)
# testhat = np.argmax(testhat, axis=1)
print(metrics.classification_report(test_dataset.labels, y_pred))
print(metrics.classification_report(test_dataset.labels, testhat))
print("Softmax test results:", results)
# get confusion matrix of random boost
grad_boost_confusion = metrics.confusion_matrix(test_dataset.labels, testhat, labels = range(8))
disp = metrics.ConfusionMatrixDisplay(confusion_matrix=grad_boost_confusion, display_labels = range(8))
disp.plot()
plt.savefig("grad_boost_confusion_matrix")
plt.show()

# print metrics of gradient boost
acc = metrics.accuracy_score(test_dataset.labels, testhat)
test_f1_score = metrics.f1_score(test_dataset.labels, testhat, average="macro")
test_precision = metrics.precision_score(test_dataset.labels, testhat, average = "macro")
test_recall = metrics.recall_score(test_dataset.labels, testhat, average = "macro")
print("Accuracy for gradient boosting:", acc)
print("F1 Score for gradient boosting:", test_f1_score)
print("Precision for gradient boosting:", test_precision)
print("Recall for gradient boosting:", test_recall)