# Pneumonia Detection Model Training
## Ensemble Deep Learning Model (ResNet50 + DenseNet121)

This notebook trains an ensemble model for pneumonia detection using chest X-ray images.

### Setup Instructions:
1. Upload your `chest_xray` folder to Google Drive
2. Enable GPU: Runtime → Change runtime type → GPU
3. Run all cells in order

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Install required packages
!pip install tensorflow keras numpy pillow scikit-learn matplotlib -q

In [None]:
# Import libraries
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import models
from tensorflow.keras.applications import ResNet50, DenseNet121
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt

tf.random.set_seed(42)
np.random.seed(42)

print(f"TensorFlow version: {tf.__version__}")
print(f"GPU Available: {tf.config.list_physical_devices('GPU')}")

In [None]:
# IMPORTANT: Update this path to where you uploaded your chest_xray folder in Google Drive
# Example: '/content/drive/MyDrive/PneumoniaDetection/chest_xray'
DATA_DIR = '/content/data/chest_xray'

# Verify the path exists
if os.path.exists(DATA_DIR):
    print(f"✓ Dataset found at: {DATA_DIR}")
    print(f"  - train: {os.path.exists(os.path.join(DATA_DIR, 'train'))}")
    print(f"  - val: {os.path.exists(os.path.join(DATA_DIR, 'val'))}")
    print(f"  - test: {os.path.exists(os.path.join(DATA_DIR, 'test'))}")
else:
    print(f"✗ ERROR: Dataset not found at {DATA_DIR}")
    print("Please update DATA_DIR path above!")

In [None]:
# Configuration
IMG_SIZE = 224
BATCH_SIZE = 32
EPOCHS = 5

train_dir = os.path.join(DATA_DIR, 'train')
val_dir = os.path.join(DATA_DIR, 'val')
test_dir = os.path.join(DATA_DIR, 'test')

In [None]:
# Data augmentation and preprocessing
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    horizontal_flip=True,
    zoom_range=0.2,
    shear_range=0.2,
    fill_mode='nearest'
)

val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

In [None]:
# Create data generators
train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=True
)

val_generator = val_datagen.flow_from_directory(
    val_dir,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=False
)

test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=False
)

print(f"\nDataset loaded successfully!")
print(f"Training samples: {train_generator.samples}")
print(f"Validation samples: {val_generator.samples}")
print(f"Test samples: {test_generator.samples}")

In [None]:
from tensorflow.keras import layers, models
from tensorflow.keras.applications import ResNet50, DenseNet121
from tensorflow import keras

def create_ensemble_model():
    input_layer = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))

    # Load pretrained models with unique names and without shared input
    resnet_base = ResNet50(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3), name='resnet50_base')
    densenet_base = DenseNet121(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3), name='densenet121_base')

    # Freeze most layers for transfer learning (keep last few trainable)
    for layer in resnet_base.layers[:-10]:
        layer.trainable = False
    for layer in densenet_base.layers[:-10]:
        layer.trainable = False

    # Pass same input through both models
    resnet_output = layers.GlobalAveragePooling2D()(resnet_base(input_layer))
    densenet_output = layers.GlobalAveragePooling2D()(densenet_base(input_layer))

    # Merge outputs (concatenate to combine features)
    merged = layers.Concatenate()([resnet_output, densenet_output])

    # Classification head
    x = layers.BatchNormalization()(merged)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.3)(x)
    output = layers.Dense(1, activation='sigmoid')(x)

    model = keras.Model(inputs=input_layer, outputs=output)
    return model


print("Creating ensemble model...")
model = create_ensemble_model()

# Compile model
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.0001),
    loss='binary_crossentropy',
    metrics=['accuracy', keras.metrics.Precision(), keras.metrics.Recall()]
)

print("\n✅ Model created successfully!")
model.summary()


In [None]:
# Setup callbacks
checkpoint = ModelCheckpoint(
    'model.h5',
    monitor='val_accuracy',
    save_best_only=True,
    mode='max',
    verbose=1
)

early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    min_lr=1e-7,
    verbose=1
)

callbacks = [checkpoint, early_stopping, reduce_lr]

In [None]:
# Train the model
print("Starting training...\n")
history = model.fit(
    train_generator,
    epochs=EPOCHS,
    validation_data=val_generator,
    callbacks=callbacks,
    verbose=1
)

print("\nTraining completed!")

