In [37]:
# Import necessary libraries
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import cv2
import os
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split


In [38]:
# Load CSVs
train_labels = pd.read_csv('Dataset/B. RFMiD_Challenge_Dataset/B. RFMiD_Challenge_Dataset/2. Groundtruths/a. RFMiD_Training_Labels.csv')
val_labels = pd.read_csv('Dataset/B. RFMiD_Challenge_Dataset/B. RFMiD_Challenge_Dataset/2. Groundtruths/b. RFMiD_Validation_Labels.csv')
test_labels = pd.read_csv('Dataset/B. RFMiD_Challenge_Dataset/B. RFMiD_Challenge_Dataset/2. Groundtruths/c. RFMiD_Testing_Labels.csv')


In [39]:
# CLAHE preprocessing function
def preprocess_image(image_path):
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Resize to match EfficientNetB0 expected size
    image = cv2.resize(image, (224, 224))

    # Apply CLAHE
    lab = cv2.cvtColor(image, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    cl = clahe.apply(l)
    limg = cv2.merge((cl, a, b))
    final = cv2.cvtColor(limg, cv2.COLOR_LAB2RGB)

    return final / 255.0  # Normalize image to [0, 1]


In [46]:
# Data generator class with CLAHE and augmentation
class CustomDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, dataframe, image_dir, batch_size, augment=False, shuffle=True):
        self.dataframe = dataframe
        self.image_dir = image_dir
        self.batch_size = batch_size
        self.augment = augment
        self.shuffle = shuffle
        self.indexes = np.arange(len(self.dataframe))
        self.label_encoder = LabelEncoder()
        self.dataframe['Diagnosis'] = self.label_encoder.fit_transform(self.dataframe['Diagnosis'])

    def __len__(self):
        return int(np.floor(len(self.dataframe) / self.batch_size))

    def __getitem__(self, index):
        batch_indexes = self.indexes[index * self.batch_size: (index + 1) * self.batch_size]
        batch_df = self.dataframe.iloc[batch_indexes]

        images = []
        labels = []

        for _, row in batch_df.iterrows():
            # Convert row['ID'] to a string before concatenating with ".png"
            img_path = os.path.join(self.image_dir, str(row['ID']) + ".png")  # Convert ID to string
            img = preprocess_image(img_path)  # Assuming preprocess_image is a function that loads and processes the image

            # Data augmentation
            if self.augment:
                if np.random.rand() < 0.5:
                    img = tf.image.random_flip_left_right(img)
                if np.random.rand() < 0.5:
                    img = tf.image.random_brightness(img, max_delta=0.1)
                if np.random.rand() < 0.5:
        # Corrected zoom effect code here
                    scale_factor = np.random.uniform(0.9, 1.1)  # Random zoom factor
                    img_shape = tf.shape(img)[:2]  # Get the shape of the image (height, width)
        
        # Apply scale factor and floor it correctly
                    new_height = tf.cast(tf.floor(tf.cast(img_shape[0], tf.float32) * scale_factor), tf.int32)
                    new_width = tf.cast(tf.floor(tf.cast(img_shape[1], tf.float32) * scale_factor), tf.int32)
        
                    new_size = tf.stack([new_height, new_width])  # Stack to create a tensor with shape [height, width]
            
                    img = tf.image.resize(img, new_size)  # Resize the image
                    img = tf.image.resize_with_crop_or_pad(img, target_height=224, target_width=224)  # Crop or pad

                    images.append(img)
                    labels.append(row['Diagnosis'])  # Assuming 'Diagnosis' is the correct label column

             images = np.array(images) / 255.0  # Normalize the images to [0, 1] range
             labels = np.array(labels)

        # One-hot encoding of labels
        return images, tf.keras.utils.to_categorical(labels, num_classes=len(np.unique(labels)))

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexes)  # Shuffle the dataset at the end of each epoch


IndentationError: unindent does not match any outer indentation level (<tokenize>, line 51)

