In [1]:
import os
import shutil
import random
import math
import numpy as np
import matplotlib.pyplot as plt
import datetime
import pickle
from pathlib import Path
from IPython.core.getipython import get_ipython
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import VGG19, ResNet50, VGG16, MobileNetV2, Xception, EfficientNetB0, DenseNet121
from tensorflow.keras.applications.vgg19 import preprocess_input as vgg19_preprocess
from tensorflow.keras.applications.resnet50 import preprocess_input as resnet50_preprocess
from tensorflow.keras.applications.vgg16 import preprocess_input as vgg16_preprocess
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input as mobilenetv2_preprocess
from tensorflow.keras.applications.xception import preprocess_input as xception_preprocess
from tensorflow.keras.applications.efficientnet import preprocess_input as efficientnetb0_preprocess
from tensorflow.keras.applications.densenet import preprocess_input as densenet121_preprocess
from tensorflow.keras.layers import GlobalAveragePooling2D, Dropout,Dense, BatchNormalization,Activation, Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping

from sklearn.metrics import confusion_matrix, roc_curve, auc, classification_report, f1_score, precision_score, recall_score


In [None]:

def extract_features(generator, model, steps):
    """
    Extract features from a dataset using a feature extractor model.
    """
    features = []
    labels = []
    for _ in range(steps):
        batch_x, batch_y = next(generator)
        batch_features = model.predict(batch_x, verbose=0)
        features.append(batch_features)
        labels.append(batch_y)
    features = np.vstack(features)[:generator.samples]  
    labels = np.hstack(labels)[:generator.samples]
    return features, labels

def train_and_evaluate_model(model_name, preprocess_function):
    print(f"\nTraining with {model_name}...\n")
    
    # Set image size based on model
    if model_name == 'Xception':
        img_h, img_w = 299, 299
    else:
        img_h, img_w = 224, 224

    
    # Create ImageDataGenerators , to load and preprocess images
    train_datagen = ImageDataGenerator(preprocessing_function=preprocess_function)
    val_datagen = ImageDataGenerator(preprocessing_function=preprocess_function)
    test_datagen = ImageDataGenerator(preprocessing_function=preprocess_function)
    
    train_dir = os.path.join(split_dataset_dir, "train")
    val_dir = os.path.join(split_dataset_dir, "validation")
    test_dir = os.path.join(split_dataset_dir, "test")
    
    train_generator = train_datagen.flow_from_directory(train_dir,target_size=(img_h, img_w),batch_size=batch_size,class_mode='binary',shuffle=True,seed=42,)
    val_generator = val_datagen.flow_from_directory(val_dir,target_size=(img_h, img_w),batch_size=batch_size,class_mode='binary',shuffle=False,seed=42,)    
    test_generator = test_datagen.flow_from_directory(test_dir,target_size=(img_h, img_w),batch_size=batch_size,class_mode='binary',shuffle=False,seed=42,)
    
    class_indices = test_generator.class_indices    
    print(f"class indices: {list(class_indices.keys())}")    
    # Load the pre-trained model
    if model_name == 'VGG19':
        Pretrained_model = VGG19(weights='imagenet', include_top=False, input_shape=(img_h, img_w, 3))
    elif model_name == 'ResNet50':
        Pretrained_model = ResNet50(weights='imagenet', include_top=False, input_shape=(img_h, img_w, 3))
    elif model_name == 'VGG16':
        Pretrained_model = VGG16(weights='imagenet', include_top=False, input_shape=(img_h, img_w, 3))
    elif model_name == 'MobileNetV2':
        Pretrained_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(img_h, img_w, 3))
    elif model_name == 'Xception':
        Pretrained_model = Xception(weights='imagenet', include_top=False, input_shape=(img_h, img_w, 3))
    elif model_name == 'EfficientNetB0':
        Pretrained_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=(img_h, img_w, 3))
    elif model_name == 'DenseNet121':
        Pretrained_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(img_h, img_w, 3))
    else:
        raise ValueError("Invalid model name")
    
    # Freeze the pre-trained layers
    Pretrained_model.trainable = False
    
    # Create feature extractor including GlobalAveragePooling2D
    feature_extractor = Model(inputs=Pretrained_model.input,outputs=GlobalAveragePooling2D()(Pretrained_model.output))
    
    # Calculate steps for feature extraction
    train_steps = math.ceil(train_generator.samples / batch_size)
    val_steps = math.ceil(val_generator.samples / batch_size)
    test_steps = math.ceil(test_generator.samples / batch_size)
    
    # Extract features
    print("Extracting features for training set...")
    train_features, train_labels = extract_features(train_generator, feature_extractor, train_steps)
    print("Extracting features for validation set...")
    val_features, val_labels = extract_features(val_generator, feature_extractor, val_steps)
    print("Extracting features for test set...")
    test_features, test_labels = extract_features(test_generator, feature_extractor, test_steps)

    # Define simplified custom head
    feature_dim = feature_extractor.output_shape[-1]  # e.g., 512 for VGG16
    input_layer = Input(shape=(feature_dim,))
    x = Dense(512)(input_layer)  # Reduced from two layers
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Dropout(0.2)(x)
    predictions = Dense(1, activation='sigmoid', dtype='float32')(x)
    custom_model = Model(inputs=input_layer, outputs=predictions)
    
    # Compile the custom head model
    custom_model.compile(optimizer=Adam(learning_rate=1e-4),loss='binary_crossentropy',metrics=['accuracy'])
    
    # Early stopping with potentially reduced patience
    early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
    
    # Train the custom head on extracted features
    history = custom_model.fit(train_features, train_labels,batch_size=batch_size,epochs=epochs,validation_data=(val_features, val_labels),callbacks=[early_stopping])
    
    # Evaluate on test set
    test_loss, test_accuracy = custom_model.evaluate(test_features, test_labels, verbose=1)
    print(f"\n{model_name} Test Loss: {test_loss:.4f}")
    print(f"{model_name} Test Accuracy: {test_accuracy:.4f}")
    
    # Make predictions
    predictions = custom_model.predict(test_features, verbose=1)
    y_pred = (predictions > 0.5).astype(int).flatten()
    y_true = test_labels  # Use extracted labels
    
    # Compute performance metrics
    cm = confusion_matrix(y_true, y_pred)
    print("Confusion Matrix:\n", cm)
    
    f1 = f1_score(y_true, y_pred, average='binary')
    precision = precision_score(y_true, y_pred, average='binary')
    recall = recall_score(y_true, y_pred, average='binary')
    
    print("\nClassification Report:")
    labels = list(class_indices.keys())
    print(classification_report(y_true, y_pred, target_names=labels))
    
    # Compute ROC Curve and AUC
    fpr, tpr, _ = roc_curve(y_true, predictions.flatten())
    roc_auc = auc(fpr, tpr)
    
    return {
        'model': custom_model,
        'history': history.history,
        'confusion_matrix': cm,
        'roc': (fpr, tpr, roc_auc),
        'performance': {'accuracy': test_accuracy, 'f1': f1, 'precision': precision, 'recall': recall},
        'labels': labels
    }

