# Ocular Disease Recognition --> tricks and multi-class predictions
## CNN (Convolutional Neural Networks)

Convolutional Neural Networks, or CNNs for short, are a powerful type of neural network commonly used in computer vision tasks. They are particularly well-suited to tasks like image classification and object detection because they are able to automatically learn and extract relevant features from input images. CNNs consist of multiple layers, each of which performs a different type of processing on the input data. These layers typically include convolutional layers, which extract features from the input images, and pooling layers, which downsample the output of the convolutional layers. By stacking these layers on top of one another, a CNN is able to learn increasingly complex representations of the input data.

In [None]:
# Import necessary packages
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sb

## Dataset



Ocular Disease Intelligent Recognition (ODIR) is a structured ophthalmic database of 5,000 patients with age, color fundus photographs from left and right eyes and doctors' diagnostic keywords from doctors.

This dataset is meant to represent ‘‘real-life’’ set of patient information collected by Shanggong Medical Technology Co., Ltd. from different hospitals/medical centers in China. In these institutions, fundus images are captured by various cameras in the market, such as Canon, Zeiss and Kowa, resulting into varied image resolutions.

Annotations were labeled by trained human readers with quality control management. They classify patient into eight labels including:

* Normal (N),
* Diabetes (D),
* Glaucoma (G),
* Cataract (C),
* Age related Macular Degeneration (A),
* Hypertension (H),
* Pathological Myopia (M),
* Other diseases/abnormalities (O)



In [None]:
class_name_dict = {
    "N": "Normal",
    "D": "Diabetes",
    "G": "Glaucoma",
    "C": "Cataract",
    "A": "Age related Macular Degeneration",
    "H": "Hypertension",
    "M": "Pathological Myopia",
}

## 1. Exploration

Read the data from `csv` files.

- What are medically useful labels to train a deep learning network on?
- Are there any biases we need to consider?
- Do we have to worry about "data leakage", something that often comes from having many datapoints for the same patients ?

In [None]:
path_data = "../../../Data/ocular_disease_recognition/"

metadata = pd.read_csv(os.path.join(path_data, "metadata.csv"))
metadata.head()

In [None]:
metadata.shape

In [None]:
label_column = ['N', 'D', 'G', 'C', 'A', 'H', 'M', 'O']
metadata[label_column].sum(axis=1).value_counts()

In [None]:
metadata[label_column].sum(axis=0)

## Have a look at some of the images

In [None]:
path_images = "../../../Data/ocular_disease_recognition/preprocessed_images"

# Pick 9 random images
np.random.seed(1)
random_images = np.random.choice(metadata.filename, 9)

# Adjust the size of your images
plt.figure(figsize=(10,10))

# Iterate and plot random images
for i, filename in enumerate(random_images):
    plt.subplot(3, 3, i + 1)
    img = plt.imread(os.path.join(path_images, filename))
    plt.imshow(img)
    plt.axis('off')
    
# Adjust subplot parameters to give specified padding
plt.tight_layout()    

## Train/Test split

In [None]:
metadata.head(2)

In [None]:
metadata.ID.value_counts()

In [None]:
from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split(metadata, test_size=0.15,
                                     random_state=0
                                    )
print(f"Training set size: {train_df.shape}")
print(f"Test set size: {test_df.shape}")

In [None]:
# second split
train_df, val_df = train_test_split(train_df, test_size=0.15,
                                    random_state=0
                                    )
print(f"Training set size: {train_df.shape}")
print(f"Validation set size: {val_df.shape}")
print(f"Test set size: {test_df.shape}")

## Data generators!

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Normalize images
image_generator = ImageDataGenerator(
    rescale=1.0/255
)

# Pick your label column(s)
label_column = # add the label-columns we want to predict

# Define the data generators
train_generator = image_generator.flow_from_dataframe(
    dataframe=train_df,
    directory=path_images,
    x_col="filename",
    y_col=label_column,
    target_size=(320, 320),
    batch_size=32,
    class_mode="raw",
    color_mode=#add color mode
)

val_generator = image_generator.flow_from_dataframe(
    dataframe=val_df,
    directory=path_images,
    x_col="filename",
    y_col=label_column,
    target_size=(320, 320),
    batch_size=32,
    class_mode="raw",
    color_mode=#add color mode,
    shuffle=False,  # this is crucial for later evaluation!
)

## Let's build a CNN and train it
- last time we used a simple CNN we designed from scratch

In [None]:
from keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense
from keras.models import Model
from keras.preprocessing.image import ImageDataGenerator


# Define the input shape
inputs = Input(shape=(320, 320, 3))

