Pneumonia Detection using TensorFlow

Preprocess Images



In [15]:
import tensorflow as tf
import cv2
import os
import numpy as np

# Define paths to the dataset folders
TRAIN_PATH = '/Users/vladpavlovich/Downloads/chest_xray/train'
VAL_PATH = '/Users/vladpavlovich/Downloads/chest_xray/val'
TEST_PATH = '/Users/vladpavlovich/Downloads/chest_xray/test'

# Set the target image dimensions
IMG_SIZE = 128



def preprocess_image(image_path):
    """
    Load, resize, normalize, and add channel dimension to an image.
    """
    img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        print(f"Failed to load image: {image_path}")
        return None
    img_resized = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
    img_normalized = img_resized / 255.0
    img_final = np.expand_dims(img_normalized, axis=-1)  # Add channel dimension
    return img_final

def load_images_from_directory(directory):
    """
    Load and preprocess all images in a directory with subfolders representing
    classes (e.g., 'PNEUMONIA' and 'NORMAL').
    """
    images = []
    labels = []  # Assuming binary classification with label folders

    for label in os.listdir(directory):
        label_path = os.path.join(directory, label)  # No 'images' subfolder
        
        
        if os.path.isdir(label_path):
            for file_name in os.listdir(label_path):
                file_path = os.path.join(label_path, file_name)
                
                if file_path.endswith(('.png', '.jpg', '.jpeg')):  # Support more image formats
                    image = preprocess_image(file_path)
                    if image is not None:  # Only append if image loaded successfully
                        images.append(image)
                        labels.append(1 if label == 'PNEUMONIA' else 0)

    # Convert lists to numpy arrays
    images = np.array(images)
    labels = np.array(labels)
    
    return images, labels

# Load images from each dataset directory
train_images, train_labels = load_images_from_directory(TRAIN_PATH)
val_images, val_labels = load_images_from_directory(VAL_PATH)
test_images, test_labels = load_images_from_directory(TEST_PATH)

# Print shapes to verify
print("Train images shape:", train_images.shape)
print("Train labels shape:", train_labels.shape)
print("Validation images shape:", val_images.shape)
print("Validation labels shape:", val_labels.shape)
print("Test images shape:", test_images.shape)
print("Test labels shape:", test_labels.shape)
print("Number of classes:", len(np.unique(train_labels)))


Train images shape: (5216, 128, 128, 1)
Train labels shape: (5216,)
Validation images shape: (16, 128, 128, 1)
Validation labels shape: (16,)
Test images shape: (624, 128, 128, 1)
Test labels shape: (624,)
Number of classes: 2


In [16]:
import tensorflow as tf
from tensorflow.keras import layers, models

