In [None]:
# 📌 Install Required Libraries
!pip install tensorflow numpy opencv-python matplotlib scikit-learn imgaug albumentations gdown

Collecting imgaug
  Downloading imgaug-0.4.0-py2.py3-none-any.whl.metadata (1.8 kB)
Downloading imgaug-0.4.0-py2.py3-none-any.whl (948 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m948.0/948.0 kB[0m [31m12.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: imgaug
Successfully installed imgaug-0.4.0


In [None]:
# 📌 Import Libraries
import os
import cv2
import numpy as np
import tensorflow as tf
import gdown
import matplotlib.pyplot as plt
from tensorflow.keras.applications import EfficientNetB3
from tensorflow.keras.layers import Input, Conv2D, Flatten, Dense, Reshape, Conv2DTranspose, Lambda, GlobalAveragePooling2D, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.losses import binary_crossentropy
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score
from sklearn.covariance import EmpiricalCovariance
import albumentations as A
import random
from scipy.ndimage import gaussian_filter, map_coordinates

In [None]:
# 📌 Load dataset paths
train_normal_path = "/content/drive/MyDrive/data/NORMAL"
test_normal_path = "/content/drive/MyDrive/data/test/NORMAL"
test_pneumonia_path = "/content/drive/MyDrive/data/test/PNEUMONIA"
IMG_SIZE = 300  # EfficientNetB3 input size

In [None]:
# 📌 Lung Segmentation + CLAHE Only
def lung_segmentation(image):
    """Basic segmentation using thresholding and contour detection"""
    # Threshold to binary image
    _, thresh = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

    # Find contours and fill the largest 2 (lungs)
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    mask = np.zeros_like(image)
    contours = sorted(contours, key=cv2.contourArea, reverse=True)[:2]
    cv2.drawContours(mask, contours, -1, color=255, thickness=-1)

    # Apply mask
    segmented = cv2.bitwise_and(image, image, mask=mask)
    return segmented

def apply_preprocessing(image):
    image = lung_segmentation(image)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    image = clahe.apply(image)
    return image


In [None]:
# 📌 Augmentation Functions
augmentations = A.Compose([
    A.Rotate(limit=10, p=0.5),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.GaussianBlur(blur_limit=(3, 7), p=0.5),
    A.ElasticTransform(alpha=1, sigma=50, alpha_affine=50, p=0.5),
])

  A.ElasticTransform(alpha=1, sigma=50, alpha_affine=50, p=0.5),


In [None]:
def apply_augmentations(image):
    augmented = augmentations(image=image)
    return augmented["image"]

In [None]:
# 📌 Load Images with Preprocessing and Augmentation
def load_images(folder, augment=False):
    images = []
    for filename in os.listdir(folder):
        img_path = os.path.join(folder, filename)
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        if img is None:
            continue
        img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))

        # Apply preprocessing
        img = apply_preprocessing(img)

        # Expand dims to add channel axis
        img = np.expand_dims(img, axis=-1)

        # Apply augmentations if enabled
        if augment:
            img = np.squeeze(img, axis=-1)  # Remove channel axis for augmentation
            img = apply_augmentations(img)
            img = np.expand_dims(img, axis=-1)  # Add channel axis back

        images.append(img)

    return np.array(images)

In [None]:
# 📌 Load and preprocess datasets
train_normal = load_images(train_normal_path, augment=True)
test_normal = load_images(test_normal_path, augment=False)
test_pneumonia = load_images(test_pneumonia_path, augment=False)

In [None]:
# Convert grayscale to RGB for EfficientNetB3
train_normal = np.repeat(train_normal, 3, axis=-1)
test_normal = np.repeat(test_normal, 3, axis=-1)
test_pneumonia = np.repeat(test_pneumonia, 3, axis=-1)

In [None]:
# 📌 EfficientNetB3 Model (Feature Extractor)
base_model = EfficientNetB3(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))
for layer in base_model.layers[:-20]:
    layer.trainable = False

'''x = GlobalAveragePooling2D()(base_model.output)
x = Dense(256, activation='relu')(x)
x = Dropout(0.3)(x)
classifier_output = Dense(1, activation='sigmoid')(x)
classifier_model = Model(base_model.input, classifier_output)
classifier_model.compile(optimizer=Adam(learning_rate=0.0001), loss='binary_crossentropy', metrics=['accuracy'])

# 📌 Train classifier on pneumonia images
classifier_model.fit(train_pneumonia, np.ones(len(train_pneumonia)), epochs=5, batch_size=16, validation_split=0.1)'''

