In [None]:
%pip install opencv-python-headless
%pip install seaborn
%pip install matplotlib
%pip install scikit-learn

In [None]:
import tensorflow as tf
import os
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization, GlobalAveragePooling2D
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    confusion_matrix, roc_curve, auc, precision_recall_fscore_support, matthews_corrcoef, cohen_kappa_score, balanced_accuracy_score, jaccard_score, log_loss, fbeta_score
)

batch_size = 32
learning_rate = 0.000001
group_type = "uti"
model_name = "ResNet50V2"
baseDir = "./datasets/uti/"
os.listdir(baseDir)

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import (
    confusion_matrix, classification_report, roc_curve, auc, RocCurveDisplay,
    precision_recall_fscore_support, matthews_corrcoef, cohen_kappa_score,
    balanced_accuracy_score, jaccard_score, log_loss, top_k_accuracy_score
)
from sklearn.preprocessing import label_binarize

# -------------------------
# Training curves (works for any class count)
# -------------------------
def save_training_metrics(history, results_dir):
    os.makedirs(results_dir, exist_ok=True)

    # Accuracy
    plt.figure(figsize=(10, 5))
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    if 'val_accuracy' in history.history:
        plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(loc='lower right')
    plt.savefig(os.path.join(results_dir, "training_accuracy.png"))
    plt.close()

    # Loss
    plt.figure(figsize=(10, 5))
    plt.plot(history.history['loss'], label='Training Loss')
    if 'val_loss' in history.history:
        plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='upper right')
    plt.savefig(os.path.join(results_dir, "training_loss.png"))
    plt.close()

    # Text dump
    with open(os.path.join(results_dir, "training_validation_metrics.txt"), "w") as f:
        f.write("Training and Validation Metrics Per Epoch\n")
        f.write("=" * 50 + "\n")
        epochs = len(history.history['loss'])
        for i in range(epochs):
            acc = history.history['accuracy'][i]
            loss = history.history['loss'][i]
            val_acc = history.history.get('val_accuracy', [np.nan]*epochs)[i]
            val_loss = history.history.get('val_loss', [np.nan]*epochs)[i]
            f.write(f"Epoch {i+1}:\n")
            f.write(f"  Training Accuracy: {acc:.4f}, Validation Accuracy: {val_acc:.4f}\n")
            f.write(f"  Training Loss: {loss:.4f}, Validation Loss: {val_loss:.4f}\n")
            f.write("-" * 50 + "\n")

