# 1. Setup and Data Loading for Anomaly Detection

In [12]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, Input, Conv2D, MaxPooling2D, UpSampling2D
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.losses import MeanSquaredError
import matplotlib.pyplot as plt
import numpy as np
import os
from google.colab import drive

# Check TensorFlow and GPU
print(f"TensorFlow Version: {tf.__version__}")
print(f"GPU Available: {tf.config.list_physical_devices('GPU')}")

# --- Step 0: Mount ny any eGoogle Drive and Set Paths ---
print("\nMounting Google Drive...")
drive.mount('/content/drive')
print("Google Drive mounted successfully!")

# Define the base directory for your dataset in Google Drive.
# IMPORTANT: CHANGE THIS PATH!
GOOGLE_DRIVE_DATASET_ROOT = '/content/drive/MyDrive/Dataset/chest_xray'
BASE_DIR = GOOGLE_DRIVE_DATASET_ROOT

# Define directories for training, validation, and testing
train_dir = os.path.join(BASE_DIR, 'train')
val_dir = os.path.join(BASE_DIR, 'val')
test_dir = os.path.join(BASE_DIR, 'test')

# Define directories to save models and visualizations
MODEL_SAVE_DIR = '/content/drive/MyDrive/pneumonia_project_models'
os.makedirs(MODEL_SAVE_DIR, exist_ok=True)
CLASSIFIER_PATH = '/content/drive/MyDrive/my_model_checkpoints/pneumonia_resnet_best_model.h5'
ANOMALY_DETECTOR_PATH = '/content/drive/MyDrive/pneumonia_project_models/anomaly_detector.h5'

# Image parameters
IMG_HEIGHT = 224
IMG_WIDTH = 224
BATCH_SIZE = 32
NUM_CLASSES = 2

def create_autoencoder_generator(directory, image_size, batch_size):
    """
    Creates a generator that yields (image, image) pairs for autoencoder training.
    """
    generator = ImageDataGenerator(rescale=1./255).flow_from_directory(
        directory,
        target_size=image_size,
        batch_size=batch_size,
        class_mode=None,
        classes=['NORMAL'],
        shuffle=True
    )
    for batch in generator:
        yield (batch, batch)

def get_anomaly_threshold(model, normal_train_dir, image_size, batch_size):
    """
    Calculates the anomaly threshold based on the mean and standard deviation
    of reconstruction errors for a set of normal images.
    """
    generator = ImageDataGenerator(rescale=1./255).flow_from_directory(
        os.path.dirname(normal_train_dir),
        target_size=image_size,
        batch_size=batch_size,
        classes=['NORMAL'],
        class_mode=None,
        shuffle=False
    )

    num_samples = generator.samples
    errors = []

    # Predict and calculate errors in batches
    for i in range(num_samples // batch_size):
        batch = next(generator)
        reconstructions = model.predict(batch)
        error = np.mean(np.square(reconstructions - batch), axis=(1, 2, 3))
        errors.extend(error)

    return np.mean(errors) + 2 * np.std(errors)

TensorFlow Version: 2.19.0
GPU Available: []

Mounting Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Google Drive mounted successfully!


# 2. Build, Train, and Threshold the Anomaly Detector

In [None]:
ANOMALY_DETECTOR_PATH = '/content/drive/MyDrive/pneumonia_project_models/anomaly_detector.h5'
CLASSIFIER_PATH = '/content/drive/MyDrive/my_model_checkpoints/pneumonia_resnet_best_model.h5'

# --- Step 4: Anomaly Detection with Autoencoder ---
print("\n--- Building and Training Anomaly Detector (Autoencoder) ---")

BATCH_SIZE = 16

# Remove the old model to ensure we train from scratch
if os.path.exists(ANOMALY_DETECTOR_PATH):
    print("Removing old anomaly detector model to train a new one...")
    os.remove(ANOMALY_DETECTOR_PATH)

# --- Corrected Generator for Autoencoder ---
def create_autoencoder_generator(directory, image_size, batch_size):
    generator = ImageDataGenerator(rescale=1./255).flow_from_directory(
        directory,
        target_size=image_size,
        batch_size=batch_size,
        class_mode=None, # Do not yield labels
        classes=['NORMAL'], # Only use normal images for training
        shuffle=True
    )
    for batch in generator:
        yield (batch, batch) # Yield the image as both input (x) and target (y)

# We need a normal_train_generator to get the total number of samples
normal_train_dir = os.path.join(train_dir, 'NORMAL')
normal_train_generator_base = ImageDataGenerator(rescale=1./255).flow_from_directory(
    os.path.dirname(normal_train_dir),
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    classes=['NORMAL'],
    class_mode=None,
    shuffle=False
)

# Use the fixed generator that yields (x, x)
normal_train_generator_fixed = create_autoencoder_generator(
    os.path.dirname(normal_train_dir),
    (IMG_HEIGHT, IMG_WIDTH),
    BATCH_SIZE
)

print("Building a new autoencoder model with improved architecture...")
# Encoder
input_img = Input(shape=(IMG_HEIGHT, IMG_WIDTH, 3))
x = Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x) # Deeper encoder
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
encoded = MaxPooling2D((2, 2), padding='same')(x)

