
# CS 559 – Thoraxial Medical Image Classification using CNNs

This notebook implements the full pipeline described in the project proposal:

- Load and filter the **CoronaHack – Chest X-Ray Dataset** metadata  
- Exclude all **COVID-19** images and keep only **Normal** vs **Pneumonia**  
- Preprocess images and apply **data augmentation**  
- Train a **CNN** for binary classification  
- Evaluate using **accuracy, F1 score, ROC/AUC, confusion matrix**  
- Extract deep features and train **traditional ML baselines**  
  - Logistic Regression  
  - SVM (RBF)  
  - Random Forest  
  - AdaBoost  

> **Important:** Before running, set `BASE_IMG_DIR` to the folder that contains your actual chest X‑ray images.


In [None]:

import os
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    roc_curve,
    auc,
    f1_score,
    accuracy_score
)
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier

import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input as mobilenet_preprocess

print("TensorFlow version:", tf.__version__)



## 1. Paths and configuration

Edit this section to point to the correct locations of:

- The `Chest_xray_Corona_Metadata.csv` file  
- The base directory containing the actual image files  

On Kaggle, the images usually sit in subfolders like `train/` and `test/`.  
The metadata column `Dataset_type` specifies which split each image belongs to.


In [None]:

# Path to the metadata CSV (update if needed)
META_CSV_PATH = "Chest_xray_Corona_Metadata.csv"  # or an absolute path if preferred

# Base directory that contains the image files.
# For example (Kaggle default):
# BASE_IMG_DIR = "/kaggle/input/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset"
BASE_IMG_DIR = "/path/to/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset"

# Image subfolders used for each Dataset_type in the metadata
DATASET_TYPE_TO_SUBDIR = {
    "TRAIN": "train",       # images in BASE_IMG_DIR/train
    "TEST": "test",         # images in BASE_IMG_DIR/test
    "VALIDATION": "train"   # some releases don't have a separate validation folder
}

# Image size and training hyperparameters
IMG_SIZE = (224, 224)
BATCH_SIZE = 16
EPOCHS = 15
RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)



## 2. Load and clean metadata

Steps:

1. Load the metadata CSV.  
2. Create a full image filepath for each row.  
3. Filter to keep only **Normal** and **Pneumonia** (spelled "Pnemonia" in this dataset).  
4. **Exclude COVID‑19** images using `Label_2_Virus_category == "COVID-19"`.  
5. Map labels to binary (0 = Normal, 1 = Pneumonia).


In [None]:

# Load metadata
meta = pd.read_csv(META_CSV_PATH)
print("Metadata shape:", meta.shape)
print(meta.head())

# Helper to build full filepath
def build_filepath(row):
    dataset_type = str(row["Dataset_type"]).upper()
    subdir = DATASET_TYPE_TO_SUBDIR.get(dataset_type, "train")
    return os.path.join(BASE_IMG_DIR, subdir, row["X_ray_image_name"])

meta["filepath"] = meta.apply(build_filepath, axis=1)

# Filter only rows where Label is Normal or Pnemonia
mask_binary = meta["Label"].isin(["Normal", "Pnemonia"])
binary_meta = meta[mask_binary].copy()

# Exclude rows where Label_2_Virus_category is explicitly COVID-19
binary_meta = binary_meta[binary_meta["Label_2_Virus_category"] != "COVID-19"]

print("After filtering (Normal vs Pneumonia, no COVID-19):", binary_meta.shape)

# Map labels to 0/1
label_map = {"Normal": 0, "Pnemonia": 1}
binary_meta["target"] = binary_meta["Label"].map(label_map)

binary_meta[["X_ray_image_name", "Label", "Label_2_Virus_category", "Dataset_type", "filepath", "target"]].head()



## 3. Train / validation / test split

We use the provided `Dataset_type` column:

- All rows with `Dataset_type == "TEST"` ⇒ **test set**  
- All remaining rows (TRAIN or VALIDATION) ⇒ **train+val**  
- Then we split train+val into **train** and **validation** sets using `train_test_split`.


In [None]:

# Use Dataset_type for initial split
is_test = binary_meta["Dataset_type"].str.upper() == "TEST"
df_test = binary_meta[is_test].copy()
df_trainval = binary_meta[~is_test].copy()

print("Train+Val size:", df_trainval.shape[0])
print("Test size:", df_test.shape[0])

# Split train+val into train and validation
df_train, df_val = train_test_split(
    df_trainval,
    test_size=0.2,
    stratify=df_trainval["target"],
    random_state=RANDOM_SEED
)

for name, df in [("train", df_train), ("val", df_val), ("test", df_test)]:
    print(f"{name} distribution:\n", df["target"].value_counts(normalize=True), "\n")



