In [None]:
"""
Pneumonia detection — 4-model comparison + custom RNN-style model + bounding-box via Grad-CAM
Save this as `pneumonia_detection_multimodel.py` and run with Python (ideally in a GPU environment).

Assumptions about dataset layout (as you described):
dataset/
  train/
    NORMAL/
    PNEUMONIA/
       virus/
       bacteria/
  test/
    NORMAL/
    PNEUMONIA/

Notes:
- This script uses TensorFlow / Keras.
- Models: (1) Pretrained EfficientNetB0 (transfer learning)
          (2) Simple custom CNN
          (3) CNN -> BiLSTM (treat image as sequence of row-features)
          (4) A "custom RNN" that uses TimeDistributed Conv on patches + LSTM
- Bounding box extraction uses Grad-CAM heatmap and finds the largest bright region bounding box.
- Trains each model for `EPOCHS` (default 100). You can reduce for testing.

"""

import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing import image_dataset_from_directory
import matplotlib.pyplot as plt
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.models import Model
import cv2

In [None]:
DATA_DIR = r"C:/Users/aravr/Downloads/Pneumonia Detection Dataset/chest_xray"  # change to your path
BATCH_SIZE = 16
IMG_SIZE = (224, 224)
EPOCHS = 100  # per your request
AUTOTUNE = tf.data.AUTOTUNE
SEED = 42
NUM_CLASSES = 2  # normal vs pneumonia

# -------- helper: load datasets --------
train_dir = os.path.join(DATA_DIR, "train")
test_dir = os.path.join(DATA_DIR, "test")

train_ds = image_dataset_from_directory(
    train_dir,
    labels='inferred',
    label_mode='int',
    batch_size=BATCH_SIZE,
    image_size=IMG_SIZE,
    shuffle=True,
    seed=SEED,
)

val_ds = image_dataset_from_directory(
    test_dir,
    labels='inferred',
    label_mode='int',
    batch_size=BATCH_SIZE,
    image_size=IMG_SIZE,
    shuffle=False,
)

# cache & prefetch for speed
train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

# normalization layer
normalization_layer = layers.Rescaling(1./255)

# -------- metrics & compile helper --------
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=False)

def compile_and_train(model, name, epochs=EPOCHS):
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=1e-4),
        loss=loss_fn,
        metrics=['accuracy']
    )
    print(f"\nTraining model: {name}")
    history = model.fit(
        train_ds.map(lambda x,y: (normalization_layer(x), y)),
        validation_data=val_ds.map(lambda x,y: (normalization_layer(x), y)),
        epochs=epochs
    )
    model.save(f"{name}.h5")
    return history

# -------- Model 1: Transfer learning EfficientNetB0 --------
def build_efficientnet_model(img_size=IMG_SIZE, num_classes=NUM_CLASSES):
    base = EfficientNetB0(include_top=False, input_shape=(*img_size,3), weights='imagenet')
    base.trainable = False
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dropout(0.3)(x)
    out = layers.Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base.input, outputs=out)
    return model

# -------- Model 2: Simple custom CNN --------
def build_simple_cnn(img_size=IMG_SIZE, num_classes=NUM_CLASSES):
    inputs = keras.Input(shape=(*img_size,3))
    x = layers.Conv2D(32, 3, activation='relu', padding='same')(inputs)
    x = layers.MaxPool2D()(x)
    x = layers.Conv2D(64, 3, activation='relu', padding='same')(x)
    x = layers.MaxPool2D()(x)
    x = layers.Conv2D(128, 3, activation='relu', padding='same')(x)
    x = layers.MaxPool2D()(x)
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.4)(x)
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inputs, out)

# -------- Model 3: CNN features -> BiLSTM (treat rows as sequence) --------
# Idea: apply small conv to get H x W x C features, then treat H rows as sequence where each row's
# features are pooled across width -> sequence length = H, then BiLSTM

def build_cnn_bilstm(img_size=IMG_SIZE, num_classes=NUM_CLASSES):
    inputs = keras.Input(shape=(*img_size,3))
    x = layers.Conv2D(32,3,activation='relu',padding='same')(inputs)
    x = layers.MaxPool2D((2,2))(x)
    x = layers.Conv2D(64,3,activation='relu',padding='same')(x)
    x = layers.MaxPool2D((2,2))(x)
    # feature map shape: (H', W', C')
    # we will pool across width -> sequence of length H'
    f = layers.Permute((2,1,3))(x)  # swap axes so we can TimeDistributed pool across width
    # collapse width into channels using GlobalMaxPool across width inside TimeDistributed
    seq = layers.TimeDistributed(layers.GlobalMaxPooling1D())(layers.Reshape((-1, x.shape[2]*x.shape[3]))(f))
    # Above reshape may be dynamic; simpler approach below:
    # alternative: reduce across width dimension with Lambda
    seq = layers.Lambda(lambda z: tf.reduce_max(z, axis=2))(x)  # now (batch, H', C')
    # feed to BiLSTM
    seq = layers.Bidirectional(layers.LSTM(64))(seq)
    x = layers.Dense(64, activation='relu')(seq)
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inputs, out)