def plot_results(results, run_dir):
    model_names = list(results.keys())
    num_models = len(model_names)
    
    # 1. Plot Confusion Matrices
    fig_cm, axes_cm = plt.subplots(1, num_models, figsize=(5*num_models, 4))
    if num_models == 1:
        axes_cm = [axes_cm]
    for ax, name in zip(axes_cm, model_names):
        cm = results[name]['confusion_matrix']
        im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
        ax.set_title(f"{name} Confusion Matrix")
        tick_marks = np.arange(len(results[name]['labels']))
        ax.set_xticks(tick_marks)
        ax.set_xticklabels(results[name]['labels'], rotation=45)
        ax.set_yticks(tick_marks)
        ax.set_yticklabels(results[name]['labels'])
        thresh = cm.max() / 2.0
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                ax.text(j, i, format(cm[i, j], 'd'),
                        ha="center", va="center",
                        color="white" if cm[i, j] > thresh else "black")
        fig_cm.colorbar(im, ax=ax)
    fig_cm.tight_layout(rect=[0, 0, 1, 0.95])
    fig_cm.suptitle("Confusion Matrices", fontsize=16)
    plt.savefig(os.path.join(run_dir, "confusion_matrices.png"))
    plt.close(fig_cm)
    
    # 2. Plot ROC Curves
    fig_roc, axes_roc = plt.subplots(1, num_models, figsize=(5*num_models, 4))
    if num_models == 1:
        axes_roc = [axes_roc]
    for ax, name in zip(axes_roc, model_names):
        fpr, tpr, roc_auc = results[name]['roc']
        ax.plot(fpr, tpr, lw=2, label=f"AUC = {roc_auc:.2f}")
        ax.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
        ax.set_xlim([0.0, 1.0])
        ax.set_ylim([0.0, 1.05])
        ax.set_xlabel('False Positive Rate')
        ax.set_ylabel('True Positive Rate')
        ax.set_title(f"ROC Curve - {name}")
        ax.legend(loc="lower right")
    fig_roc.tight_layout(rect=[0, 0, 1, 0.95])
    fig_roc.suptitle("ROC Curves", fontsize=16)
    plt.savefig(os.path.join(run_dir, "roc_curves.png"))
    plt.close(fig_roc)
    
    # 3. Plot Accuracy and Loss Curves for each model
    for name in model_names:
        history = results[name]['history']
        epochs_range = range(1, len(history['accuracy']) + 1)
        fig_model, (ax_acc, ax_loss) = plt.subplots(1, 2, figsize=(12, 5))
        ax_acc.plot(epochs_range, history['accuracy'], marker='o', label='Train Accuracy')
        ax_acc.plot(epochs_range, history['val_accuracy'], marker='x', linestyle='--', label='Validation Accuracy')
        ax_acc.set_title(f"{name} Accuracy")
        ax_acc.set_xlabel('Epoch')
        ax_acc.set_ylabel('Accuracy')
        ax_acc.legend()
        
        ax_loss.plot(epochs_range, history['loss'], marker='o', label='Train Loss')
        ax_loss.plot(epochs_range, history['val_loss'], marker='x', linestyle='--', label='Validation Loss')
        ax_loss.set_title(f"{name} Loss")
        ax_loss.set_xlabel('Epoch')
        ax_loss.set_ylabel('Loss')
        ax_loss.legend()
        
        fig_model.suptitle(f"Accuracy and Loss Curves - {name}", fontsize=16)
        fig_model.tight_layout(rect=[0, 0, 1, 0.93])
        plt.savefig(os.path.join(run_dir, f"{name}_training_curves.png"))
        plt.close(fig_model)
    
    # 4. Overall Performance Comparison Table
    col_labels = ["Model", "Accuracy", "Precision", "Recall", "F1 Score"]
    cell_text = []
    for name in model_names:
        perf = results[name]['performance']
        row = [name,
               f"{perf['accuracy']:.4f}",
               f"{perf['precision']:.4f}",
               f"{perf['recall']:.4f}",
               f"{perf['f1']:.4f}"]
        cell_text.append(row)
    
    fig_table, ax_table = plt.subplots(figsize=(8, len(model_names)*0.8+1))
    ax_table.axis('tight')
    ax_table.axis('off')
    table = ax_table.table(cellText=cell_text, colLabels=col_labels, loc='center')
    table.auto_set_font_size(False)
    table.set_fontsize(12)
    table.scale(1, 2)
    fig_table.suptitle("Overall Performance Comparison", fontsize=16)
    plt.savefig(os.path.join(run_dir, "performance_table.png"))
    plt.close(fig_table)
    
    # 5. Save training histories to pickle files
    for name in model_names:
        history = results[name]['history']
        with open(os.path.join(run_dir, f"{name}_history.pkl"), "wb") as f:
            pickle.dump(history, f)
    print("Training histories saved for each model.")