## 4. Data generators and augmentation

We use `ImageDataGenerator` to:

- Rescale pixel values to `[0, 1]`  
- Apply random rotations, shifts, zooms, and horizontal flips on the **training** set  
- Use only rescaling for **validation** and **test** sets


In [None]:

train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=15,
    width_shift_range=0.05,
    height_shift_range=0.05,
    zoom_range=0.1,
    horizontal_flip=True,
    fill_mode="nearest"
)

val_test_datagen = ImageDataGenerator(rescale=1./255)

def make_generator(df, datagen, shuffle=True):
    return datagen.flow_from_dataframe(
        dataframe=df,
        x_col="filepath",
        y_col="target",
        target_size=IMG_SIZE,
        color_mode="rgb",
        class_mode="binary",
        batch_size=BATCH_SIZE,
        shuffle=shuffle
    )

train_gen = make_generator(df_train, train_datagen, shuffle=True)
val_gen = make_generator(df_val, val_test_datagen, shuffle=False)
test_gen = make_generator(df_test, val_test_datagen, shuffle=False)



## 5. CNN architecture

We implement a straightforward CNN with:

- 3 × `(Conv2D + MaxPooling2D)` blocks  
- A fully-connected layer with dropout  
- A final sigmoid unit for binary classification  

Optimizer: **Adam**  
Loss: **binary cross-entropy**


In [None]:

def build_cnn(input_shape=(224, 224, 3)):
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation="relu", padding="same", input_shape=input_shape),
        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(64, (3, 3), activation="relu", padding="same"),
        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(128, (3, 3), activation="relu", padding="same"),
        layers.MaxPooling2D((2, 2)),

        layers.Flatten(),
        layers.Dense(128, activation="relu"),
        layers.Dropout(0.5),
        layers.Dense(1, activation="sigmoid")
    ])

    model.compile(
        optimizer=optimizers.Adam(learning_rate=1e-4),
        loss="binary_crossentropy",
        metrics=["accuracy"]
    )
    return model

cnn_model = build_cnn(input_shape=IMG_SIZE + (3,))
cnn_model.summary()



## 6. Train the CNN

We monitor validation loss and use early stopping + best model checkpointing.


In [None]:

callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=4,
        restore_best_weights=True
    ),
    tf.keras.callbacks.ModelCheckpoint(
        "cnn_best_model.h5",
        monitor="val_loss",
        save_best_only=True
    )
]

history = cnn_model.fit(
    train_gen,
    epochs=EPOCHS,
    validation_data=val_gen,
    callbacks=callbacks
)



## 7. Learning curves


In [None]:

def plot_learning_curves(history):
    hist = history.history
    epochs_range = range(1, len(hist["loss"]) + 1)

    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, hist["loss"], label="Train Loss")
    plt.plot(epochs_range, hist["val_loss"], label="Val Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.title("Loss")

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, hist["accuracy"], label="Train Acc")
    plt.plot(epochs_range, hist["val_accuracy"], label="Val Acc")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.title("Accuracy")

    plt.tight_layout()
    plt.show()

plot_learning_curves(history)



## 8. Evaluation: accuracy, F1, ROC/AUC, confusion matrix


In [None]:

def evaluate_model(model, generator, set_name="set"):
    # Predict probabilities
    probs = model.predict(generator)
    y_prob = probs.ravel()
    y_pred = (y_prob >= 0.5).astype(int)
    y_true = generator.classes

    acc = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)

    print(f"=== {set_name.upper()} RESULTS ===")
    print("Accuracy:", acc)
    print("F1 score:", f1)
    print("\nClassification report:\n", classification_report(y_true, y_pred, target_names=["Normal", "Pneumonia"]))

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(4, 4))
    plt.imshow(cm, interpolation="nearest")
    plt.title(f"{set_name.capitalize()} Confusion Matrix")
    plt.colorbar()
    tick_marks = np.arange(2)
    plt.xticks(tick_marks, ["Normal", "Pneumonia"], rotation=45)
    plt.yticks(tick_marks, ["Normal", "Pneumonia"])

    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], "d"),
                     ha="center", va="center",
                     color="white" if cm[i, j] > thresh else "black")

    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    plt.tight_layout()
    plt.show()

    # ROC curve
    fpr, tpr, _ = roc_curve(y_true, y_prob)
    roc_auc = auc(fpr, tpr)
    print("AUC:", roc_auc)

    plt.figure()
    plt.plot(fpr, tpr, label=f"ROC curve (area = {roc_auc:.3f})")
    plt.plot([0, 1], [0, 1], "k--")
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title(f"{set_name.capitalize()} ROC Curve")
    plt.legend(loc="lower right")
    plt.show()

    return {
        "accuracy": acc,
        "f1": f1,
        "auc": roc_auc
    }

