Import Libraries

In [None]:
import os
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.applications import EfficientNetB0
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping

Verify GPU usage

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        print(e)
else:
    print("No GPU found. Using CPU instead.")

Enable mixed precision

In [None]:
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

Define Classes

In [None]:
num_classes = 5

Define Directories

In [None]:
# Map class labels to folder names
label_to_folder = {
    0: 'No_Dr',
    1: 'Mild',
    2: 'Moderate',
    3: 'Severe',
    4: 'Proliferate'
}

In [None]:
# Get the home directory
home_dir = os.path.expanduser("~")

# Construct full paths
base_dir = os.path.join(home_dir, "Desktop", "4th Year Project", "Detection Of Diabetic Retinopathy Using Machine Learning.v1i.multiclass", "archive")
img_dir = os.path.join(base_dir, "gaussian_filtered_images", "gaussian_filtered_images")
csv_file_path = os.path.join(base_dir, "train.csv")

Set Paths

In [None]:
# Print and Check Paths
print("Image Directory Path:", img_dir)
print("CSV file Path:", csv_file_path)
print("Image Directory Exists:", os.path.exists(img_dir))
print("CSV file Exists:", os.path.exists(csv_file_path))

In [None]:
if os.path.exists(img_dir) and os.path.exists(csv_file_path):
    # Read the CSV file
    df = pd.read_csv(csv_file_path)
    print("CSV columns:", df.columns.tolist())
    print(df.head())

    # Column names in your CSV
    id_column = 'id_code'
    label_column = 'diagnosis'

    # Create full file paths and get labels
    file_paths = [os.path.join(img_dir, label_to_folder[label], f"{id_code}.png")
                  for id_code, label in zip(df[id_column], df[label_column])]
    labels = df[label_column].values

    # Verify if all files exist
    existing_files = [file for file in file_paths if os.path.exists(file)]
    existing_labels = [labels[i] for i, file in enumerate(file_paths) if os.path.exists(file)]

    # Split the data into train and validation sets
    train_files, val_files, train_labels, val_labels = train_test_split(
        existing_files, existing_labels, test_size=0.2, random_state=42)

    # Define image dimensions
    img_width, img_height = 224, 224

    # Function to process images
    def process_path(file_path, label):
        img = tf.io.read_file(file_path)
        img = tf.image.decode_png(img, channels=3)
        img = tf.image.resize(img, [img_width, img_height])
        img = tf.image.convert_image_dtype(img, tf.float32)  # Normalize to [0,1]
        return img, label

    # Data augmentation
    data_augmentation = tf.keras.Sequential([
        layers.RandomFlip("horizontal_and_vertical"),
        layers.RandomRotation(0.2),
        layers.RandomZoom(0.2),
        layers.RandomTranslation(0.1, 0.1)
    ])

    # Create datasets
    batch_size = 32
    train_dataset = tf.data.Dataset.from_tensor_slices((train_files, train_labels))
    train_dataset = train_dataset.map(process_path, num_parallel_calls=tf.data.AUTOTUNE)
    train_dataset = train_dataset.shuffle(buffer_size=len(train_files))
    train_dataset = train_dataset.map(lambda x, y: (data_augmentation(x, training=True), y), num_parallel_calls=tf.data.AUTOTUNE)
    train_dataset = train_dataset.batch(batch_size)
    train_dataset = train_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

    val_dataset = tf.data.Dataset.from_tensor_slices((val_files, val_labels))
    val_dataset = val_dataset.map(process_path, num_parallel_calls=tf.data.AUTOTUNE)
    val_dataset = val_dataset.batch(batch_size)
    val_dataset = val_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

    print("Datasets Have Been Created Successfully!!!")

    # Model definition
    base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    base_model.trainable = True  # Unfreeze the base model

    # Fine-tune from this layer onwards
    fine_tune_at = 100

    for layer in base_model.layers[:fine_tune_at]:
        layer.trainable = False

    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(256, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation='softmax', dtype='float32')  # Ensures output is not in mixed precision
    ])

    # Model compilation
    model.compile(optimizer=optimizers.Adam(learning_rate=0.001),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    # Callbacks
    checkpoint = ModelCheckpoint('best_model.keras', save_best_only=True, monitor='val_accuracy', mode='max')
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.0001)
    early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

    # Training
    history = model.fit(
        train_dataset,
        validation_data=val_dataset,
        epochs=50,
        callbacks=[checkpoint, reduce_lr, early_stopping]
    )

    # Save the final model
    model.save('final_model.keras')

    # Evaluate the model
    test_loss, test_accuracy = model.evaluate(val_dataset)
    print(f"Test accuracy: {test_accuracy:.4f}")

else:
    print("Error: One or both paths do not exist.")

    # Print the contents of the base directory to help troubleshoot
    print("\nContents of base directory:")
    try:
        for item in os.listdir(base_dir):
            print(item)
    except FileNotFoundError:
        print(f"Base directory not found: {base_dir}")