In [None]:
import tensorflow as tf
# from tensorflow.keras.layers.experimental import preprocessing
from tensorflow import keras
from tensorflow.keras import layers,Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.layers import Attention
# from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc, precision_recall_curve, accuracy_score
import matplotlib.pyplot as plt

In [None]:
from tensorflow.keras.applications.efficientnet_v2 import EfficientNetV2S, preprocess_input
from google.colab.patches import cv2_imshow
import pandas as pd
import numpy as np
import seaborn as sns
# import imutils
import time
import cv2
from cuml import SVC
# from sklearn.svm import SVC

In [None]:
from google.colab import drive
import os

drive.mount('/content/drive')

In [None]:
base_dir = '/content/drive/MyDrive/APTOS 2019 dataset'
train_dir = os.path.join(base_dir, 'train_images/train_images/')
validation_dir = os.path.join(base_dir, 'val_images/val_images/')
test_dir = os.path.join(base_dir, 'test_images/test_images/')

In [None]:
print(os.listdir(train_dir))
print(os.listdir(validation_dir))
print(os.listdir(test_dir))

In [None]:
train_path = "/content/drive/MyDrive/APTOS 2019 dataset/train_images/train_images/"
valid_path = "/content/drive/MyDrive/APTOS 2019 dataset/val_images/val_images/"
test_path = "/content/drive/MyDrive/APTOS 2019 dataset/test_images/test_images/"

In [None]:
NUM_CLASSES = 2
epochs = 20

In [None]:
img_augmentation = Sequential(
    [
        tf.keras.layers.RandomRotation(factor=(-0.15, 0.15)),
        tf.keras.layers.RandomTranslation(height_factor=0.1, width_factor=0.1),
        tf.keras.layers.RandomFlip(),
        tf.keras.layers.RandomContrast(factor=0.1),
    ],
    name="img_augmentation",
)

