In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, classification_report, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.metrics import AUC
from tensorflow.keras.preprocessing.image import ImageDataGenerator, img_to_array, load_img
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, GlobalAveragePooling2D, BatchNormalization
from tensorflow.keras.applications import EfficientNetB4
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras import regularizers
from tensorflow.keras.losses import BinaryFocalCrossentropy
from tensorflow.keras.applications.efficientnet import preprocess_input
from tensorflow.keras.utils import to_categorical

In [None]:
# Set random seed for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

# Configuration
IMG_SIZE = (380, 380)  # EfficientNetB4 requires 380x380
BATCH_SIZE = 32
EPOCHS_PHASE1 = 15
EPOCHS_PHASE2 = 10
CLASS_NAMES = ['DR', 'MH', 'ODC', 'TSLN', 'DN', 'MYA', 'ARMD', 'BRVO', 'ODP', 'ODE', 'LS', 'RS', 'CSR', 'CRS']

# Paths
train_labels_path = "retinal-disease-classification/Training_Set/RFMiD_Training_Labels.csv"
val_labels_path = "retinal-disease-classification/Evaluation_Set/RFMiD_Validation_Labels.csv"
test_labels_path = "retinal-disease-classification/Test_Set/RFMiD_Testing_Labels.csv"

In [None]:
# Load and preprocess data
def load_and_preprocess_data():
    train_labels = pd.read_csv(train_labels_path)
    val_labels = pd.read_csv(val_labels_path)
    test_labels = pd.read_csv(test_labels_path)

    selected_diseases = CLASS_NAMES
    
    def process_df(df):
        selected_diseases = list(set(CLASS_NAMES) & set(df.columns))
        df = df[['ID', 'Disease_Risk'] + selected_diseases].copy()
        df['Disease_Risk'] = (df[selected_diseases].sum(axis=1) > 0).astype(int)
        return df

    return (
        process_df(train_labels),
        process_df(val_labels),
        process_df(test_labels)
    )

train_df, val_df, test_df = load_and_preprocess_data()

In [None]:
# Enhanced Data Generator with Mixup
class AdvancedDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, img_dir, df, batch_size=32, img_size=IMG_SIZE, 
                 augment=False, shuffle=True, mixup_alpha=0.4, **kwargs):
        super().__init__(**kwargs) 
         
        self.img_dir = img_dir
        self.df = df
        self.batch_size = batch_size
        self.img_size = img_size
        self.augment = augment
        self.shuffle = shuffle
        self.mixup_alpha = mixup_alpha
        self.indices = np.arange(len(df))
        self.on_epoch_end()
        
        # Augmentation configurations
        self.augmenter = ImageDataGenerator(
            rotation_range=25,
            width_shift_range=0.2,
            height_shift_range=0.2,
            shear_range=0.2,
            zoom_range=0.2,
            horizontal_flip=True,
            vertical_flip=True,
            brightness_range=[0.8, 1.2]
        )

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def __getitem__(self, index):
        batch_indices = self.indices[index*self.batch_size : (index+1)*self.batch_size]
        batch_df = self.df.iloc[batch_indices]
        
        # Load images and labels
        X, y = self._load_data(batch_df)
        
        # Apply Mixup
        if self.augment and self.mixup_alpha > 0:
            X, y = self._apply_mixup(X, y)
            
        return X, y

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

    def _load_data(self, batch_df):
        X = np.empty((len(batch_df), *self.img_size, 3))
        y = np.empty((len(batch_df), len(CLASS_NAMES)))   # Disease_Risk + diseases
        
        for i, (_, row) in enumerate(batch_df.iterrows()):
            img_path = os.path.join(self.img_dir, f"{row['ID']}.png")
            img = load_img(img_path, target_size=self.img_size)
            img_array = img_to_array(img)
            
            # Apply augmentation only to diseased samples
            if self.augment and row['Disease_Risk'] == 1:
                img_array = self.augmenter.random_transform(img_array)
                
            X[i] = preprocess_input(img_array)  # EfficientNet preprocessing
            y[i] = row[CLASS_NAMES].values
            
        return X, y.astype(np.float32)

    def _apply_mixup(self, X, y):
        lam = np.clip(np.random.beta(self.mixup_alpha, self.mixup_alpha), 0.2, 0.8)
        rand_index = np.random.permutation(len(X))
        
        mixed_X = lam * X + (1 - lam) * X[rand_index]
        mixed_y = lam * y + (1 - lam) * y[rand_index]
        return mixed_X, mixed_y

