# Required Functions

In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals

import sys

import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import numpy as np
import seaborn as sns
import matplotlib as mpl


class Plotter:
    def __init__(self, class_names):
        self.class_names = class_names

    def plot_metrics(self, history, test_run, index):
        metrics2 = ['loss', 'auc', 'precision', 'recall']
        for n, metric in enumerate(metrics2):
            name = metric.replace("_", " ").capitalize()
            plt.subplot(2, 2, n + 1)
            plt.plot(history.epoch, history.history[metric], color='green', label='Train')
            plt.plot(history.epoch, history.history['val_' + metric], color='green', linestyle="--", label='Val')
            plt.xlabel('Epoch')
            plt.ylabel(name)
            if metric == 'loss':
                plt.ylim([0, plt.ylim()[1]])
            elif metric == 'auc':
                plt.ylim([0, 1])
            else:
                plt.ylim([0, 1])

            plt.legend()

        #fig_manager = plt.get_current_fig_manager()
        #fig_manager.full_screen_toggle()
        plt.subplots_adjust(top=0.97, bottom=0.09, left=0.10, right=0.96, hspace=0.25, wspace=0.26)
        plt.savefig(test_run)
        plt.show()
        plt.close()

    def plot_input_images(self, x_train, y_train):
        plt.figure(figsize=(9, 9))
        for i in range(100):
            plt.subplot(10, 10, i + 1)
            plt.xticks([])
            plt.yticks([])
            plt.grid(False)
            plt.imshow(x_train[i])
            classes = ""
            for j in range(8):
                if y_train[i][j] >= 0.5:
                    classes = classes + self.class_names[j] + "\n"
            plt.xlabel(classes, fontsize=7, color='black', labelpad=1)

        plt.subplots_adjust(bottom=0.04, right=0.95, top=0.94, left=0.06, wspace=0.56, hspace=0.17)
        plt.show()

    def plot_image(self, i, predictions_array, true_label, img):
        predictions_array, true_label, img = predictions_array[i], true_label[i], img[i]
        plt.grid(False)
        plt.xticks([])
        plt.yticks([])

        plt.imshow(img)
        label_check = [0,0,0,0,0,0,0,0]
        ground = ""
        count_true = 0
        predicted_true = 0

        for index in range(8):
            if true_label[index] >= 0.5:
                count_true = count_true + 1
                ground = ground + self.class_names[index] + "\n"
                label_check[index] = 1
            if predictions_array[index] >= 0.5:
                predicted_true = predicted_true + 1
                label_check[index] = label_check[index] + 1

        all_match = True
        for index in range(8):
            if label_check[index]==1:
                all_match = False

        if count_true == predicted_true and all_match:
            color = 'green'
        else:
            color = 'red'

        first, second, third, i, j, k = self.calculate_3_largest(predictions_array, 8)
        prediction = "{} {:2.0f}% \n".format(self.class_names[i], 100 * first)
        if second >= 0.5:
            prediction = prediction + "{} {:2.0f}% \n".format(self.class_names[j], 100 * second)
        if third >= 0.5:
            prediction = prediction + "{} {:2.0f}% \n".format(self.class_names[k], 100 * third)
        plt.xlabel("Predicted: {} Ground Truth: {}".format(prediction, ground), color=color)

    def plot_accuracy(self, history, new_folder):
        # Hide meanwhile for now
        plt.plot(history.history['accuracy'], label='accuracy')
        plt.plot(history.history['val_accuracy'], label='val_accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend(loc='lower right')
        plt.savefig(new_folder)
        plt.show()

    def calculate_3_largest(self, arr, arr_size):
        if arr_size < 3:
            print(" Invalid Input ")
            return

        third = first = second = -sys.maxsize
        index_1 = 0
        index_2 = 0
        index_3 = 0

        for i in range(0, arr_size):
            if arr[i] > first:
                third = second
                second = first
                first = arr[i]
            elif arr[i] > second:
                third = second
                second = arr[i]
            elif arr[i] > third:
                third = arr[i]

        for i in range(0, arr_size):
            if arr[i] == first:
                index_1 = i
        for i in range(0, arr_size):
            if arr[i] == second and i != index_1:
                index_2 = i
        for i in range(0, arr_size):
            if arr[i] == third and i != index_1 and i!= index_2:
                index_3 = i
        return first, second, third, index_1, index_2, index_3

    def plot_value_array(self, i, predictions_array, true_label):
        predictions_array, true_label = predictions_array[i], true_label[i]
        plt.grid(False)
        plt.xticks([])
        plt.yticks([])
        bar_plot = plt.bar(range(8), predictions_array, color="#777777")
        plt.xticks(range(8), ('N', 'D', 'G', 'C', 'A', 'H', 'M', 'O'))
        plt.ylim([0, 1])

        for j in range(8):
            if true_label[j] >= 0.5:
                bar_plot[j].set_color('green')

        for j in range(8):
            if predictions_array[j] >= 0.5 and true_label[j] < 0.5:
                bar_plot[j].set_color('red')

        def bar_label(rects):
            for rect in rects:
                height = rect.get_height()
                value = height * 100
                if value > 1:
                    plt.annotate('{:2.0f}%'.format(value),
                                 xy=(rect.get_x() + rect.get_width() / 2, height),
                                 xytext=(0, 3),  # 3 points vertical offset
                                 textcoords="offset points",
                                 ha='center', va='bottom')

        bar_label(bar_plot)

    def ensure_test_prediction_exists(self, predictions):
        exists = False
        for j in range(8):
            if predictions[j] >= 0.5:
                exists = True
        return exists

    def plot_output(self, test_predictions_baseline, y_test, x_test_drawing, test_run):
        mpl.rcParams["font.size"] = 7
        num_rows = 5
        num_cols = 5
        num_images = num_rows * num_cols
        plt.figure(figsize=(2 * 2 * num_cols, 2 * num_rows))
        j = 0
        i = 0
        while j < num_images:
            if self.ensure_test_prediction_exists(test_predictions_baseline[i]):
                plt.subplot(num_rows, 2 * num_cols, 2 * j + 1)
                self.plot_image(i, test_predictions_baseline, y_test, x_test_drawing)
                plt.subplot(num_rows, 2 * num_cols, 2 * j + 2)
                self.plot_value_array(i, test_predictions_baseline, y_test)
                j = j + 1
            i = i + 1
            if i > 400:
                break

        plt.subplots_adjust(bottom=0.08, right=0.95, top=0.94, left=0.05, wspace=0.11, hspace=0.56)
        plt.savefig(test_run)
        plt.show()

    def plot_output_single(self, i, test_predictions_baseline, y_test, x_test_drawing):
        plt.figure(figsize=(6, 3))
        plt.subplot(1, 2, 1)
        self.plot_image(i, test_predictions_baseline, y_test, x_test_drawing)
        plt.subplot(1, 2, 2)
        self.plot_value_array(i, test_predictions_baseline, y_test)
        plt.show()

    def plot_confusion_matrix(self, y_true, y_pred, classes, normalize=False, title=None, cmap=plt.cm.Blues):
        """
        This function prints and plots the confusion matrix.
        Normalization can be applied by setting `normalize=True`.
        """
        if not title:
            if normalize:
                title = 'Normalized confusion matrix'
            else:
                title = 'Confusion matrix, without normalization'

        # Compute confusion matrix
        cm = confusion_matrix(y_true.argmax(axis=1), y_pred.argmax(axis=1))
        # Only use the labels that appear in the data
        if normalize:
            cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
            print("Normalized confusion matrix")
        else:
            print('Confusion matrix, without normalization')

        print(cm)

        fig, ax = plt.subplots()
        im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
        ax.figure.colorbar(im, ax=ax)
        # We want to show all ticks...
        ax.set(xticks=np.arange(cm.shape[1]),
               yticks=np.arange(cm.shape[0]),
               # ... and label them with the respective list entries
               # xticklabels=classes, yticklabels=classes,
               title=title,
               ylabel='True label',
               xlabel='Predicted label')
        ax.set_ylim(8.0, -1.0)
        # Rotate the tick labels and set their alignment.
        plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
                 rotation_mode="anchor")

        # Loop over data dimensions and create text annotations.
        fmt = '.2f' if normalize else 'd'
        thresh = cm.max() / 2.
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                ax.text(j, i, format(cm[i, j], fmt),
                        ha="center", va="center",
                        color="white" if cm[i, j] > thresh else "black")
        fig.tight_layout()
        return ax

    def print_normalized_confusion_matrix(self, y_test, test_predictions_baseline):
        np.set_printoptions(precision=2)

        # Plot non-normalized confusion matrix
        self.plot_confusion_matrix(y_test, test_predictions_baseline, classes=self.class_names,
                                   title='Confusion matrix, without normalization')

        # Plot normalized confusion matrix
        self.plot_confusion_matrix(y_test, test_predictions_baseline, classes=self.class_names, normalize=True,
                                   title='Normalized confusion matrix')

        plt.show()

    def plot_confusion_matrix_generic(self, labels2, predictions, test_run, p=0.5):
        cm = confusion_matrix(labels2.argmax(axis=1), predictions.argmax(axis=1))
        plt.figure(figsize=(6, 6))
        ax = sns.heatmap(cm, annot=True, fmt="d")
        ax.set_ylim(8.0, -1.0)
        plt.title('Confusion matrix')
        plt.ylabel('Actual label')
        plt.xlabel('Predicted label')
        plt.savefig(test_run)
        plt.subplots_adjust(top=0.94, bottom=0.11, left=0.12, right=1.00, hspace=0.20, wspace=0.18)
        plt.show()
        plt.close()

