In [2]:
import pandas as pd
import os
import cv2
import shutil
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report

In [None]:
# Mount Google Drive
from google.colab import drive
import os
import zipfile
drive.mount('/content/drive', force_remount=True)

# Path to the zip file in Google Drive
train_zip_file_path = '/content/drive/My Drive/new_kaggle_dataset.zip'
test_zip_file_path = '/content/drive/My Drive/Messidor2New.zip'

# Path where you want to extract the contents of the zip file
first_folder_path = '/content/kaggle_dataset/'
second_extracted_folder_path = '/content/messidor2/'

# Create the directory to extract the contents of the zip file
os.makedirs(first_folder_path, exist_ok=True)
os.makedirs(second_extracted_folder_path, exist_ok=True)

# Copy the zip file from Google Drive to Colab environment
!cp "{train_zip_file_path}" "/content/"
!cp "{test_zip_file_path}" "/content/"

with zipfile.ZipFile(train_zip_file_path, 'r') as zip_ref:
  zip_ref.extractall(first_folder_path)

with zipfile.ZipFile(test_zip_file_path, 'r') as zip_ref:
  zip_ref.extractall(second_extracted_folder_path)

csv_file_path3 = '/content/drive/MyDrive/messidor_data.csv'

# Path where you want to store the CSV files
destination_csv_folder = '/content/csv_files/'

# Create the directory to store the CSV files
os.makedirs(destination_csv_folder, exist_ok=True)

# Copy the CSV files from Google Drive to Colab environment
!cp "{csv_file_path3}" "{destination_csv_folder}"


In [27]:
df = pd.read_csv(r'/content/kaggle_dataset/new_kaggle_dataset/train.csv')

diagnosis_dict = {
    0: 'No_DR',
    1: 'Mild',
    2: 'Moderate',
    3: 'Severe',
    4: 'Proliferative_DR',
}

df['type'] = df['diagnosis'].map(diagnosis_dict.get)


In [None]:
df['type'].value_counts().plot(kind='barh')
df['diagnosis'].value_counts()

In [None]:
main_directory = r'/content/kaggle_dataset/new_kaggle_dataset/gaussian_filtered_images/gaussian_filtered_images'
classes = os.listdir(main_directory)

filepaths = []
labels = []

for _class in classes:
    class_path = os.path.join(main_directory, _class)
    if os.path.isdir(class_path):
        file_list = os.listdir(class_path)
        for file in file_list:
            file_path = os.path.join(class_path, file)
            filepaths.append(file_path)
            labels.append(_class)
trainPreprocessed_filepaths = []
trainPreprocessedDirectory = '/content/kaggle_dataset/new_kaggle_dataset/train'
for file_path in filepaths:
    # Load the image
    img = cv2.imread(file_path)
    # Convert the image to greyscale
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    # Apply a median filter to reduce noise
    img = cv2.medianBlur(img, 5)
    # Enhance the contrast of the image using CLAHE
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    img = clahe.apply(img)
    # Save the preprocessed image
    os.makedirs(trainPreprocessedDirectory, exist_ok=True)
    trainPreprocessed_path = os.path.join(trainPreprocessedDirectory, os.path.basename(file_path))
    cv2.imwrite(trainPreprocessed_path, img)
    trainPreprocessed_filepaths.append(trainPreprocessed_path)

Fseries=pd.Series(trainPreprocessed_filepaths, name='filepaths')
Lseries=pd.Series(labels, name='labels')
df=pd.concat([Fseries, Lseries], axis=1)
print (df.head())
print('df length: ', len(df))
print (df['labels'].value_counts())

In [None]:
sample_list = []
max_size = 1500
groups = df.groupby('labels')
for label in df['labels'].unique():
    group = groups.get_group(label)
    sample_count = len(group)
    if sample_count > max_size:
        # downsample to max size without replacement
        samples = group.sample(max_size, replace = False, weights = None, random_state = 123, axis = 0).reset_index(drop = True)
    else:
        # shuffle it by sampling 100% of its data without replacement
        samples = group.sample(frac = 1.0, replace = False, random_state = 123, axis = 0).reset_index(drop = True)
    sample_list.append(samples)

