In [None]:
import os
import tensorflow as tf

# Prevent GPU memory pre-allocation
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(f"❌ Could not set memory growth: {e}")

# Optional: force TensorFlow to allow GPU growth
os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "true"

In [None]:
! pip install imutils

import tensorflow as tf
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt
import imutils
import pathlib
import time
import PIL as pil
import shutil
from tqdm import tqdm

import numpy as np
import matplotlib.pyplot as plt
import os
import cv2
import gc
from sklearn.metrics import confusion_matrix, classification_report
import itertools
import joblib



IMAGE_SIZE = (224, 224)
BASE_LR = 2e-5
EPOCH = 20
BATCH_SIZE = 32

In [None]:
!pip install tensorflow

In [None]:
def crop_img(img, image_size=(224, 224)):
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    gray = cv2.GaussianBlur(gray, (3, 3), 0)

    thresh = cv2.threshold(gray, 45, 255, cv2.THRESH_BINARY)[1]
    thresh = cv2.erode(thresh, None, iterations=2)
    thresh = cv2.dilate(thresh, None, iterations=2)

    cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    cnts = imutils.grab_contours(cnts)
    
    if len(cnts) == 0:
        print("Warning: No contours found, returning resized original image.")
        return cv2.resize(img, image_size, interpolation=cv2.INTER_CUBIC)
    
    c = max(cnts, key=cv2.contourArea)

    extLeft = tuple(c[c[:, :, 0].argmin()][0])
    extRight = tuple(c[c[:, :, 0].argmax()][0])
    extTop = tuple(c[c[:, :, 1].argmin()][0])
    extBot = tuple(c[c[:, :, 1].argmax()][0])

    ADD_PIXELS = 0
    new_img = img[extTop[1]-ADD_PIXELS:extBot[1]+ADD_PIXELS, extLeft[0]-ADD_PIXELS:extRight[0]+ADD_PIXELS].copy()
    new_img = cv2.resize(new_img, image_size, interpolation=cv2.INTER_CUBIC)
    return new_img


In [None]:
def image_preprocessing(source_dir, saved_root_dir, image_size=(224, 224), channels=3):
    if not os.path.exists(source_dir):
        raise Exception(f"Source directory: {source_dir} does not exist")
    if not os.path.isdir(source_dir):
        raise Exception(f"Source path: {source_dir} is not a directory")

    if not os.path.exists(saved_root_dir):
        os.makedirs(saved_root_dir)
        
    source_dir_path = pathlib.Path(source_dir)
    
    for p in tqdm(source_dir_path.iterdir(), desc="Processing folders"):
        dir_name = str(p).split("/")[-1]
        for fp in p.iterdir():
            filename = str(fp).split("/")[-1]

            img = tf.io.read_file(str(fp))
            img = tf.image.decode_jpeg(img, channels=channels)
            img = crop_img(img.numpy(), image_size)
            img = pil.Image.fromarray(img)

            saved_dist_dir = os.path.join(saved_root_dir, dir_name)
            if not os.path.exists(saved_dist_dir):
                os.makedirs(saved_dist_dir)

            img_dist_path = os.path.join(saved_dist_dir, filename)
            img.save(img_dist_path)
    print(f"\n✅ All images processed and saved to: {saved_root_dir}")


In [None]:
image_preprocessing(
    "/kaggle/input/brain-tumor-mri-dataset/Training",
    "/kaggle/working/processed/Training",
    image_size=IMAGE_SIZE
)

image_preprocessing(
    "/kaggle/input/brain-tumor-mri-dataset/Testing",
    "/kaggle/working/processed/Testing",
    image_size=IMAGE_SIZE
)


# Datasets

In [None]:
root_dir_path = "/kaggle/working/brain-tumor-mri-dataset"

In [None]:
train_ds = tf.keras.utils.image_dataset_from_directory(
    "/kaggle/working/processed/Training",
    label_mode="categorical",
    batch_size=BATCH_SIZE,
    image_size=IMAGE_SIZE,
    seed=42
)