cnn_val_metrics = evaluate_model(cnn_model, val_gen, set_name="validation")
cnn_test_metrics = evaluate_model(cnn_model, test_gen, set_name="test")



## 9. Classical ML baselines using deep features

To compare the CNN against traditional ML models, we:

1. Use **MobileNetV2** (pretrained on ImageNet) as a **fixed feature extractor**.  
2. Extract a feature vector for each image.  
3. Train classical classifiers on those features:

   - Logistic Regression  
   - SVM (RBF)  
   - Random Forest  
   - AdaBoost


In [None]:

base_cnn = MobileNetV2(
    include_top=False,
    weights="imagenet",   # requires internet to download the first time
    pooling="avg",
    input_shape=IMG_SIZE + (3,)
)
base_cnn.trainable = False

def extract_features(df):
    features = []
    labels = []

    for _, row in df.iterrows():
        img_path = row["filepath"]
        img = tf.keras.utils.load_img(img_path, target_size=IMG_SIZE)
        x = tf.keras.utils.img_to_array(img)
        x = np.expand_dims(x, axis=0)
        x = mobilenet_preprocess(x)

        feat = base_cnn.predict(x, verbose=0)
        features.append(feat.squeeze())
        labels.append(row["target"])

    return np.array(features), np.array(labels)

X_train_feat, y_train_feat = extract_features(df_train)
X_val_feat, y_val_feat = extract_features(df_val)
X_test_feat, y_test_feat = extract_features(df_test)

print("Feature shapes:", X_train_feat.shape, X_val_feat.shape, X_test_feat.shape)



## 10. Train classical classifiers

We train and evaluate:

- Logistic Regression  
- SVM with RBF kernel  
- Random Forest  
- AdaBoost


In [None]:

def train_and_eval_clf(clf, X_tr, y_tr, X_te, y_te, name="model"):
    clf.fit(X_tr, y_tr)
    y_prob = clf.predict_proba(X_te)[:, 1] if hasattr(clf, "predict_proba") else clf.decision_function(X_te)
    y_pred = (y_prob >= 0.5).astype(int)

    acc = accuracy_score(y_te, y_pred)
    f1 = f1_score(y_te, y_pred)
    fpr, tpr, _ = roc_curve(y_te, y_prob)
    roc_auc = auc(fpr, tpr)

    print(f"\n=== {name} ===")
    print("Accuracy:", acc)
    print("F1 score:", f1)
    print("AUC:", roc_auc)
    print("Classification report:\n", classification_report(y_te, y_pred, target_names=["Normal", "Pneumonia"]))

    return {"accuracy": acc, "f1": f1, "auc": roc_auc}

results_baselines = {}

log_reg = LogisticRegression(max_iter=1000)
results_baselines["Logistic Regression"] = train_and_eval_clf(
    log_reg, X_train_feat, y_train_feat, X_test_feat, y_test_feat, name="Logistic Regression"
)

svm_clf = SVC(kernel="rbf", probability=True)
results_baselines["SVM (RBF)"] = train_and_eval_clf(
    svm_clf, X_train_feat, y_train_feat, X_test_feat, y_test_feat, name="SVM (RBF)"
)

rf_clf = RandomForestClassifier(n_estimators=200, random_state=RANDOM_SEED)
results_baselines["Random Forest"] = train_and_eval_clf(
    rf_clf, X_train_feat, y_train_feat, X_test_feat, y_test_feat, name="Random Forest"
)

ada_clf = AdaBoostClassifier(n_estimators=200, random_state=RANDOM_SEED)
results_baselines["AdaBoost"] = train_and_eval_clf(
    ada_clf, X_train_feat, y_train_feat, X_test_feat, y_test_feat, name="AdaBoost"
)

results_baselines



## 11. Results summary

This section aggregates all metrics (CNN + baselines) into a single table for inclusion in the **Results** section of the report.


In [None]:

all_results = []

all_results.append({
    "Model": "CNN (End-to-end)",
    "Accuracy (test)": cnn_test_metrics["accuracy"],
    "F1 (test)": cnn_test_metrics["f1"],
    "AUC (test)": cnn_test_metrics["auc"]
})

for name, metrics in results_baselines.items():
    all_results.append({
        "Model": name,
        "Accuracy (test)": metrics["accuracy"],
        "F1 (test)": metrics["f1"],
        "AUC (test)": metrics["auc"]
    })

results_df = pd.DataFrame(all_results)
results_df