In [2]:
import csv
import os

import numpy as np
from sklearn import metrics


class FinalScore:
    def __init__(self, new_folder):
        self.new_folder = new_folder


    def odir_metrics(self, gt_data, pr_data):
        th = 0.5
        gt = gt_data.flatten()
        pr = pr_data.flatten()
        kappa = metrics.cohen_kappa_score(gt, pr > th)
        f1 = metrics.f1_score(gt, pr > th, average='micro')
        auc = metrics.roc_auc_score(gt, pr)
        final_score = (kappa + f1 + auc) / 3.0
        return kappa, f1, auc, final_score

    def import_data(self, filepath):
        with open(filepath, 'r') as f:
            reader = csv.reader(f)
            header = next(reader)
            pr_data = [[int(row[0])] + list(map(float, row[1:])) for row in reader]
        pr_data = np.array(pr_data)
        return pr_data

    def output(self):
        gt_data = self.import_data(os.path.join(self.new_folder, 'Eyeonic_ground_truth.csv')) #
        pr_data = self.import_data(os.path.join(self.new_folder, 'Eyeonic_predictions.csv'))
        kappa, f1, auc, final_score = self.odir_metrics(gt_data[:, 1:], pr_data[:, 1:])
        print("Kappa score:", kappa)
        print("F-1 score:", f1)
        print("AUC value:", auc)
        print("Final Score:", final_score)

