# <strong>Retina Disease Classification</strong>
In Opthalmology, there are a plethora of diseases and disorders. When a patient comes to a doctor for a consultation, there are too many diseases to map the symptoms to. In other words, the search space is too huge and at times it might lead to a patients being subject to treatment for a disease which he/she did not actually have. Hence the aim of this notebook is to make an attempt to leverage the power of state of the art CNNs for Multilabel classification of retinal fundus images. Moreover, we generally prefer on getting another perceptive to the disease diagnosis which is implemented as a part of Deep Ensemble Learning. Finally, the problem statement for the work is defined as below.
>"Multilabel Classification of Retinal Diseases using Deep Ensemble Learning to built a system to help reduce the search space for the disease diagnosis in Opthalmology reducing the chances of misdiagnosis."

The architecture aimed to built comprises two classifiers: 
- Binary Classifier to classify whether the eye is infected or not.
- Multilabel Classifier to provide the most probable diseases the retinal image is susceptible to.

![image.png](attachment:c7e7eba6-0516-419d-84d4-d72a040da529.png)

# Importing required modules

In [None]:
# Modules required for data handling and visualisation
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns
sns.set_style("whitegrid")

# Modules required for accessing the file system.
import os

# Modules used for model training and transfer learning
import tensorflow as tf
from tensorflow.keras.layers import Dense,Flatten
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
import tensorflow.keras.applications as cnns
from tensorflow.keras import Model

# Modules used for model evaluation and selection.
from sklearn.metrics import accuracy_score, classification_report, f1_score


# Disabling logging
import logging, os
logging.disable(logging.WARNING)
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

# Constants
TRAIN_CSV_DIR = "/kaggle/input/retinal-disease-classification/Training_Set/Training_Set/RFMiD_Training_Labels.csv"
TEST_CSV_DIR = "/kaggle/input/retinal-disease-classification/Test_Set/Test_Set/RFMiD_Testing_Labels.csv"
VAL_CSV_DIR = "/kaggle/input/retinal-disease-classification/Evaluation_Set/Evaluation_Set/RFMiD_Validation_Labels.csv"

In [None]:
tf.config.list_physical_devices()

In [None]:
tf.debugging.set_log_device_placement(False)

# Exploratory Data Analysis

Since, there are too many classes there is a chance of class imbalance to verify it we analyse the class distribution.

In [None]:
train_labels = pd.read_csv(TRAIN_CSV_DIR)
test_labels = pd.read_csv(TEST_CSV_DIR)
val_labels = pd.read_csv(VAL_CSV_DIR)

In [None]:
train_labels.head()

In [None]:
disease_risk_col = dict(train_labels["Disease_Risk"].value_counts())
col_labels = list(disease_risk_col.keys())
label_freq = list(disease_risk_col.values())
sns.barplot(x = col_labels,
            y = label_freq,
            edgecolor = "black")
plt.title("Risk of Disease of Retina Images")
plt.xlabel("Column Values")
plt.ylabel("Value Frequency")
plt.show()

In [None]:
infected_retinas = dict(train_labels.drop(["ID", "Disease_Risk"], axis=1).sum())
retina_infections = list(infected_retinas.keys())
infection_freq = list(infected_retinas.values())
plt.figure(figsize = (10,10))
sns.barplot(y = retina_infections,
            x = infection_freq,
            edgecolor = "black",
            orient = "h")
plt.title("Distribution of Retina Images across various Eye Infections")
plt.xlabel("Type of Infection")
plt.ylabel("Number of Images pertaining to the infection")
plt.show()

From the class distribution we can make the following inferences:
- The diseases OPDM and HR have no samples for training and hence will be dropped. 
- The aim of the model is to predict different diseases so for multilabel classification, Disease Risk column is not relevant. However, we can built a binary classifier for predicting whether the retinal image poses a risk of any disease or not. Based on the outcome of the binary classifier, we will use the multilabel classifier.
- We also observe that there is a lot of class imbalance and hence, for classes below a certain threshold we will consider them into a category of "Other" category.

# Training a binary classifier for Disease Risk Feature

In [None]:
TRAIN_IMG_DIR="/kaggle/input/retinal-disease-classification/Training_Set/Training_Set/Training"
TEST_IMG_DIR = "/kaggle/input/retinal-disease-classification/Test_Set/Test_Set/Test"
VAL_IMG_DIR = "/kaggle/input/retinal-disease-classification/Evaluation_Set/Evaluation_Set/Validation"
def id_to_path(id : int, IMG_DIR : str):
    return os.path.join(IMG_DIR, str(id) + ".png")

