<a href="https://colab.research.google.com/github/Joaquin-Estevez/CAP4770/blob/main/Predicting_Pneumonia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Required Libraries

In [None]:
!pip install matplotlib tensorflow pandas numpy keras
!pip install psycopg2 pandas tensorflow

# Import Dependencies

In [4]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sqlalchemy import create_engine, Column, Integer, String, LargeBinary
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from PIL import Image
import io

# Database Setup

In [6]:
Base = declarative_base()
engine = create_engine('sqlite:///chest_xray_database.db')
Session = sessionmaker(bind=engine)

class XRayImage(Base):
    __tablename__ = 'xray_images'

    id = Column(Integer, primary_key=True)
    filename = Column(String)
    dataset_type = Column(String)  # 'train', 'test', 'val'
    image_class = Column(String)  # 'NORMAL' or 'PNEUMONIA'
    image_data = Column(LargeBinary)

  Base = declarative_base()


# Create tables

In [8]:
Base.metadata.create_all(engine)

# Download and process Kaggle dataset

In [9]:
import kagglehub
from pathlib import Path

# Download dataset
base_path = kagglehub.dataset_download("paultimothymooney/chest-xray-pneumonia")
base_path = Path(base_path) / 'chest_xray'

def store_images_in_db(base_path):
    session = Session()

    # Dataset types and their paths
    dataset_types = {
        'train': base_path / 'train',
        'test': base_path / 'test',
        'val': base_path / 'val'
    }

    # Count of images for reporting
    image_counts = {
        'train': {'NORMAL': 0, 'PNEUMONIA': 0},
        'test': {'NORMAL': 0, 'PNEUMONIA': 0},
        'val': {'NORMAL': 0, 'PNEUMONIA': 0}
    }

    # Iterate through dataset types
    for dataset_type, dataset_path in dataset_types.items():
        for image_class in ['NORMAL', 'PNEUMONIA']:
            class_path = dataset_path / image_class

            # Store each image in the database
            for filename in os.listdir(class_path):
                file_path = class_path / filename

                # Read image and convert to bytes
                with Image.open(file_path) as img:
                    img_byte_arr = io.BytesIO()
                    img.save(img_byte_arr, format='JPEG')
                    img_byte_arr = img_byte_arr.getvalue()

                # Create database entry
                db_image = XRayImage(
                    filename=filename,
                    dataset_type=dataset_type,
                    image_class=image_class,
                    image_data=img_byte_arr
                )

                session.add(db_image)
                image_counts[dataset_type][image_class] += 1

    session.commit()
    session.close()

    return image_counts

# Store images in database and get counts

In [10]:
image_counts = store_images_in_db(base_path)

# Print Image Counts

In [11]:
for dataset_type, classes in image_counts.items():
    print(f'There are {classes["NORMAL"]} normal images in {dataset_type} dataset')
    print(f'There are {classes["PNEUMONIA"]} pneumonia images in {dataset_type} dataset')

There are 1341 normal images in train dataset
There are 3875 pneumonia images in train dataset
There are 234 normal images in test dataset
There are 390 pneumonia images in test dataset
There are 8 normal images in val dataset
There are 8 pneumonia images in val dataset


# DB Image Generator

In [2]:
import tensorflow as tf
import numpy as np
from PIL import Image
import io

class DatabaseImageGenerator:
    def __init__(self, session, dataset_type, batch_size=32, image_size=(256, 256)):
        self.session = session
        self.dataset_type = dataset_type
        self.batch_size = batch_size
        self.image_size = image_size

        # Count total images
        self.total_images = self.session.query(XRayImage).filter_by(
            dataset_type=dataset_type
        ).count()

        # Create a query to iterate through images
        self.image_query = self.session.query(XRayImage).filter_by(
            dataset_type=dataset_type
        )

    def __call__(self):
        # Shuffle the query
        shuffled_query = self.image_query.order_by(XRayImage.id)

        # Prepare lists to collect images and labels
        all_images = []
        all_labels = []

        # Collect all valid images
        for image_record in shuffled_query:
            try:
                img = Image.open(io.BytesIO(image_record.image_data))
                img = img.convert('RGB')  # Ensure RGB mode
                img = img.resize(self.image_size)
                img_array = np.array(img, dtype=np.float32) / 255.0  # Normalize

                # Ensure consistent shape
                if img_array.shape == (256, 256, 3):
                    all_images.append(img_array)

                    # Assign labels
                    label = [1, 0] if image_record.image_class == 'NORMAL' else [0, 1]
                    all_labels.append(label)
            except Exception as e:
                print(f"Error processing image: {e}")

        # Shuffle the data
        indices = np.arange(len(all_images))
        np.random.shuffle(indices)
        all_images = [all_images[i] for i in indices]
        all_labels = [all_labels[i] for i in indices]

        # Yield batches
        for offset in range(0, len(all_images), self.batch_size):
            batch_images = all_images[offset:offset+self.batch_size]
            batch_labels = all_labels[offset:offset+self.batch_size]

            yield np.array(batch_images), np.array(batch_labels)

In [21]:
def create_tf_dataset(session, dataset_type):
    generator = DatabaseImageGenerator(session, dataset_type)

    # Create TensorFlow dataset from generator
    dataset = tf.data.Dataset.from_generator(
        generator,
        output_signature=(
            tf.TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32),
            tf.TensorSpec(shape=(None, 2), dtype=tf.float32)
        )
    )

    # Flatten the extra dimension and batch
    dataset = dataset.unbatch().batch(32).prefetch(tf.data.AUTOTUNE)

    return dataset

