https://www.mdpi.com/2306-5729/6/2/14

# Libraries

In [None]:
import keras
import tensorflow.keras as keras
import pandas as pd
import tensorflow as tf
import os
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import classification_report
from tensorflow.keras import mixed_precision
from sklearn.metrics import confusion_matrix
import seaborn as sns
from sklearn.utils.class_weight import compute_class_weight


# Data

In [None]:
train_dir = '/kaggle/input/retinal-disease-classification/Training_Set/Training_Set/Training/'
val_dir = '/kaggle/input/retinal-disease-classification/Evaluation_Set/Evaluation_Set/Validation/'
test_dir = '/kaggle/input/retinal-disease-classification/Test_Set/Test_Set/Test/'

train_label_csv = '/kaggle/input/retinal-disease-classification/Training_Set/Training_Set/RFMiD_Training_Labels.csv'
val_label_csv = '/kaggle/input/retinal-disease-classification/Evaluation_Set/Evaluation_Set/RFMiD_Validation_Labels.csv'
test_label_csv = '/kaggle/input/retinal-disease-classification/Test_Set/Test_Set/RFMiD_Testing_Labels.csv'

In [None]:
train_label_df = pd.read_csv(train_label_csv)
val_label_df = pd.read_csv(val_label_csv)
test_label_df = pd.read_csv(test_label_csv)

In [None]:
train_label_df.head()

In [None]:
train_label_df.columns

* Diabetic retinopathy (DR)
* Age-related macular degeneration (ARMD)
* Media Haze (MH)
* Drusens (DN)
* Myopia (MYA)
* Branch retinal vein occlusion (BRVO)
* Tessellation (TSLN)
* Epiretinal membrane (ERM)
* Laser scars (LS)
* Macular scar (MS)
* Central serous retinopathy (CSR)
* Optic disc cupping (ODC)
* Central retinal vein occlusion (CRVO)
* Tortuous vessels (TV)
* Asteroid hyalosis (AH)
* Optic disc pallor (ODP)
* Optic disc edema (ODE)
* Shunt (ST)
* Anterior ischemic optic neuropathy (AION)
* Parafoveal telangiectasia (PT)
* Retinal traction (RT)
* Retinitis (RS)
* Chorioretinitis (CRS)
* Exudation (EDN)
* Retinal pigment epithelium changes (RPEC)
* Macular hole (MHL)
* Retinitis pigmentosa (RP)
* Cotton-wool spots (CWS)
* Coloboma (CB)
* Optic disc pit maculopathy (ODPM)
* Preretinal hemorrhage (PRH)
* Myelinated nerve fibers (MNF)
* Hemorrhagic retinopathy (HR)
* Central retinal artery occlusion (CRAO)
* Tilted disc (TD)
* Cystoid macular edema (CME)
* Post-traumatic choroidal rupture (PTCR)
* Choroidal folds (CF)
* Vitreous hemorrhage (VH)
* Macroaneurysm (MCA)
* Vasculitis (VS)
* Branch retinal artery occlusion (BRAO)
* Plaque (PLQ)
* Hemorrhagic pigment epithelial detachment (HPED)
* Collateral (CL)

In [None]:
import matplotlib.pyplot as plt

# Extract label-only columns (assuming from 3rd column onward)
label_only_df = train_label_df.iloc[:, 2:]

# Count '1's (positive cases) per label
label_counts = (label_only_df == 1).sum().sort_values(ascending=False)

# Remove labels with zero positive samples
label_counts = label_counts[label_counts > 0]

# Plot with horizontal bars
fig, ax = plt.subplots(figsize=(12, 10))
bars = ax.barh(label_counts.index, label_counts.values, color='cornflowerblue', edgecolor='black')

# Title and labels
ax.set_title("Positive Sample Count per Disease", fontsize=16, fontweight='bold')
ax.set_xlabel("Number of Positive Samples", fontsize=12)
ax.set_ylabel("Disease Labels", fontsize=12)

# Flip y-axis to show highest count on top
ax.invert_yaxis()

# Add value annotations to each bar
for bar in bars:
    width = bar.get_width()
    ax.text(width + 0.5, bar.get_y() + bar.get_height()/2, str(int(width)),
            va='center', fontsize=9)