test_ds = tf.keras.utils.image_dataset_from_directory(
    "/kaggle/working/processed/Testing",
    label_mode="categorical",
    batch_size=BATCH_SIZE,
    image_size=IMAGE_SIZE,
    shuffle=False
)


In [None]:
class_names = train_ds.class_names
print("Class names:", class_names)

cls_to_id = {c: i for i, c in enumerate(class_names)}
id_to_cls = {i: c for i, c in enumerate(class_names)}
print("cls_to_id:", cls_to_id)
print("id_to_cls:", id_to_cls)

In [None]:
with open("class_to_id.txt", "w") as f:
    for k, v in cls_to_id.items():
        f.write(f"{k}\t{v}\n")

with open("id_to_class.txt", "w") as f:
    for k, v in id_to_cls.items():
        f.write(f"{k}\t{v}\n")

In [None]:
with open("class_to_id.txt", "r") as f:
    for line in f.readlines():
        cls, label = line.replace("\n","").split("\t")
        print(cls, int(label))
print("\n")
with open("id_to_class.txt", "r") as f:
    for line in f.readlines():
        label, cls = line.replace("\n","").split("\t")
        print(int(label), cls)

In [None]:
def class_weight_from_one_hot(ds):
    class_labels = []
    if ds.__class__.__name__ == "_BatchDataset":
        ds = ds.unbatch()

    for _, onehot in ds:
        label = tf.argmax(onehot).numpy()
        if label.shape == ():  # scalar
            class_labels.append(label.item())
        else:  # batch
            class_labels.extend([l.item() for l in label])

    unique_classes = np.unique(class_labels)
    class_weights = compute_class_weight(class_weight="balanced", 
                                         classes=unique_classes,
                                         y=class_labels)
    return {i: w for i, w in enumerate(class_weights)}


    
class_weights = class_weight_from_one_hot(train_ds)
print(class_weights)

In [None]:
for images, labels in train_ds.take(1):
    image, label = images[0], labels[0]
    plt.figure()
    plt.imshow(tf.cast(image, tf.uint8))
    plt.title(class_names[tf.argmax(label).numpy()])
    plt.axis("off")
    plt.show()


In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define augmentation for training data
train_datagen = ImageDataGenerator(
    rescale=1./255,          # Normalization
    rotation_range=12,       # Random rotation up to 12 degrees
    zoom_range=0.2,          # Random zoom up to 20%
    horizontal_flip=True     # Random horizontal flip
)

# For validation/testing (only normalization)
test_datagen = ImageDataGenerator(rescale=1./255)

# Create generators using flow_from_directory (no need for dataframes)
train_gen = train_datagen.flow_from_directory(
    "/kaggle/working/processed/Training",
    target_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=True
)

test_gen = test_datagen.flow_from_directory(
    "/kaggle/working/processed/Testing",
    target_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=False
)

classes = list(test_gen.class_indices.keys())
global classes 

# Update your class names from the generator
class_names = list(train_gen.class_indices.keys())
print("Class names:", class_names)
# Update class weight calculation using generator classes
class_weights = compute_class_weight(
    'balanced',
    classes=np.unique(train_gen.classes),
    y=train_gen.classes
)
class_weights = dict(enumerate(class_weights))
print("Class weights:", class_weights)

# Update your visualization code
for images, labels in train_gen:
    plt.figure(figsize=(10, 10))
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i])
        plt.title(class_names[np.argmax(labels[i])])
        plt.axis("off")
    break  # Just show first batch