Downloading data from https://storage.googleapis.com/keras-applications/efficientnetb3_notop.h5
[1m43941136/43941136[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


"x = GlobalAveragePooling2D()(base_model.output)\nx = Dense(256, activation='relu')(x)\nx = Dropout(0.3)(x)\nclassifier_output = Dense(1, activation='sigmoid')(x)\nclassifier_model = Model(base_model.input, classifier_output)\nclassifier_model.compile(optimizer=Adam(learning_rate=0.0001), loss='binary_crossentropy', metrics=['accuracy'])\n\n# 📌 Train classifier on pneumonia images\nclassifier_model.fit(train_pneumonia, np.ones(len(train_pneumonia)), epochs=5, batch_size=16, validation_split=0.1)"

In [None]:
# 📌 VAE Model
latent_dim = 64

# Encoder
inputs = Input(shape=(IMG_SIZE, IMG_SIZE, 3))
x = base_model(inputs, training=False)
x = GlobalAveragePooling2D()(x)
x = Dense(256, activation='relu')(x)
z_mean = Dense(latent_dim, name="z_mean")(x)
z_log_var = Dense(latent_dim, name="z_log_var")(x)

def sampling(args):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

z = Lambda(sampling, output_shape=(latent_dim,), name="z")([z_mean, z_log_var])

encoder = Model(inputs, [z_mean, z_log_var, z], name="encoder")

# Decoder
decoder_input = Input(shape=(latent_dim,))
x = Dense(75 * 75 * 64, activation='relu')(decoder_input)
x = Reshape((75, 75, 64))(x)
x = Conv2DTranspose(64, (3,3), activation='relu', strides=2, padding='same')(x)
x = Conv2DTranspose(32, (3,3), activation='relu', strides=2, padding='same')(x)
x = Conv2DTranspose(3, (3,3), activation='sigmoid', padding='same')(x)

outputs = Lambda(lambda img: tf.image.resize(img, (IMG_SIZE, IMG_SIZE)))(x)

decoder = Model(decoder_input, outputs, name="decoder")

# 📌 VAE Loss with Corrected call() and compile() method
class VAE(Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = tf.keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = tf.keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_loss")

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        return self.decoder(z)

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]

        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                tf.keras.losses.binary_crossentropy(
                    tf.keras.backend.flatten(data),
                    tf.keras.backend.flatten(reconstruction)
                )
            )

            reconstruction_loss *= IMG_SIZE * IMG_SIZE * 3
            kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=-1)
            kl_loss = tf.reduce_mean(kl_loss)
            total_loss = reconstruction_loss + kl_loss

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def test_step(self, data):
        if isinstance(data, tuple):
            data = data[0]

        z_mean, z_log_var, z = self.encoder(data)
        reconstruction = self.decoder(z)
        reconstruction_loss = tf.reduce_mean(
            tf.keras.losses.binary_crossentropy(
                tf.keras.backend.flatten(data),
                tf.keras.backend.flatten(reconstruction)
            )
        )
        reconstruction_loss *= IMG_SIZE * IMG_SIZE * 3
        kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=-1)
        kl_loss = tf.reduce_mean(kl_loss)
        total_loss = reconstruction_loss + kl_loss

        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

# Compile the VAE model
vae = VAE(encoder, decoder)
vae.compile(optimizer=Adam())


In [None]:
# 📌 Train VAE
vae.fit(train_normal, train_normal, epochs=30, batch_size=32, validation_split=0.1)


Epoch 1/30
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m133s[0m 2s/step - kl_loss: 168936768.0000 - loss: -40275648.0000 - reconstruction_loss: -209212400.0000 - val_kl_loss: 15693.8174 - val_loss: -488573600.0000 - val_reconstruction_loss: -488589312.0000
Epoch 2/30
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 50ms/step - kl_loss: 11654.6836 - loss: -469062240.0000 - reconstruction_loss: -469073888.0000 - val_kl_loss: 3883.4180 - val_loss: -488585376.0000 - val_reconstruction_loss: -488589248.0000
Epoch 3/30
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 50ms/step - kl_loss: 3108.8169 - loss: -466785152.0000 - reconstruction_loss: -466788256.0000 - val_kl_loss: 4317.8779 - val_loss: -488584896.0000 - val_reconstruction_loss: -488589312.0000
Epoch 4/30
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 50ms/step - kl_loss: 3702.4260 - loss: -466128224.0000 - reconstruction_loss: -466131872.0000 - val_kl_loss: 1166.4163 - val

<keras.src.callbacks.history.History at 0x79c962125310>

In [None]:
# 📌 Anomaly Detection with VAE
z_mean_normal, _, _ = encoder.predict(test_normal)
z_mean_pneumonia, _, _ = encoder.predict(test_pneumonia)

[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 4s/step
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 2s/step


In [None]:
# Compute Mahalanobis distance
ec = EmpiricalCovariance()
ec.fit(z_mean_normal)

mahalanobis_normal = ec.mahalanobis(z_mean_normal)
mahalanobis_pneumonia = ec.mahalanobis(z_mean_pneumonia)

In [None]:
# Evaluation
y_true = np.concatenate([np.zeros(len(mahalanobis_normal)), np.ones(len(mahalanobis_pneumonia))])
y_scores = np.concatenate([mahalanobis_normal, mahalanobis_pneumonia])
threshold = np.percentile(mahalanobis_normal, 95)
y_pred = (y_scores > threshold).astype(int)

In [None]:
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score

# 📌 Evaluate Metrics
auc_roc = roc_auc_score(y_true, y_scores)  # AUC-ROC
auc_pr = average_precision_score(y_true, y_scores)  # AUC-PR
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

# 📌 Print Results
print(f"AUC-ROC: {auc_roc:.4f}")
print(f"AUC-PR: {auc_pr:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")


AUC-ROC: 0.9231
AUC-PR: 0.9567
Accuracy: 0.8093
Precision: 0.9593
Recall: 0.7256
F1 Score: 0.8263