# Tweak layout for better spacing
plt.grid(axis='x', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()


In [None]:
train_label_df['ID'] = train_label_df['ID'].astype(str) + '.png'
val_label_df['ID'] = val_label_df['ID'].astype(str) + '.png'
test_label_df['ID'] = test_label_df['ID'].astype(str) + '.png'



In [None]:
train_label_df['ID'][:5]

In [None]:
train_label_clean = train_label_df.drop(['ID','Disease_Risk'], axis=1)
val_label_clean = val_label_df.drop(['ID','Disease_Risk'], axis=1)
test_label_clean = test_label_df.drop(['ID','Disease_Risk'], axis=1)

In [None]:
x_col = 'ID'
y_col_train = [col for col in train_label_clean.columns]
y_col_val = [col for col in val_label_clean.columns]
y_col_test = [col for col in test_label_clean.columns]

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define target image size and batch size
IMG_SIZE = (224, 224)
BATCH_SIZE = 32

# Image data generators with rescaling
train_gen = ImageDataGenerator(rescale=1./255)
val_gen = ImageDataGenerator(rescale=1./255)
test_gen = ImageDataGenerator(rescale=1./255)

# ✅ Flow from dataframe for multi-label classification
train_data = train_gen.flow_from_dataframe(
    dataframe=train_label_df,
    directory=train_dir,
    x_col=x_col,
    y_col=y_col_train,
    batch_size=BATCH_SIZE,
    target_size=IMG_SIZE,
    shuffle=True,
    class_mode="raw"  # ✅ for multi-label
)

val_data = val_gen.flow_from_dataframe(
    dataframe=val_label_df,
    directory=val_dir,
    x_col=x_col,
    y_col=y_col_val,
    batch_size=BATCH_SIZE,
    target_size=IMG_SIZE,
    shuffle=False,
    class_mode="raw"
)

test_data = test_gen.flow_from_dataframe(
    dataframe=test_label_df,
    directory=test_dir,
    x_col=x_col,
    y_col=y_col_test,
    batch_size=BATCH_SIZE,
    target_size=IMG_SIZE,
    shuffle=False,
    class_mode="raw"
)


In [None]:

images, labels = next(train_data)
class_names = y_col_train

plt.figure(figsize=(10, 8))
for i in range(9):     #n²
    plt.subplot(3, 3, i+1)   #n
    plt.imshow(images[i])
    plt.axis('off')
    active_indices = np.where(labels[i] >= 0.5)[0]
    
    # Get corresponding class names
    active_class_names = [class_names[idx] for idx in active_indices]
    
    # Join labels as a string separated by commas
    label_str = ", ".join(active_class_names) 
    
    plt.title(label_str, fontsize=8)


plt.tight_layout()
plt.show()

In [None]:
def load_and_preprocess(path, label):
    image = tf.io.read_file(path)
    
    # Decode image safely
    image = tf.image.decode_image(image, channels=3, expand_animations=False)
    
    # Resize to target size (you can replace with IMG_SIZE if defined globally)
    image = tf.image.resize(image, [224, 224])
    
    # Normalize to [0,1]
    image = tf.cast(image, tf.float32) / 255.0

    return image, label


In [None]:
def make_dataset(image_dir, label_df, batch_size=32, shuffle=True, cache=True):
    # Generate full image paths
    image_paths = label_df['ID'].apply(lambda x: os.path.join(image_dir, str(x))).values
    labels = label_df.iloc[:, 2:].values  # Skips 'ID' and 'Disease_Risk' columns

    # Create tf.data.Dataset
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
    
    # Apply preprocessing
    dataset = dataset.map(load_and_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    
    # Optional caching, shuffling, batching
    if cache:
        dataset = dataset.cache()
    if shuffle:
        dataset = dataset.shuffle(buffer_size=1000)
    
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

    return dataset


In [None]:
train_dataset = make_dataset(train_dir,train_label_df)
val_dataset = make_dataset(val_dir,val_label_df)
test_dataset = make_dataset(test_dir,test_label_df)

In [None]:
mixed_precision.set_global_policy("mixed_float16")

# Model

In [None]:

from tensorflow.keras import layers, Model
from tensorflow import keras

# Load VGG16 base model
base_model = keras.applications.VGG16(
    input_shape=(224, 224, 3),
    include_top=False,
    weights='imagenet'
)

# Freeze all layers first
base_model.trainable = False

# Unfreeze last 4 layers
for layer in base_model.layers[-4:]:
    layer.trainable = True

# Build the model
x = layers.GlobalAveragePooling2D()(base_model.output)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.4)(x)
output = layers.Dense(45, activation='sigmoid')(x)  # 45 binary labels for multi-label classification

model = Model(inputs=base_model.input, outputs=output)

# Compile
model.compile(
    optimizer='adam',
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
    metrics=[tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.AUC()]
)


In [None]:
history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=10,
)

In [None]:
import matplotlib.pyplot as plt