In [None]:

# Update the main execution block to use the new function as is
if __name__ == "__main__":
    Base_Folder = 'D:/Learning/University of sadat/Grade 4/Semester 2/06- Graduation Project/Coding/' 
    original_dataset_dir = f'{Base_Folder}00- The DataSet/Balanced_DataSet'
    split_dataset_dir = f'{Base_Folder}00- The DataSet/Dataset_split'
    
    current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    run_dir = os.path.join(f'{Base_Folder}runs_codes', current_time)
    os.makedirs(run_dir, exist_ok=True)

    # # Copy current script to run directory
    # script_path = os.path.abspath(__file__)
    # script_name = os.path.basename(script_path)
    # shutil.copy(script_path, os.path.join(run_dir, script_name))
    def get_notebook_path():
        ipython = get_ipython()
        if ipython is None:
            raise RuntimeError("Not running in Jupyter.")
        # VS Code specific (may not work in all environments)
        if '__vsc_ipynb_file__' in ipython.user_ns:
            return Path(ipython.user_ns['__vsc_ipynb_file__'])
        # Fallback or other environments
        try:
            return Path(ipython.startup_scripts[0])
        except:
            raise RuntimeError("Could not determine notebook path.")

    notebook_path = get_notebook_path()
    notebook_name = notebook_path.name
    # Create the destination path
    destination_path = os.path.join(run_dir, notebook_name)

    # Copy the notebook to the specified directory
    shutil.copy(notebook_path, destination_path)
    
    batch_size = 32  
    epochs = 50
    
    results = {}
    models = {
        'VGG19': vgg19_preprocess,
        'ResNet50': resnet50_preprocess,
        'VGG16': vgg16_preprocess,
        'MobileNetV2': mobilenetv2_preprocess,
        'Xception': xception_preprocess,
        'EfficientNetB0': efficientnetb0_preprocess,
        'DenseNet121': densenet121_preprocess
    }
    
    for model_name, preprocess in models.items():
        results[model_name] = train_and_evaluate_model(model_name, preprocess)
        
        # Save the custom head model instead
        results[model_name]['model'].save(os.path.join(run_dir, f"{model_name}_custom_model.h5"))
        print(f"{model_name} custom head model saved.")
    
        plot_results(results, run_dir)
    print(f"All outputs saved to directory: {run_dir}")




Training with ResNet50...

Found 6784 images belonging to 2 classes.
Found 848 images belonging to 2 classes.
Found 848 images belonging to 2 classes.
class indices: ['Oblique', 'Overriding']
Extracting features for training set...


KeyboardInterrupt: 