# -------------------------
# Confusion matrix (with optional normalization)
# -------------------------
def save_confusion_matrix(y_true, y_pred, class_names, results_dir, normalize=False):
    cm = confusion_matrix(y_true, y_pred, labels=range(len(class_names)))
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1, keepdims=True).clip(min=1e-12)

    plt.figure(figsize=(8, 6))
    fmt = ".2f" if normalize else "d"
    sns.heatmap(cm, annot=True, fmt=fmt, cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix' + (' (Normalized)' if normalize else ''))
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.tight_layout()
    plt.savefig(os.path.join(results_dir, "confusion_matrix.png" if not normalize else "confusion_matrix_normalized.png"))
    plt.close()

# -------------------------
# ROC (one-vs-rest) for multiclass; binary handled as a special case
# -------------------------
def save_roc_curves(y_true, y_probs, class_names, results_dir):
    os.makedirs(results_dir, exist_ok=True)
    n_classes = len(class_names)

    # Binary case: standard single ROC
    if n_classes == 2:
        fpr, tpr, _ = roc_curve(y_true, y_probs[:, 1])
        roc_auc = auc(fpr, tpr)
        plt.figure(figsize=(10, 6))
        plt.plot(fpr, tpr, lw=2, label=f"ROC (AUC = {roc_auc:.3f})")
        plt.plot([0, 1], [0, 1], lw=2, linestyle='--')
        plt.title('ROC Curve (Binary)')
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.legend(loc='lower right')
        plt.tight_layout()
        plt.savefig(os.path.join(results_dir, "roc_curve.png"))
        plt.close()
        return

    # Multiclass: OVR curves + micro/macro AUC
    y_true_bin = label_binarize(y_true, classes=list(range(n_classes)))  # shape (N, C)

    # Compute per-class ROC + AUC
    fpr_dict, tpr_dict, auc_dict = {}, {}, {}
    for i in range(n_classes):
        fpr_dict[i], tpr_dict[i], _ = roc_curve(y_true_bin[:, i], y_probs[:, i])
        auc_dict[i] = auc(fpr_dict[i], tpr_dict[i])

    # Micro-average
    fpr_dict["micro"], tpr_dict["micro"], _ = roc_curve(y_true_bin.ravel(), y_probs.ravel())
    auc_dict["micro"] = auc(fpr_dict["micro"], tpr_dict["micro"])

    # Macro-average
    # Average the AUCs directly (simple macro-AUC)
    auc_macro = np.mean([auc_dict[i] for i in range(n_classes)])
    auc_dict["macro"] = auc_macro

    # Plot all
    plt.figure(figsize=(10, 8))
    # Micro
    plt.plot(fpr_dict["micro"], tpr_dict["micro"], lw=3,
             label=f"micro-average ROC (AUC = {auc_dict['micro']:.3f})")
    # Each class
    for i, name in enumerate(class_names):
        plt.plot(fpr_dict[i], tpr_dict[i], lw=1.5, label=f"{name} (AUC = {auc_dict[i]:.3f})")
    # Chance
    plt.plot([0, 1], [0, 1], linestyle='--')

    plt.title('ROC Curves (One-vs-Rest)')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.legend(loc='lower right', ncol=2, fontsize=9)
    plt.tight_layout()
    plt.savefig(os.path.join(results_dir, "roc_curves_multiclass.png"))
    plt.close()

# -------------------------
# Metrics (binary or multiclass)
# -------------------------
def _per_class_specificity(y_true, y_pred, n_classes):
    """
    Specificity for each class in multiclass (treat class i as positive, others as negative).
    Returns array of shape (n_classes,).
    """
    specs = []
    cm = confusion_matrix(y_true, y_pred, labels=list(range(n_classes)))
    for i in range(n_classes):
        tp = cm[i, i]
        fn = cm[i, :].sum() - tp
        fp = cm[:, i].sum() - tp
        tn = cm.sum() - (tp + fn + fp)
        spec = tn / (tn + fp) if (tn + fp) > 0 else np.nan
        specs.append(spec)
    return np.array(specs)

def save_classification_metrics(y_true, y_pred, y_probs, results_dir, class_names, topk=(2,3)):
    """
    y_true: (N,) integer labels
    y_pred: (N,) integer predictions
    y_probs: (N, C) probabilities for each class
    """
    os.makedirs(results_dir, exist_ok=True)
    n_classes = len(class_names)

    # Averages
    # macro = unweighted mean across classes; weighted = class-frequency weighted
    precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(y_true, y_pred, average='macro', zero_division=0)
    precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted', zero_division=0)

    # Multiclass-capable metrics
    mcc = matthews_corrcoef(y_true, y_pred)
    kappa = cohen_kappa_score(y_true, y_pred)
    balanced_acc = balanced_accuracy_score(y_true, y_pred)
    jaccard_macro = jaccard_score(y_true, y_pred, average='macro', zero_division=0)
    jaccard_weighted = jaccard_score(y_true, y_pred, average='weighted', zero_division=0)
    logloss = log_loss(y_true, y_probs)

    # Top-k
    topk_scores = {}
    for k in topk:
        if y_probs.shape[1] >= k:
            topk_scores[f"top_{k}_accuracy"] = top_k_accuracy_score(y_true, y_probs, k=k)

    # Specificity (per class + macro)
    specs = _per_class_specificity(y_true, y_pred, n_classes)
    specificity_macro = np.nanmean(specs)

    # Classification report (per class)
    report = classification_report(y_true, y_pred, target_names=class_names, zero_division=0)

    # Save text
    with open(os.path.join(results_dir, "classification_metrics.txt"), "w") as f:
        f.write("=== Macro / Weighted Averages ===\n")
        f.write(f"Precision (macro):  {precision_macro:.4f}\n")
        f.write(f"Recall (macro):     {recall_macro:.4f}\n")
        f.write(f"F1-Score (macro):   {f1_macro:.4f}\n")
        f.write(f"Precision (weighted): {precision_weighted:.4f}\n")
        f.write(f"Recall (weighted):    {recall_weighted:.4f}\n")
        f.write(f"F1-Score (weighted):  {f1_weighted:.4f}\n")
        f.write(f"Balanced Accuracy:   {balanced_acc:.4f}\n")
        f.write(f"Specificity (macro): {specificity_macro:.4f}\n")
        f.write(f"Jaccard (macro):     {jaccard_macro:.4f}\n")
        f.write(f"Jaccard (weighted):  {jaccard_weighted:.4f}\n")
        f.write(f"MCC:                 {mcc:.4f}\n")
        f.write(f"Cohen's Kappa:       {kappa:.4f}\n")
        f.write(f"Log Loss:            {logloss:.4f}\n")
        for k, v in topk_scores.items():
            f.write(f"{k.replace('_',' ').title()}: {v:.4f}\n")
        f.write("\n=== Per-Class Specificity ===\n")
        for name, spec in zip(class_names, specs):
            f.write(f"{name}: {spec:.4f}\n")
        f.write("\n=== Classification Report ===\n")
        f.write(report)

# -------------------------
# Orchestrator
# -------------------------
def save_model_metrics(model, test_ds, results_dir, class_names):
    """
    - Evaluates the model
    - Collects y_true, y_probs, y_pred
    - Saves confusion matrix, ROC curves, metrics
    Works if test_ds yields (x, y) where y is one-hot or sparse ints.
    """
    os.makedirs(results_dir, exist_ok=True)
    print("Evaluating the model on test data...")
    test_loss, test_accuracy = model.evaluate(test_ds)
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")

    with open(os.path.join(results_dir, "testing_metrics.txt"), "w") as f:
        f.write(f"Test Loss: {test_loss:.4f}\n")
        f.write(f"Test Accuracy: {test_accuracy:.4f}\n")

    # Gather labels from dataset
    ys = []
    for _, y in test_ds:
        ys.append(y.numpy())
    y_true_raw = np.concatenate(ys, axis=0)

    # Handle one-hot vs sparse
    if y_true_raw.ndim == 2 and y_true_raw.shape[1] > 1:
        y_true = np.argmax(y_true_raw, axis=1)
    else:
        y_true = y_true_raw.squeeze().astype(int)

    # Predict probabilities
    y_probs = model.predict(test_ds)
    if y_probs.ndim == 1:  # binary model with a single sigmoid output
        y_probs = np.stack([1 - y_probs, y_probs], axis=1)

    # Predicted labels
    y_pred = np.argmax(y_probs, axis=1)

    # Confusion matrices
    save_confusion_matrix(y_true, y_pred, class_names, results_dir, normalize=False)
    save_confusion_matrix(y_true, y_pred, class_names, results_dir, normalize=True)

    # ROC curves (binary or multiclass automatically)
    save_roc_curves(y_true, y_probs, class_names, results_dir)

    # Metrics (binary or multiclass automatically)
    save_classification_metrics(y_true, y_pred, y_probs, results_dir, class_names)

In [None]:
# ANOVA requires that you don't have a seed.

def get_images_and_classes():
    # Load training and validation datasets

    train_ds = tf.keras.utils.image_dataset_from_directory(
        baseDir + "train",
        labels="inferred",
        label_mode="categorical",  # Use categorical for multi-class classification
        image_size=(224, 224),
        batch_size=batch_size,
        shuffle=True,
    )

    val_ds = tf.keras.utils.image_dataset_from_directory(
        baseDir + "dev",
        labels="inferred",
        label_mode="categorical",
        image_size=(224, 224),
        batch_size=batch_size,
        shuffle=True,
    )

    # Test dataset (assuming separate directory for test data)
    test_ds = tf.keras.utils.image_dataset_from_directory(
        baseDir + "test",
        labels="inferred",
        label_mode="categorical",
        image_size=(224, 224),
        batch_size=batch_size,
        shuffle=False  # No shuffling for test set
    )

    class_names = train_ds.class_names

    for images, labels in train_ds.take(1):
        print(f"Images shape: {images.shape}")
        print(f"Labels shape: {labels.shape}")
    
    return (train_ds, val_ds, test_ds, class_names)

### Dataset normalization

In [None]:
normalization_layer = tf.keras.layers.Rescaling(1./255)
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal_and_vertical"),
    tf.keras.layers.RandomContrast(factor=0.2),
    tf.keras.layers.RandomZoom(0.1),
    tf.keras.layers.RandomRotation(0.1)
]) # Light Augmentation