# concatenate all sampled groups
df = pd.concat(sample_list, axis = 0).reset_index(drop = True)
print (len(df))
print (df['labels'].value_counts())

In [None]:
# creating the augmented images directory
working_dir = r'/content/kaggle_dataset/new_kaggle_dataset'
aug_dir = os.path.join(working_dir, 'aug')
if os.path.isdir(aug_dir):
    shutil.rmtree(aug_dir)
os.mkdir(aug_dir)
for label in df['labels'].unique():
    dir_path = os.path.join(aug_dir,label)
    os.mkdir(dir_path)
print(os.listdir(aug_dir))

In [None]:
target = 1500 # set the target count for each class in df
generatedImgs = tf.keras.preprocessing.image.ImageDataGenerator(horizontal_flip=True,
                                                                 rotation_range=20,
                                                                 width_shift_range=0.2,
                                                                 height_shift_range=0.2,
                                                                 zoom_range=0.2,
                                                                 shear_range=0.2)
labelsGroups = df.groupby('labels')
for label in df['labels'].unique():  # for every class
    group = labelsGroups.get_group(label)  # a dataframe holding only rows with the specified label
    sample_count = len(group)   # determine how many samples there are in this class
    if sample_count <= target: # if the class has less than target number of images
        aug_img_count = 0
        delta = target - sample_count  # number of augmented images to create
        target_dir = os.path.join(aug_dir, label)  # define where to write the images
        aug_gen = generatedImgs.flow_from_dataframe( group,  x_col = 'filepaths', y_col = None, target_size=(224,224),
                                          class_mode = None, batch_size = 1, shuffle = False, save_to_dir = target_dir,
                                          save_prefix = 'aug-', save_format = 'jpg')
        while aug_img_count < delta:
            images = next(aug_gen)
            aug_img_count += len(images)

In [None]:
# print out the number of images per class in the augmented folder
aug = r'/content/kaggle_dataset/new_kaggle_dataset/aug'
auglist = os.listdir(aug)
print (auglist)
for _class in auglist:
    classpath = os.path.join(aug, _class)
    fileList = os.listdir(classpath)
    print('class: ', _class, '  file count: ', len(fileList))

In [None]:
# display augmented images
plt.figure(figsize = (20, 20))
for i in range(25):
    image = next(aug_gen)/255
    image = np.squeeze(image, axis=0)
    plt.subplot(5,5,i+1)
    plt.imshow(image)
plt.show()

In [None]:
# combines initial and augmented images per class

aug_filePaths = []
aug_labels = []
classlist = os.listdir(aug_dir)
for _class in classlist:
    classpath = os.path.join(aug_dir, _class)
    flist = os.listdir(classpath)
    for f in flist:
        fpath = os.path.join(classpath,f)
        aug_filePaths.append(fpath)
        aug_labels.append(_class)
fileSeries = pd.Series(aug_filePaths, name = 'filepaths')
labelSeries = pd.Series(aug_labels, name = 'labels')
aug_df = pd.concat([fileSeries, labelSeries], axis = 1)
ndf = pd.concat([df,aug_df], axis = 0).reset_index(drop = True)


print (df['labels'].value_counts())  #Original labels count
print(aug_df['labels'].value_counts()) #Augmented labels count
print (ndf['labels'].value_counts()) #Concatenated labels count

In [None]:
ndf.head()

In [None]:
test_split = 0.2  # 10% for testing
train_valid_split = 0.8  # 80% for training and validation

# Split the DataFrame into testing set and remaining data
data_train, test_df = train_test_split(ndf, test_size=test_split, shuffle=True, random_state=42)

# Split the remaining data into training and validation sets
train_df, valid_df = train_test_split(data_train, train_size=train_valid_split, shuffle=True, random_state=42)