# Create sessions for different datasets

In [7]:
train_session = Session()
test_session = Session()
val_session = Session()


# Create TensorFlow datasets

In [8]:
Train = create_tf_dataset(train_session, 'train')
Test = create_tf_dataset(test_session, 'test')
Validation = create_tf_dataset(val_session, 'val')

# Initialize Model

In [23]:
model = tf.keras.models.Sequential([
	layers.Conv2D(32, (3, 3), activation='relu', input_shape=(256, 256, 3)),
	layers.MaxPooling2D(2, 2),
	layers.Conv2D(64, (3, 3), activation='relu'),
	layers.MaxPooling2D(2, 2),
	layers.Conv2D(64, (3, 3), activation='relu'),
	layers.MaxPooling2D(2, 2),
	layers.Conv2D(64, (3, 3), activation='relu'),
	layers.MaxPooling2D(2, 2),

	layers.Flatten(),
	layers.Dense(512, activation='relu'),
	layers.BatchNormalization(),
	layers.Dense(512, activation='relu'),
	layers.Dropout(0.1),
	layers.BatchNormalization(),
	layers.Dense(512, activation='relu'),
	layers.Dropout(0.2),
	layers.BatchNormalization(),
	layers.Dense(512, activation='relu'),
	layers.Dropout(0.2),
	layers.BatchNormalization(),
	layers.Dense(2, activation='sigmoid')
])

In [24]:
model.summary()

In [15]:
model.compile(
	loss='binary_crossentropy',
	optimizer='adam',
	metrics=['accuracy']
)

# Training

In [None]:
history = model.fit(Train,
		epochs=1,
		validation_data=Validation)

In [None]:
model.save('model.h5')

# Evaluate Performance

In [None]:
loss, accuracy = model.evaluate(Test)
print('The accuracy of the model on test dataset is',
    np.round(accuracy*100))

# Analytics Functions

In [None]:
def plot_class_distribution():
    data = {
        "Dataset": [],
        "Class": [],
        "Count": []
    }

    for dataset_type in ['train', 'test', 'val']:
        session = Session()
        normal_count = session.query(XRayImage).filter_by(dataset_type=dataset_type, image_class='NORMAL').count()
        pneumonia_count = session.query(XRayImage).filter_by(dataset_type=dataset_type, image_class='PNEUMONIA').count()

        data["Dataset"].extend([dataset_type, dataset_type])
        data["Class"].extend(["Normal", "Pneumonia"])
        data["Count"].extend([normal_count, pneumonia_count])

        session.close()

    df = pd.DataFrame(data)
    sns.barplot(x="Dataset", y="Count", hue="Class", data=df)
    plt.title("Class Distribution in Datasets")
    plt.show()

plot_class_distribution()

In [None]:
def plot_sample_images():
    session = Session()
    plt.figure(figsize=(10, 8))

    # Pneumonia images
    pneumonia_images = session.query(XRayImage).filter_by(dataset_type='train', image_class='PNEUMONIA').limit(4)
    for i, image_record in enumerate(pneumonia_images):
        plt.subplot(2, 4, i + 1)
        img = Image.open(io.BytesIO(image_record.image_data))
        plt.imshow(img, cmap='gray')
        plt.title("Pneumonia")
        plt.axis("off")

    # Normal images
    normal_images = session.query(XRayImage).filter_by(dataset_type='train', image_class='NORMAL').limit(4)
    for i, image_record in enumerate(normal_images):
        plt.subplot(2, 4, i + 5)
        img = Image.open(io.BytesIO(image_record.image_data))
        plt.imshow(img, cmap='gray')
        plt.title("Normal")
        plt.axis("off")

    plt.tight_layout()
    plt.show()
    session.close()

plot_sample_images()

In [None]:
def plot_confusion_matrix(y_true, y_pred, classes):
    from sklearn.metrics import confusion_matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

y_true = np.argmax(y_test, axis=1)
y_pred = np.argmax(model.predict(Test), axis=1)

plot_confusion_matrix(y_true, y_pred, classes=["Normal", "Pneumonia"])

# Prediction Function

In [None]:
def predict_xray(image_bytes, model):
    # Convert bytes to image
    test_image = Image.open(io.BytesIO(image_bytes))
    test_image = test_image.resize((256, 256))
    test_image = np.array(test_image)
    test_image = np.expand_dims(test_image, axis=0)

    # Make prediction
    result = model.predict(test_image, verbose=0)
    class_probabilities = result[0]

    # Print prediction and probabilities
    prediction = "Normal" if class_probabilities[0] > class_probabilities[1] else "Pneumonia"
    print(f"Prediction: {prediction}")
    print(f"Probability of Normal: {class_probabilities[0]:.2%}")
    print(f"Probability of Pneumonia: {class_probabilities[1]:.2%}")

# Test prediction with database-stored images

In [None]:
session = Session()

# Test with a normal case

In [None]:
normal_image = session.query(XRayImage).filter_by(dataset_type='test', image_class='NORMAL').first()
print("\nPredicting Normal Case:")
predict_xray(normal_image.image_data, model)

# Test with a pneumonia case

In [None]:
pneumonia_image = session.query(XRayImage).filter_by(dataset_type='test', image_class='PNEUMONIA').first()
print("\nPredicting Pneumonia Case:")
predict_xray(pneumonia_image.image_data, model)

# Closing Session

In [None]:
session.close()