def plot_metric(history, metric, val_metric=None, title='', ylabel=''):
    plt.figure(figsize=(8, 5))
    epochs = range(1, len(history[metric]) + 1)

    plt.plot(epochs, history[metric], 'bo-', label=f'Training {title}')

    if val_metric and val_metric in history:
        plt.plot(epochs, history[val_metric], 'ro-', label=f'Validation {title}')

    plt.title(f'{title} Over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel(ylabel)
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

# Replace these with actual keys from print(history.history.keys())
plot_metric(history.history, 'loss', 'val_loss', 'Loss', 'Loss')
plot_metric(history.history, 'binary_accuracy', 'val_binary_accuracy', 'Binary Accuracy', 'Accuracy')
# plot_metric(history.history, 'auc', 'val_auc', 'AUC', 'AUC')  # Only if 'auc' exists



**Evaluation**

In [None]:
from sklearn.metrics import f1_score, accuracy_score, classification_report

# Collect true and predicted values
y_val = []
y_val_pred_probs = []

for x_batch, y_batch in val_dataset:
    preds = model.predict(x_batch, verbose=0)
    y_val_pred_probs.append(preds)
    y_val.append(y_batch.numpy())

# Concatenate all batches
y_val = np.concatenate(y_val, axis=0)
y_val_pred_probs = np.concatenate(y_val_pred_probs, axis=0)

# Apply threshold
threshold = 0.5
y_val_pred_binary = (y_val_pred_probs > threshold).astype(int)

# Compute binary accuracy
binary_acc = np.mean((y_val == y_val_pred_binary).astype(int))
print(f" (threshold={threshold}): {binary_acc * 100:.2f}%")

# Compute macro F1 Score
macro_f1 = f1_score(y_val, y_val_pred_binary, average='macro', zero_division=0)
print(f"Macro F1 Score: {macro_f1:.4f}")

# Classification report
print("\nClassification Report:")
print(classification_report(y_val, y_val_pred_binary, zero_division=0))


In [None]:
def plot_confusion_matrix_per_class(y_true, y_pred, class_idx, class_name):
    cm = confusion_matrix(y_true[:, class_idx], y_pred[:, class_idx])
    plt.figure(figsize=(5, 4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(f'Confusion Matrix for class: {class_name}')
    plt.xlabel('Predicted label')
    plt.ylabel('True label')
    plt.xticks([0.5, 1.5], ['Negative', 'Positive'])
    plt.yticks([0.5, 1.5], ['Negative', 'Positive'], rotation=0)
    plt.show()

In [None]:
cm = confusion_matrix(y_val.flatten(), y_val_pred_binary.flatten())
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix (all classes flattened)')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.show()

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

layers = [
    "Input Layer\n(224×224×3)",
    "VGG16 Base\n(Pretrained, Frozen)",
    "Unfrozen Last 4\nConv Layers",
    "GlobalAveragePooling2D",
    "Dense Layer\n(256, ReLU)",
    "Dropout (0.4)",
    "Output Layer\n(45, Sigmoid)"
]

fig, ax = plt.subplots(figsize=(10, 8))
ax.set_xlim(0, 10)
ax.set_ylim(0, len(layers) + 2)
ax.axis('off')

for i, layer in enumerate(layers):
    y = len(layers) - i
    rect = patches.FancyBboxPatch(
        (2, y), 6, 0.9,
        boxstyle="round,pad=0.02",
        edgecolor="black",
        facecolor="#add8e6"
    )
    ax.add_patch(rect)
    ax.text(5, y + 0.45, layer, fontsize=10, ha='center', va='center')
    if i < len(layers) - 1:
        ax.annotate('', xy=(5, y), xytext=(5, y - 0.1),
                    arrowprops=dict(arrowstyle="->", lw=2))

plt.tight_layout()
plt.savefig("model_architecture_vgg16.png", dpi=300)


In [None]:
##Matrix for all classes
#for i, class_name in enumerate(class_names):
#    plot_confusion_matrix_per_class(y_val, y_val_pred_binary, i, class_name)

# Model 2

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Input, Conv2D, SeparableConv2D, MaxPooling2D,
                                     Dropout, Flatten, Dense, BatchNormalization,
                                     Add, Activation, GlobalAveragePooling2D)
from tensorflow.keras.optimizers import Adam
import tensorflow as tf

def swish(x):
    return x * tf.keras.backend.sigmoid(x)

def create_advanced_cnn_model(input_shape, num_classes):
    inputs = Input(shape=input_shape)

    # Block 1
    x = Conv2D(32, (3, 3), padding='same', activation=None)(inputs)
    x = BatchNormalization()(x)
    x = Activation(swish)(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.15)(x)

    # Block 2
    x = SeparableConv2D(64, (3, 3), padding='same', activation=None)(x)
    x = BatchNormalization()(x)
    x = Activation(swish)(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.2)(x)

    # Block 3 with Residual
    shortcut = x
    x = SeparableConv2D(128, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation(swish)(x)
    x = SeparableConv2D(128, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Add()([shortcut, x])
    x = Activation(swish)(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.3)(x)

    # Classification Head (GlobalAvg instead of Flatten)
    x = GlobalAveragePooling2D()(x)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.4)(x)
    outputs = Dense(num_classes, activation='sigmoid')(x)

    model = Model(inputs, outputs)

    model.compile(
        optimizer=Adam(learning_rate=1e-4),
        loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
        metrics=[
            tf.keras.metrics.BinaryAccuracy(name="binary_accuracy"),
            tf.keras.metrics.AUC(name="auc")
        ]
    )

    return model


In [None]:
history2 = model.fit(train_dataset, 
                    validation_data=val_dataset, 
                    epochs=10, 
                    batch_size=32)

**Evaluation**

In [None]:
from sklearn.metrics import classification_report, f1_score, accuracy_score

# ✅ Store predictions
y_val = []
y_val_pred_probs = []

for x_batch, y_batch in val_dataset:
    preds = model.predict(x_batch, verbose=0)
    y_val_pred_probs.append(preds)
    y_val.append(y_batch.numpy())

# ✅ Concatenate all batches
y_val = np.concatenate(y_val, axis=0)
y_val_pred_probs = np.concatenate(y_val_pred_probs, axis=0)

# ✅ Apply threshold
threshold = 0.3
y_val_pred_binary = (y_val_pred_probs > threshold).astype(int)

# ✅ Compute Binary Accuracy
binary_acc = np.mean((y_val == y_val_pred_binary).astype(int))
print(f" Accuracy (threshold={threshold}): {binary_acc:.4f}")

# ✅ Compute Macro F1 Score
macro_f1 = f1_score(y_val, y_val_pred_binary, average='macro', zero_division=0)
print(f"Macro F1 Score: {macro_f1:.4f}")

# ✅ Full Classification Report
print("\nClassification Report:")
print(classification_report(y_val, y_val_pred_binary, zero_division=0))


In [None]:
report = classification_report(y_val, y_val_pred_binary, target_names=class_names)

print(report)

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

layers = [
    "Input Layer\n(224×224×3)",
    "Conv2D (32) + BN + Swish\nMaxPooling + Dropout(0.15)",
    "SeparableConv2D (64) + BN + Swish\nMaxPooling + Dropout(0.2)",
    "Residual Block:\nSeparableConv2D (128) ×2 + BN + Add\nSwish + MaxPooling + Dropout(0.3)",
    "GlobalAveragePooling2D",
    "Dense Layer\n(256, ReLU)",
    "Dropout (0.4)",
    "Output Layer\n(Num Classes, Sigmoid)"
]

fig, ax = plt.subplots(figsize=(10, 10))
ax.set_xlim(0, 10)
ax.set_ylim(0, len(layers) + 2)
ax.axis('off')

for i, layer in enumerate(layers):
    y = len(layers) - i
    rect = patches.FancyBboxPatch(
        (2, y), 6, 0.9,
        boxstyle="round,pad=0.02",
        edgecolor="black",
        facecolor="#c2f0c2"  # Light Green
    )
    ax.add_patch(rect)
    ax.text(5, y + 0.45, layer, fontsize=10, ha='center', va='center')

    if i < len(layers) - 1:
        ax.annotate('', xy=(5, y), xytext=(5, y - 0.1),
                    arrowprops=dict(arrowstyle="->", lw=2))

plt.tight_layout()
plt.savefig("model_architecture_advanced_cnn.png", dpi=300)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix_per_class(y_true, y_pred, class_idx, class_name):
    """
    Plot confusion matrix for a specific class (index).
    
    Args:
        y_true (np.array): Ground truth binary labels (shape: [n_samples, n_classes])
        y_pred (np.array): Predicted binary labels (same shape as y_true)
        class_idx (int): Index of the class to visualize
        class_name (str): Name of the class (used in title)
    """
    cm = confusion_matrix(y_true[:, class_idx], y_pred[:, class_idx])
    plt.figure(figsize=(5, 4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(f'Confusion Matrix for class: {class_name}')
    plt.xlabel('Predicted label')
    plt.ylabel('True label')
    plt.xticks([0.5, 1.5], ['Negative', 'Positive'])
    plt.yticks([0.5, 1.5], ['Negative', 'Positive'], rotation=0)
    plt.tight_layout()
    plt.show()


In [None]:
# Flattened confusion matrix across all classes
cm = confusion_matrix(y_val.flatten(), y_val_pred_binary.flatten())

plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix (All Classes Flattened)')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.tight_layout()
plt.show()


In [None]:
# from sklearn.metrics import average_precision_score

# # Calculate Average Precision for each class
# avg_precision = average_precision_score(y_val, y_val_pred_probs, average=None)

# for i, class_name in enumerate(class_names):
#     print(f"Class: {class_name} - Average Precision: {avg_precision[i]:.4f}")

In [None]:
# import pandas as pd
# class_performance = pd.DataFrame({
#     'Class': class_names,
#     'Average Precision': avg_precision
# })

# class_performance = class_performance.sort_values('Average Precision', ascending=False)


# plt.figure(figsize=(12, 6))
# sns.barplot(x='Average Precision', y='Class', data=class_performance)
# plt.title('Average Precision per Class')
# plt.xlabel('Average Precision')
# plt.ylabel('Class')
# plt.show()

# Model 3: class weights

In [None]:
class_counts = train_label_df.drop(columns=train_label_df.columns[:2]).sum(axis=0)

print(class_counts)

In [None]:
class_weights = {i: (class_counts.sum() / (len(class_counts) * class_counts[i])) 
                 for i in range(len(class_counts)) if class_counts[i] > 0}

# Print the class weights
print("Class Weights:", class_weights)

In [None]:
history3 = model.fit(
    train_dataset, 
    epochs=10, 
    batch_size=32,  
    class_weight=class_weights  
)

**Evaluation**

In [None]:
from sklearn.metrics import classification_report, f1_score, accuracy_score

# 1. Prediction collection
y_val = []
y_val_pred_probs = []

for x_batch, y_batch in val_dataset:
    preds = model.predict(x_batch, verbose=0)
    y_val_pred_probs.append(preds)
    y_val.append(y_batch.numpy())

# 2. Concatenate predictions and labels
y_val = np.concatenate(y_val, axis=0)
y_val_pred_probs = np.concatenate(y_val_pred_probs, axis=0)

# 3. Apply threshold to get binary predictions
threshold = 0.3
y_val_pred_binary = (y_val_pred_probs > threshold).astype(int)

# 4. Binary Accuracy (mean match across all labels)
binary_acc = np.mean((y_val == y_val_pred_binary).astype(int))
print(f"Binary Accuracy (threshold={threshold}): {binary_acc:.4f}")

# 5. Macro F1 Score (treats all labels equally)
macro_f1 = f1_score(y_val, y_val_pred_binary, average='macro', zero_division=0)
print(f"Macro F1 Score: {macro_f1:.4f}")

# 6. Full classification report
print("\nClassification Report:")
print(classification_report(y_val, y_val_pred_binary, zero_division=0))


In [None]:
report = classification_report(y_val, y_val_pred_binary, target_names=class_names)

print(report)

# Model 4: class weights and focal loss

In [None]:
def focal_loss(gamma=2., alpha=0.25):
    def focal_loss_fixed(y_true, y_pred):
        epsilon = tf.keras.backend.epsilon()
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.clip_by_value(y_pred, epsilon, 1. - epsilon)
        cross_entropy = -y_true * tf.math.log(y_pred)
        loss = alpha * tf.pow(1 - y_pred, gamma) * cross_entropy
        return tf.reduce_sum(loss, axis=-1)
    return focal_loss_fixed

In [None]:
model.compile(optimizer='adam',
              loss=focal_loss(gamma=2., alpha=0.25),
              metrics=[tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.AUC()])

In [None]:
model.fit(train_dataset,
          epochs=10,
          batch_size=32,
          class_weight=class_weights,
          validation_data=val_dataset)

**Evaluation**

In [None]:
from sklearn.metrics import f1_score, accuracy_score, classification_report
import numpy as np

# Step 1: Predict on validation set
y_val = []
y_val_pred_probs = []

for x_batch, y_batch in val_dataset:
    preds = model.predict(x_batch, verbose=0)
    y_val_pred_probs.append(preds)
    y_val.append(y_batch.numpy())

# Step 2: Concatenate all predictions and true labels
y_val = np.concatenate(y_val, axis=0)
y_val_pred_probs = np.concatenate(y_val_pred_probs, axis=0)

# Step 3: Apply threshold
threshold = 0.3
y_val_pred_binary = (y_val_pred_probs > threshold).astype(int)

# Step 4: Evaluation
accuracy = accuracy_score(y_val, y_val_pred_binary)
macro_f1 = f1_score(y_val, y_val_pred_binary, average='macro', zero_division=0)

print(f"\nThreshold: {threshold}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Macro F1 Score: {macro_f1:.4f}")

# Optional: Classification report
print("\nClassification Report:")
print(classification_report(y_val, y_val_pred_binary, zero_division=0))


In [None]:
cm = confusion_matrix(y_val.flatten(), y_val_pred_binary.flatten())
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix (all classes flattened)')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Names of your models
model_names = [
    "VGG16 Transfer Learning",
    "Custom CNN Model",
    "Advanced CNN with Swish+Residual",
    "Baseline/Other Model"
]

# Accuracy values (as fractions)
accuracies = [
    0.9793,  # VGG16
    0.9762,  # Custom CNN
    0.9738,  # Advanced CNN
    0.5991   # Baseline
]

# Convert accuracies to percentages
accuracies_percent = [acc * 100 for acc in accuracies]

# Plot
plt.figure(figsize=(10, 6))
bars = plt.bar(model_names, accuracies_percent, color='skyblue')
plt.ylim(0, max(accuracies_percent) + 5)  # Slightly higher than max for better text space
plt.title("Model Comparison: Validation Accuracy (%)", fontsize=14)
plt.ylabel("Accuracy (%)")
plt.xticks(rotation=15)

# Annotate bars with accuracy percentage values
for bar, acc in zip(bars, accuracies_percent):
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2.0, yval - 2, f"{acc:.2f}%", ha='center', va='top', fontsize=10, color='black')

plt.tight_layout()
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()


In [None]:
# ================= MULTI-MODEL RFMiD (VGG16/ResNet50/EfficientNetB0/MobileNetV2) =================
import os, random, json, math, gc
import numpy as np, pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import mixed_precision
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import f1_score, roc_auc_score, classification_report, confusion_matrix, hamming_loss

# -------------------- Reproducibility & Mixed Precision --------------------
SEED = 1337
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)
mixed_precision.set_global_policy("mixed_float16")

# -------------------- Paths (Kaggle RFMiD) --------------------
train_dir = '/kaggle/input/retinal-disease-classification/Training_Set/Training_Set/Training/'
val_dir   = '/kaggle/input/retinal-disease-classification/Evaluation_Set/Evaluation_Set/Validation/'
test_dir  = '/kaggle/input/retinal-disease-classification/Test_Set/Test_Set/Test/'

train_csv = '/kaggle/input/retinal-disease-classification/Training_Set/Training_Set/RFMiD_Training_Labels.csv'
val_csv   = '/kaggle/input/retinal-disease-classification/Evaluation_Set/Evaluation_Set/RFMiD_Validation_Labels.csv'
test_csv  = '/kaggle/input/retinal-disease-classification/Test_Set/Test_Set/RFMiD_Testing_Labels.csv'

# -------------------- Hyperparams --------------------
IMG_SIZE=(224,224); BATCH=32
LR=1e-3; EPOCHS_WARM=3; EPOCHS_FT=7; UNFREEZE_LAST_N=4
AUTO=tf.data.AUTOTUNE

# -------------------- Load labels --------------------
train_df=pd.read_csv(train_csv); val_df=pd.read_csv(val_csv); test_df=pd.read_csv(test_csv)
for df in (train_df,val_df,test_df):
    df['ID']=df['ID'].astype(str)+'.png'

def label_cols(df): return [c for c in df.columns if c not in ('ID','Disease_Risk')]
cols=label_cols(train_df); assert cols==label_cols(val_df)==label_cols(test_df)
C=len(cols); print("Classes:",C)

# -------------------- Vector prevalence plot --------------------
pos_counts=(train_df[cols]==1).sum().sort_values(ascending=False)
plt.figure(figsize=(10,12)); sns.barplot(x=pos_counts.values,y=pos_counts.index,edgecolor='black')
plt.title("Positive Sample Count per Disease (Train)",fontsize=14,weight='bold')
plt.xlabel("Count"); plt.ylabel("Label"); plt.tight_layout(); plt.savefig("fig_pos_counts.pdf"); plt.close()

# -------------------- tf.data pipeline --------------------
def decode(path):
    img = tf.io.read_file(path)
    img = tf.image.decode_image(img, 3, expand_animations=False)
    img = tf.image.resize(img, IMG_SIZE, antialias=True)
    return tf.cast(img, tf.float32)   # ❌ /255.0 দেবেন না


def augment(x):  # light & safe
    x=tf.image.random_flip_left_right(x,seed=SEED)
    x=tf.image.random_flip_up_down(x,seed=SEED)
    x=tf.image.random_brightness(x,0.05)
    x=tf.image.random_contrast(x,0.9,1.1)
    return x

def make_ds(img_dir, df, train=False):
    paths=df['ID'].values; labels=df[cols].values.astype(np.float32)
    ds=tf.data.Dataset.from_tensor_slices((paths,labels))
    if train: ds=ds.shuffle(len(paths),seed=SEED)
    ds=ds.map(lambda p,y:(tf.strings.join([img_dir,p]),y),num_parallel_calls=AUTO)
    ds=ds.map(lambda p,y:(decode(p),y),num_parallel_calls=AUTO)
    if train: ds=ds.map(lambda x,y:(augment(x),y),num_parallel_calls=AUTO)
    return ds.batch(BATCH).cache().prefetch(AUTO)

train_ds=make_ds(train_dir,train_df,train=True)
val_ds  =make_ds(val_dir,val_df,train=False)
test_ds =make_ds(test_dir,test_df,train=False)

# -------------------- Imbalance: per-class positive weights --------------------
pos=train_df[cols].sum(0).values.astype(np.float32); neg=len(train_df)-pos
pos_w=np.clip(neg/(pos+1e-6),1.0,50.0).astype(np.float32)
pos_w_tf=tf.constant(pos_w,dtype=tf.float32)

class WeightedBCE(keras.losses.Loss):
    def __init__(self, w): super().__init__(name='weighted_bce'); self.w=tf.cast(w,tf.float32)
    def call(self,y_true,y_pred):
        y_true=tf.cast(y_true,tf.float32); y_pred=tf.clip_by_value(y_pred,1e-7,1-1e-7)
        w=1.0+y_true*(self.w-1.0)
        bce=-(y_true*tf.math.log(y_pred)+(1.0-y_true)*tf.math.log(1.0-y_pred))
        return tf.reduce_mean(tf.reduce_sum(w*bce,axis=-1))

# -------------------- Backbones & preprocess mapping --------------------
from tensorflow.keras.applications import (
    vgg16, resnet50, efficientnet, mobilenet_v2
)

BACKBONES = {
    "VGG16":        (vgg16.VGG16,        vgg16.preprocess_input),
    "ResNet50":     (resnet50.ResNet50,  resnet50.preprocess_input),
    "EfficientNetB0":(efficientnet.EfficientNetB0, efficientnet.preprocess_input),
    "MobileNetV2":  (mobilenet_v2.MobileNetV2, mobilenet_v2.preprocess_input),
}

def build_model(name):
    ctor, preprocess = BACKBONES[name]
    base = ctor(include_top=False, weights='imagenet', input_shape=(*IMG_SIZE,3))
    base.trainable=False
    inp=keras.Input(shape=(*IMG_SIZE,3))
    x=preprocess(inp)
    x=base(x,training=False)
    x=keras.layers.GlobalAveragePooling2D()(x)
    x=keras.layers.Dense(256,activation='relu')(x)
    x=keras.layers.Dropout(0.4)(x)
    out=keras.layers.Dense(C,activation='sigmoid',dtype='float32')(x)
    model=keras.Model(inp,out,name=f"{name}_rfmid")
    model.compile(optimizer=keras.optimizers.Adam(LR),
                  loss=WeightedBCE(pos_w_tf),
                  metrics=[keras.metrics.BinaryAccuracy(name='bin_acc'),
                           keras.metrics.AUC(name='auc')])
    # plan for fine-tune: unfreeze last N layers
    def unfreeze_last_n(n=UNFREEZE_LAST_N):
        base.trainable=True
        for lyr in base.layers[:-n]: lyr.trainable=False
        model.compile(optimizer=keras.optimizers.Adam(LR*0.1),
                      loss=WeightedBCE(pos_w_tf),
                      metrics=[keras.metrics.BinaryAccuracy(name='bin_acc'),
                               keras.metrics.AUC(name='auc')])
    return model, unfreeze_last_n

# -------------------- Train/Evaluate one backbone --------------------
def collect_preds(ds, model):
    y_true=[]; y_prob=[]
    for xb,yb in ds:
        pb=model.predict(xb,verbose=0)
        y_true.append(yb.numpy()); y_prob.append(pb)
    return np.concatenate(y_true,0), np.concatenate(y_prob,0)

def find_best_thresholds(y_true,y_prob,grid=None):
    if grid is None: grid=np.linspace(0.1,0.9,17)
    best=np.zeros(y_true.shape[1],dtype=np.float32)
    for c in range(y_true.shape[1]):
        yt=y_true[:,c]; yp=y_prob[:,c]
        if yt.max()==yt.min(): best[c]=0.5; continue
        best_f=-1; best_t=0.5
        for t in grid:
            f=f1_score(yt,(yp>=t).astype(int),zero_division=0)
            if f>best_f: best_f=f; best_t=t
        best[c]=best_t
    return best

def eval_suite(name, y_true, y_prob, thresholds):
    y_pred=(y_prob>=thresholds.reshape(1,-1)).astype(int)
    macro_f=f1_score(y_true,y_pred,average='macro',zero_division=0)
    micro_f=f1_score(y_true,y_pred,average='micro',zero_division=0)
    aucs=[]
    for i in range(y_true.shape[1]):
        if y_true[:,i].max()!=y_true[:,i].min():
            aucs.append(roc_auc_score(y_true[:,i],y_prob[:,i]))
    macro_auc=float(np.mean(aucs)) if len(aucs)>0 else float('nan')
    hamm=hamming_loss(y_true,y_pred)
    emr=float((y_true==y_pred).all(axis=1).mean())
    # Flattened CM (vector figure)
    cm=confusion_matrix(y_true.flatten(),y_pred.flatten())
    plt.figure(figsize=(5.5,4.5)); sns.heatmap(cm,annot=True,fmt='d',cbar=False)
    plt.title(f'Confusion Matrix (Flattened) — {name}')
    plt.xlabel('Pred'); plt.ylabel('True'); plt.tight_layout()
    plt.savefig(f"fig_cm_flat_{name}.pdf"); plt.close()
    # classification report file
    with open(f"val_report_{name}.txt","w") as f:
        f.write(classification_report(y_true,y_pred,target_names=cols,zero_division=0))
    return {"model":name,"macro_f1":macro_f,"micro_f1":micro_f,"macro_auc":macro_auc,
            "hamming_loss":hamm,"exact_match_ratio":emr}

def train_and_eval(name):
    print(f"\n===== {name}: training =====")
    model, unfreeze = build_model(name)
    cbs=[
        keras.callbacks.EarlyStopping(monitor='val_loss',patience=4,restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(monitor='val_loss',factor=0.5,patience=2,verbose=1),
        keras.callbacks.ModelCheckpoint(f"{name}_best.weights.h5",save_best_only=True,
                                        save_weights_only=True,monitor='val_loss')
    ]
    h1=model.fit(train_ds,validation_data=val_ds,epochs=EPOCHS_WARM,callbacks=cbs,verbose=1)
    unfreeze(UNFREEZE_LAST_N)
    h2=model.fit(train_ds,validation_data=val_ds,epochs=EPOCHS_FT,callbacks=cbs,verbose=1)
    # plot history (vector)
    hist={}
    for k in h1.history: hist[k]=h1.history[k]+h2.history.get(k,[])
    def plot_hist(key,fname,title):
        ep=np.arange(1,len(hist[key])+1)
        plt.figure(figsize=(7,5))
        plt.plot(ep,hist[key],'o-',label=f"Train {key}")
        vk=f"val_{key}"
        if vk in hist: plt.plot(ep,hist[vk],'o-',label=f"Val {key}")
        plt.title(title); plt.xlabel("Epoch"); plt.ylabel(key); plt.grid(True); plt.legend()
        plt.tight_layout(); plt.savefig(fname); plt.close()
    plot_hist('loss', f"fig_loss_{name}.pdf", f"Loss — {name}")
    if 'bin_acc' in hist: plot_hist('bin_acc', f"fig_binacc_{name}.pdf", f"Binary Accuracy — {name}")
    if 'auc' in hist: plot_hist('auc', f"fig_auc_{name}.pdf", f"AUC — {name}")
    # Eval
    y_true,y_prob=collect_preds(val_ds,model)
    t=find_best_thresholds(y_true,y_prob); np.save(f"best_thresh_{name}.npy",t)
    res=eval_suite(name,y_true,y_prob,t)
    # save weights + metadata
    model.save_weights(f"{name}_final.weights.h5")
    meta={"backbone":name,"seed":SEED,"img_size":IMG_SIZE,"batch":BATCH,
          "epochs_warm":EPOCHS_WARM,"epochs_ft":EPOCHS_FT,"unfreeze_last_n":UNFREEZE_LAST_N,
          "metrics":res}
    with open(f"run_meta_{name}.json","w") as f: json.dump(meta,f,indent=2)
    return res

# -------------------- Run all models --------------------
results=[]
for name in ["VGG16","ResNet50","EfficientNetB0","MobileNetV2"]:
    results.append(train_and_eval(name))

# -------------------- Results table + comparison figure (vector) --------------------
res_df=pd.DataFrame(results)
res_df.to_csv("model_comparison_results.csv",index=False)
print("\n==== Model Comparison (Validation) ===="); print(res_df)

plt.figure(figsize=(8,5))
plt.bar(res_df["model"], res_df["macro_f1"], edgecolor='black')
plt.title("Model Comparison: Macro-F1 (Validation)")
plt.ylabel("Macro-F1"); plt.xlabel("Backbone"); plt.tight_layout()
plt.savefig("fig_model_compare_macroF1.pdf"); plt.close()

# Optional: Macro-AUC bar
plt.figure(figsize=(8,5))
plt.bar(res_df["model"], res_df["macro_auc"], edgecolor='black')
plt.title("Model Comparison: Macro-AUC (Validation)")
plt.ylabel("Macro-AUC"); plt.xlabel("Backbone"); plt.tight_layout()
plt.savefig("fig_model_compare_macroAUC.pdf"); plt.close()

# Save run summary
with open("SUMMARY.txt","w") as f:
    f.write("Validation comparison across backbones\n")
    f.write(res_df.to_string(index=False))
print("\n[Done] All figures saved as PDF (vector). Logs/weights/thresholds per model are written.")


In [None]:
import os
import shutil
import glob
import zipfile

# ✅ Helper: Create ZIP safely
def create_zip(zip_path, source_dir):
    if os.path.exists(source_dir):
        shutil.make_archive(zip_path.replace('.zip', ''), 'zip', source_dir)
        print(f"✅ Created ZIP: {zip_path}")
    else:
        print(f"⚠️ Source folder not found: {source_dir}")

# ✅ Helper: Collect important output files
def gather_files():
    patterns = [
        'figures/*.pdf', 'figures/*.svg',
        '*_final.weights.h5', '*_best.weights.h5',
        'val_report_*.txt', 'run_meta_*.json',
        'model_comparison_results.csv', 'SUMMARY.txt'
    ]
    files = []
    for pattern in patterns:
        matched = glob.glob(pattern)
        if matched:
            files.extend(matched)
        else:
            print(f"⚠️ No match for pattern: {pattern}")
    return files

# ✅ Helper: Write files to ZIP
def zip_outputs(zip_path, files):
    with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_DEFLATED) as z:
        for f in files:
            if os.path.exists(f):
                z.write(f)
            else:
                print(f"⚠️ File not found: {f}")
    print(f"✅ Packed {len(files)} files into: {zip_path}")

# 📦 figures ফোল্ডার ZIP
create_zip('/kaggle/working/figures.zip', 'figures')

# 📦 সব গুরুত্বপূর্ণ আউটপুট একসাথে ZIP
important_files = gather_files()
zip_outputs('/kaggle/working/all_outputs.zip', important_files)

# ✅ Final status
print("Ready:", "/kaggle/working/figures.zip", "/kaggle/working/all_outputs.zip")