In [2]:
import os
import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc, precision_score, recall_score, f1_score, accuracy_score
from sklearn.preprocessing import label_binarize
from itertools import cycle
import requests
import zipfile
from io import BytesIO


# --- Configuration ---
IMG_SIZE = (64, 64)
BATCH_SIZE = 64
NUM_CLASSES = 10
EPOCHS = 25

# Load dataframes
try:
    train_df = pd.read_csv('train.csv')
    val_df = pd.read_csv('validation.csv')
    test_df = pd.read_csv('test.csv')
    class_names = list(pd.read_json('label_map.json', typ='series').index)

    train_df['Filename'] = DATA_DIR + '/' + train_df['Filename']
    val_df['Filename'] = DATA_DIR + '/' + val_df['Filename']
    test_df['Filename'] = DATA_DIR + '/' + test_df['Filename']
except FileNotFoundError:
    print("Error: train.csv, validation.csv, test.csv or label_map.json not found.")
    exit()


def create_dataset(df, shuffle=False):
    """Creates a tf.data.Dataset from a pandas DataFrame."""
    image_paths = df['Filename'].values
    labels = df['Label'].values
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))

    def _parse_function(filename, label):
        image_string = tf.io.read_file(filename)
        image_decoded = tf.image.decode_jpeg(image_string, channels=3)
        image = tf.image.convert_image_dtype(image_decoded, tf.float32)
        image = tf.image.resize(image, IMG_SIZE)
        return image, label

    dataset = dataset.map(_parse_function, num_parallel_calls=tf.data.AUTOTUNE)
    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(df))
    dataset = dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
    return dataset

# Create datasets
train_ds = create_dataset(train_df, shuffle=True)
val_ds = create_dataset(val_df)
test_ds = create_dataset(test_df)

# --- 2. HYBRID MODEL DEFINITION (MODIFIED FOR FLEXIBILITY) ---
def build_feature_model(input_shape, model_type='combined', num_classes=10):
    """Builds a model based on the specified feature type: global, local, or combined."""
    base_model = tf.keras.applications.VGG16(
        include_top=False, weights='imagenet', input_shape=input_shape
    )
    base_model.trainable = False

    inputs = tf.keras.Input(shape=input_shape)
    x = tf.keras.applications.vgg16.preprocess_input(inputs)
    
    # Get outputs from VGG16 base
    vgg16_output = base_model(x, training=False)
    local_feature_layer_output = base_model.get_layer("block4_pool").output
    
    # Create models for extracting specific features
    base_model_functional = tf.keras.Model(inputs=base_model.input, outputs=vgg16_output)
    local_model = tf.keras.Model(inputs=base_model.input, outputs=local_feature_layer_output)
    
    # Get feature maps
    vgg16_feature_map = base_model_functional(x)
    local_feature_map = local_model(x)

    # --- Feature Streams ---
    global_features = tf.keras.layers.GlobalAveragePooling2D(name="global_pool")(vgg16_feature_map)
    local_features = tf.keras.layers.Flatten(name="local_flatten")(local_feature_map)

    # --- Feature Selection/Fusion ---
    if model_type == 'global':
        final_features = global_features
    elif model_type == 'local':
        final_features = local_features
    elif model_type == 'combined':
        final_features = tf.keras.layers.Concatenate(name="fusion")([global_features, local_features])
    else:
        raise ValueError("model_type must be one of 'global', 'local', or 'combined'")

    # --- Classification Head ---
    classifier = tf.keras.layers.Dense(256, activation='relu', name="classifier_dense_1")(final_features)
    classifier = tf.keras.layers.Dropout(0.5, name="classifier_dropout")(classifier)
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax', name="classifier_output")(classifier)
    
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model

# --- 3. MODEL TRAINING & EVALUATION LOOP ---
model_types = ['global', 'local', 'combined']
histories = {}
evaluation_results = {}

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6)
]

# Get true labels once for all evaluations
y_true = np.concatenate([y for x, y in test_ds], axis=0)

for m_type in model_types:
    print(f"\n{'='*20} TRAINING {m_type.upper()} MODEL {'='*20}")
    
    # Build and compile the model
    model = build_feature_model(IMG_SIZE + (3,), model_type=m_type, num_classes=NUM_CLASSES)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    print(f"--- Starting training for {m_type} model ---")
    history = model.fit(
        train_ds, epochs=EPOCHS, validation_data=val_ds, callbacks=callbacks, verbose=1
    )
    histories[m_type] = history
    
    print(f"\n--- Evaluating {m_type} model ---")
    y_pred_probs = model.predict(test_ds)
    y_pred = np.argmax(y_pred_probs, axis=1)
    
    # Store results
    evaluation_results[m_type] = {
        'y_pred': y_pred,
        'y_pred_probs': y_pred_probs,
        'history': history
    }

# --- 4. VISUALIZATION & COMPARISON ---

