# Pneumonia Detection Model Training

This notebook trains a CNN model to detect pneumonia from chest X-ray images using the Kaggle chest X-ray pneumonia dataset.

## Setup and Dataset Download

First, we'll install necessary libraries and download the dataset from Kaggle.

In [None]:
# Install required packages
!pip install kaggle tensorflow matplotlib numpy pandas scikit-learn

### Configure Kaggle API

To download the dataset, you need to provide your Kaggle API credentials. Follow these steps:
1. Go to your Kaggle account settings (https://www.kaggle.com/me/account)
2. Scroll down to 'API' section and click 'Create New API Token'
3. This will download a 'kaggle.json' file
4. Upload this file in the next cell

In [None]:
from google.colab import files
files.upload() # Upload your kaggle.json file here

In [None]:
# Configure Kaggle API
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
# Download the chest X-ray dataset
!kaggle datasets download -d paultimothymooney/chest-xray-pneumonia
!unzip -q chest-xray-pneumonia.zip

## Data Exploration and Preprocessing

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from sklearn.metrics import confusion_matrix, classification_report

# Set random seed for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
# Define dataset paths
train_dir = 'chest_xray/train'
val_dir = 'chest_xray/val'
test_dir = 'chest_xray/test'

# Check class distribution
def count_images(directory):
    normal = len(os.listdir(os.path.join(directory, 'NORMAL')))
    pneumonia = len(os.listdir(os.path.join(directory, 'PNEUMONIA')))
    return normal, pneumonia

train_normal, train_pneumonia = count_images(train_dir)
val_normal, val_pneumonia = count_images(val_dir)
test_normal, test_pneumonia = count_images(test_dir)

print(f"Training set: Normal={train_normal}, Pneumonia={train_pneumonia}")
print(f"Validation set: Normal={val_normal}, Pneumonia={val_pneumonia}")
print(f"Test set: Normal={test_normal}, Pneumonia={test_pneumonia}")

In [None]:
# Check if validation set is too small (it's known to be small in this dataset)
# If it's too small, we'll create a new validation set from the training data
if val_normal < 100 or val_pneumonia < 100:
    print("Validation set is too small. Will create a new validation set from training data.")
    
    # We'll move 20% of training data to validation
    from sklearn.model_selection import train_test_split
    from shutil import copyfile
    import random
    
    # Create new val directory if it doesn't exist
    os.makedirs('chest_xray/new_val/NORMAL', exist_ok=True)
    os.makedirs('chest_xray/new_val/PNEUMONIA', exist_ok=True)
    
    # Normal images
    normal_images = os.listdir(os.path.join(train_dir, 'NORMAL'))
    normal_train, normal_val = train_test_split(normal_images, test_size=0.2, random_state=42)
    
    # Pneumonia images
    pneumonia_images = os.listdir(os.path.join(train_dir, 'PNEUMONIA'))
    pneumonia_train, pneumonia_val = train_test_split(pneumonia_images, test_size=0.2, random_state=42)
    
    # Copy validation images
    for img in normal_val:
        copyfile(os.path.join(train_dir, 'NORMAL', img), 
                os.path.join('chest_xray/new_val/NORMAL', img))
        
    for img in pneumonia_val:
        copyfile(os.path.join(train_dir, 'PNEUMONIA', img), 
                os.path.join('chest_xray/new_val/PNEUMONIA', img))
    
    val_dir = 'chest_xray/new_val'
    val_normal, val_pneumonia = count_images(val_dir)
    print(f"New validation set: Normal={val_normal}, Pneumonia={val_pneumonia}")

In [None]:
# Display sample images
plt.figure(figsize=(12, 6))

# Normal samples
for i in range(3):
    plt.subplot(2, 3, i+1)
    img_path = os.path.join(train_dir, 'NORMAL', os.listdir(os.path.join(train_dir, 'NORMAL'))[i])
    img = plt.imread(img_path)
    plt.imshow(img, cmap='gray')
    plt.title('Normal')
    plt.axis('off')

# Pneumonia samples
for i in range(3):
    plt.subplot(2, 3, i+4)
    img_path = os.path.join(train_dir, 'PNEUMONIA', os.listdir(os.path.join(train_dir, 'PNEUMONIA'))[i])
    img = plt.imread(img_path)
    plt.imshow(img, cmap='gray')
    plt.title('Pneumonia')
    plt.axis('off')

plt.tight_layout()
plt.show()

## Data Preprocessing and Augmentation

In [None]:
# Define image dimensions and batch size
IMG_WIDTH, IMG_HEIGHT = 224, 224
BATCH_SIZE = 32

# Data augmentation for training set
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Only rescaling for validation and test sets
val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

# Load the datasets
train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(IMG_WIDTH, IMG_HEIGHT),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=True
)

validation_generator = val_datagen.flow_from_directory(
    val_dir,
    target_size=(IMG_WIDTH, IMG_HEIGHT),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=False
)

test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=(IMG_WIDTH, IMG_HEIGHT),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=False
)

## Model Definition

In [None]:
def create_model():
    model = Sequential([
        # First convolutional block
        Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=(IMG_WIDTH, IMG_HEIGHT, 3)),
        Conv2D(32, (3, 3), activation='relu', padding='same'),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),
        
        # Second convolutional block
        Conv2D(64, (3, 3), activation='relu', padding='same'),
        Conv2D(64, (3, 3), activation='relu', padding='same'),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),
        
        # Third convolutional block
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),
        
        # Fully connected layers
        Flatten(),
        Dense(512, activation='relu'),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])
    
    # Compile the model
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    return model