def build_pneumonia_cnn(input_shape=(128, 128, 1)):
    model = models.Sequential()
    
    # First Convolutional Block
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Second Convolutional Block
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Third Convolutional Block
    model.add(layers.Conv2D(128, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Optional Fourth Convolutional Block
    model.add(layers.Conv2D(256, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Flatten Layer
    model.add(layers.Flatten())
    
    # Fully Connected (Dense) Layers
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))  # Dropout for regularization
    model.add(layers.Dense(64, activation='relu'))
    
    # Output Layer
    model.add(layers.Dense(1, activation='sigmoid'))  # Sigmoid for binary classification
    
    return model

# Instantiate and compile the model
input_shape = (128, 128, 1)  # Adjust as needed
model = build_pneumonia_cnn(input_shape)

# Compile the model
model.compile(optimizer='adam', 
              loss='binary_crossentropy', 
              metrics=['accuracy'])

# Display the model architecture
model.summary()


Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 126, 126, 32)      320       
                                                                 
 max_pooling2d (MaxPooling2  (None, 63, 63, 32)        0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 61, 61, 64)        18496     
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 30, 30, 64)        0         
 g2D)                                                            
                                                                 
 conv2d_2 (Conv2D)           (None, 28, 28, 128)       73856     
                                                                 
 max_pooling2d_2 (MaxPoolin  (None, 14, 14, 128)       0

In [17]:
# Compile the model with Adam optimizer and binary cross-entropy loss
model.compile(optimizer='adam', 
              loss='binary_crossentropy', 
              metrics=['accuracy'])


EPOCHS = 20 


history = model.fit(
    train_images, train_labels,
    epochs=EPOCHS,
    validation_data=(val_images, val_labels)
)


test_loss, test_accuracy = model.evaluate(test_images, test_labels)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")





model_filename = 'pneumonia_detection_model.h5'

model.save(model_filename)

print(f"Model saved to {model_filename}")


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Test Loss: 3.3793
Test Accuracy: 0.7083
Model saved to pneumonia_detection_model.h5


  saving_api.save_model(


In [18]:
import tensorflow as tf
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# Load the saved model
model = tf.keras.models.load_model('pneumonia_detection_model.h5')

# Get predictions on the test set
test_predictions = model.predict(test_images)
test_predictions = (test_predictions > 0.5).astype(int).flatten()  # Convert probabilities to binary predictions

# Calculate various metrics
accuracy = accuracy_score(test_labels, test_predictions)
precision = precision_score(test_labels, test_predictions)
recall = recall_score(test_labels, test_predictions)
f1 = f1_score(test_labels, test_predictions)
conf_matrix = confusion_matrix(test_labels, test_predictions)

# Print results
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print("Confusion Matrix:")
print(conf_matrix)


Accuracy: 0.7083
Precision: 0.6825
Recall: 0.9974
F1 Score: 0.8104
Confusion Matrix:
[[ 53 181]
 [  1 389]]


Single Image Testing

In [19]:
import tensorflow as tf
import cv2
import numpy as np

# Load the trained model
model = tf.keras.models.load_model('pneumonia_detection_model.h5')

# Define the image size used in training
IMG_SIZE = 128

def preprocess_single_image(image_path):
    """
    Preprocess a single image: load, resize, normalize, and add channel dimension.
    """
    # Load the image in grayscale
    img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        raise ValueError("Image not found or unable to read.")
    
    # Resize to match the input shape of the model
    img_resized = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
    
    # Normalize pixel values to [0, 1]
    img_normalized = img_resized / 255.0
    
    # Expand dimensions to add batch and channel dimensions: (1, IMG_SIZE, IMG_SIZE, 1)
    img_final = np.expand_dims(img_normalized, axis=(0, -1))
    
    return img_final

def predict_image(image_path):
    """
    Predict if the image shows pneumonia or not.
    """
    # Preprocess the image
    processed_image = preprocess_single_image(image_path)
    
    # Get the model's prediction
    prediction = model.predict(processed_image)[0][0]  # Get the single prediction value
    
    # Interpret the prediction
    if prediction > 0.5:
        print("Prediction: Pneumonia")
    else:
        print("Prediction: Normal")

# Test with a single image
image_path = '/Users/vladpavlovich/Downloads/chest_xray/test/PNEUMONIA/person99_bacteria_474.jpeg'  # Replace with the path to your test image
predict_image(image_path)


Prediction: Pneumonia


Flask Front-End for HealthCare providers example

In [21]:
pip install flask

Defaulting to user installation because normal site-packages is not writeable
Collecting flask
  Downloading flask-3.0.3-py3-none-any.whl.metadata (3.2 kB)
Collecting itsdangerous>=2.1.2 (from flask)
  Downloading itsdangerous-2.2.0-py3-none-any.whl.metadata (1.9 kB)
Collecting click>=8.1.3 (from flask)
  Using cached click-8.1.7-py3-none-any.whl.metadata (3.0 kB)
Collecting blinker>=1.6.2 (from flask)
  Downloading blinker-1.8.2-py3-none-any.whl.metadata (1.6 kB)
Downloading flask-3.0.3-py3-none-any.whl (101 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.7/101.7 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hDownloading blinker-1.8.2-py3-none-any.whl (9.5 kB)
Using cached click-8.1.7-py3-none-any.whl (97 kB)
Downloading itsdangerous-2.2.0-py3-none-any.whl (16 kB)
Installing collected packages: itsdangerous, click, blinker, flask
Successfully installed blinker-1.8.2 click-8.1.7 flask-3.0.3 itsdangerous-2.2.0

[1m[[0m[34;49mnotice[0m[1;3

In [24]:
from flask import Flask, request, jsonify
import os
import numpy as np
import tensorflow as tf
import cv2
from werkzeug.utils import secure_filename

# Initialize Flask app
app = Flask(__name__)

# Set up the path for saving uploaded images
UPLOAD_FOLDER = 'uploads/'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER

# Load the trained model
model = tf.keras.models.load_model('pneumonia_detection_model.h5')

# Define allowed file extensions for upload
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'}

def allowed_file(filename):
    """Check if the file has an allowed extension."""
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

def preprocess_image(image_path):
    """Preprocess the uploaded image for prediction."""
    IMG_SIZE = 128
    img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    img_resized = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
    img_normalized = img_resized / 255.0
    img_final = np.expand_dims(img_normalized, axis=(0, -1))  # Shape (1, IMG_SIZE, IMG_SIZE, 1)
    return img_final

@app.route('/')
def home():
    return '''
    <!doctype html>
    <html lang="en">
    <head>
        <title>Pneumonia Detection</title>
        <style>
            body { font-family: Arial, sans-serif; display: flex; justify-content: center; align-items: center; height: 100vh; margin: 0; background-color: #f0f2f5; }
            .container { text-align: center; padding: 20px; background: white; border-radius: 8px; box-shadow: 0px 4px 8px rgba(0,0,0,0.2); }
            h1 { color: #333; }
            input[type="file"] { margin: 15px 0; }
            #prediction { font-size: 1.2em; font-weight: bold; color: #333; margin-top: 20px; }
            .button { background-color: #007bff; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; }
            .button:hover { background-color: #0056b3; }
            img { max-width: 100%; height: auto; margin-top: 20px; border-radius: 8px; }
        </style>
        <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
        <script>
            function previewImage(event) {
                var reader = new FileReader();
                reader.onload = function() {
                    var output = document.getElementById('imagePreview');
                    output.src = reader.result;
                };
                reader.readAsDataURL(event.target.files[0]);
            }

            function uploadImage() {
                var formData = new FormData();
                var fileInput = document.getElementById("fileInput");
                formData.append("file", fileInput.files[0]);

                $.ajax({
                    url: "/predict",
                    type: "POST",
                    data: formData,
                    processData: false,
                    contentType: false,
                    success: function(response) {
                        $("#prediction").text("Prediction: " + response.prediction);
                    },
                    error: function() {
                        $("#prediction").text("Error: Could not process the image.");
                    }
                });
            }
        </script>
    </head>
    <body>
        <div class="container">
            <h1>Upload an X-ray Image for Pneumonia Detection</h1>
            <input type="file" id="fileInput" accept="image/*" onchange="previewImage(event)">
            <img id="imagePreview" src="#" alt="Image Preview" style="display: none;">
            <button class="button" onclick="uploadImage()">Upload and Predict</button>
            <div id="prediction"></div>
        </div>
    </body>
    </html>
    '''

@app.route('/predict', methods=['POST'])
def predict():
    # Check if a file is part of the request
    if 'file' not in request.files:
        return jsonify({"error": "No file part in the request"}), 400
    
    file = request.files['file']

    # Check if the file is selected and has an allowed file extension
    if file.filename == '':
        return jsonify({"error": "No selected file"}), 400
    if not allowed_file(file.filename):
        return jsonify({"error": "File type not allowed"}), 400

    # Save the file
    filename = secure_filename(file.filename)
    filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    file.save(filepath)

    # Preprocess the image and make a prediction
    processed_image = preprocess_image(filepath)
    prediction = model.predict(processed_image)[0][0]

    # Interpret the prediction
    result = "Pneumonia" if prediction > 0.5 else "Normal"
    
    # Optionally delete the file after prediction to save space
    os.remove(filepath)

    return jsonify({"prediction": result})

if __name__ == '__main__':
    # Create the upload folder if it doesn't exist
    if not os.path.exists(UPLOAD_FOLDER):
        os.makedirs(UPLOAD_FOLDER)
    
    app.run(port=5000, debug=False)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit
127.0.0.1 - - [04/Nov/2024 20:04:28] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [04/Nov/2024 20:04:28] "GET / HTTP/1.1" 200 -




127.0.0.1 - - [04/Nov/2024 20:04:34] "POST /predict HTTP/1.1" 200 -