In [None]:
class GAM(tf.keras.layers.Layer):
    def __init__(self, reduction_ratio=16, **kwargs):
        super(GAM, self).__init__(**kwargs)
        self.reduction_ratio = reduction_ratio

    def build(self, input_shape):
        channels = input_shape[-1]
        self.channel_mlp = tf.keras.Sequential([
            tf.keras.layers.Dense(channels // self.reduction_ratio, activation='relu'),
            tf.keras.layers.Dense(channels)
        ])
        self.spatial_conv = tf.keras.Sequential([
            tf.keras.layers.Conv2D(channels // self.reduction_ratio, 7, padding='same', activation='relu'),
            tf.keras.layers.Conv2D(1, 7, padding='same', activation='sigmoid')
        ])

    def call(self, inputs):
        # Channel attention
        channel_att = tf.reduce_mean(inputs, axis=[1, 2], keepdims=True)
        channel_att = self.channel_mlp(channel_att)

        # Spatial attention
        spatial_att = self.spatial_conv(inputs)

        return inputs * channel_att * spatial_att

class ECA(tf.keras.layers.Layer):
    def __init__(self, gamma=2, b=1, **kwargs):
        super(ECA, self).__init__(**kwargs)
        self.gamma = gamma
        self.b = b

    def build(self, input_shape):
        channels = input_shape[-1]
        self.kernel_size = int(abs((tf.math.log(tf.cast(channels, tf.float32), 2) + self.b) / self.gamma))
        self.kernel_size = self.kernel_size if self.kernel_size % 2 else self.kernel_size + 1
        self.conv = tf.keras.layers.Conv1D(1, kernel_size=self.kernel_size, padding='same', use_bias=False)

    def call(self, inputs):
        # Global Average Pooling (Keep Channel Dimension)
        x = tf.reduce_mean(inputs, axis=[1, 2], keepdims=True)

        # Reshape for Conv1D
        x = tf.squeeze(x, axis=[1, 2])
        x = tf.expand_dims(x, axis=-1)

        # Apply 1D Convolution for Channel Attention
        x = self.conv(x)
        x = tf.sigmoid(x)

        # Reshape to match original input dimensions
        x = tf.reshape(x, [-1, 1, 1, inputs.shape[-1]])

        return inputs * x


# Model 

EfficientNetV2

In [None]:
from tensorflow.keras import regularizers

def create_model():
    inputs = tf.keras.layers.Input(shape=(224, 224, 3))
    
    # Enhanced base model configuration
    base_model = tf.keras.applications.EfficientNetV2B0(
        include_top=False, 
        weights='imagenet',
        input_tensor=inputs
    )
    base_model.trainable = True
    
    # Attention-enhanced flow
    x = base_model(inputs)
    x = GAM(reduction_ratio=32)(x)  # Requires 4D input (batch, height, width, channels)
    x = ECA()(x)
    
    # Enhanced head from Part B
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001)(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    x = tf.keras.layers.Dense(256, activation='relu',
                            kernel_regularizer=regularizers.l2(0.016),
                            activity_regularizer=regularizers.l1(0.006),
                            bias_regularizer=regularizers.l1(0.006))(x)
    x = tf.keras.layers.Dropout(0.45)(x)
    outputs = tf.keras.layers.Dense(4, "softmax")(x)
    
    return tf.keras.Model(inputs, outputs)

 



In [None]:
def create_baseline_model():
    inputs = tf.keras.layers.Input(shape=(224, 224, 3))
    
    # Baseline with Part B enhancements
    base_model = tf.keras.applications.EfficientNetV2B0(
        include_top=False, 
        weights='imagenet',
        input_tensor=inputs
    )
    base_model.trainable = True
    
    x = base_model(inputs)
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001)(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    x = tf.keras.layers.Dense(256, activation='relu',
                            kernel_regularizer=regularizers.l2(0.016),
                            activity_regularizer=regularizers.l1(0.006),
                            bias_regularizer=regularizers.l1(0.006))(x)
    x = tf.keras.layers.Dropout(0.45)(x)
    outputs = tf.keras.layers.Dense(4, "softmax")(x)
    
    return tf.keras.Model(inputs, outputs)

# Training

In [None]:
def build_lrfn(lr_start=2e-5, lr_max=1e-3,
               lr_min=1e-6, lr_rampup_epochs=8,
               lr_sustain_epochs=0, lr_exp_decay=0.8):

    def lrfn(epoch):
        if epoch < lr_rampup_epochs:
            lr = (lr_max - lr_start) / lr_rampup_epochs * epoch + lr_start
        elif epoch < lr_rampup_epochs + lr_sustain_epochs:
            lr = lr_max
        else:
            lr = (lr_max - lr_min) * \
                 lr_exp_decay ** (epoch - lr_rampup_epochs - lr_sustain_epochs) + lr_min
        return lr

    return lrfn

In [None]:
import os
import shutil

# Remove previous best model file if it exists
if os.path.exists("best_initial_model.keras"):
    os.remove("best_initial_model.keras")


In [None]:
class PrintLR(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        lr = self.model.optimizer.lr
        if hasattr(lr, 'numpy'):
            lr = lr.numpy()
        print(f"🔁 Epoch {epoch+1}: Learning rate is {lr:.6f}")


In [None]:
import os
import shutil
import gc
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, LearningRateScheduler, ModelCheckpoint
from tensorflow.keras import backend as K
import tensorflow as tf
from collections import defaultdict

# === Assumes the following are defined ===
# BASE_LR, build_lrfn(), class_weights, train_ds, test_ds, GAM, ECA, PrintLR()

# Learning Rate Schedule
lrfn = build_lrfn()
lr_schedule = LearningRateScheduler(lrfn, verbose=True)

# Base Callbacks
base_cbs = [
    EarlyStopping(patience=10, monitor='val_loss', restore_best_weights=True),
    lr_schedule,
    PrintLR()
]

# Convert tf.data.Dataset to numpy arrays
def dataset_to_numpy(ds):
    data = list(ds.unbatch().as_numpy_iterator())
    images = np.array([x[0] for x in data])
    labels = np.array([x[1] for x in data])
    return images, labels

images, labels = dataset_to_numpy(train_ds)
label_indices = np.argmax(labels, axis=1)

# Trackers
best_model = {"fold": None, "train_accuracy": 0, "val_accuracy": 0, "test_accuracy": 0, "model": None}
accuracies = []
history_storage = defaultdict(list)

# Stratified K-Fold
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

for fold, (train_idx, val_idx) in enumerate(skf.split(images, label_indices)):
    print(f"\n📂 Fold {fold + 1}")

    # Split data
    X_train, X_val = images[train_idx], images[val_idx]
    y_train, y_val = labels[train_idx], labels[val_idx]

    train_fold_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)).batch(8).prefetch(tf.data.AUTOTUNE)
    val_fold_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val)).batch(8).prefetch(tf.data.AUTOTUNE)

    # Unique checkpoint per fold
    checkpoint_path = f"best_attention_fold{fold+1}.weights.h5"
    if os.path.exists(checkpoint_path):
        os.remove(checkpoint_path)

    model_checkpoint = ModelCheckpoint(
        checkpoint_path,
        monitor='val_categorical_accuracy',
        save_best_only=True,
        mode='max',
        save_weights_only=True
    )

    # 🔁 Build and compile attention-based model
    model = create_model()
    model.compile(
        loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.05),
        optimizer=tf.keras.optimizers.Adam(BASE_LR),
        metrics=["categorical_accuracy"]
    )

    # Train
    history = model.fit(
        train_fold_ds,
        validation_data=val_fold_ds,
        epochs=35,
        callbacks=base_cbs + [model_checkpoint],
        class_weight=class_weights,
        verbose=1
    )

    # Load best weights
    model.load_weights(checkpoint_path)

    # Store history
    for metric in history.history:
        history_storage[metric].append(history.history[metric])

    # Evaluate
    test_acc = model.evaluate(test_ds, verbose=0)[1]
    accuracies.append(test_acc)

    if test_acc > best_model["test_accuracy"]:
        best_model = {
            "fold": fold + 1,
            "train_accuracy": max(history.history['categorical_accuracy']),
            "val_accuracy": max(history.history['val_categorical_accuracy']),
            "test_accuracy": test_acc,
            "model": model
        }

    K.clear_session()
    gc.collect()

    print(f"\n✅ Fold {fold + 1} Results:")
    print(f"Test Accuracy: {test_acc:.4f}")

