<a href="https://www.kaggle.com/code/yahyasoker/diabetic-retinopathy-v1?scriptVersionId=249932563" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import spectrogram
from tqdm import tqdm
import random
import shutil
import seaborn as sns
import warnings
from PIL import Image
import numpy as np
from glob import glob
#---------------------------------------
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
#---------------------------------------
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Dropout, Flatten, Input, BatchNormalization, Add
from tensorflow.keras.optimizers import Adamax
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping
#---------------------------------------
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, ConfusionMatrixDisplay
from sklearn.model_selection import KFold
#---------------------------------------
import warnings
warnings.filterwarnings("ignore")

In [None]:
# Function to move files to respective directories with progress bar
def move_files(file_paths, labels, target_dirs):
    with tqdm(total=len(file_paths), desc="Moving Files", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") as pbar:
        for file_path, label in zip(file_paths, labels):
            target_dir = target_dirs[label]
            shutil.copy(file_path, target_dir)
            pbar.update(1)

In [None]:
def data_prep_split(input_dir, output_dir, classes):
    # Define output directories
    train_dir = os.path.join(output_dir, "train")
    valid_dir = os.path.join(output_dir, "valid")
    test_dir = os.path.join(output_dir, "test")

    # Create directories for each class in each split
    split_dirs = {
        'train': {},
        'valid': {},
        'test': {}
    }

    for split in split_dirs:
        for class_name in classes:
            path = os.path.join(output_dir, split, class_name)
            os.makedirs(path, exist_ok=True)
            split_dirs[split][class_name] = path

    # Process each class
    for label, class_name in enumerate(classes):
        class_dir = os.path.join(input_dir, class_name)
        class_files = [os.path.join(class_dir, file) for file in os.listdir(class_dir)
                       if os.path.isfile(os.path.join(class_dir, file))]

        # Shuffle the files
        random.shuffle(class_files)

        # Compute split indices
        total = len(class_files)
        train_end = int(0.7 * total)
        valid_end = train_end + int(0.1 * total)

        train_files = class_files[:train_end]
        valid_files = class_files[train_end:valid_end]
        test_files = class_files[valid_end:]

        # Move files
        move_files(train_files, [label] * len(train_files), list(split_dirs['train'].values()))
        move_files(valid_files, [label] * len(valid_files), list(split_dirs['valid'].values()))
        move_files(test_files, [label] * len(test_files), list(split_dirs['test'].values()))

    print("Data successfully split into train, validation, and test directories.")

In [None]:
# Function to create a DataFrame for training/testing data
def create_df(data_path):
    classes, class_paths = zip(*[
        (label, os.path.join(data_path, label, file))
        for label in os.listdir(data_path) if os.path.isdir(os.path.join(data_path, label))
        for file in os.listdir(os.path.join(data_path, label))
    ])
    df = pd.DataFrame({'Class Path': class_paths, 'Class': classes})
    return df

In [None]:
input_dir = "/kaggle/input/diabetic-retinopathy-224x224-gaussian-filtered/gaussian_filtered_images/gaussian_filtered_images"  # Parent directory containing folders A, B, C, D, E
output_dir = "/kaggle/working/"      # Directory where you want to store the processed data
classes = ['Mild','Moderate','No_DR','Proliferate_DR','Severe']
# Prepare data with an 80-20 split
data_prep_split(input_dir, output_dir,classes)

# Create DataFrames for train and test data
train_path = os.path.join(output_dir, "train")
test_path = os.path.join(output_dir, "test")
valid_path = os.path.join(output_dir, "valid")


train_df = create_df(train_path)
test_df = create_df(test_path)
valid_df = create_df(valid_path)

# Display DataFrames
print("Training DataFrame:")
print(train_df.head())

print("\nTesting DataFrame:")
print(test_df.head())

print("\nValid DataFrame:")
print(valid_df.head())

In [None]:
def train_dff(tr_path):
    classes, class_paths = zip(*[(label, os.path.join(tr_path, label, image))
                                 for label in os.listdir(tr_path) if os.path.isdir(os.path.join(tr_path, label))
                                 for image in os.listdir(os.path.join(tr_path, label))])

    tr_df = pd.DataFrame({'Class Path': class_paths, 'Class': classes})
    return tr_df
def test_dff(ts_path):
    classes, class_paths = zip(*[(label, os.path.join(ts_path, label, image))
                                 for label in os.listdir(ts_path) if os.path.isdir(os.path.join(ts_path, label))
                                 for image in os.listdir(os.path.join(ts_path, label))])

    ts_df = pd.DataFrame({'Class Path': class_paths, 'Class': classes})
    return ts_df

In [None]:
def data_df(train_path,test_path):
    train_df = train_dff(train_path)
    test_df = test_dff(test_path)
    valid_df, test_df = train_test_split(test_df, train_size=0.5, random_state=41, stratify=test_df['Class'])
    
    datasets = [("Train Data", train_df['Class']), ("Test Data", test_df['Class']),("Validation Data", valid_df['Class'])]
    palettes = ['crest', 'mako', 'rocket', 'flare']

# Loop through each dataset and create the corresponding plot
    for i, (title, data) in enumerate(datasets):
        plt.figure(figsize=(4, 3))
        ax = sns.countplot(y=data, palette=palettes[i % len(palettes)])
        ax.set(xlabel='', ylabel='', title=f'Count of images in each class ({title})')
        ax.bar_label(ax.containers[0], fontsize=10, padding=5)
        plt.show()
    return train_df,test_df

In [None]:
train_path = "/kaggle/working/train"
test_path = "/kaggle/working/test"
train_df, test_df = data_df(train_path,test_path)

In [None]:
def create_model(img_shape, ml,y):
    """
    Create a model with a dynamic base model from TensorFlow's application models.

    Parameters:
    - img_shape: tuple, shape of the input images (e.g., (224, 224, 3))
    - ml: TensorFlow application model function (e.g., tf.keras.applications.VGG16)
    
    Returns:
    - Compiled Keras Model
    """
    # Input layer
    inputs = Input(shape=img_shape)  # Define the input layer based on img_shape
    
    # Base model
    base_model = ml(
        include_top=False,
        weights="imagenet",
        input_shape=img_shape,  # Define the input shape for the base model
        pooling='max'  # Use global max pooling
    )
    
    # Connect the base model to the inputs
    x = base_model(inputs, training=False)  # Use the base model's features without updating weights
    
    # Flatten Layer
    x = Flatten()(x)  # Flatten the output of the base model

    # First Dropout Layer
    x = Dropout(rate=0.3)(x)

    # First Dense Layer
    x = Dense(128, activation='relu')(x)

    # Second Dropout Layer
    x = Dropout(rate=0.25)(x)

    # Output Layer
    outputs = Dense(y, activation='sigmoid')(x)

    # Create the model
    model = Model(inputs=inputs, outputs=outputs)

    # Compile the model
    model.compile(
        optimizer=Adamax(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy', Precision(), Recall()]
    )

    model.summary()  # Display model summary
    return model

In [None]:
def plot_training_metrics(epochs, training_metrics, validation_metrics, title, xlabel, ylabel, best_index, best_value, best_label):
    """
    Plots training and validation metrics over epochs with an emphasis on the best value.
    """
    if len(training_metrics) == 0 or len(validation_metrics) == 0:
        print("Error: Training or validation metrics are empty.")
        return
    
    # Create the plot
    plt.figure(figsize=(12, 6))  # Larger figure for more space
    plt.plot(epochs, training_metrics, 'r-', label='Training', linewidth=2, markersize=6)
    plt.plot(epochs, validation_metrics, 'g-', label='Validation', linewidth=2, markersize=6)
    
    # Add a scatter for the best value
    plt.scatter(best_index + 1, best_value, s=200, c='blue', edgecolor='black', label=f'{best_label} ({best_value:.4f})', zorder=5)
    
    # Annotate the best value point
    plt.text(best_index + 1, best_value + 0.05, f'{best_value:.4f}', color='blue', ha='center', fontsize=12, fontweight='bold')

    # Add horizontal line at best value to emphasize it
    plt.axhline(y=best_value, color='blue', linestyle='--', linewidth=1.5, alpha=0.6)

    # Title and labels
    plt.title(title, fontsize=16, fontweight='bold')
    plt.xlabel(xlabel, fontsize=14)
    plt.ylabel(ylabel, fontsize=14)

    # Enhance legend
    plt.legend(fontsize=12, loc='upper right', title="Metrics", title_fontsize=14)

    # Grid for better readability
    plt.grid(True, linestyle='--', alpha=0.6)

    # Customize ticks for better clarity
    plt.xticks(fontsize=12)
    plt.yticks(fontsize=12)

    # Tight layout to avoid clipping
    plt.tight_layout()

    # Show the plot
    plt.show()

In [None]:
def plot_confusion_matrix(y_true, y_pred_classes, test_gen, fold_idx):
    """
    Plots a confusion matrix and displays evaluation metrics.
    
    Parameters:
        y_true (list or array-like): True labels.
        y_pred_classes (list or array-like): Predicted labels.
        test_gen (object): Test data generator, used to extract class labels.
        fold_idx (int): Fold index for display purposes.
    """
    # Generate the confusion matrix
    cm = confusion_matrix(y_true, y_pred_classes)

    # Set a larger figure size and create an axis
    fig, ax = plt.subplots(figsize=(12, 10))  # Adjust dimensions as needed

    # Plot the confusion matrix on the specified axis
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=test_gen.class_indices.keys())
    disp.plot(cmap='Blues', ax=ax)  # Assign ax to ensure resizing

    # Calculate evaluation metrics
    accuracy = accuracy_score(y_true, y_pred_classes)
    precision = precision_score(y_true, y_pred_classes, average='micro')
    recall = recall_score(y_true, y_pred_classes, average='micro')
    f1 = f1_score(y_true, y_pred_classes, average='micro')

    # Customize plot title and display metrics below the plot
    plt.title(f"Confusion Matrix - Fold {fold_idx}", fontsize=16)
    plt.figtext(
        0.5, -0.1,
        f"Accuracy: {accuracy:.4f} | Precision: {precision:.4f} | Recall: {recall:.4f} | F1 Score: {f1:.4f}",
        ha="center", fontsize=14, color="black"
    )

    # Show the plot with the updated size
    plt.show()

In [None]:
def automated_model_training_with_plots(train_df, test_df, kf, img_size, batch_size, img_shape, model_list,epoch_size,y):
    """
    Automates training and evaluation with different pre-trained models, including plotting metrics.

    Parameters:
    - train_df: DataFrame with training data
    - test_df: DataFrame with test data
    - kf: KFold cross-validator
    - img_size: Tuple, target size for the images
    - batch_size: Integer, batch size for data generators
    - img_shape: Tuple, input shape for the model
    - model_list: List of TensorFlow model functions (e.g., [VGG16, Xception])

    Outputs:
    - Training metrics, plots, and confusion matrix for each model.
    """
    for model_fn in model_list:
        print(f"Training with model: {model_fn.__name__}")
        
        # Initialize metrics storage
        accuracy_scores, precision_scores, recall_scores, f1_scores = [], [], [], []
        
        # K-Fold Cross Validation Loop
        for fold_idx, (train_index, val_index) in enumerate(kf.split(train_df)):
            print(f"\nFold {fold_idx + 1}/{kf.n_splits}")
            
            # Split data into training and validation sets
            train_data = train_df.iloc[train_index]
            val_data = train_df.iloc[val_index]

            # Data generators
            train_gen = ImageDataGenerator(rescale=1/255, brightness_range=(0.8, 1.2)).flow_from_dataframe(
                train_data, x_col='Class Path', y_col='Class', batch_size=batch_size, target_size=img_size)
            
            valid_gen = ImageDataGenerator(rescale=1/255).flow_from_dataframe(
                val_data, x_col='Class Path', y_col='Class', batch_size=batch_size, target_size=img_size)
            
            test_gen = ImageDataGenerator(rescale=1/255).flow_from_dataframe(
                test_df, x_col='Class Path', y_col='Class', batch_size=16, target_size=img_size, shuffle=False)

            # Build and train model
            model = create_model(img_shape, model_fn,y)
            history = model.fit(train_gen, validation_data=valid_gen, epochs=epoch_size, shuffle=False)

            # Extract training and validation metrics
            training_accuracy = history.history['accuracy']
            validation_accuracy = history.history['val_accuracy']
            training_loss = history.history['loss']
            validation_loss = history.history['val_loss']

            # Find best epoch indices
            best_epoch_loss_index = np.argmin(validation_loss)
            best_validation_loss = validation_loss[best_epoch_loss_index]
            best_epoch_accuracy_index = np.argmax(validation_accuracy)
            best_validation_accuracy = validation_accuracy[best_epoch_accuracy_index]

            # Plot training metrics
            epochs = np.arange(1, len(training_accuracy) + 1)
            plot_training_metrics(
                epochs,
                training_accuracy,
                validation_accuracy,
                "Training and Validation Accuracy",
                "Epochs",
                "Accuracy",
                best_epoch_accuracy_index,
                best_validation_accuracy,
                f"Best Epoch = {best_epoch_accuracy_index + 1}"
            )
            plot_training_metrics(
                epochs,
                training_loss,
                validation_loss,
                "Training and Validation Loss",
                "Epochs",
                "Loss",
                best_epoch_loss_index,
                best_validation_loss,
                f"Best Epoch = {best_epoch_loss_index + 1}"
            )

            # Evaluate the model on test data
            y_true = test_gen.classes
            y_pred = model.predict(test_gen)
            y_pred_classes = np.argmax(y_pred, axis=1)

            # Metrics
            accuracy = accuracy_score(y_true, y_pred_classes)
            precision = precision_score(y_true, y_pred_classes, average='macro')
            recall = recall_score(y_true, y_pred_classes, average='macro')
            f1 = f1_score(y_true, y_pred_classes, average='macro')

            # Append metrics
            accuracy_scores.append(accuracy)
            precision_scores.append(precision)
            recall_scores.append(recall)
            f1_scores.append(f1)

            # Confusion Matrix
            plot_confusion_matrix(y_true,
                y_pred_classes,
                test_gen, 
                fold_idx)

        # Print cross-validation metrics
        print(f"\nResults for model: {model_fn.__name__}")
        print(f"Accuracy: {np.mean(accuracy_scores):.4f} ± {np.std(accuracy_scores):.4f}")
        print(f"Precision: {np.mean(precision_scores):.4f} ± {np.std(precision_scores):.4f}")
        print(f"Recall: {np.mean(recall_scores):.4f} ± {np.std(recall_scores):.4f}")
        print(f"F1 Score: {np.mean(f1_scores):.4f} ± {np.std(f1_scores):.4f}")
        print("-" * 50)

In [None]:
kf = KFold(n_splits=3, shuffle=True, random_state=42) 
img_size = (256, 256)
img_shape = (256,256,3)
batch_size = 32
loss_threshold=1.5
patience=10
epoch_size = 30
y = 5

In [None]:
# Example usage
models_to_evaluate = [tf.keras.applications.Xception, tf.keras.applications.VGG16, tf.keras.applications.ResNet50]
automated_model_training_with_plots(train_df, test_df, kf, img_size, batch_size, img_shape, models_to_evaluate,epoch_size,y)