In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install tensorflow

In [None]:
import os, numpy as np, pandas as pd, tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.applications import EfficientNetB3
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.preprocessing.image import load_img, img_to_array

IMG_SIZE = 300
BATCH_SIZE = 16

BASE_DIR = "/content/drive/MyDrive/BrainTumorDetection"

LOG_DIR   = f"{BASE_DIR}/Logs"
MODEL_DIR = f"{BASE_DIR}/Models"

os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)

In [None]:
@tf.keras.utils.register_keras_serializable()
class FocalLoss(tf.keras.losses.Loss):
    def __init__(self, gamma=2.0, alpha=0.25,
                 reduction=tf.keras.losses.Reduction.SUM_OVER_BATCH_SIZE,
                 name="focal_loss"):
        super().__init__(reduction=reduction, name=name)
        self.gamma = gamma
        self.alpha = alpha

    def call(self, y_true, y_pred):
        y_pred = tf.clip_by_value(y_pred, 1e-7, 1 - 1e-7)
        ce = -y_true * tf.math.log(y_pred)
        fl = self.alpha * tf.pow(1 - y_pred, self.gamma) * ce
        return tf.reduce_mean(tf.reduce_sum(fl, axis=1))

    def get_config(self):
        config = super().get_config()
        config.update({
            "gamma": self.gamma,
            "alpha": self.alpha
        })
        return config

In [None]:
@tf.keras.utils.register_keras_serializable()
def attention_block(x):
    gap = layers.GlobalAveragePooling2D()(x)
    dense = layers.Dense(x.shape[-1]//8, activation="relu")(gap)
    scale = layers.Dense(x.shape[-1], activation="sigmoid")(dense)
    scale = layers.Reshape((1,1,x.shape[-1]))(scale)
    return layers.Multiply()([x, scale])

In [None]:
import pandas as pd
import os

def last_epoch(csv_path):
    # File does not exist
    if not os.path.exists(csv_path):
        return 0

    # File exists but is empty
    if os.path.getsize(csv_path) == 0:
        return 0

    try:
        df = pd.read_csv(csv_path)
        return len(df)
    except Exception:
        # Covers EmptyDataError or corrupted CSV
        return 0

In [None]:
class Stage1MRIGenerator(tf.keras.utils.Sequence):
    def __init__(self, root_dir, augment=False, shuffle=True):
        self.samples = []
        self.augment = augment
        self.shuffle = shuffle

        folders = {f.lower().strip(): f for f in os.listdir(root_dir)}

        no_dir = os.path.join(root_dir, folders[[k for k in folders if "notumor" in k][0]])
        tumor_dir = os.path.join(root_dir, folders[[k for k in folders if "tumor" in k and "no" not in k][0]])

        for img in os.listdir(no_dir):
            self.samples.append((os.path.join(no_dir, img), 0))

        for cls in os.listdir(tumor_dir):
            for img in os.listdir(os.path.join(tumor_dir, cls)):
                self.samples.append((os.path.join(tumor_dir, cls, img), 1))

        self.on_epoch_end()

    def __len__(self):
        return len(self.samples)//BATCH_SIZE

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.samples)

    def __getitem__(self, idx):
        batch = self.samples[idx*BATCH_SIZE:(idx+1)*BATCH_SIZE]
        X, y = [], []

        for path, label in batch:
            img = load_img(path, target_size=(IMG_SIZE, IMG_SIZE))
            img = img_to_array(img)/255.0
            if self.augment and np.random.rand() > 0.5:
                img = tf.image.flip_left_right(img)
            X.append(img)
            y.append(label)

        return np.array(X), tf.keras.utils.to_categorical(y, 2)

In [None]:
def build_stage1_model():
    base = EfficientNetB3(weights="imagenet", include_top=False,
                          input_shape=(IMG_SIZE, IMG_SIZE, 3))
    base.trainable = False

    x = attention_block(base.output)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(256, activation="relu")(x)
    out = layers.Dense(2, activation="softmax")(x)

    return models.Model(base.input, out)

In [None]:
if not os.path.exists(STAGE2_LOG):
    with open(STAGE2_LOG, "w") as f:
        f.write("")

In [None]:
STAGE1_TRAIN = f"{BASE_DIR}/Stage1/Training"
STAGE1_VAL   = f"{BASE_DIR}/Stage1/Validation"

STAGE1_DIR  = f"{MODEL_DIR}/Stage1"
os.makedirs(STAGE1_DIR, exist_ok=True)

STAGE1_BEST = f"{STAGE1_DIR}/best.keras"
STAGE1_FINAL = f"{STAGE1_DIR}/final.keras"
STAGE1_LOG  = f"{LOG_DIR}/stage1.csv"