# Define the CNN architecture
x = Conv2D(32, (3, 3), activation='relu')(inputs)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(128, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(128, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(128, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Flatten()(x)
x = Dense(64, activation='relu')(x)
outputs = Dense(#add code here, activation=#add code here)(x)

# Create the model
model = Model(inputs=inputs, outputs=outputs)

model.summary()

In [None]:
import tensorflow as tf

metrics = [
    'accuracy',
    tf.keras.metrics.Precision(name='precision'),
    tf.keras.metrics.Recall(name='recall')
]

# Compile the model
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics="accuracy") #metrics)

In [None]:
# Train the model
history = model.fit(
    train_generator,
    epochs=10,
    validation_data=val_generator,
    verbose=1
)

In [None]:
fix, (ax1, ax2) = plt.subplots(2, figsize=(6, 10))

# summarize history for accuracy
ax1.plot(history.history['accuracy'], "o--")
ax1.plot(history.history['val_accuracy'], "o--")
ax1.set_title('model accuracy')
ax1.set_ylabel('accuracy')
ax1.set_xlabel('epoch')
ax1.legend(['train', 'test'], loc='upper left')

# summarize history for loss
ax2.plot(history.history['loss'], "o--")
ax2.plot(history.history['val_loss'], "o--")
ax2.set_title('model loss')
ax2.set_ylabel('loss')
ax2.set_xlabel('epoch')
ax2.legend(['train', 'test'], loc='upper left')
plt.show()

# Model evaluation

In [None]:
y_pred = model.predict(val_generator)
y_true = val_generator.labels

In [None]:
# Code for multi-class model
from sklearn.metrics import roc_curve, auc


# Compute the ROC curve and AUC for each class
fpr = dict()
tpr = dict()
roc_auc = dict()

classes = label_column

for i in range(len(classes)):
    fpr[i], tpr[i], _ = roc_curve(y_true[:, i], y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Plot the ROC curve
fig, ax = plt.subplots(figsize=(8, 8))

# Get class names
class_names = list(classes)

for i, label in enumerate(classes):
    ax.plot(fpr[i], tpr[i],
            label=f"ROC curve -> {class_name_dict[label]} (area = {roc_auc[i]:.2f})")

ax.plot([0, 1], [0, 1], 'k--')
ax.set_xlim([0.0, 1.0])
ax.set_ylim([0.0, 1.0])
ax.set_xlabel('False Positive Rate')
ax.set_ylabel('True Positive Rate')
ax.set_title('Receiver Operating Characteristic')
ax.legend(loc="lower right")
plt.grid(True)
plt.show()

## Use transfer learning!

In [None]:
from tensorflow.keras.applications import DenseNet121
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam

# Load the DenseNet121 model but exclude the top layer (classification layer)
base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(320, 320, 3))

# Add your own top layer for classification
x = base_model.output
x = GlobalAveragePooling2D()(x)

# complete code here

# Create the actual model
model = Model(inputs=base_model.input, outputs=predictions)
model.summary()

### Account for biases in the data by custom-made loss function

In [None]:
# Calculate class weights
class_weights = train_df[label_column].sum(axis=0)
class_weights = class_weights.sum() / class_weights 

# Ensure class weights sum up to 1
class_weights = class_weights / class_weights.sum()
class_weights

In [None]:
from tensorflow.keras.losses import binary_crossentropy
import tensorflow as tf

def weighted_binary_crossentropy(class_weights):
    class_weights = tf.constant([class_weights[k] for k in sorted(class_weights.keys())], dtype=tf.float32)  # ensure class_weights is float32

    def _weighted_binary_crossentropy(y_true, y_pred):
        y_true = tf.cast(y_true, tf.float32)  # ensure y_true is float32
        y_pred = tf.cast(y_pred, tf.float32)  # ensure y_pred is float32
        crossentropy = binary_crossentropy(y_true, y_pred)
        weight_vector = tf.reduce_sum(class_weights * y_true, axis=-1)
        weighted_loss = weight_vector * crossentropy
        return tf.reduce_mean(weighted_loss)

    return _weighted_binary_crossentropy


In [None]:
# First: train only the top layers (which were randomly initialized)
for layer in base_model.layers:
    layer.trainable = False

metrics = [
    'accuracy',
    tf.keras.metrics.Precision(name='precision'),
    tf.keras.metrics.Recall(name='recall')
]

# Compile the model
model.compile(optimizer=Adam(),
              #loss='binary_crossentropy',
              loss=weighted_binary_crossentropy(class_weights),
              metrics=metrics)

In [None]:
# Train the model
history = model.fit(
    train_generator, 
    validation_data=val_generator,
    epochs=10,
    verbose=1
)

In [None]:
fix, (ax1, ax2) = plt.subplots(2, figsize=(6, 10))

# summarize history for accuracy
ax1.plot(history.history['accuracy'], "o--")
ax1.plot(history.history['val_accuracy'], "o--")
ax1.set_title('model accuracy')
ax1.set_ylabel('accuracy')
ax1.set_xlabel('epoch')
ax1.legend(['train', 'test'], loc='upper left')

# summarize history for loss
ax2.plot(history.history['loss'], "o--")
ax2.plot(history.history['val_loss'], "o--")
ax2.set_title('model loss')
ax2.set_ylabel('loss')
ax2.set_xlabel('epoch')
ax2.legend(['train', 'test'], loc='upper left')
plt.show()

## Evaluate the model

In [None]:
y_pred = model.predict(val_generator)

In [None]:
y_true = val_generator.labels

In [None]:
# Code for multi-class model
from sklearn.metrics import roc_curve, auc


# Compute the ROC curve and AUC for each class
fpr = dict()
tpr = dict()
roc_auc = dict()

classes = label_column

for i in range(len(classes)):
    fpr[i], tpr[i], _ = roc_curve(y_true[:, i], y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Plot the ROC curve
fig, ax = plt.subplots(figsize=(8, 8))

# Get class names
class_names = list(classes)

for i, label in enumerate(classes):
    ax.plot(fpr[i], tpr[i],
            label=f"ROC curve -> {class_name_dict[label]} (area = {roc_auc[i]:.2f})")

ax.plot([0, 1], [0, 1], 'k--')
ax.set_xlim([0.0, 1.0])
ax.set_ylim([0.0, 1.0])
ax.set_xlabel('False Positive Rate')
ax.set_ylabel('True Positive Rate')
ax.set_title('Receiver Operating Characteristic')
ax.legend(loc="lower right")
plt.grid(True)
plt.show()

## Improve further....
- Longer training
- Lower learning rate (maybe)
- add data augmentation to image generator (slight rotation and zoom)

## Another option: fine tuning

In [None]:
print(f"Our model has {len(model.layers)} layers!")

In [None]:
# Now: unfreeze some of the base model layers and do a second pass of training
for layer in model.layers[:300]:
    layer.trainable = False
for layer in model.layers[300:]:
    layer.trainable = True

# We use a smaller learning rate for fine-tuning
model.compile(optimizer=Adam(learning_rate=0.0001),
              #loss='binary_crossentropy',
              loss=weighted_binary_crossentropy(class_weights),
              metrics=['accuracy'])

history = model.fit(
    train_generator, 
    validation_data=val_generator,
    class_weight=class_weights,
    epochs=10
)

In [None]:
fix, (ax1, ax2) = plt.subplots(2, figsize=(6, 10))

# summarize history for accuracy
ax1.plot(history.history['accuracy'], "o--")
ax1.plot(history.history['val_accuracy'], "o--")
ax1.set_title('model accuracy')
ax1.set_ylabel('accuracy')
ax1.set_xlabel('epoch')
ax1.legend(['train', 'test'], loc='upper left')

# summarize history for loss
ax2.plot(history.history['loss'], "o--")
ax2.plot(history.history['val_loss'], "o--")
ax2.set_title('model loss')
ax2.set_ylabel('loss')
ax2.set_xlabel('epoch')
ax2.legend(['train', 'test'], loc='upper left')
plt.show()

## Evaluate the model

In [None]:
y_pred = model.predict(val_generator)

In [None]:
y_true = val_generator.labels

In [None]:
# Code for multi-class model
from sklearn.metrics import roc_curve, auc


# Compute the ROC curve and AUC for each class
fpr = dict()
tpr = dict()
roc_auc = dict()

classes = label_column

for i in range(len(classes)):
    fpr[i], tpr[i], _ = roc_curve(y_true[:, i], y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Plot the ROC curve
fig, ax = plt.subplots(figsize=(8, 8))

# Get class names
class_names = list(classes)

for i, label in enumerate(classes):
    ax.plot(fpr[i], tpr[i],
            label=f"ROC curve -> {class_name_dict[label]} (area = {roc_auc[i]:.2f})")

ax.plot([0, 1], [0, 1], 'k--')
ax.set_xlim([0.0, 1.0])
ax.set_ylim([0.0, 1.0])
ax.set_xlabel('False Positive Rate')
ax.set_ylabel('True Positive Rate')
ax.set_title('Receiver Operating Characteristic')
ax.legend(loc="lower right")
plt.grid(True)
plt.show()