In [41]:
# Now use the correct column name, for example 'Disease_Risk'
train_labels['Diagnosis'] = train_labels['Disease_Risk']  # Assigning 'Disease_Risk' to 'Diagnosis'
val_labels['Diagnosis'] = val_labels['Disease_Risk']
test_labels['Diagnosis'] = test_labels['Disease_Risk']

# Ensure the column 'Diagnosis' exists in the dataframe
if 'Diagnosis' not in train_labels.columns:
    raise KeyError("'Diagnosis' column not found in train_labels dataframe")
if 'Diagnosis' not in val_labels.columns:
    raise KeyError("'Diagnosis' column not found in val_labels dataframe")
if 'Diagnosis' not in test_labels.columns:
    raise KeyError("'Diagnosis' column not found in test_labels dataframe")


In [42]:
batch_size = 32

train_generator = CustomDataGenerator(
    dataframe=train_labels,
    image_dir="Dataset/B. RFMiD_Challenge_Dataset/B. RFMiD_Challenge_Dataset/1. Original Images/a. Training Set", # <-- update this
    batch_size=batch_size,
    augment=True
)

val_generator = CustomDataGenerator(
    dataframe=val_labels,
    image_dir="Dataset/B. RFMiD_Challenge_Dataset/B. RFMiD_Challenge_Dataset/1. Original Images/b. Validation Set", # <-- update this
    batch_size=batch_size,
    augment=False
)

test_generator = CustomDataGenerator(
    dataframe=test_labels,
    image_dir="Dataset/B. RFMiD_Challenge_Dataset/B. RFMiD_Challenge_Dataset/1. Original Images/c. Testing Set", # <-- update this
    batch_size=batch_size,
    augment=False
)


In [43]:
base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(512, activation='relu')(x)
predictions = Dense(len(np.unique(train_labels['Diagnosis'])), activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=predictions)

# Freeze base model
for layer in base_model.layers:
    layer.trainable = False

model.compile(optimizer=Adam(learning_rate=1e-3), loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()


In [44]:
early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
checkpoint = ModelCheckpoint('efficientnetb0_best.keras', monitor='val_accuracy', save_best_only=True, mode='max')

history = model.fit(
    train_generator,
    epochs=20,
    validation_data=val_generator,
    callbacks=[early_stop, checkpoint]
)


  self._warn_if_super_not_called()


TypeError: Cannot convert 1.079899959798813 to EagerTensor of dtype int32

In [None]:
# Unfreeze all layers for fine-tuning
for layer in model.layers:
    layer.trainable = True

model.compile(optimizer=Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['accuracy'])

history_finetune = model.fit(
    train_generator,
    epochs=10,
    validation_data=val_generator,
    callbacks=[early_stop]
)


In [None]:
model.save('efficientnetb0_retinal_final.h5')

print("✅ Model training complete and saved!")


In [None]:
# Smooth Curve Function
def smooth_curve(points, factor=0.8):
    smoothed_points = []
    for point in points:
        if smoothed_points:
            previous = smoothed_points[-1]
            smoothed_points.append(previous * factor + point * (1 - factor))
        else:
            smoothed_points.append(point)
    return smoothed_points


In [None]:
# Combine histories
def combine_histories(h1, h2):
    history = {}
    for k in h1.history.keys():
        history[k] = h1.history[k] + h2.history[k]
    return history

full_history = combine_histories(history, history_finetune)

# Smooth
smooth_acc = smooth_curve(full_history['accuracy'])
smooth_val_acc = smooth_curve(full_history['val_accuracy'])
smooth_loss = smooth_curve(full_history['loss'])
smooth_val_loss = smooth_curve(full_history['val_loss'])

# Plot
plt.figure(figsize=(10,5))
plt.plot(smooth_acc, label='Smoothed Training Accuracy')
plt.plot(smooth_val_acc, label='Smoothed Validation Accuracy')
plt.title('Training and Validation Accuracy (Smoothed)')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10,5))
plt.plot(smooth_loss, label='Smoothed Training Loss')
plt.plot(smooth_val_loss, label='Smoothed Validation Loss')
plt.title('Training and Validation Loss (Smoothed)')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()