# Create the model
model = create_model()

# Display model summary
model.summary()

## Model Training with Callbacks

In [None]:
# Define callbacks
callbacks = [
    # Early stopping to prevent overfitting
    EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
    
    # Save the best model
    ModelCheckpoint('pneumonia_model.h5', save_best_only=True, monitor='val_loss'),
    
    # Reduce learning rate when a metric has stopped improving
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6)
]

# Calculate class weights to handle imbalance
total_train = train_normal + train_pneumonia
weight_normal = total_train / (2 * train_normal)
weight_pneumonia = total_train / (2 * train_pneumonia)
class_weights = {0: weight_normal, 1: weight_pneumonia}

print(f"Class weights: {class_weights}")

# Number of training and validation steps
STEPS_PER_EPOCH = train_generator.samples // BATCH_SIZE
VALIDATION_STEPS = validation_generator.samples // BATCH_SIZE + 1

# Train the model
history = model.fit(
    train_generator,
    steps_per_epoch=STEPS_PER_EPOCH,
    epochs=20,
    validation_data=validation_generator,
    validation_steps=VALIDATION_STEPS,
    callbacks=callbacks,
    class_weight=class_weights
)

## Evaluate Model Performance

In [None]:
# Plot training history
plt.figure(figsize=(12, 4))

# Plot accuracy
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='lower right')

# Plot loss
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper right')

plt.tight_layout()
plt.show()

In [None]:
# Load the best model
best_model = tf.keras.models.load_model('pneumonia_model.h5')

# Evaluate on the test set
test_loss, test_accuracy = best_model.evaluate(test_generator, steps=test_generator.samples // BATCH_SIZE + 1)
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")

In [None]:
# Predict test set
test_generator.reset()
predictions = best_model.predict(test_generator, steps=test_generator.samples // BATCH_SIZE + 1)
pred_classes = (predictions > 0.5).astype("int32")

# Get true labels
true_classes = test_generator.classes

# Calculate confusion matrix
cm = confusion_matrix(true_classes, pred_classes)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(2)
plt.xticks(tick_marks, ['Normal', 'Pneumonia'], rotation=45)
plt.yticks(tick_marks, ['Normal', 'Pneumonia'])

# Display the actual count values
threshold = cm.max() / 2.
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > threshold else "black")

plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

# Detailed classification report
print("\nClassification Report:")
target_names = ['Normal', 'Pneumonia']
print(classification_report(true_classes, pred_classes, target_names=target_names))

## Convert Model to TensorFlow.js Format for Web Integration

In [None]:
# Install tensorflowjs
!pip install tensorflowjs

In [None]:
# Create output directory
!mkdir -p pneumonia_model_tfjs

# Convert model to TensorFlow.js format
import tensorflowjs as tfjs
tfjs.converters.save_keras_model(best_model, 'pneumonia_model_tfjs')

In [None]:
# Zip the model files to download
!zip -r pneumonia_model_tfjs.zip pneumonia_model_tfjs

In [None]:
# Download the converted model
from google.colab import files
files.download('pneumonia_model_tfjs.zip')

## Test Model Prediction on a Sample Image

In [None]:
# Test prediction on a sample image
import random
from tensorflow.keras.preprocessing import image

# Choose a random test image
pneumonia_test_dir = os.path.join(test_dir, 'PNEUMONIA')
normal_test_dir = os.path.join(test_dir, 'NORMAL')

# Randomly pick normal or pneumonia
if random.choice([True, False]):
    img_path = os.path.join(pneumonia_test_dir, random.choice(os.listdir(pneumonia_test_dir)))
    true_class = 'Pneumonia'
else:
    img_path = os.path.join(normal_test_dir, random.choice(os.listdir(normal_test_dir)))
    true_class = 'Normal'

# Load and preprocess the image
img = image.load_img(img_path, target_size=(IMG_WIDTH, IMG_HEIGHT))
img_array = image.img_to_array(img)
img_array = np.expand_dims(img_array, axis=0) / 255.0

# Make prediction
prediction = best_model.predict(img_array)
predicted_class = 'Pneumonia' if prediction[0][0] > 0.5 else 'Normal'
confidence = prediction[0][0] if predicted_class == 'Pneumonia' else 1 - prediction[0][0]

# Display the image and prediction
plt.figure(figsize=(8, 8))
plt.imshow(img)
plt.title(f"True: {true_class}, Predicted: {predicted_class}\nConfidence: {confidence:.2f}")
plt.axis('off')
plt.show()

print(f"True label: {true_class}")
print(f"Predicted: {predicted_class} with {confidence:.2%} confidence")

## Next Steps for Web Integration

Now that you have trained the model and converted it to TensorFlow.js format, you need to:

1. Download the `pneumonia_model_tfjs.zip` file using the download link above
2. Extract it to your web application's public directory (e.g., `/public/model/`)
3. Update the `src/lib/tensorflow-model.ts` file in your React application to load this model instead of the placeholder

```typescript
// Replace this line:
// this.model = await this.createDummyModel();

// With:
this.model = await tf.loadLayersModel('/model/model.json');
```

4. Update the prediction logic to use the actual model prediction rather than the placeholder logic