In [None]:
def unfreeze_model(model):
    # Unfreeze the top 10 layers while leaving BatchNorm layers frozen for fine-tuning
    for layer in model.layers[-10:]:
        if not isinstance(layer, layers.BatchNormalization):
            layer.trainable = True
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
    model.compile(optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"])

In [None]:
def test_model(model,test_batches):
    #Testing the Model
    test_labels = test_batches.classes
    print("Test Labels",test_labels)
    print(test_batches.class_indices)

    predictions = model.predict(test_batches,steps=len(test_batches),verbose=0)

    acc = 0
    for i in range(len(test_labels)):
        actual_class = test_labels[i]
        if predictions[i][actual_class] > 0.5:
            acc += 1
    print("Accuarcy:",(acc/len(test_labels))*100,"%")

In [None]:
#@title create train, test and valid data
def load_data():
    train = pd.read_csv('/content/drive/MyDrive/APTOS 2019 dataset/train_1 - 2 class.csv', encoding='utf-8')
    test = pd.read_csv('/content/drive/MyDrive/APTOS 2019 dataset/test - 2 class.csv', encoding='utf-8')
    valid = pd.read_csv('/content/drive/MyDrive/APTOS 2019 dataset/val - 2 class.csv', encoding='utf-8')

    train_dir = os.path.join('/content/drive/MyDrive/APTOS 2019 dataset/train_images/train_images')
    test_dir = os.path.join('/content/drive/MyDrive/APTOS 2019 dataset/test_images/test_images')
    valid_dir = os.path.join('/content/drive/MyDrive/APTOS 2019 dataset/val_images/val_images')

    # Construct file paths directly within function:
    train['file_path'] = [os.path.join(train_dir, f'{id_code}.png') for id_code in train['id_code']]
    test['file_path'] = [os.path.join(test_dir, f'{id_code}.png') for id_code in test['id_code']]
    valid['file_path'] = [os.path.join(valid_dir, f'{id_code}.png') for id_code in valid['id_code']]

    # Construct file names using list comprehensions:
    train['train_images'] = [id_code + ".png" for id_code in train['id_code']]
    test['test_images'] = [id_code + ".png" for id_code in test['id_code']]
    valid['val_images'] = [id_code + ".png" for id_code in valid['id_code']]

    train['diagnosis'] = train['diagnosis'].astype(str)
    test['diagnosis'] = test['diagnosis'].astype(str)
    valid['diagnosis'] = valid['diagnosis'].astype(str)

    return train, test, valid

In [None]:
def inference_data():
  inference = pd.read_csv('/content/drive/MyDrive/APTOS 2019 dataset/HRF fundus image database/Collective labels.txt', sep = '\t')

  inference_dir = os.path.join('/content/drive/MyDrive/APTOS 2019 dataset/HRF fundus image database')

  inference['file_name'] = [os.path.join(inference_dir, f'{file_name}') for file_name in inference['file_name']]

  # inference['inference_images'] = [file_name + ".png" for file_name in inference["file_name"]]

  inference['label'] = inference['label'].astype(str)

  return inference

In [None]:
def inference_EyePACS():
  inference = pd.read_csv('/content/drive/MyDrive/APTOS 2019 dataset/EyePACS labels - trainLabels.csv', sep = ',')

  inference_dir = os.path.join('/content/drive/MyDrive/APTOS 2019 dataset/EyePACS Subset')

  inference['image'] = [os.path.join(inference_dir, f'{file_name}') for file_name in inference['image']]

  inference['image'] = [file_name + ".jpeg" for file_name in inference["image"]]

  inference['diagnosis'] = inference['diagnosis'].astype(str)

  return inference

In [None]:
def preprocess_image(img_path):
  img = load_img(img_path, target_size=(224, 224))
  img_array = img_to_array(img)
  img_array = np.expand_dims(img_array, axis=0)
  return preprocess_input(img_array)

In [None]:
# @title Compute GradCAM
def compute_gradcam(model, img_array, last_conv_layer_name, class_idx):
    grad_model = Model(
        inputs=[model.inputs],
        outputs=[model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array,training=False)
        class_channel = preds[:, class_idx]

    grads = tape.gradient(class_channel, last_conv_layer_output)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

In [None]:
# @title Display the GradCAM heatmap superimposed on top of image
def display_heatmap(original_img, heatmap, alpha=0.5):
    img = cv2.cvtColor(original_img, cv2.COLOR_BGR2RGB)
    heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
    heatmap = np.uint8(255 * heatmap)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)

    superimposed_img = heatmap * alpha + img
    superimposed_img = np.uint8(superimposed_img)

    plt.figure(figsize=(5, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(img)
    plt.title('Original Image')
    plt.axis('off')

    plt.subplot(1, 2, 2)
    plt.imshow(superimposed_img)
    plt.title('Grad-CAM Heatmap')
    plt.axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
# @title Display heatmap
def display_heatmap1(heatmap):
    # Resize the heatmap to a higher resolution
    heatmap_resized = cv2.resize(heatmap, (224, 224))  # Adjust the size as needed
    # Display the heatmap with improved granularity
    plt.imshow(heatmap_resized, cmap='jet', interpolation='bilinear')
    plt.colorbar()
    plt.show()

In [None]:
# @title Training
if __name__ == "__main__":
    train_df, test_df, valid_df = load_data()
    model = build_EfficientNetV2S_model_SVM_without_attention(NUM_CLASSES)
    unfreeze_model(model)

    train_batches = ImageDataGenerator(preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input).flow_from_dataframe(
        dataframe=train_df, x_col='file_path', y_col='diagnosis', target_size=(224,224), batch_size=32)
    valid_batches = ImageDataGenerator(preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input).flow_from_dataframe(
        dataframe=valid_df, x_col='file_path', y_col='diagnosis', target_size=(224,224), batch_size=32)
    test_batches = ImageDataGenerator(preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input).flow_from_dataframe(
        dataframe=test_df, x_col='file_path', y_col='diagnosis', target_size=(224,224), batch_size=32, shuffle=False)

    early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-4)
    history = model.fit(train_batches, epochs=epochs, validation_data=valid_batches, verbose=1,callbacks=[early_stopping,reduce_lr])

In [None]:
# model = keras.models.load_model('/content/drive/MyDrive/APTOS 2019 dataset/Saved models/1) Normal softmax classification with EfficientNetV2S with dropout.h5')
# model.save("/content/drive/MyDrive/APTOS 2019 dataset/Saved models/1) Normal softmax classification with EfficientNetV2S with dropout.h5")

In [None]:
# @title Accuracy-Loss plot
def plot_accuracy_loss(history):
    # Accuracy plot
    plt.figure(figsize=(14, 5))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(loc='upper left')

    # Loss plot
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(loc='upper left')

    plt.show()

plot_accuracy_loss(history)

In [None]:
# @title Inference on test data
for i in range(len(test_df)):
  img_path = test_df['file_path'][i]
  # original_img = cv2.imread(img_path)
  img_array = preprocess_image(img_path)
  predicted_label = model.predict(img_array)
  predicted_label = np.argmax(predicted_label)
  # last_conv_layer_name = "top_activation"  # Replace with the actual last conv layer name of your model
  class_idx = predicted_label  # Replace with the actual class index you want to visualize
  actual_label = test_df['diagnosis'][i]
  print(i,actual_label, predicted_label)

In [None]:
model.evaluate(test_batches)

In [None]:
#Testing the Model
test_model(model,test_batches)

In [None]:
softmax_test = (model.predict(test_batches) > 0.5).astype("int32")

In [None]:
print("Classification Report:\n", classification_report(test_batches.labels, softmax_test.argmax(axis=1)))

In [None]:
cms = confusion_matrix(test_batches.labels, softmax_test.argmax(axis=1))  #top_dropout softmax
sns.heatmap(cms, annot=True, cbar=False, fmt='d')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Inference Confusion Matrix')

In [None]:
index = 253
img_path = test_df['file_path'][index]
original_img = cv2.imread(img_path)
img_array = preprocess_image(img_path)
predicted_label = model.predict(img_array)
predicted_label = np.argmax(predicted_label)
last_conv_layer_name = "top_conv"  # Replace with the actual last conv layer name of model
actual_label = test_df['diagnosis'][index]
class_idx = predicted_label  # Replace with the actual class index to be visualized
print(actual_label, predicted_label)
heatmap = compute_gradcam(model, img_array, last_conv_layer_name, class_idx)
display_heatmap(original_img, heatmap)
display_heatmap1(heatmap)

In [None]:
test_model(model,test_batches)

In [None]:
softmax_pred = (model.predict(test_batches) > 0.5).astype("int32")

In [None]:
print("Classification Report:\n", classification_report(test_batches.labels, softmax_pred.argmax(axis=1)))

In [None]:
cms = confusion_matrix(test_batches.labels, softmax_pred.argmax(axis=1))  #top_dropout softmax
sns.heatmap(cms, annot=True, cbar=False, fmt='d')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Confusion Matrix')

## SVM

In [None]:
# Collect data from the generator
X_train, y_train = [], []
for _ in range(len(train_batches)):
    images, labels = next(train_batches)
    X_train.append(images)
    y_train.append(labels)

In [None]:
# Convert lists to arrays
X_train = np.vstack(X_train)
y_train = np.vstack(y_train)

In [None]:
X_train.shape

In [None]:
# Reshape X_train to be 2D (necessary for SVM)
X_train1 = X_train.reshape(X_train.shape[0], -1)

In [None]:
X_train1.shape

In [None]:
y_train.shape

In [None]:
# Collect data from the generator
X_test, y_test = [], []
for _ in range(len(test_batches)):
    images, labels = next(test_batches)
    X_test.append(images)
    y_test.append(labels)

In [None]:
# Convert lists to arrays
X_test = np.vstack(X_test)
y_test = np.vstack(y_test)

In [None]:
X_test.shape

In [None]:
# Reshape X_train to be 2D (necessary for SVM)
X_test1 = X_test.reshape(X_test.shape[0], -1)

In [None]:
X_test1.shape

In [None]:
y_test

In [None]:
# Initialize and train the SVC with verbose output
start = time.time()
svc = SVC(verbose=True,kernel='rbf')
svc.fit(X_train1, y_train.argmax(axis=1))  # Use argmax to get class indices if y_train is one-hot encoded
print('Running time: %.4f seconds' % (time.time()-start))

In [None]:
# Get predictions from SVC using test features
predictions_test = svc.predict(X_test1)

In [None]:
accuracy_test = accuracy_score(y_test.argmax(axis=1), predictions_test)
print("Testing Accuracy using SVM with Feature Extraction:", accuracy_test)

In [None]:
print(classification_report(test_batches.labels, predictions_test))

In [None]:
cmsvm = confusion_matrix(test_batches.labels, predictions_test)
sns.heatmap(cmsvm, annot=True, cbar=False, fmt='d')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Confusion Matrix')

In [None]:
# @title EffNet-SVM classifier
from tensorflow.python.keras.models import Model

In [None]:
model1 = tf.keras.Model(inputs=model.input, outputs=model.get_layer('top_conv').output)

In [None]:
features_array_train1 = model1.predict(X_train)

In [None]:
features_array_train1_reshape = features_array_train1.reshape(features_array_train1.shape[0], -1)

In [None]:
features_array_test1 = model1.predict(X_test)

In [None]:
features_array_test1_reshape = features_array_test1.reshape(features_array_test1.shape[0], -1)

In [None]:
X_test1 = X_test.reshape(X_test.shape[0], -1)

In [None]:
clf = SVC(verbose=True,kernel='rbf')
start = time.time()
# for _ in range(5):
clf.fit(features_array_train1_reshape, y_train.argmax(axis=1))   #Learned from training! SAVE THIS!!!! Edt : saved!
preds1 = clf.predict(features_array_test1_reshape)
print('Running time: %.4f seconds' % (time.time()-start))

In [None]:
# import pickle

# with open('/content/drive/MyDrive/APTOS 2019 dataset/Saved models/cuML trained EffNet-SVM model.pkl', 'wb') as f:
#     pickle.dump(clf, f)

# Load the model from the file using pickle
# with open('/content/drive/MyDrive/APTOS 2019 dataset/Saved models/cuML trained EffNet-SVM model.pkl', 'rb') as f:
#     clf = pickle.load(f)

# Use the loaded model for predictions
# predictions = clf_loaded.predict(features_array_test1_reshape)

In [None]:
print(accuracy_score(y_test.argmax(axis=1), preds1))  #top_dropout

In [None]:
cm = confusion_matrix(test_batches.labels, preds1)  #top_dropout
sns.heatmap(cm, annot=True, cbar=False, fmt='d')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Confusion Matrix')

In [None]:
print(classification_report(test_batches.labels, preds1))

In [None]:
# @title Inference on inference data using EffNet-SVM - Inference time

# infer_df = inference_data()
infer_df = inference_EyePACS()
inference_times = []

for _ in range(5):
  for i in range(len(infer_df)):
    # img_path = infer_df['file_name'][i]
    img_path = infer_df['image'][i]

    if not os.path.exists(img_path):
      # print(f"Image not found: {img_path}. Skipping...")
      continue
    try:
      # original_img = cv2.imread(img_path)
      img_array = preprocess_image(img_path)
      start_time = time.time()
      predicted_label = model1.predict(img_array)
      preds_inference = clf.predict(predicted_label.reshape(predicted_label.shape[0], -1))
      end_time = time.time()
      inference_time = end_time - start_time
      inference_times.append(inference_time)
      # last_conv_layer_name = "top_conv"  # Replace with the actual last conv layer name of your model
      # class_idx = predicted_label  # Replace with the actual class index you want to visualize
      # actual_label = infer_df['label'][i]
      actual_label = infer_df['diagnosis'][i]
      print(i,actual_label, preds_inference)

    except Exception as e:
      print(f"Error processing image {img_path}: {e}")
      continue

sum(inference_times) / len(inference_times)

In [None]:
inference_batches = ImageDataGenerator(preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input).flow_from_dataframe(
    dataframe=infer_df, x_col='image', y_col='diagnosis', target_size=(224,224), batch_size=32, shuffle=False)

In [None]:
model.evaluate(inference_batches)

In [None]:
test_model(model,inference_batches)

In [None]:
softmax_infer = (model.predict(inference_batches) > 0.5).astype("int32")

In [None]:
print("Classification Report:\n", classification_report(inference_batches.labels, softmax_infer.argmax(axis=1)))

In [None]:
cms = confusion_matrix(inference_batches.labels, softmax_infer.argmax(axis=1))  #top_dropout softmax
sns.heatmap(cms, annot=True, cbar=False, fmt='d')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Inference Confusion Matrix')

# Visual metrics

In [None]:
# @title ROC curve
def plot_roc_curves(y_true, y_pred_probas, labels):
    plt.figure()
    for i in range(len(y_pred_probas)):
        fpr, tpr, _ = roc_curve(y_true, y_pred_probas[i])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, lw=2, label=f'{labels[i]} (area = {roc_auc:.5f})')

    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc='lower right')
    plt.show()