# Final Summary
print("\n🏆 Best Fold:", best_model["fold"])
print(f"Train Accuracy: {best_model['train_accuracy']:.4f}")
print(f"Validation Accuracy: {best_model['val_accuracy']:.4f}")
print(f"Test Accuracy: {best_model['test_accuracy']:.4f}")
print(f"Mean Accuracy: {np.mean(accuracies):.4f} ± {np.std(accuracies):.4f}")

# Save best model weights
best_model["model"].save_weights("best_attention_model.weights.h5")

# Plotting Function
from tensorflow.keras.utils import pad_sequences

def plot_combined_histories(history_storage):
    plt.figure(figsize=(15, 10))
    metrics = ['loss', 'categorical_accuracy', 'val_loss', 'val_categorical_accuracy']
    titles = ['Training Loss', 'Training Accuracy', 'Validation Loss', 'Validation Accuracy']
    color = '#1f77b4'

    for idx, metric in enumerate(metrics):
        plt.subplot(2, 2, idx + 1)
        if metric in history_storage:
            padded = pad_sequences(history_storage[metric], padding='post', dtype='float32')
            for h in padded:
                plt.plot(h, color=color, alpha=0.3)
            plt.plot(np.mean(padded, axis=0), color=color, linestyle='--')
        plt.title(titles[idx])
        plt.xlabel('Epochs')
        plt.ylabel(metric.replace('_', ' ').title())
    plt.tight_layout()
    plt.suptitle('Training Curves Across Folds', y=1.02)
    plt.show()