In [None]:
train_labels["IMG_DIR"] = TRAIN_IMG_DIR
train_labels["IMG_DIR"] = train_labels.apply(lambda x: id_to_path(x.ID, x.IMG_DIR), 
                                             axis=1)
test_labels["IMG_DIR"] = TEST_IMG_DIR
test_labels["IMG_DIR"] = test_labels.apply(lambda x: id_to_path(x.ID, x.IMG_DIR), 
                                           axis=1)
val_labels["IMG_DIR"] = VAL_IMG_DIR
val_labels["IMG_DIR"] = val_labels.apply(lambda x: id_to_path(x.ID, x.IMG_DIR), 
                                         axis=1)

In [None]:
class ImagePreprocessor:
    def __init__(self, cnn_variant):
        self.cnn_variant = cnn_variant
        self.train_gen = None
        self.test_gen = None
        self.val_gen = None
        
    def create_generators(self):
        self.train_gen = tf.keras.preprocessing.image.ImageDataGenerator(
            preprocessing_function=self.cnn_variant.preprocess_input,
        )

        self.test_gen = tf.keras.preprocessing.image.ImageDataGenerator(
            preprocessing_function=self.cnn_variant.preprocess_input
        )

        self.val_gen =  tf.keras.preprocessing.image.ImageDataGenerator(
            preprocessing_function=self.cnn_variant.preprocess_input
        )
        
    def preprocess(self):
        train_images = self.train_gen.flow_from_dataframe(
              train_labels,
              x_col="IMG_DIR", 
              y_col="Disease_Risk",
              target_size=(200,200),
              color_mode='rgb',
              batch_size=32,
              class_mode="binary",
              shuffle=False
        )

        val_images = self.val_gen.flow_from_dataframe(
              test_labels,
              x_col="IMG_DIR", 
              y_col="Disease_Risk",
              target_size=(200,200),
              color_mode='rgb',
              batch_size=32,
              class_mode="binary",
              shuffle=False
        )

        test_images = self.test_gen.flow_from_dataframe(
              val_labels,
              x_col="IMG_DIR", 
              y_col="Disease_Risk",
              color_mode='rgb',
              target_size=(200,200),
              batch_size=32,
              class_mode="binary",
              shuffle=False
        )

        return train_images, val_images, test_images

In [None]:
train_labels["Disease_Risk"] = train_labels["Disease_Risk"].astype(str)
test_labels["Disease_Risk"] = test_labels["Disease_Risk"].astype(str)
val_labels["Disease_Risk"] = val_labels["Disease_Risk"].astype(str)

In [None]:
class TransferLearning:

    def __init__(self, train, val, model) -> None:
        self.train = train
        self.val = val
        self.model = model
        self.history = None
        self.fine_tune_from = 100
        self.INPUT_SIZE = (200,200,3)
    
    def mark_layers_non_trainable(self):
        for layer in self.model.layers[:self.fine_tune_from]:
            layer.trainable = False
        
        for layer in self.model.layers[self.fine_tune_from:]:
            layer.trainable = True
    
    def add_final_layer(self):
        self.x = Flatten()(self.model.output)
        self.x = Dense(1000, activation='relu')(self.x)
        self.predictions = Dense(1, activation = 'sigmoid')(self.x)

    def compile_model(self):
        self.model = Model(inputs = self.model.input, 
                           outputs = self.predictions)
        self.model.compile(optimizer='adam', 
                           loss="binary_crossentropy", 
                           metrics=['accuracy',
                                    tf.keras.metrics.AUC(name="auc",from_logits=True),
                                    tf.keras.metrics.FalseNegatives(name="false_negatives"),
                                    tf.keras.metrics.FalsePositives(name="false_positives"),
                                    tf.keras.metrics.Precision(name="precision"),
                                    tf.keras.metrics.Recall(name="recall")])
    
    def train_model(self):
        self.history = self.model.fit(self.train,
                                      batch_size=32, 
                                      epochs=100, 
                                      validation_data=self.val,
                                      callbacks=[
                                        tf.keras.callbacks.EarlyStopping(
                                            monitor='val_loss',
                                            patience=3,
                                            restore_best_weights=True
                                        )
                                     ])

In [None]:
def experiment_model(preprocessing_function, model):
    preprocessor = ImagePreprocessor(preprocessing_function)
    preprocessor.create_generators()
    train, val, test = preprocessor.preprocess()
    modelbuilder = TransferLearning(train, val, model)
    modelbuilder.mark_layers_non_trainable()
    modelbuilder.add_final_layer()
    modelbuilder.compile_model()
    modelbuilder.train_model()
    return modelbuilder.model, modelbuilder.history, test