# -------- Model 4: Custom RNN-style: patches -> TimeDistributed Conv -> LSTM --------
# We split image into vertical patches (like columns) and treat each column as timestep

def build_custom_rnn(img_size=IMG_SIZE, num_classes=NUM_CLASSES, num_patches=28):
    # num_patches decides sequence length (e.g., 28 columns -> 28 timesteps)
    inputs = keras.Input(shape=(*img_size,3))
    # resize to ensure divisible
    h, w = img_size
    patch_w = w // num_patches
    # extract patches using Reshape after resizing to (h, num_patches, patch_w, 3)
    x = layers.Resizing(h, num_patches*patch_w)(inputs)
    x = layers.Reshape((h, num_patches, patch_w, 3))(x)
    # process each patch with small conv net
    td = layers.TimeDistributed(layers.Conv2D(16, 3, activation='relu', padding='same'))(x)
    td = layers.TimeDistributed(layers.MaxPool2D())(td)
    td = layers.TimeDistributed(layers.Flatten())(td)
    # now sequence shape: (batch, num_patches, features)
    r = layers.LSTM(128)(td)
    x = layers.Dense(64, activation='relu')(r)
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inputs, out)

# -------- Grad-CAM + bounding box helper --------
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    # img_array: preprocessed single image (1, H, W, 3)
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )
    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        class_channel = predictions[:, pred_index]
    grads = tape.gradient(class_channel, conv_outputs)
    pooled_grads = tf.reduce_mean(grads, axis=(0,1,2))
    conv_outputs = conv_outputs[0]
    heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = tf.maximum(heatmap, 0) / (tf.reduce_max(heatmap) + 1e-9)
    return heatmap.numpy()