In [None]:
plot_combined_histories(history_storage)


In [None]:
def plot_accuracy_vs_val_accuracy(history_storage):
    train_acc = history_storage['categorical_accuracy']
    val_acc = history_storage['val_categorical_accuracy']

    from tensorflow.keras.utils import pad_sequences
    train_acc = pad_sequences(train_acc, padding='post', dtype='float32')
    val_acc = pad_sequences(val_acc, padding='post', dtype='float32')

    plt.figure(figsize=(8, 6))
    for i in range(len(train_acc)):
        plt.plot(train_acc[i], label=f'Train Fold {i+1}', linestyle='-')
        plt.plot(val_acc[i], label=f'Val Fold {i+1}', linestyle='--')
    plt.title("Training vs Validation Accuracy")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()
def plot_loss_vs_val_loss(history_storage):
    train_losses = history_storage['loss']
    val_losses = history_storage['val_loss']

    from tensorflow.keras.utils import pad_sequences
    train_losses = pad_sequences(train_losses, padding='post', dtype='float32')
    val_losses = pad_sequences(val_losses, padding='post', dtype='float32')

    plt.figure(figsize=(8, 6))
    for i in range(len(train_losses)):
        plt.plot(train_losses[i], label=f'Train Fold {i+1}', linestyle='-')
        plt.plot(val_losses[i], label=f'Val Fold {i+1}', linestyle='--')
    plt.title("Training vs Validation Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

def plot_confusion_matrix_and_report(model, test_ds, class_names):
    # Convert dataset to numpy
    test_data = list(test_ds.unbatch().as_numpy_iterator())
    X_test = np.array([x[0] for x in test_data])
    y_true_onehot = np.array([x[1] for x in test_data])
    y_true = np.argmax(y_true_onehot, axis=1)

    # Predict
    y_pred_probs = model.predict(X_test, batch_size=8, verbose=0)
    y_pred = np.argmax(y_pred_probs, axis=1)

    # Classification Report
    print("\n📋 Classification Report:\n")
    print(classification_report(y_true, y_pred, target_names=class_names))

    # Confusion Matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.show()


In [None]:
plot_accuracy_vs_val_accuracy(history_storage)
plot_loss_vs_val_loss(history_storage)

# For classification report and confusion matrix
class_names = ['Glioma', 'Meningioma', 'No Tumor', 'Pituitary']  # modify if needed
plot_confusion_matrix_and_report(best_model["model"], test_ds, class_names)


In [None]:
for layer in model.layers:
    print(layer.name)