In [3]:
import csv
import os


class Prediction:
    def __init__(self, prediction, num_images_test, folder = ""):
        self.prediction = prediction
        self.num_images_test = num_images_test
        self.folder = folder

    def save(self):
        """Generate a CSV that contains the output of all the classes.
        Args:
          No arguments are required.
        Returns:
          File with the output
        """
        # The process here is to generate a CSV file with the content of the data annotations file
        # and also the total of labels per eye. This will help us later to process the images
        if self.folder != "":
            folder_to_save = os.path.join(self.folder, 'predictions.csv')
        else:
            folder_to_save = 'predictions.csv'
        with open(folder_to_save, 'w', newline='') as csv_file:
            file_writer = csv.writer(csv_file, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            file_writer.writerow(['ID', 'Normal', 'Diabetes', 'Glaucoma', 'Cataract', 'AMD', 'Hypertension', 'Myopia', 'Others'])
            count = 0
            for sub in self.prediction:
                normal = sub[0]
                diabetes = sub[1]
                glaucoma = sub[2]
                cataract = sub[3]
                amd = sub[4]
                hypertension = sub[5]
                myopia = sub[6]
                others = sub[7]
                file_writer.writerow([count, normal, diabetes, glaucoma, cataract, amd, hypertension, myopia, others])
                count = count + 1

    def save_all(self, y_test):
        """Generate a CSV that contains the output of all the classes.
        Args:
          No arguments are required.
        Returns:
          File with the output
        """
        # The process here is to generate a CSV file with the content of the data annotations file
        # and also the total of labels per eye. This will help us later to process the images
        if self.folder != "":
            folder_to_save = os.path.join(self.folder, 'Eyeonic_predictions.csv')
        else:
            folder_to_save = 'Eyeonic_predictions.csv'
        with open(folder_to_save, 'w', newline='') as csv_file:
            file_writer = csv.writer(csv_file, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            file_writer.writerow(['ID', 'N', 'D', 'G', 'C', 'A', 'H', 'M', 'O'])
            count = 0
            for i in range(self.num_images_test):
                normal = self.prediction[i][0]
                diabetes = self.prediction[i][1]
                glaucoma = self.prediction[i][2]
                cataract = self.prediction[i][3]
                amd = self.prediction[i][4]
                hypertension = self.prediction[i][5]
                myopia = self.prediction[i][6]
                others = self.prediction[i][7]
                file_writer.writerow([count, normal, diabetes, glaucoma, cataract, amd, hypertension, myopia, others])
                count = count + 1

        if self.folder != "":
            folder_to_save = os.path.join(self.folder, 'Eyeonic_ground_truth.csv')
        else:
            folder_to_save = 'Eyeonic_ground_truth.csv'
        with open(folder_to_save, 'w', newline='') as csv_file:
            file_writer = csv.writer(csv_file, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            file_writer.writerow(['ID', 'N', 'D', 'G', 'C', 'A', 'H', 'M', 'O'])
            count = 0
            for i in range(self.num_images_test):
                normal2 = y_test[i][0]
                diabetes2 = y_test[i][1]
                glaucoma2 = y_test[i][2]
                cataract2 = y_test[i][3]
                amd2 = y_test[i][4]
                hypertension2 = y_test[i][5]
                myopia2 = y_test[i][6]
                others2 = y_test[i][7]

                file_writer.writerow([count, normal2, diabetes2, glaucoma2, cataract2, amd2, hypertension2, myopia2, others2])
                count = count + 1

In [4]:
#loading data 
import numpy as np


def load_data(image_size, index = 0, challenge = 0):

    if index == 0:
        x_train = np.load('odir_training' + '_' + str(image_size) + '.npy')
        y_train = np.load('odir_training_labels' + '_' + str(image_size) + '.npy')
    else:
        x_train = np.load('odir_training' + '_' + str(image_size) + '_'  + '.npy')
        y_train = np.load('odir_training_labels' + '_' + str(image_size) + '_' + str(index) + '.npy')

    if challenge == 0:
        x_test = np.load('odir_testing'+'_' + str(image_size)+'.npy')
        y_test = np.load('odir_testing_labels'+'_' + str(image_size)+'.npy')
    else:
        x_test = np.load('odir_testing_challenge'+'_' + str(image_size)+'.npy')
        y_test = np.load('odir_testing_labels_challenge'+'_' + str(image_size)+'.npy')

    return (x_train, y_train), (x_test, y_test)

# InceptionResNetV2

In [None]:
import os
import tensorflow as tf
from tensorflow.keras.applications import inception_resnet_v2
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.optimizers import legacy
import secrets
import matplotlib.pyplot as plt
from tensorflow.keras.optimizers import SGD
from sklearn.utils import class_weight
import numpy as np

# batch_size = 32
batch_size = 16
num_classes = 8
# epochs = 100
epochs = 3
patience = 3
freeze_layers = 2

# class_weight = {0: 1.,
#                 1: 1.583802025,
#                 2: 8.996805112,
#                 3: 10.24,
#                 4: 10.05714286,
#                 5: 1.,
#                 6: 1.,
#                 7: 2.505338078}

token = secrets.token_hex(16)
folder = r'/content/drive/MyDrive/AIMedic/OcularDisease/InceptionResNetV2_basic_outputs'

new_folder = os.path.join(folder, token)

if not os.path.exists(new_folder):
    os.makedirs(new_folder)

base_model = inception_resnet_v2.InceptionResNetV2

base_model = base_model(weights='imagenet', include_top=False)

# Comment this out if you want to train all layers
# for layer in base_model.layers:
#     layer.trainable = True

x = base_model.output
#x = Flatten()(x)
#x = Dropout(0.5)(x)
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
predictions = Dense(num_classes, activation='sigmoid')(x)
model = Model(inputs=base_model.input, outputs=predictions)

model.summary()

tf.keras.utils.plot_model(model, to_file=os.path.join(new_folder, 'model_inception_resnet_v2.png'), show_shapes=True, show_layer_names=True)

defined_metrics = [
    tf.keras.metrics.BinaryAccuracy(name='accuracy'),
    tf.keras.metrics.Precision(name='precision'),
    tf.keras.metrics.Recall(name='recall'),
    tf.keras.metrics.AUC(name='auc'),
]

sgd = legacy.SGD(lr=0.0001, decay=1e-6, momentum=0.9, nesterov=True)
print('Configuration Start -------------------------')
print(sgd.get_config())
print('Configuration End -------------------------')
model.compile(loss='binary_crossentropy', optimizer=sgd, metrics=defined_metrics)

#________________exists above____________________
(x_train, y_train), (x_test, y_test) = load_data(224, 1)

x_test_drawing = x_test

class_names = ['Normal', 'Diabetes', 'Glaucoma', 'Cataract', 'AMD', 'Hypertension', 'Myopia', 'Others']

# plot data input
plotter = Plotter(class_names)

callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=patience, mode='min', verbose=1)

In [None]:
history = model.fit(x_train, y_train,
                    epochs=epochs,
                    batch_size=batch_size,
                    shuffle=True, #class_weight= class_weight,
                    validation_data=(x_test, y_test), callbacks=[callback])

In [None]:
print("saving weights")
model.save(os.path.join(new_folder, 'model_weights.h5'))

print("plotting metrics")
plotter.plot_metrics(history, os.path.join(new_folder, 'plot1.png'), 2)

print("plotting accuracy")
plotter.plot_accuracy(history, os.path.join(new_folder, 'plot2.png'))

print("display the content of the model")
baseline_results = model.evaluate(x_test, y_test, verbose=2)
for name, value in zip(model.metrics_names, baseline_results):
    print(name, ': ', value)
print()

# test a prediction
test_predictions_baseline = model.predict(x_test)
print("plotting confusion matrix")
plotter.plot_confusion_matrix_generic(y_test, test_predictions_baseline, os.path.join(new_folder, 'plot3.png'), 0)

# save the predictions
prediction_writer = Prediction(test_predictions_baseline, 400, new_folder)
prediction_writer.save()
prediction_writer.save_all(y_test)

# show the final score
score = FinalScore(new_folder)
score.output()

In [None]:
# plot output results
plotter.plot_output(test_predictions_baseline, y_test, x_test_drawing, os.path.join(new_folder, 'plot4.png'))

In [None]:
# Code for output

In [6]:
#

In [None]:
#our final model complete 