def augment_and_normalize(_train_ds, _val_ds, _test_ds):
    internal_train_ds = _train_ds.map(lambda x, y: (normalization_layer(data_augmentation(x, training=True)), y))
    internal_val_ds = _val_ds.map(lambda x, y: (normalization_layer(x), y))
    internal_test_ds = _test_ds.map(lambda x, y: (normalization_layer(x), y))

    AUTOTUNE = tf.data.AUTOTUNE
    output_train_ds = internal_train_ds.prefetch(buffer_size=AUTOTUNE)
    output_val_ds = internal_val_ds.prefetch(buffer_size=AUTOTUNE)
    output_test_ds = internal_test_ds.prefetch(buffer_size=AUTOTUNE)
    
    return (output_train_ds, output_val_ds, output_test_ds)

### Load ResNet50V2 architecture

In [None]:
def create_and_compile_model():
    ResNet50V2 = tf.keras.applications.ResNet50V2(input_shape = (224,224,3), include_top=False)

    model = Sequential([
        tf.keras.Input(shape=(224, 224, 3)),  # 🔹 Explicit input layer
        ResNet50V2,
        # 🌀 Replace Flatten with GlobalAveragePooling2D for better generalization
        GlobalAveragePooling2D(),

        # 🧩 Add normalization and dropout for stability
        BatchNormalization(),
        Dense(512, activation='relu'),
        Dropout(0.5),

        Dense(256, activation='relu'),
        Dropout(0.3),
        Dense(3, activation='softmax')  # Final classification layer
    ])

    # Call model to initialize input shape
    model.build(input_shape=(None, 224, 224, 3))

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                  loss='categorical_crossentropy',  # Suitable for binary classification
                  metrics=['accuracy'])
    
    return model