# Bottleneck with Dropout for better generalization
x = Dropout(0.2)(encoded)

# Decoder
x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

anomaly_detector = Model(input_img, decoded)
anomaly_detector.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005), loss='mse')

print("Training the autoencoder on NORMAL images...")
anomaly_detector.fit(
    normal_train_generator_fixed,
    steps_per_epoch=normal_train_generator_base.samples // BATCH_SIZE,
    epochs=20,
    callbacks=[
        ModelCheckpoint(
            filepath=ANOMALY_DETECTOR_PATH,
            save_best_only=True,
            monitor='loss'
        )
    ]
)

print("\nAutoencoder training complete. Model saved.")

print("\nTo use the new model, continue running the next phase of the notebook.")

Mounted at /content/drive
Found 1351 images belonging to 1 classes.

--- Building and Training Anomaly Detector (Autoencoder) ---
Loading existing anomaly detector from /content/drive/MyDrive/pneumonia_project_models/anomaly_detector.h5...





Autoencoder training complete. Model saved.

--- Setting Anomaly Detection Threshold ---

Found 1351 images belonging to 1 classes.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step


KeyboardInterrupt: 

In [None]:
ANOMALY_DETECTOR_PATH = '/content/drive/MyDrive/pneumonia_project_models/anomaly_detector.h5'
CLASSIFIER_PATH = '/content/drive/MyDrive/my_model_checkpoints/pneumonia_resnet_best_model.h5'

final_classifier_model = tf.keras.models.load_model(CLASSIFIER_PATH)


# --- Step 6: Define Functions for Prediction, XAI (Grad-CAM), and Anomaly Detection ---

def make_gradcam_heatmap(img_array, model, last_conv_layer_name="conv5_block3_out"):
    grad_model = tf.keras.models.Model([model.inputs], [model.get_layer(last_conv_layer_name).output, model.output])
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]
    grads = tape.gradient(class_channel, last_conv_layer_output)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

def display_gradcam(image_path, model, last_conv_layer_name="conv5_block3_out"):
    img = tf.keras.preprocessing.image.load_img(image_path, target_size=(IMG_HEIGHT, IMG_WIDTH))
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    img_array_preprocessed = np.expand_dims(img_array / 255.0, axis=0)

    heatmap = make_gradcam_heatmap(img_array_preprocessed, model, last_conv_layer_name)

    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(img, cmap='gray')
    plt.title("Original X-Ray")
    plt.axis('off')

    plt.subplot(1, 2, 2)
    plt.imshow(img, cmap='gray')
    plt.imshow(heatmap, cmap='jet', alpha=0.5)
    plt.title("Grad-CAM Heatmap")
    plt.axis('off')
    plt.show()