In [None]:
class ModelEvaluator:
    def __init__(self, history, classes : list, model, test):
        self.history = history
        self.model = model
        self.test = test
        self.classes = classes
        self.num_classes = len(self.classes)
        self.predictions = None
        self.t = 0.5
    
    def __plot_metric(self, ax, metric: str,train_metric : str, val_metric : str):
        
        ax.plot(self.history.history[train_metric])
        ax.plot(self.history.history[val_metric])
        ax.title.set_text(train_metric + " v/s " + val_metric)
        ax.set_xlabel('Epochs')
        ax.set_ylabel(metric.title())
        ax.legend(['Train','Val'])
    
    def training_history(self):
        rows = 4
        columns = 2
        c = 0
        metrics = ["loss", "accuracy", "auc", "false_negatives", "false_positives", "precision", "recall"]
        fig, axs = plt.subplots(rows, columns,figsize=(15,15))
        for metric in metrics:
            self.__plot_metric(axs[c//columns, c%columns], 
                               metric.replace("_", " ").title(), 
                               metric, "val_"+metric)
            c += 1
        fig.suptitle("Training History of the Model")
        plt.subplots_adjust(bottom=0.1, top=0.9, hspace=0.5)
        plt.show()
    
    def __clip_sigmoid_output(self, p : float):
        if p >= self.t:
            return 1
        else:
            return 0
    
    def predict(self):
        self.predictions = self.model.predict(self.test).flatten()
        self.predictions = list(map(self.__clip_sigmoid_output, self.predictions))
    
    def class_report(self):
        sns.heatmap(tf.math.confusion_matrix(self.test.labels, self.predictions, num_classes=2), 
                    annot=True, cmap="crest")
        print(classification_report(self.test.labels, self.predictions, target_names=self.classes))

In [None]:
class ThresholdTuner:
    def __init__(self, model, test):
        self.model = model
        self.test = test
        self.predictions_raw = None
        self.f1_scores = []
    
    def __predict_test(self):
        self.predictions_raw = self.model.predict(self.test).flatten()
    
    def __clip_logits(self, t : float, p : float):
        if p >= t:
            return 1
        else:
            return 0
    
    def tune_threshold(self):
        self.__predict_test()
        self.ts = np.arange(0, 1.0, 0.001)
        for threshold in self.ts:
            pred = list(map(lambda p: self.__clip_logits(threshold, p), 
                            self.predictions_raw))
            self.f1_scores.append(f1_score(self.test.labels, pred))
        
        plt.plot(self.ts, self.f1_scores)
        max_f1_score = max(self.f1_scores)
        max_threshold = self.ts[np.argmax(self.f1_scores)]
        plt.plot(max_threshold, max_f1_score, 'ro')
        plt.plot(0.50, self.f1_scores[500], 'go')
        plt.text(max_threshold, max_f1_score+0.1, '({}, {})'.format(max_threshold, max_f1_score))
        plt.title("Tuning of the Threshold")
        plt.xlabel("Threshold Value")
        plt.ylabel("F1 score")
        plt.show()

# Experimentation of models for Binary Classification

## EfficientNetB4

In [None]:
efficientnet, history, efficientnet_test = experiment_model(cnns.efficientnet, 
                                                    cnns.efficientnet.EfficientNetB4(include_top=False, 
                                                    input_shape=(200,200,3)))

In [None]:
me = ModelEvaluator(history, ["Not Infected", "Infected"], efficientnet, efficientnet_test)

In [None]:
me.training_history()
me.predict()

In [None]:
me.class_report()

In [None]:
tuner = ThresholdTuner(efficientnet, efficientnet_test)

In [None]:
tuner.tune_threshold()

In [None]:
print(tuner.ts[np.argmax(tuner.f1_scores)], max(tuner.f1_scores))

In [None]:
def clip_predictions(x):
    if x > 0.533:
        return 1
    else:
        return 0

In [None]:
p = list(map(clip_predictions, tuner.predictions_raw))

In [None]:
print(classification_report(efficientnet_test.labels, p, target_names=["Not Infected", "Infected"]))

## EfficientNetB5

In [None]:
efficientnet, history, efficientnet_test = experiment_model(cnns.efficientnet, 
                                                    cnns.efficientnet.EfficientNetB5(include_top=False, 
                                                    input_shape=(200,200,3)))

In [None]:
me = ModelEvaluator(history, ["Not Infected", "Infected"], efficientnet, efficientnet_test)

In [None]:
me.training_history()
me.predict()

In [None]:
me.class_report()

In [None]:
tuner = ThresholdTuner(efficientnet, efficientnet_test)

In [None]:
tuner.tune_threshold()

In [None]:
print(tuner.ts[np.argmax(tuner.f1_scores)], max(tuner.f1_scores))