In [None]:
# Evaluate on test set
print("Evaluating on test set...\n")
test_loss, test_accuracy, test_precision, test_recall = model.evaluate(test_generator)

f1_score = 2 * (test_precision * test_recall) / (test_precision + test_recall)

print(f"\n{'='*50}")
print(f"TEST RESULTS")
print(f"{'='*50}")
print(f"Accuracy:  {test_accuracy:.4f} ({test_accuracy*100:.2f}%)")
print(f"Precision: {test_precision:.4f} ({test_precision*100:.2f}%)")
print(f"Recall:    {test_recall:.4f} ({test_recall*100:.2f}%)")
print(f"F1-Score:  {f1_score:.4f} ({f1_score*100:.2f}%)")
print(f"{'='*50}")

In [None]:
# Detailed classification report
test_generator.reset()
y_pred_probs = model.predict(test_generator)
y_pred = (y_pred_probs > 0.5).astype(int).flatten()
y_true = test_generator.classes

print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=['NORMAL', 'PNEUMONIA']))

print("\nConfusion Matrix:")
cm = confusion_matrix(y_true, y_pred)
print(cm)
print(f"\nTrue Negatives:  {cm[0][0]}")
print(f"False Positives: {cm[0][1]}")
print(f"False Negatives: {cm[1][0]}")
print(f"True Positives:  {cm[1][1]}")

In [None]:
# Plot training history
plt.figure(figsize=(15, 5))

plt.subplot(1, 3, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy', linewidth=2)
plt.plot(history.history['val_accuracy'], label='Val Accuracy', linewidth=2)
plt.title('Model Accuracy', fontsize=14, fontweight='bold')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 3, 2)
plt.plot(history.history['loss'], label='Train Loss', linewidth=2)
plt.plot(history.history['val_loss'], label='Val Loss', linewidth=2)
plt.title('Model Loss', fontsize=14, fontweight='bold')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 3, 3)
plt.plot(history.history['precision'], label='Train Precision', linewidth=2)
plt.plot(history.history['val_precision'], label='Val Precision', linewidth=2)
plt.plot(history.history['recall'], label='Train Recall', linewidth=2)
plt.plot(history.history['val_recall'], label='Val Recall', linewidth=2)
plt.title('Precision & Recall', fontsize=14, fontweight='bold')
plt.xlabel('Epoch')
plt.ylabel('Score')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('training_history.png', dpi=300, bbox_inches='tight')
plt.show()

print("Training history plot saved!")

In [None]:
# Save model to Google Drive
save_path = '/content/drive/MyDrive/model.h5'
model.save(save_path)
print(f"\n✓ Model saved to: {save_path}")
print(f"✓ Download this file and place it in your project's 'model' folder")

# Also save locally in Colab session
model.save('model.h5')
print(f"\n✓ Model also saved locally in Colab session")
print(f"  You can download it from the Files panel on the left")

In [None]:
# Test prediction on a sample image
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing import image

# Get a random test image
test_generator.reset()
x_batch, y_batch = next(test_generator)
sample_image = x_batch[0]
true_label = y_batch[0]

# Make prediction
prediction = model.predict(np.expand_dims(sample_image, axis=0))[0][0]

# Display
plt.figure(figsize=(8, 6))
plt.imshow(sample_image)
plt.axis('off')
plt.title(f"True: {'PNEUMONIA' if true_label == 1 else 'NORMAL'}\n"
          f"Predicted: {'PNEUMONIA' if prediction > 0.5 else 'NORMAL'}\n"
          f"Confidence: {prediction if prediction > 0.5 else 1-prediction:.4f}",
          fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

print(f"\nPrediction Result:")
print(f"  Raw output: {prediction:.6f}")
print(f"  Prediction: {'PNEUMONIA' if prediction > 0.5 else 'NORMAL'}")
print(f"  Confidence: {(prediction if prediction > 0.5 else 1-prediction)*100:.2f}%")

## Download Your Trained Model

### Option 1: From Google Drive
- The model is saved at: `/content/drive/MyDrive/model.h5`
- Go to your Google Drive and download it

### Option 2: From Colab Files
- Click the Files icon on the left sidebar
- Find `model.h5`
- Right-click → Download

### Next Steps:
1. Download `model.h5` (file size ~200-300MB)
2. Place it in your project's `model/` folder
3. The file path should be: `PneumoniaDetection/model/model.h5`
4. Now you can run your backend server and start making predictions!

---

**Training Complete! 🎉**