In [None]:
# Create data generators
train_gen = AdvancedDataGenerator(
    "retinal-disease-classification/Training_Set/Training",
    train_df,
    batch_size=BATCH_SIZE,
    augment=True,
    mixup_alpha=0.4
)

val_gen = AdvancedDataGenerator(
    "retinal-disease-classification/Evaluation_Set/Validation",
    val_df,
    batch_size=BATCH_SIZE
)

test_gen = AdvancedDataGenerator(
    "retinal-disease-classification/Test_Set/Test",
    test_df,
    batch_size=BATCH_SIZE
)

In [None]:
X_batch, y_batch = train_gen[0]
print("Shape of X_batch:", X_batch.shape)  # Should be (batch_size, 380, 380, 3)
print("Shape of y_batch:", y_batch.shape)  # Should be (batch_size, 14)

In [None]:
# Calculate class weights
def calculate_class_weights(df):
    weights = {}
    for idx, disease in enumerate(['Disease_Risk'] + CLASS_NAMES):
        cls_weights = compute_class_weight(
            'balanced',
            classes=np.array([0, 1]),
            y=df[disease]
        )
        weights[idx] = {0: cls_weights[0], 1: cls_weights[1]}
    return weights

class_weights = calculate_class_weights(train_df)
class_weights = {idx: weights[1] for idx, weights in class_weights.items()}

In [None]:
# Build Model
def build_model():
    base_model = EfficientNetB4(
        weights='imagenet',
        include_top=False,
        input_shape=(*IMG_SIZE, 3)
    )
    base_model.trainable = False  # Freeze initially

    model = Sequential([
        base_model,
        GlobalAveragePooling2D(),
        BatchNormalization(),
        Dropout(0.5),
        Dense(512, activation='swish', kernel_regularizer=regularizers.l2(1e-4)),
        BatchNormalization(),
        Dropout(0.5),
        Dense(256, activation='swish'),
        Dense(len(CLASS_NAMES), activation='sigmoid')  # Disease_Risk + diseases
    ])
    
    return model

In [None]:
import tensorflow as tf

def focal_loss(alpha=0.25, gamma=2.0):
    def loss_fn(y_true, y_pred):
        # Cast to float32
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.cast(y_pred, tf.float32)
        
        # Compute element-wise binary crossentropy using Keras backend.
        ce = tf.keras.backend.binary_crossentropy(y_true, y_pred)  
        # ce now has shape (batch_size, num_classes)
        
        # Compute p_t element-wise
        p_t = y_true * y_pred + (1 - y_true) * (1 - y_pred)
        
        # Compute alpha factor element-wise
        alpha_factor = y_true * alpha + (1 - y_true) * (1 - alpha)
        
        # Compute modulating factor element-wise
        modulating_factor = tf.pow(1.0 - p_t, gamma)
        
        # Final loss element-wise
        loss = alpha_factor * modulating_factor * ce
        return loss  # Optionally, you can reduce_mean over the batch or last axis.
    
    return loss_fn


In [None]:
sample_y_true = tf.convert_to_tensor(y_batch[:5], dtype=tf.float32)  # (5, 14)
sample_y_pred = tf.random.uniform(sample_y_true.shape, 0, 1)           # (5, 14)
loss_fn = focal_loss(alpha=0.25, gamma=2.0)
print(loss_fn(sample_y_true, sample_y_pred))


In [None]:
print("y_true shape:", sample_y_true.shape)
print("y_pred shape:", sample_y_pred.shape)