y_true2 = test_batches.labels
y_pred_probas = [softmax_pred.argmax(axis=1),
    predictions_test,
    preds1]
labels = ['Softmax', 'SVC(RBF)', 'EffNetV2S-SVC']

plot_roc_curves(y_true2, y_pred_probas, labels)

In [None]:
# @title Precision-Recall curve
def plot_precision_recall_curves(y_true, y_pred_probas, labels):
    plt.figure()
    for i in range(len(y_pred_probas)):
        precision, recall, _ = precision_recall_curve(y_true, y_pred_probas[i])
        pr_auc = auc(recall, precision)
        plt.plot(recall, precision, lw=2, label=f'{labels[i]} (area = {pr_auc:.5f})')

    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.legend(loc='best')
    plt.show()

y_true2 = test_batches.labels
y_pred_probas = [softmax_pred.argmax(axis=1),
    predictions_test,
    preds1]
labels = ['Softmax', 'SVC(RBF)', 'EffNetV2S-SVC']

plot_precision_recall_curves(y_true2, y_pred_probas, labels)

In [None]:
# @title Calibration curve
def plot_calibration_curves(y_true, y_pred_probas, labels):
    plt.figure()
    for i in range(len(y_pred_probas)):
        prob_true, prob_pred = calibration_curve(y_true, y_pred_probas[i], n_bins=10)
        plt.plot(prob_pred, prob_true, marker='o', label=labels[i])

    plt.plot([0, 1], [0, 1], linestyle='--', color='black')
    plt.xlabel('Predicted Probabilities')
    plt.ylabel('True Probabilities')
    plt.title('Calibration Curves')
    plt.legend(loc='best')
    plt.show()

y_true2 = test_batches.labels
y_pred_probas = [softmax_pred.argmax(axis=1),
    predictions_test,
    preds1]
labels = ['Softmax', 'SVC(RBF)', 'EffNetV2S-SVC']

plot_calibration_curves(y_true2, y_pred_probas, labels)