def get_anomaly_threshold(model, normal_train_dir, image_size, batch_size):
    generator = ImageDataGenerator(rescale=1./255).flow_from_directory(
        os.path.dirname(normal_train_dir),
        target_size=image_size,
        batch_size=batch_size,
        classes=['NORMAL'],
        class_mode=None,
        shuffle=False
    )

    num_samples = generator.samples
    errors = []

    for i in range(num_samples // batch_size):
        batch = next(generator)
        reconstructions = model.predict(batch)
        error = np.mean(np.square(reconstructions - batch), axis=(1, 2, 3))
        errors.extend(error)

    return np.mean(errors) + 2 * np.std(errors)


# --- NEW FUNCTION: Get Anomaly Coordinates ---
def get_anomaly_coordinates(original_img_array_preprocessed, reconstruction_img_array, error_threshold_for_pixels=0.01):
    """
    Calculates pixel-wise reconstruction error and returns bounding box coordinates
    of anomalous regions.

    Args:
        original_img_array_preprocessed (np.array): Original image (preprocessed, e.g., scaled to [0,1]).
                                                    Shape (1, H, W, C).
        reconstruction_img_array (np.array): Reconstructed image from autoencoder.
                                             Shape (1, H, W, C).
        error_threshold_for_pixels (float): Pixel-wise error threshold to consider a pixel anomalous.
                                            Adjust this value based on visual inspection.
                                            A higher value means stricter anomaly detection.
                                            (e.g., 0.01 means 1% difference is anomalous)

    Returns:
        list of tuples: Each tuple represents a bounding box (x_min, y_min, x_max, y_max)
                        If no anomalies, returns an empty list.
    """
    # Calculate pixel-wise squared error
    pixel_error = np.mean(np.square(original_img_array_preprocessed - reconstruction_img_array), axis=-1)
    pixel_error = pixel_error[0] # Remove batch dimension

    # Threshold the pixel error to create a binary mask of anomalous regions
    anomaly_mask = pixel_error > error_threshold_for_pixels

    # Find contours or connected components to get bounding boxes
    # Using a simple bounding box around all anomalous pixels for now
    # For more complex shapes, you'd use OpenCV's findContours

    anomalous_pixels = np.argwhere(anomaly_mask)

    if len(anomalous_pixels) == 0:
        return [] # No anomalous pixels found

    # Get min/max coordinates
    y_min, x_min = anomalous_pixels.min(axis=0)
    y_max, x_max = anomalous_pixels.max(axis=0)

    # Return as (x_min, y_min, x_max, y_max)
    return [(x_min, y_min, x_max, y_max)]


def comprehensive_prediction(image_path, classifier_model, anomaly_detector, anomaly_threshold):
    img = tf.keras.preprocessing.image.load_img(image_path, target_size=(IMG_HEIGHT, IMG_WIDTH))
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    img_array_preprocessed = np.expand_dims(img_array / 255.0, axis=0)

    # 1. Pneumonia Prediction
    pneumonia_prediction = classifier_model.predict(img_array_preprocessed)[0][0]
    is_pneumonia = "Pneumonia" if pneumonia_prediction > 0.5 else "Normal"

    # 2. Anomaly Detection (Image-level)
    reconstruction = anomaly_detector.predict(img_array_preprocessed)
    reconstruction_error = np.mean(np.square(reconstruction - img_array_preprocessed))
    is_anomaly = reconstruction_error > anomaly_threshold

    print(f"\n--- Analysis for Image: {os.path.basename(image_path)} ---")
    print(f"Pneumonia Prediction: {is_pneumonia} (Confidence: {pneumonia_prediction:.2f})")
    print(f"Anomaly Detected: {'Yes' if is_anomaly else 'No'} (Reconstruction Error: {reconstruction_error:.4f})")

    # 3. Anomaly Localization (Pixel-level)
    anomaly_coords = []
    if is_anomaly: # Only get coordinates if image is flagged as anomalous
        # You might need to tune this pixel_error_threshold based on your data
        anomaly_coords = get_anomaly_coordinates(img_array_preprocessed, reconstruction, error_threshold_for_pixels=0.01)
        if anomaly_coords:
            print(f"Anomaly Coordinates (x_min, y_min, x_max, y_max): {anomaly_coords}")
        else:
            print("No significant anomalous regions found at the pixel level based on threshold.")

    # 4. Explainable AI (Grad-CAM)
    display_gradcam(image_path, classifier_model)

    return is_pneumonia, is_anomaly, anomaly_coords

# --- Example of Comprehensive Prediction on a New Image ---
if os.path.exists(ANOMALY_DETECTOR_PATH) and final_classifier_model is not None:
    anomaly_detector = tf.keras.models.load_model(
        ANOMALY_DETECTOR_PATH,
        custom_objects={'mse': tf.keras.losses.MeanSquaredError()}
    )

    normal_train_dir_for_threshold = os.path.join(train_dir, 'NORMAL')
    anomaly_threshold = get_anomaly_threshold(
        anomaly_detector,
        normal_train_dir_for_threshold,
        (IMG_HEIGHT, IMG_WIDTH),
        BATCH_SIZE
    )
    print(f"\nAnomaly Detection Threshold (Image-level): {anomaly_threshold:.4f}")

    # Example prediction on a test pneumonia image
    test_pneumonia_path = os.path.join(test_dir, 'PNEUMONIA', os.listdir(os.path.join(test_dir, 'PNEUMONIA'))[51])
    comprehensive_prediction(test_pneumonia_path, final_classifier_model, anomaly_detector, anomaly_threshold)

    # Example prediction on a test normal image
    test_normal_path = os.path.join(test_dir, 'NORMAL', os.listdir(os.path.join(test_dir, 'NORMAL'))[51])
    comprehensive_prediction(test_normal_path, final_classifier_model, anomaly_detector, anomaly_threshold)
else:
    print("\nError: Anomaly detector model or classifier model not found. Please run the training phases first.")



Found 1351 images belonging to 1 classes.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[