In [None]:
# Phase 1: Train head
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss=focal_loss(gamma=2.0, alpha=0.25),
    metrics=[AUC(name='auc', multi_label=True)]
)

phase1_callbacks = [
    EarlyStopping(patience=3, monitor='val_auc', mode='max', verbose=1),
    ModelCheckpoint('phase1_best.keras', save_best_only=True),
    ReduceLROnPlateau(factor=0.5, patience=2)
]

history_phase1 = model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=EPOCHS_PHASE1,
    callbacks=phase1_callbacks
)

In [None]:
# Phase 2: Fine-tune
model = tf.keras.models.load_model('phase1_best.keras', custom_objects={'loss_fn': focal_loss()}) # Load best weights from phase 1
model.layers[0].trainable = True  # Unfreeze base model

# Set last 150 layers trainable
for layer in model.layers[0].layers[-150:]:
    layer.trainable = True

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
    loss=focal_loss(gamma=2.0, alpha=0.25),
    metrics=[AUC(name='auc', multi_label=True)]
)

phase2_callbacks = [
    EarlyStopping(patience=2, monitor='val_auc', mode='max', verbose=1),
    ModelCheckpoint('final_model.keras', save_best_only=True),
    ReduceLROnPlateau(factor=0.2, patience=1)
]

history_phase2 = model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=EPOCHS_PHASE2,
    callbacks=phase2_callbacks
)

In [None]:
# Evaluation with Test-Time Augmentation
def evaluate_with_tta(model, generator, n_tta=5):
    y_true, y_pred = [], []
    # tta_preds = []
    
    for i in range(len(generator)):
        X, y = generator[i]
        batch_preds = np.zeros_like(y)
        
        for _ in range(n_tta):
            # Create augmented versions
            aug_X = np.array([generator.augmenter.random_transform(img) for img in X])
            batch_preds += model.predict(aug_X)
            
        # Average predictions
        batch_preds /= n_tta  # Average predictions
        y_true.append(y)
        y_pred.append(batch_preds)
        # avg_pred = np.mean(batch_preds, axis=0)
        # tta_preds.append(avg_pred)
        # y_true.append(y)
        
    return np.vstack(y_true), np.vstack(y_pred)

model = tf.keras.models.load_model('final_model.keras', custom_objects={'loss_fn': focal_loss()})
y_true, y_pred = evaluate_with_tta(model, test_gen)

In [None]:
# Generate reports
print("Classification Report:")
print(classification_report(
    y_true[:, 1:],  # Skip Disease_Risk
    (y_pred[:, 1:] > 0.5).astype(int),
    target_names=CLASS_NAMES
))

print("\nConfusion Matrices:")
for idx, disease in enumerate(CLASS_NAMES):
    cm = confusion_matrix(y_true[:, idx+1], (y_pred[:, idx+1] > 0.5).astype(int))
    plt.figure()
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(f"{disease} Confusion Matrix")
    plt.show()

In [None]:
# Plot training history
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history_phase1.history['auc'], label='Phase1 Train')
plt.plot(history_phase1.history['val_auc'], label='Phase1 Val')
plt.plot(np.arange(EPOCHS_PHASE1, EPOCHS_PHASE1+len(history_phase2.history['auc'])), 
         history_phase2.history['auc'], label='Phase2 Train')
plt.plot(np.arange(EPOCHS_PHASE1, EPOCHS_PHASE1+len(history_phase2.history['val_auc'])), 
         history_phase2.history['val_auc'], label='Phase2 Val')
plt.title('AUC History')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history_phase1.history['loss'], label='Phase1 Train')
plt.plot(history_phase1.history['val_loss'], label='Phase1 Val')
plt.plot(np.arange(EPOCHS_PHASE1, EPOCHS_PHASE1+len(history_phase2.history['loss'])), 
         history_phase2.history['loss'], label='Phase2 Train')
plt.plot(np.arange(EPOCHS_PHASE1, EPOCHS_PHASE1+len(history_phase2.history['val_loss'])), 
         history_phase2.history['val_loss'], label='Phase2 Val')
plt.title('Loss History')
plt.legend()
plt.show()