In [15]:
import os
import cv2
import numpy as np
from tqdm import tqdm
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical
from keras.applications.resnet import preprocess_input
from keras.applications import ResNet50
from keras.layers import GlobalAveragePooling2D, Dense, Dropout
from keras.models import Model
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from sklearn.metrics import classification_report
from keras.preprocessing.image import ImageDataGenerator

# Constants
INPUT_SIZE_RESNET = (224, 224, 3)
class_folders = ['AD', 'CONTROL', 'PD']

# Use ImageDataGenerator for data augmentation
datagen = ImageDataGenerator(
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    vertical_flip=True,
    brightness_range=[0.8, 1.2],
    fill_mode='nearest'
)

# Custom data generator
def data_generator(image_paths, labels, batch_size):
    num_samples = len(image_paths)
    while True:
        indices = np.arange(num_samples)
        np.random.shuffle(indices)

        for start in range(0, num_samples, batch_size):
            end = min(start + batch_size, num_samples)
            batch_indices = indices[start:end]

            batch_images = []
            batch_labels = []

            for idx in batch_indices:
                image_path = image_paths[idx]
                label = labels[idx]

                image = cv2.imread(image_path)
                if image is None or image.size == 0 or image.shape[0] == 0 or image.shape[1] == 0:
                    print(f"Error or invalid image: {image_path}")
                    continue

                image = cv2.resize(image, (INPUT_SIZE_RESNET[0], INPUT_SIZE_RESNET[1]))
                image = preprocess_input(image)

                batch_images.append(image)
                batch_labels.append(label)

                # Apply data augmentation
                for augmented_image, _ in zip(datagen.flow(np.expand_dims(image, axis=0), batch_size=8), range(1)):
                    batch_images.append(augmented_image[0])
                    batch_labels.append(label)

            yield np.array(batch_images), to_categorical(batch_labels, len(class_folders))

# Load and preprocess data
image_paths = []
labels = []

for folder in class_folders:
    folder_path = os.path.join(os.getcwd(), 'Training', folder)

    if os.path.exists(folder_path):
        images = os.listdir(folder_path)
        print(f"Found {len(images)} images in {folder} folder.")
    else:
        print(f"Directory {folder_path} does not exist.")
        continue

    for image_name in tqdm(images, desc=f"Processing '{folder}' images"):
        if image_name.split('.')[-1].lower() not in ['jpg', 'jpeg', 'png']:
            continue

        image_path = os.path.join(folder_path, image_name)
        image_paths.append(image_path)
        labels.append(folder)

# Convert labels to one-hot encoding
label_indices = {label: idx for idx, label in enumerate(class_folders)}
labels_encoded = [label_indices[label] for label in labels]

# Split the data into training and testing sets
X_train_paths, X_test_paths, Y_train, Y_test = train_test_split(image_paths, labels_encoded, test_size=0.2, random_state=42)

# Create a ResNet50 base model
base_model = ResNet50(include_top=False, input_shape=INPUT_SIZE_RESNET)
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(512, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.5)(x)
predictions = Dense(len(class_folders), activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=predictions)

# Fine-tune the last few layers of the ResNet50 model
for layer in base_model.layers[:-10]:
    layer.trainable = False

# Compile the model with class weights
initial_learning_rate = 1e-4
optimizer = Adam(learning_rate=initial_learning_rate)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

# Define callbacks
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('best_model.h5', save_best_only=True, monitor='val_loss', mode='min')
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-7)

# Train the model using the data generator
batch_size = 32
steps_per_epoch = len(X_train_paths) // batch_size
validation_steps = len(X_test_paths) // batch_size

history = model.fit(
    data_generator(X_train_paths, Y_train, batch_size),
    steps_per_epoch=steps_per_epoch,
    epochs=10,
    validation_data=data_generator(X_test_paths, Y_test, batch_size),
    validation_steps=validation_steps,
    callbacks=[early_stop, model_checkpoint, reduce_lr]
)

# Evaluate the model on test data
model.load_weights('best_model.h5')
test_loss, test_acc = model.evaluate(data_generator(X_test_paths, Y_test, batch_size), steps=validation_steps)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}')

# Make predictions
y_pred = np.argmax(model.predict(data_generator(X_test_paths, Y_test, batch_size), steps=validation_steps), axis=1)

# Print classification report
print(classification_report(Y_test[:len(y_pred)], y_pred))


Found 2561 images in AD folder.


Processing 'AD' images: 100%|██████████| 2561/2561 [00:00<00:00, 361232.60it/s]


Found 3010 images in CONTROL folder.


Processing 'CONTROL' images: 100%|██████████| 3010/3010 [00:00<00:00, 386945.02it/s]


Found 906 images in PD folder.


Processing 'PD' images: 100%|██████████| 906/906 [00:00<00:00, 445021.60it/s]


Epoch 1/10
  9/161 [>.............................] - ETA: 7:01 - loss: 1.3330 - accuracy: 0.3958

KeyboardInterrupt: 