initial_epoch = last_epoch(STAGE1_LOG)
print("Stage-1 resume from:", initial_epoch)

if os.path.exists(STAGE1_BEST):
    model1 = tf.keras.models.load_model(
        STAGE1_BEST,
        custom_objects={"FocalLoss": FocalLoss, "attention_block": attention_block}
    )
else:
    model1 = build_stage1_model()

model1.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss=FocalLoss(),
    metrics=["accuracy"]
)

callbacks1 = [
    ModelCheckpoint(STAGE1_BEST, save_best_only=True, monitor="val_loss"),
    CSVLogger(STAGE1_LOG, append=True),
    ReduceLROnPlateau(patience=3, factor=0.3),
    EarlyStopping(patience=6, restore_best_weights=True)
]

model1.fit(
    Stage1MRIGenerator(STAGE1_TRAIN, augment=True),
    validation_data=Stage1MRIGenerator(STAGE1_VAL),
    epochs=20,
    initial_epoch=initial_epoch,
    callbacks=callbacks1
)

In [None]:
# ðŸ”¥ Fine-tune last 30 layers
for layer in model1.layers:
    if isinstance(layer, tf.keras.Model):
        backbone = layer

for layer in backbone.layers[-30:]:
    layer.trainable = True

model1.compile(
    optimizer=tf.keras.optimizers.Adam(1e-5),
    loss=FocalLoss(),
    metrics=["accuracy"]
)

model1.fit(
    Stage1MRIGenerator(STAGE1_TRAIN, augment=True),
    validation_data=Stage1MRIGenerator(STAGE1_VAL),
    epochs=30,
    callbacks=callbacks1
)

model1.save(STAGE1_FINAL)

In [None]:
# =========================================================
# Stage-1 Brain Tumor Detection (Tumor vs No Tumor)
# Correct & Safe Evaluation Script
# =========================================================

import os
import cv2
import numpy as np
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# ===================== CONFIG =====================
IMG_SIZE = 300
TEST_DIR = "/content/drive/MyDrive/BrainTumorDetection/Test"
MODEL_PATH = "/content/drive/MyDrive/BrainTumorDetection/Models/Stage1/best.keras"

# ===================== LOAD MODEL =====================
model = tf.keras.models.load_model(MODEL_PATH, compile=False)

# ===================== LOAD TEST DATA =====================
X_test = []
y_test = []

for folder in os.listdir(TEST_DIR):
    folder_path = os.path.join(TEST_DIR, folder)
    if not os.path.isdir(folder_path):
        continue

    # Label: 0 = No Tumor, 1 = Tumor
    label = 0 if folder.lower() == "notumor" else 1

    for img_name in os.listdir(folder_path):
        img_path = os.path.join(folder_path, img_name)

        try:
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
            img = img / 255.0

            X_test.append(img)
            y_test.append(label)

        except Exception as e:
            print("Skipping:", img_path)

X_test = np.array(X_test, dtype=np.float32)
y_test = np.array(y_test, dtype=np.int32)

print("âœ… Total test images:", len(X_test))
print("No Tumor samples:", np.sum(y_test == 0))
print("Tumor samples:", np.sum(y_test == 1))

# ===================== PREDICTION =====================
y_probs = model.predict(X_test, batch_size=32)

# âœ… SAFE prediction handling (sigmoid or softmax)
if len(y_probs.shape) == 2 and y_probs.shape[1] == 2:
    y_pred = np.argmax(y_probs, axis=1)     # Softmax model
else:
    y_pred = (y_probs > 0.5).astype(int).ravel()  # Sigmoid model

# ===================== METRICS =====================
print("\nðŸ“Š Classification Report (Stage-1):")
print(classification_report(
    y_test,
    y_pred,
    target_names=["No Tumor", "Tumor"],
    zero_division=0
))

# ===================== CONFUSION MATRIX =====================
cm = confusion_matrix(y_test, y_pred)
tn, fp, fn, tp = cm.ravel()

plt.figure(figsize=(6, 5))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    cmap="Blues",
    xticklabels=["No Tumor", "Tumor"],
    yticklabels=["No Tumor", "Tumor"]
)
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Stage-1 Confusion Matrix")
plt.show()

# ===================== MEDICAL METRICS =====================
sensitivity = tp / (tp + fn + 1e-7)   # Tumor Recall
specificity = tn / (tn + fp + 1e-7)   # No-Tumor Recall

print(f"ðŸ§  Tumor Sensitivity (Recall): {sensitivity:.4f}")
print(f"ðŸ§  No-Tumor Specificity: {specificity:.4f}")