# Output the lengths of the resulting DataFrames
print('Training set length:', len(data_train))
print('Validation set length:', len(valid_df))
print('Testing set length:', len(test_df))

In [None]:
# sets up image data generators for training, validation and testing sets
height=224
width=224
channels=3
batch_size=40
img_shape=(height, width, channels)
img_size=(height, width)
length=len(test_df)
test_batch_size=sorted([int(length/n) for n in range(1,length+1) if length % n ==0 and length/n<=80],reverse=True)[0]
test_steps=int(length/test_batch_size)
print ( 'test batch size: ' ,test_batch_size, '  test steps: ', test_steps)
def scalar(img):
    #img=img/127.5-1
    return img
trgen=tf.keras.preprocessing.image.ImageDataGenerator(preprocessing_function=scalar, horizontal_flip=True)
tvgen=tf.keras.preprocessing.image.ImageDataGenerator(preprocessing_function=scalar)

train_gen=trgen.flow_from_dataframe(train_df, x_col='filepaths', y_col='labels', target_size=img_size,color_mode='rgb', class_mode='categorical', shuffle=True, batch_size=batch_size)
valid_gen=tvgen.flow_from_dataframe(valid_df, x_col='filepaths', y_col='labels', target_size=img_size,color_mode='rgb', class_mode='categorical', shuffle=True, batch_size=batch_size)
test_gen=tvgen.flow_from_dataframe(test_df, x_col='filepaths', y_col='labels', target_size=img_size,color_mode='rgb', class_mode='categorical', shuffle=False, batch_size=test_batch_size)

classes=list(train_gen.class_indices.keys())
class_count=len(classes)
train_steps=int(len(train_gen.labels)/batch_size)

In [None]:
# #Subplots of the train retinal images available
# Get a batch of images from the training set generator
images, labels = next(train_gen)
images = images / 255.0

# Display the images in a 5x5 grid
plt.figure(figsize=(20, 20))
for i in range(25):
    image = images[i]
    plt.subplot(5, 5, i+1)
    plt.imshow(image)
    plt.axis('off')
plt.show()

In [None]:
# display testing images
images, labels = next(test_gen)
images = images / 255.0
print(len(images))

# Display the images in a 5x5 grid
plt.figure(figsize=(20, 20))
for i in range(min(len(images), 25)):
    image = images[i]
    plt.subplot(5, 5, i+1)
    plt.imshow(image)
    plt.axis('off')
plt.show()

In [None]:
# set up training with different models
L2_REGULARISATION = 0.01
def alex_net(input_shape=(224, 224, 3)):
    alexnet = tf.keras.models.Sequential()
    alexnet.add(tf.keras.layers.InputLayer(input_shape=input_shape))

    # Conv1 and Pool1
    alexnet.add(tf.keras.layers.Conv2D(96, 11, strides=4, padding='same', input_shape=input_shape))
    alexnet.add(tf.keras.layers.BatchNormalization())
    alexnet.add(tf.keras.layers.Activation('relu'))
    alexnet.add(tf.keras.layers.MaxPooling2D(3, strides=2))

    # Conv2 and Pool2
    alexnet.add(tf.keras.layers.Conv2D(256, 5, strides=4, padding='same'))
    alexnet.add(tf.keras.layers.BatchNormalization())
    alexnet.add(tf.keras.layers.Activation('relu'))
    alexnet.add(tf.keras.layers.MaxPooling2D(3, strides=2))

    # Conv3
    alexnet.add(tf.keras.layers.Conv2D(384, 3, strides=4, padding='same'))
    alexnet.add(tf.keras.layers.BatchNormalization())
    alexnet.add(tf.keras.layers.Activation('relu'))

    # Conv4
    alexnet.add(tf.keras.layers.Conv2D(384, 3, strides=4, padding='same'))
    alexnet.add(tf.keras.layers.BatchNormalization())
    alexnet.add(tf.keras.layers.Activation('relu'))

    #Conv5
    alexnet.add(tf.keras.layers.Conv2D(256, 3, strides=4, padding='same', kernel_regularizer=tf.keras.regularizers.l2(L2_REGULARISATION)))
    alexnet.add(tf.keras.layers.Activation('relu'))

    # Fully connected layers
    alexnet.add(tf.keras.layers.GlobalAveragePooling2D())
    alexnet.add(tf.keras.layers.Flatten())
    alexnet.add(tf.keras.layers.Dense(4096, activation='relu'))
    alexnet.add(tf.keras.layers.Dropout(0.5))

    alexnet.add(tf.keras.layers.Dense(4096, activation='relu'))
    alexnet.add(tf.keras.layers.Dropout(0.5))

    alexnet.add(tf.keras.layers.Dense(5, activation='softmax', kernel_regularizer=tf.keras.regularizers.l2(L2_REGULARISATION)))
    return alexnet