### Train model

In [None]:
import os

ANOVA_REPEATS = 5
epochs = 35

for index in range(1, ANOVA_REPEATS + 1):
    print("REPEAT ROUND: ", index)

    (train_ds, val_ds, test_ds, class_names) = get_images_and_classes()
    (output_train_ds, output_val_ds, output_test_ds) = augment_and_normalize(train_ds, val_ds, test_ds)
    model = create_and_compile_model()

    history = model.fit(output_train_ds, epochs=epochs, validation_data=output_val_ds)

    path = "results/" + group_type + "/" + model_name +"/"
    name = group_type.upper() + "_" + model_name +"_Round" + str(index) # Rounds are retrains for ANOVA

    # Ensure the results directory exists
    results_dir = path + name + "/"
    os.makedirs(results_dir, exist_ok=True)
    model.save(results_dir + name + ".keras")

    save_training_metrics(history, results_dir)
    save_model_metrics(model, output_test_ds, results_dir, class_names=class_names)

[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m257s[0m 12s/step - accuracy: 0.5895 - loss: 0.9313 - val_accuracy: 0.2137 - val_loss: 1.5805
Evaluating the model on test data...
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 2s/step - accuracy: 0.1540 - loss: 1.6871
Test Loss: 1.5837
Test Accuracy: 0.2157


2025-10-28 12:00:51.991072: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 2s/step
REPEAT ROUND:  2
Found 646 files belonging to 2 classes.
Found 248 files belonging to 2 classes.
Found 102 files belonging to 2 classes.
Images shape: (32, 224, 224, 3)
Labels shape: (32, 2)
[1m 3/21[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m4:18[0m 14s/step - accuracy: 0.6059 - loss: 0.9213

KeyboardInterrupt: 