# --- Plot 1: Performance Metrics Comparison (NEW) ---
def plot_model_comparison(results, y_true):
    """Plots a bar chart comparing performance metrics across models."""
    metrics_data = []
    for model_name, result in results.items():
        y_pred = result['y_pred']
        accuracy = accuracy_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred, average='macro')
        recall = recall_score(y_true, y_pred, average='macro')
        f1 = f1_score(y_true, y_pred, average='macro')
        
        metrics_data.append({'Model': model_name.title(), 'Metric': 'Accuracy', 'Score': accuracy})
        metrics_data.append({'Model': model_name.title(), 'Metric': 'Precision', 'Score': precision})
        metrics_data.append({'Model': model_name.title(), 'Metric': 'Recall', 'Score': recall})
        metrics_data.append({'Model': model_name.title(), 'Metric': 'F1-Score', 'Score': f1})
        
    df = pd.DataFrame(metrics_data)
    
    plt.figure(figsize=(12, 7))
    sns.barplot(data=df, x='Metric', y='Score', hue='Model', palette='viridis')
    plt.title('Model Performance Comparison', fontsize=18)
    plt.ylabel('Score', fontsize=12)
    plt.xlabel('Metric', fontsize=12)
    plt.ylim(0, 1.05)
    plt.legend(title='Model Type', fontsize=11)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.show()

print("\n--- Plotting Model Performance Comparison ---")
plot_model_comparison(evaluation_results, y_true)

# --- Detailed Analysis of the BEST model (Combined) ---
print("\n--- Detailed Analysis of the Combined Model ---")
combined_results = evaluation_results['combined']
y_pred = combined_results['y_pred']
y_pred_probs = combined_results['y_pred_probs']
history = combined_results['history']

# --- Plot 2: Training History of Combined Model ---
def plot_history(history):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    ax1.plot(history.history['accuracy'], label='Train Accuracy')
    ax1.plot(history.history['val_accuracy'], label='Validation Accuracy')
    ax1.set_title('Model Accuracy'); ax1.set_xlabel('Epoch'); ax1.set_ylabel('Accuracy')
    ax1.legend(loc='lower right'); ax1.grid(True)
    ax2.plot(history.history['loss'], label='Train Loss')
    ax2.plot(history.history['val_loss'], label='Validation Loss')
    ax2.set_title('Model Loss'); ax2.set_xlabel('Epoch'); ax2.set_ylabel('Loss')
    ax2.legend(loc='upper right'); ax2.grid(True)
    plt.suptitle('Combined Model Training History', fontsize=16); plt.tight_layout(rect=[0, 0.03, 1, 0.95]); plt.show()

print("\n--- Plotting Training History for Combined Model ---")
plot_history(history)

# --- Plot 3: Confusion Matrix ---
def plot_confusion_matrix(y_true, y_pred, class_names):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix (Combined Model)', fontsize=16); plt.xlabel('Predicted Label'); plt.ylabel('True Label')
    plt.show()

print("\n--- Plotting Confusion Matrix for Combined Model ---")
plot_confusion_matrix(y_true, y_pred, class_names)
print("\n--- Classification Report (Combined Model) ---")
print(classification_report(y_true, y_pred, target_names=class_names))

# --- Plot 4: Multi-Class ROC AUC Curve ---
def plot_roc_auc_curves(y_true, y_pred_probs, class_names):
    y_true_bin = label_binarize(y_true, classes=range(len(class_names)))
    fpr, tpr, roc_auc = dict(), dict(), dict()
    for i in range(len(class_names)):
        fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_pred_probs[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    plt.figure(figsize=(12, 10))
    colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'green', 'red', 'purple', 'brown', 'pink', 'gray', 'olive'])
    for i, color in zip(range(len(class_names)), colors):
        plt.plot(fpr[i], tpr[i], color=color, lw=2,
                 label=f'ROC curve of class {class_names[i]} (area = {roc_auc[i]:0.2f})')

    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlim([0.0, 1.0]); plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate'); plt.ylabel('True Positive Rate')
    plt.title('Multi-Class ROC Curves (Combined Model)', fontsize=16)
    plt.legend(loc="lower right"); plt.grid(True); plt.show()

print("\n--- Plotting ROC AUC Curves for Combined Model ---")
plot_roc_auc_curves(y_true, y_pred_probs, class_names)

# --- Plot 5: Sample Predictions ---
def show_sample_predictions(dataset, model, class_names, num_samples=9):
    plt.figure(figsize=(12, 12))
    for images, labels in dataset.take(1):
        # We need to use the combined model for these predictions
        combined_model = build_feature_model(IMG_SIZE + (3,), model_type='combined', num_classes=NUM_CLASSES)
        combined_model.set_weights(model.get_weights()) # Quick way to load weights, assumes last trained model is combined
        
        predictions = combined_model.predict(images)
        predicted_labels = np.argmax(predictions, axis=1)
        for i in range(num_samples):
            ax = plt.subplot(3, 3, i + 1)
            plt.imshow(images[i].numpy())
            true_label = class_names[labels[i]]
            predicted_label = class_names[predicted_labels[i]]
            title_color = 'green' if true_label == predicted_label else 'red'
            plt.title(f"True: {true_label}\nPred: {predicted_label}", color=title_color)
            plt.axis("off")
    plt.tight_layout(); plt.show()

print("\n--- Displaying Sample Predictions from the Test Set (using Combined Model) ---")
# The last trained model is the combined one, so we can use it directly
show_sample_predictions(test_ds, model, class_names)



ModuleNotFoundError: No module named 'tensorflow'