def heatmap_to_bbox(heatmap, orig_img_shape, upsample_size=IMG_SIZE):
    # Resize heatmap to image size
    hm = cv2.resize(heatmap, upsample_size)
    hm_uint = np.uint8(255 * hm)
    # threshold and find contours
    _, th = cv2.threshold(hm_uint, 100, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(th, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        return None
    # choose largest contour
    c = max(contours, key=cv2.contourArea)
    x,y,w,h = cv2.boundingRect(c)
    return (x,y,w,h)

# visualization helper

def display_bbox_on_image(img, bbox, title=None):
    img = img.copy()
    if bbox is not None:
        x,y,w,h = bbox
        cv2.rectangle(img, (x,y), (x+w, y+h), (0,255,0), 2)
    plt.figure(figsize=(5,5))
    if title: plt.title(title)
    plt.imshow(img)
    plt.axis('off')
    plt.show()

# -------- build models --------
models = {}
models['efficientnet'] = build_efficientnet_model()
models['simple_cnn'] = build_simple_cnn()
models['cnn_bilstm'] = build_cnn_bilstm()
models['custom_rnn'] = build_custom_rnn()

# -------- Train models (WARNING: this will take time) --------
histories = {}
for name, model in models.items():
    try:
        histories[name] = compile_and_train(model, name, epochs=EPOCHS)
    except Exception as e:
        print(f"Training failed for {name}: {e}")

# -------- Example: run Grad-CAM on one test image using EfficientNet (if available) --------
# We need to pick a model with a convolutional last layer. EfficientNet has 'top_conv' as last conv.
if os.path.exists('efficientnet.h5'):
    model = keras.models.load_model('efficientnet.h5')
    # pick an image from val_ds
    for images, labels in val_ds.take(1):
        img = images[0].numpy()
        lbl = labels[0].numpy()
        break
    img_input = np.expand_dims(img/255.0, axis=0)
    # ensure we can access last conv layer name
    last_conv = None
    for layer in model.layers[::-1]:
        if isinstance(layer, layers.Conv2D):
            last_conv = layer.name
            break
    if last_conv:
        heatmap = make_gradcam_heatmap(img_input, model, last_conv)
        bbox = heatmap_to_bbox(heatmap, img.shape[:2], upsample_size=(IMG_SIZE[1], IMG_SIZE[0]))
        display_bbox_on_image((img).astype('uint8'), bbox, title=f"label={lbl}")
    else:
        print("No Conv2D layer found for Grad-CAM in loaded model.")

# -------- Plot accuracy comparison --------
plt.figure(figsize=(10,6))
for name, h in histories.items():
    acc = h.history.get('accuracy') or h.history.get('acc')
    val_acc = h.history.get('val_accuracy') or h.history.get('val_acc')
    epochs_range = range(1, len(acc)+1)
    plt.plot(epochs_range, acc, label=f"{name} train")
    plt.plot(epochs_range, val_acc, '--', label=f"{name} val")
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Model comparison')
plt.show()

print("\nDone. Models saved as <name>.h5. Use the saved models to run inference or Grad-CAM later.")

# -------- Suggestions for improving & unique ideas (printed for clarity) --------
improvements = [
    "1) Add segmentation: train a U-Net to segment infected regions instead of only bounding boxes.",
    "2) Use multiclass classification to detect virus vs bacteria vs normal (your dataset contains those subfolders).",
    "3) Use explainability: besides Grad-CAM use integrated gradients and shap for trustworthy explanations.",
    "4) Use data augmentation tuned for X-rays: elastic deformations, contrast adjustments, random rotations +/- 10 degrees.",
    "5) Cross-validation and reporting sensitivity, specificity, F1-score, ROC AUC (medical tasks care about sensitivity).",
    "6) Build an ensemble of top-2 models to boost accuracy and robustness.",
    "7) Add patient metadata (if available) like age/gender and create multimodal model combining image + metadata.",
    "8) Create a lightweight model (pruning/quantization) and measure inference latency to deploy on edge devices.",
    "9) Create a small web app (Streamlit / Flask) to upload an X-ray and show prediction + bounding box + explanation.",
]

print('\nSuggestions to make project more valuable:')
for s in improvements:
    print(s)

# End of script


### """
Pneumonia detection — 4-model comparison + custom RNN-style model + bounding-box via Grad-CAM
Dataset structure (as per your screenshots):

Pneumonia Detection Dataset/chest_xray/
  train/
    NORMAL/
    PNEUMONIA/
  test/
    NORMAL/
    PNEUMONIA/

Notes:
- This script uses TensorFlow / Keras.
- Models: (1) Pretrained EfficientNetB0 (transfer learning)
          (2) Simple custom CNN
          (3) CNN -> BiLSTM (treat image as sequence of row-features)
          (4) A "custom RNN" that uses TimeDistributed Conv on patches + LSTM
- Bounding box extraction uses Grad-CAM heatmap and finds the largest bright region bounding box.
- Trains each model for `EPOCHS` (default 100).
"""

import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing import image_dataset_from_directory
import matplotlib.pyplot as plt
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.models import Model
import cv2

# -------- user params --------
DATA_DIR = r"C:/Users/aravr/Downloads/Pneumonia Detection Dataset/chest_xray"  # adjust to your path
BATCH_SIZE = 16
IMG_SIZE = (224, 224)
EPOCHS = 7
AUTOTUNE = tf.data.AUTOTUNE
SEED = 42
NUM_CLASSES = 2  # normal vs pneumonia

# -------- helper: load datasets --------
train_dir = os.path.join(DATA_DIR, "train")
test_dir = os.path.join(DATA_DIR, "test")

train_ds = image_dataset_from_directory(
    train_dir,
    labels='inferred',
    label_mode='int',
    batch_size=BATCH_SIZE,
    image_size=IMG_SIZE,
    shuffle=True,
    seed=SEED,
)

val_ds = image_dataset_from_directory(
    test_dir,
    labels='inferred',
    label_mode='int',
    batch_size=BATCH_SIZE,
    image_size=IMG_SIZE,
    shuffle=False,
)

# cache & prefetch for speed
train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

# normalization layer
normalization_layer = layers.Rescaling(1./255)

# -------- metrics & compile helper --------
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=False)

def compile_and_train(model, name, epochs=EPOCHS):
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=1e-4),
        loss=loss_fn,
        metrics=['accuracy']
    )
    print(f"\nTraining model: {name}")
    history = model.fit(
        train_ds.map(lambda x,y: (normalization_layer(x), y)),
        validation_data=val_ds.map(lambda x,y: (normalization_layer(x), y)),
        epochs=epochs
    )
    model.save(f"{name}.h5")
    return history

# -------- Model 1: Transfer learning EfficientNetB0 --------
def build_efficientnet_model(img_size=IMG_SIZE, num_classes=NUM_CLASSES):
    base = EfficientNetB0(include_top=False, input_shape=(*img_size,3), weights='imagenet')
    base.trainable = False
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dropout(0.3)(x)
    out = layers.Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base.input, outputs=out)
    return model

# -------- Model 2: Simple custom CNN --------
def build_simple_cnn(img_size=IMG_SIZE, num_classes=NUM_CLASSES):
    inputs = keras.Input(shape=(*img_size,3))
    x = layers.Conv2D(32, 3, activation='relu', padding='same')(inputs)
    x = layers.MaxPool2D()(x)
    x = layers.Conv2D(64, 3, activation='relu', padding='same')(x)
    x = layers.MaxPool2D()(x)
    x = layers.Conv2D(128, 3, activation='relu', padding='same')(x)
    x = layers.MaxPool2D()(x)
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.4)(x)
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inputs, out)

# -------- Model 3: CNN features -> BiLSTM --------
def build_cnn_bilstm(img_size=IMG_SIZE, num_classes=NUM_CLASSES):
    inputs = keras.Input(shape=(*img_size,3))
    x = layers.Conv2D(32,3,activation='relu',padding='same')(inputs)
    x = layers.MaxPool2D((2,2))(x)
    x = layers.Conv2D(64,3,activation='relu',padding='same')(x)
    x = layers.MaxPool2D((2,2))(x)
    seq = layers.Lambda(lambda z: tf.reduce_max(z, axis=2))(x)  # (batch, H', C')
    seq = layers.Bidirectional(layers.LSTM(64))(seq)
    x = layers.Dense(64, activation='relu')(seq)
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inputs, out)

# -------- Model 4: Custom RNN-style --------
def build_custom_rnn(img_size=IMG_SIZE, num_classes=NUM_CLASSES, num_patches=28):
    inputs = keras.Input(shape=(*img_size,3))
    h, w = img_size
    patch_w = w // num_patches
    x = layers.Resizing(h, num_patches*patch_w)(inputs)
    x = layers.Reshape((h, num_patches, patch_w, 3))(x)
    td = layers.TimeDistributed(layers.Conv2D(16, 3, activation='relu', padding='same'))(x)
    td = layers.TimeDistributed(layers.MaxPool2D())(td)
    td = layers.TimeDistributed(layers.Flatten())(td)
    r = layers.LSTM(128)(td)
    x = layers.Dense(64, activation='relu')(r)
    out = layers.Dense(num_classes, activation='softmax')(x)
    return keras.Model(inputs, out)

# -------- Grad-CAM + bounding box --------
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )
    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        class_channel = predictions[:, pred_index]
    grads = tape.gradient(class_channel, conv_outputs)
    pooled_grads = tf.reduce_mean(grads, axis=(0,1,2))
    conv_outputs = conv_outputs[0]
    heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = tf.maximum(heatmap, 0) / (tf.reduce_max(heatmap) + 1e-9)
    return heatmap.numpy()

def heatmap_to_bbox(heatmap, upsample_size=IMG_SIZE):
    hm = cv2.resize(heatmap, upsample_size)
    hm_uint = np.uint8(255 * hm)
    _, th = cv2.threshold(hm_uint, 100, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(th, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        return None
    c = max(contours, key=cv2.contourArea)
    x,y,w,h = cv2.boundingRect(c)
    return (x,y,w,h)

def display_bbox_on_image(img, bbox, title=None):
    img = img.copy()
    if bbox is not None:
        x,y,w,h = bbox
        cv2.rectangle(img, (x,y), (x+w, y+h), (0,255,0), 2)
    plt.figure(figsize=(5,5))
    if title: plt.title(title)
    plt.imshow(img)
    plt.axis('off')
    plt.show()

# -------- build models --------
models = {
    'efficientnet': build_efficientnet_model(),
    'simple_cnn': build_simple_cnn(),
    'cnn_bilstm': build_cnn_bilstm(),
    'custom_rnn': build_custom_rnn()
}

# -------- Train models --------
histories = {}
for name, model in models.items():
    try:
        histories[name] = compile_and_train(model, name, epochs=EPOCHS)
    except Exception as e:
        print(f"Training failed for {name}: {e}")

# -------- Example Grad-CAM on EfficientNet --------
if os.path.exists('efficientnet.h5'):
    model = keras.models.load_model('efficientnet.h5')
    for images, labels in val_ds.take(1):
        img = images[0].numpy()
        lbl = labels[0].numpy()
        break
    img_input = np.expand_dims(img/255.0, axis=0)
    last_conv = None
    for layer in model.layers[::-1]:
        if isinstance(layer, layers.Conv2D):
            last_conv = layer.name
            break
    if last_conv:
        heatmap = make_gradcam_heatmap(img_input, model, last_conv)
        bbox = heatmap_to_bbox(heatmap, upsample_size=(IMG_SIZE[1], IMG_SIZE[0]))
        display_bbox_on_image((img).astype('uint8'), bbox, title=f"label={lbl}")

# -------- Plot accuracy comparison --------
plt.figure(figsize=(10,6))
for name, h in histories.items():
    acc = h.history.get('accuracy') or h.history.get('acc')
    val_acc = h.history.get('val_accuracy') or h.history.get('val_acc')
    epochs_range = range(1, len(acc)+1)
    plt.plot(epochs_range, acc, label=f"{name} train")
    plt.plot(epochs_range, val_acc, '--', label=f"{name} val")
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Model comparison')
plt.show()

print("\nDone. Models saved as <name>.h5")
