In [1]:
import os
from PIL import Image
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from flask import Flask, request, render_template
from tensorflow.keras.preprocessing.image import load_img, img_to_array, ImageDataGenerator
from sklearn.metrics import classification_report
from sklearn.utils.class_weight import compute_class_weight

In [2]:
# Define dataset path
dataset_path = r"C:\Users\adlercohen\DevAdler\thesis-data\Retinal-OCT-Reduced"

In [3]:
# Mapping Retinal OCT labels to Alzheimer's categories
alzheimers_labels = {
    "NORMAL": "CN",  # Cognitively Normal
    "DRUSEN": "MCI",  # Mild Cognitive Impairment
    "CNV": "AD",  # Alzheimer's Disease
    "DME": "AD"  # Alzheimer's Disease
}

# Collect image paths and labels
train_image_paths, train_labels = [], []
test_image_paths, test_labels = [], []
val_image_paths, val_labels = [], []

for split, image_list, label_list in [
    ("train", train_image_paths, train_labels), 
    ("test", test_image_paths, test_labels), 
    ("val", val_image_paths, val_labels)
]:
    for category in alzheimers_labels.keys():
        folder_path = os.path.join(dataset_path, split, category)
        if not os.path.exists(folder_path):
            print(f"Warning: {folder_path} does not exist. Skipping...")
            continue
        for img_name in os.listdir(folder_path):
            img_path = os.path.join(folder_path, img_name)
            image_list.append(img_path)
            label_list.append(alzheimers_labels[category])

# Load images with augmentation
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode="nearest"
)

In [4]:
def load_images(image_paths, labels, img_size=(300, 300)):
    X, y = [], []
    for img_path, label in zip(image_paths, labels):
        try:
            img = load_img(img_path, target_size=img_size)
            img_array = img_to_array(img) / 255.0
            X.append(img_array)
            y.append(label)
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
    return np.array(X), np.array(y)

# Load datasets
train_X, train_y = load_images(train_image_paths, train_labels)
test_X, test_y = load_images(test_image_paths, test_labels)
val_X, val_y = load_images(val_image_paths, val_labels)

# Encode labels and ensure integer type
label_map = {"CN": 0, "MCI": 1, "AD": 2}
train_y = np.array([label_map[label] for label in train_y]).astype(int)  # Cast to int
test_y = np.array([label_map[label] for label in test_y]).astype(int)
val_y = np.array([label_map[label] for label in val_y]).astype(int)

# Compute class weights (now classes are integers)
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(train_y),
    y=train_y
)
class_weights = dict(enumerate(class_weights))

print("Class weights:", class_weights)

Class weights: {0: np.float64(1.3254747871643746), 1: np.float64(1.335973597359736), 2: np.float64(0.667986798679868)}


In [5]:
# Build improved model
def build_model(input_shape=(300, 300, 3), num_classes=3):
    base_model = keras.applications.EfficientNetB3(weights='imagenet', include_top=False, input_shape=input_shape)
    for layer in base_model.layers[-50:]:  # Fine-tune last 50 layers
        layer.trainable = True
    
    model = keras.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(256, activation='relu'),
        layers.Dropout(0.4),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-5), 
                  loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model


In [None]:
# Train model
model = build_model()
lr_scheduler = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, verbose=1)
history = model.fit(datagen.flow(train_X, train_y, batch_size=32), 
                    validation_data=(val_X, val_y), 
                    epochs=10, 
                    class_weight=class_weights, 
                    callbacks=[lr_scheduler])

# Save trained model
model.save('alzheimers_rgc_model.h5')
print("Model trained and saved successfully!")

# Predict on test data
test_predictions = model.predict(test_X)
test_pred_labels = np.argmax(test_predictions, axis=1)

# Evaluate model on validation set
val_predictions = model.predict(val_X)
val_pred_labels = np.argmax(val_predictions, axis=1)

print("Classification Report (Validation Set):")
print(classification_report(val_y, val_pred_labels, target_names=["CN", "MCI", "AD"]))

  self._warn_if_super_not_called()


Epoch 1/10
[1m 1/64[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m3:54:04[0m 223s/step - accuracy: 0.3125 - loss: 1.1996