# change the base model to train the model of choice
base_model = tf.keras.applications.MobileNetV3Small(include_top=False, weights='imagenet')
base_model = tf.keras.applications.DenseNet201(include_top=False, weights='imagenet')
base_model = tf.keras.applications.ResNet50(include_top=False, weights='imagenet')
base_model = tf.keras.applications.inception_v3.InceptionV3(include_top=False, weights='imagenet')

x = base_model.output
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(1024,activation='relu')(x)
x = tf.keras.layers.Dense(512, activation='relu')(x)
predictions = tf.keras.layers.Dense(5, activation='softmax')(x)

model = tf.keras.models.Model(inputs=base_model.input, outputs=predictions)

optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])


model_name = 'inceptionv3'
PATH = f'/content/drive/MyDrive/{model_name}/{model_name}.keras'
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(filepath=PATH,monitor='val_loss', save_best_only=True, initial_value_threshold=None),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss' ,factor= 0.2, patience = 10,  min_lr=1e-9, verbose=1)
]



chosen_model = model.fit(x = train_gen, epochs=100, batch_size=32, validation_data = valid_gen ,callbacks = callbacks)

In [None]:
# display training accuracy
def display_accuracy() -> None:
    # Summarize history for accuracy
    plt.plot(chosen_model.history['accuracy'])
    plt.plot(chosen_model.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['accuracy', 'val_accuracy'], loc='upper left')
    plt.show()
display_accuracy()

In [None]:
# display training loss
def display_loss() -> None:
    # Summarize history for loss
    plt.plot(chosen_model.history['loss'])
    plt.plot(chosen_model.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()
display_loss()

In [None]:
# test models on testing set
pred = model.predict(test_gen)

hist_eval = model.evaluate(
    test_gen,
    verbose=1
)

print(hist_eval)
print("Accuracy: %f\nLoss: %f" %(hist_eval[1],hist_eval[0]))

In [25]:
# print out testing output
def print_in_color(msg, fg_color=(255, 255, 255), bg_color=(0, 0, 0)):
    """
    Function to print colored text in console.

    Parameters:
        msg (str): The message to print.
        fg_color (tuple): RGB tuple for text color. Default is white (255, 255, 255).
        bg_color (tuple): RGB tuple for background color. Default is black (0, 0, 0).
    """
    fg = '\033[38;2;' + ';'.join(map(str, fg_color)) + 'm'
    bg = '\033[48;2;' + ';'.join(map(str, bg_color)) + 'm'
    reset = '\033[0m'
    print(bg + fg + msg + reset)

from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, classification_report

def print_info(test_gen, preds, print_code, save_dir, subject):
    class_dict = test_gen.class_indices
    labels = test_gen.labels
    file_names = test_gen.filenames
    error_list = []
    true_class = []
    pred_class = []
    prob_list = []
    new_dict = {}
    error_indices = []
    y_pred = []
    for key, value in class_dict.items():
        new_dict[value] = key  # dictionary {integer of class number: string of class name}
    # store new_dict as a text file in the save_dir
    classes = list(new_dict.values())  # list of string of class names
    dict_as_text = str(new_dict)
    dict_name = subject + '-' + str(len(classes)) + '.txt'
    dict_path = os.path.join(save_dir, dict_name)
    with open(dict_path, 'w') as x_file:
        x_file.write(dict_as_text)
    errors = 0
    for i, p in enumerate(preds):
        pred_index = np.argmax(p)
        true_index = labels[i]  # labels are integer values
        if pred_index != true_index:  # a misclassification has occurred
            error_list.append(file_names[i])
            true_class.append(new_dict[true_index])
            pred_class.append(new_dict[pred_index])
            prob_list.append(p[pred_index])
            error_indices.append(true_index)
            errors = errors + 1
        y_pred.append(pred_index)
    if print_code != 0:
        if errors > 0:
            if print_code > errors:
                r = errors
            else:
                r = print_code
            msg = '{0:^28s}{1:^28s}{2:^28s}{3:^16s}'.format('Filename', 'Predicted Class', 'True Class', 'Probability')
            print_in_color(msg, (0, 255, 0), (55, 65, 80))
            for i in range(r):
                split1 = os.path.split(error_list[i])
                split2 = os.path.split(split1[0])
                fname = split2[1] + '/' + split1[1]
                msg = '{0:^28s}{1:^28s}{2:^28s}{3:4s}{4:^6.4f}'.format(fname, pred_class[i], true_class[i], ' ',
                                                                        prob_list[i])
                print(msg, (255, 255, 255), (55, 65, 60))
                # print(error_list[i]  , pred_class[i], true_class[i], prob_list[i])
        else:
            msg = 'With accuracy of 100 % there are no errors to print'
            print_in_color(msg, (0, 255, 0), (55, 65, 80))
    if errors > 0:
        plot_bar = []
        plot_class = []
        for key, value in new_dict.items():
            count = error_indices.count(key)
            if count != 0:
                plot_bar.append(count)
                plot_class.append(value)
        fig = plt.figure()
        fig.set_figheight(len(plot_class) / 3)
        fig.set_figwidth(10)
        plt.style.use('fivethirtyeight')
        for i in range(0, len(plot_class)):
            c = plot_class[i]
            x = plot_bar[i]
            plt.barh(c, x, )
            plt.title(' Errors by Class on Test Set')
    y_true = np.array(labels)
    y_pred = np.array(y_pred)
    if len(classes) <= 30:
        # create a confusion matrix
        cm = confusion_matrix(y_true, y_pred)
        length = len(classes)
        if length < 8:
            fig_width = 8
            fig_height = 8
        else:
            fig_width = int(length * .5)
            fig_height = int(length * .5)
        plt.figure(figsize=(fig_width, fig_height))
        sns.heatmap(cm, annot=True, vmin=0, fmt='g', cmap='Blues', cbar=False)
        plt.xticks(np.arange(length) + .5, classes, rotation=90)
        plt.yticks(np.arange(length) + .5, classes, rotation=0)
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        plt.title("Confusion Matrix")
        plt.show()

        # Calculate metrics using scikit-learn
        accuracy = accuracy_score(y_true, y_pred)
        f1 = f1_score(y_true, y_pred, average='weighted')
        print("Confusion Matrix:")
        print(cm)
        print("Accuracy:", accuracy)
        print("F1 Score:", f1)
        print("Classification Report:")

        # Calculate specificity and sensitivity
        specificity = np.zeros(length)
        sensitivity = np.zeros(length)
        for i in range(length):
            true_positives = cm[i, i]
            false_positives = np.sum(cm[:, i]) - true_positives
            false_negatives = np.sum(cm[i, :]) - true_positives
            true_negatives = np.sum(cm) - (true_positives + false_positives + false_negatives)

            sensitivity[i] = true_positives / (true_positives + false_negatives)
            specificity[i] = true_negatives / (true_negatives + false_positives)

        print("Specificity for each class:", specificity)
        print("Sensitivity for each class:", sensitivity)
        print(classification_report(y_true, y_pred, target_names=classes))

In [None]:
print_code = 0
save_dir = "./"
subject='classes'

print_info(test_gen, pred, print_code, save_dir, subject )