In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
labels = ['glioma','meningioma','notumor','pituitary']

In [None]:
train_dir = '/content/drive/MyDrive/brain-tumor-mri-dataset/Training'
test_dir = '/content/drive/MyDrive/brain-tumor-mri-dataset/Testing'

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tqdm import tqdm
from skimage.transform import resize

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, GlobalAveragePooling2D
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adamax
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
import os
os.path.exists(train_dir)

True

In [None]:
image_size = 224
batch_size = 16

# Load dataset
def load_images_in_batches_and_accumulate(folder, labels, image_size, batch_size):

    X_all = []
    Y_all = []

    for label in labels:
        path = os.path.join(folder, label)
        class_num = labels.index(label)

        X_batch = []
        Y_batch = []

        for img in os.listdir(path):
            try:
                img_array = plt.imread(os.path.join(path, img))
                img_resized = resize(img_array, (image_size, image_size, 3))
                X_batch.append(img_resized)
                Y_batch.append(class_num)

                # When the batch size is reached, accumulate the batch
                if len(X_batch) == batch_size:
                    X_all.extend(X_batch)
                    Y_all.extend(Y_batch)
                    X_batch = []  # Reset the batch
                    Y_batch = []

            except Exception as e:
                print(f"Error loading image {img}: {e}")
        if X_batch:
            X_all.extend(X_batch)
            Y_all.extend(Y_batch)

    return np.array(X_all), np.array(Y_all)

In [None]:
X_train, Y_train = load_images_in_batches_and_accumulate(train_dir, labels, image_size, batch_size)
#X_test, Y_test = load_images_from_folder(test_dir, labels, image_size)

In [None]:
X_train.shape

(1603, 224, 224, 3)

In [None]:
Y_train.shape

(2760,)

In [None]:
pd.Series(Y_train).value_counts()

Unnamed: 0,count
2,825
3,704
1,681
0,550


In [None]:
pd.Series(Y_test).value_counts()

Unnamed: 0,count
2,405
1,306
0,300
3,300


In [None]:
X_train

array([[[[1.24310547e-05, 1.24310547e-05, 1.24310547e-05],
         [1.24310547e-05, 1.24310547e-05, 1.24310547e-05],
         [1.79922181e-05, 1.79922181e-05, 1.79922181e-05],
         ...,
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00]],

        [[3.02108561e-03, 3.02108561e-03, 3.02108561e-03],
         [3.02108561e-03, 3.02108561e-03, 3.02108561e-03],
         [3.01887104e-03, 3.01887104e-03, 3.01887104e-03],
         ...,
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00]],

        [[7.24970710e-03, 7.24970710e-03, 7.24970710e-03],
         [7.24970710e-03, 7.24970710e-03, 7.24970710e-03],
         [7.24572879e-03, 7.24572879e-03, 7.24572879e-03],
         ...,
         [0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
         [

In [None]:
batch_output

<tf.Tensor: shape=(1, 196, 768), dtype=float32, numpy=
array([[[ 4.6186987e-03,  4.0146271e-03, -5.9993938e-04, ...,
          2.1843147e-04,  4.0796408e-04,  1.9597629e-04],
        [ 3.9805588e-03,  3.5137141e-03, -1.1164562e-03, ...,
         -7.4214675e-04, -1.6772603e-04,  3.4416828e-04],
        [ 3.1663205e-03,  3.1812154e-03, -6.8177812e-04, ...,
         -6.6990906e-04,  4.6938175e-04,  4.2922876e-04],
        ...,
        [ 4.3255175e-03,  6.6649085e-03,  4.0866029e-03, ...,
         -2.9408564e-03, -1.2645121e-03, -8.5679383e-04],
        [ 2.4156258e-03,  3.2950011e-03,  3.7042063e-04, ...,
         -1.3110299e-03,  1.0296499e-05,  1.3466798e-03],
        [ 2.3011900e-03,  3.1344569e-03, -1.6789365e-04, ...,
         -2.0836508e-03,  3.0530509e-04,  9.5824519e-04]]], dtype=float32)>

In [None]:
from tensorflow.keras import layers
from tensorflow.keras.layers import Layer

In [None]:
class Patchembedding(layers.Layer):
    def __init__(self,
                 in_channels:int=3,
                 patch_size:int=16,
                 embedding_dim:int=768):
        super().__init__()

        self.in_channels = in_channels
        self.patch_size = patch_size
        self.embedding_dim = embedding_dim

        self.patcher = layers.Conv2D(filters=self.embedding_dim,
                                      kernel_size=self.patch_size,
                                      strides=self.patch_size,
                                      padding='valid')
        self.flatten = layers.Flatten()

    def call(self, x):
        # Apply the convolutional layer to extract patches and project
        x = self.patcher(x)  #  (batch_size, new_height, new_width, embedding_dim)

        # Flatten
        x = tf.reshape(x, (tf.shape(x)[0], -1, self.embedding_dim))

        return x


In [None]:
patchify = Patchembedding(in_channels=3,
                          patch_size=16,
                          embedding_dim=768)

In [None]:
batch_size = 1  # You can adjust this based on your GPU memory capacity
dataset = tf.data.Dataset.from_tensor_slices(X_train)
dataset = dataset.batch(batch_size)

# Iterate over the dataset in batches and process each batch
for batch in dataset:
    batch_output = patchify(batch)
    print(f"Processed batch of shape: {batch_output.shape}")



Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)
Processed batch of shape: (1, 196, 768)


In [None]:
print(batch_output)

tf.Tensor(
[[[ 4.6186987e-03  4.0146271e-03 -5.9993938e-04 ...  2.1843147e-04
    4.0796408e-04  1.9597629e-04]
  [ 3.9805588e-03  3.5137141e-03 -1.1164562e-03 ... -7.4214675e-04
   -1.6772603e-04  3.4416828e-04]
  [ 3.1663205e-03  3.1812154e-03 -6.8177812e-04 ... -6.6990906e-04
    4.6938175e-04  4.2922876e-04]
  ...
  [ 4.3255175e-03  6.6649085e-03  4.0866029e-03 ... -2.9408564e-03
   -1.2645121e-03 -8.5679383e-04]
  [ 2.4156258e-03  3.2950011e-03  3.7042063e-04 ... -1.3110299e-03
    1.0296499e-05  1.3466798e-03]
  [ 2.3011900e-03  3.1344569e-03 -1.6789365e-04 ... -2.0836508e-03
    3.0530509e-04  9.5824519e-04]]], shape=(1, 196, 768), dtype=float32)


In [None]:
class GlobalResponseNorm(layers.Layer):
    def __init__(self, epsilon=1e-6):
        super(GlobalResponseNorm, self).__init__()
        self.epsilon = epsilon

    def call(self, inputs):
        # Compute the mean and variance across the patches
        mean = tf.reduce_mean(inputs, axis=1, keepdims=True)
        variance = tf.reduce_mean(tf.square(inputs - mean), axis=1, keepdims=True)

        # Normalize the inputs
        normed = (inputs - mean) / tf.sqrt(variance + self.epsilon)
        return normed

class MLP(tf.keras.Model):
    def __init__(self, input_dim=768, hidden_units1=256, hidden_units2=128, dropout_rate1=0.1, dropout_rate2=0.1):
        super(MLP, self).__init__()

        # Linear Transformation Layer 1
        self.dense1 = layers.Dense(hidden_units1)
        # Activation Layer
        self.activation = layers.Activation('relu')
        # Dropout Layer 1
        self.dropout1 = layers.Dropout(dropout_rate1)
        # Global Response Normalization
        self.grn = GlobalResponseNorm()
        # Linear Transformation Layer 2
        self.dense2 = layers.Dense(hidden_units2)
        # Dropout Layer 2
        self.dropout2 = layers.Dropout(dropout_rate2)
        # Output Layer
        self.output_layer = layers.Dense(1)

    def call(self, inputs, training=False):

        batch_size = tf.shape(inputs)[0]
        num_patches = tf.shape(inputs)[1]


        x = tf.reshape(inputs, (batch_size * num_patches, -1))  # Shape: [batch_size * num_patches, 768]

        x = self.dense1(x)                 # Linear Transformation Layer 1
        x = self.activation(x)              # Activation Layer
        x = self.dropout1(x, training=training) # Dropout Layer 1
        x = self.grn(x)                     # Global Response Normalization
        x = self.dense2(x)                  # Linear Transformation Layer 2
        x = self.dropout2(x, training=training) # Dropout Layer 2

        # Output Layer
        output = self.output_layer(x)


        output = tf.reshape(output, (batch_size, num_patches, -1))  # Shape: [batch_size, num_patches, output_dim]

        return output


In [None]:

class DropPath(layers.Layer):
    """ Drop Path layer for regularization. """
    def __init__(self, drop_prob=0.0):
        super().__init__()
        self.drop_prob = drop_prob

    def call(self, x):
        if self.drop_prob == 0.0:
            return x
        keep_prob = 1 - self.drop_prob
        # Apply Drop Path
        mask = tf.random.uniform(tf.shape(x)[:1]) < keep_prob
        x = tf.where(mask, x / keep_prob, tf.zeros_like(x))
        return x

class ConvNeXtV2Block(tf.keras.Model):
    def __init__(self, drop_prob=0.0):
        super(ConvNeXtV2Block, self).__init__()
        self.depthwise_conv = layers.DepthwiseConv2D(kernel_size=7, padding='same', activation=None)
        self.layer_norm1 = layers.LayerNormalization(epsilon=1e-6)
        self.pointwise_conv1 = layers.Conv2D(filters=256, kernel_size=1, activation=None)
        self.gelu = layers.Activation('gelu')
        self.grn = GlobalResponseNorm()
        self.pointwise_conv2 = layers.Conv2D(filters=768, kernel_size=1, activation=None)
        self.dropout = layers.Dropout(drop_prob)

    def call(self, inputs, training=False):

        x = tf.expand_dims(inputs, axis=2)  # Shape: (batch_size, num_patches, 1, features)

        x = self.depthwise_conv(x)  # Perform Depthwise Convolution
        x = self.layer_norm1(x)
        x = self.pointwise_conv1(x)
        x = self.gelu(x)
        x = self.grn(x)
        x = self.pointwise_conv2(x)
        x = self.dropout(x, training=training)

        return x

In [None]:
class WindowPartition(layers.Layer):
    def __init__(self, window_size):
        super().__init__()
        self.window_size = window_size

    def call(self, x):
        batch_size, height, width, channels = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2], tf.shape(x)[3]
        # Reshape the input to create windows
        x = tf.reshape(x, (batch_size, height // self.window_size, self.window_size, width // self.window_size, self.window_size, channels))
        x = tf.transpose(x, perm=[0, 1, 3, 2, 4, 5])
        x = tf.reshape(x, (batch_size, -1, self.window_size * self.window_size, channels))  # Shape: (batch_size, num_windows, window_size * window_size, channels)
        return x

class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.embed_dim = embed_dim
        self.depth = embed_dim // num_heads

        self.wq = layers.Dense(embed_dim)
        self.wk = layers.Dense(embed_dim)
        self.wv = layers.Dense(embed_dim)
        self.dense = layers.Dense(embed_dim)

    def call(self, x):
        batch_size = tf.shape(x)[0]


        q = self.wq(x)
        k = self.wk(x)
        v = self.wv(x)


        q = tf.reshape(q, (batch_size, -1, self.num_heads, self.depth))
        k = tf.reshape(k, (batch_size, -1, self.num_heads, self.depth))
        v = tf.reshape(v, (batch_size, -1, self.num_heads, self.depth))


        q = tf.transpose(q, perm=[0, 2, 1, 3])
        k = tf.transpose(k, perm=[0, 2, 1, 3])
        v = tf.transpose(v, perm=[0, 2, 1, 3])


        logits = tf.matmul(q, k, transpose_b=True) / tf.sqrt(tf.cast(self.depth, tf.float32))
        attention_weights = tf.nn.softmax(logits, axis=-1)

        output = tf.matmul(attention_weights, v)  # (batch_size, num_heads, num_windows, window_size * window_size, depth)
        output = tf.transpose(output, perm=[0, 2, 1, 3])  # Shape: (batch_size, num_windows, num_heads, window_size * window_size, depth)
        output = tf.reshape(output, (batch_size, -1, self.embed_dim))  # Shape: (batch_size, num_windows, embed_dim)

        return self.dense(output)  # Shape: (batch_size, num_windows, embed_dim)

class FeedForwardNetwork(layers.Layer):
    def __init__(self, embed_dim, hidden_dim):
        super().__init__()
        self.dense1 = layers.Dense(hidden_dim, activation='relu')
        self.dense2 = layers.Dense(embed_dim)

    def call(self, x):
        return self.dense2(self.dense1(x))

class WindowReverse(layers.Layer):
    def __init__(self, window_size):
        super().__init__()
        self.window_size = window_size

    def call(self, x, original_shape):
        batch_size, num_windows, _, channels = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2], tf.shape(x)[3]
        height, width = original_shape[1], original_shape[2]


        x = tf.reshape(x, (batch_size, height // self.window_size, width // self.window_size, self.window_size, self.window_size, channels))
        x = tf.transpose(x, perm=[0, 1, 3, 2, 4, 5])  # Shape: (batch_size, num_rows, window_size, num_cols, window_size, channels)
        x = tf.reshape(x, (batch_size, height, width, channels))  # Shape: (batch_size, height, width, channels)

        return x

class BlockAttentionModule(layers.Layer):
    def __init__(self, embed_dim, num_heads, hidden_dim, window_size):
        super().__init__()
        self.window_partition = WindowPartition(window_size)
        self.msa = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = FeedForwardNetwork(embed_dim, hidden_dim)
        self.window_reverse = WindowReverse(window_size)

    def call(self, x):
        original_shape = tf.shape(x)

        # Step 1: Window Partitioning
        x_windows = self.window_partition(x)

        # Step 2: Multi-Head Self-Attention
        msa_output = self.msa(x_windows)

        # Step 3: Residual Connection for MSA
        x += msa_output

        # Step 4: Feed Forward Network
        ffn_output = self.ffn(x)

        # Step 5: Residual Connection for FFN
        x += ffn_output

        # Step 6: Window Reverse
        output = self.window_reverse(x, original_shape)

        return output


In [None]:
class GridPartitioning(layers.Layer):
    def __init__(self, grid_size):
        super().__init__()
        self.grid_size = grid_size

    def call(self, x):
        batch_size, height, width, channels = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2], tf.shape(x)[3]
        # Reshape the input to perform grid partitioning
        grid_h = height // self.grid_size
        grid_w = width // self.grid_size

        x = tf.reshape(x, (batch_size, grid_h, self.grid_size, grid_w, self.grid_size, channels))
        return tf.transpose(x, perm=[0, 1, 3, 2, 4, 5])  # Shape: (batch_size, grid_h, grid_w, grid_size, grid_size, channels)

class GridMSA(layers.Layer):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.embed_dim = embed_dim
        self.depth = embed_dim // num_heads

        self.wq = layers.Dense(embed_dim)
        self.wk = layers.Dense(embed_dim)
        self.wv = layers.Dense(embed_dim)
        self.dense = layers.Dense(embed_dim)

    def call(self, x):
        batch_size = tf.shape(x)[0]
        num_grids = tf.shape(x)[1] * tf.shape(x)[2]

        # Reshape for multi-head self-attention
        x = tf.reshape(x, (batch_size, num_grids, -1, self.embed_dim))

        q = self.wq(x)  # Shape: (batch_size, num_grids, num_patches, embed_dim)
        k = self.wk(x)
        v = self.wv(x)

        q = tf.reshape(q, (batch_size, num_grids, -1, self.num_heads, self.depth))
        k = tf.reshape(k, (batch_size, num_grids, -1, self.num_heads, self.depth))
        v = tf.reshape(v, (batch_size, num_grids, -1, self.num_heads, self.depth))

        q = tf.transpose(q, perm=[0, 1, 3, 2, 4])  # Shape: (batch_size, num_grids, num_heads, num_patches, depth)
        k = tf.transpose(k, perm=[0, 1, 3, 2, 4])
        v = tf.transpose(v, perm=[0, 1, 3, 2, 4])

        # Scaled Dot-Product Attention
        logits = tf.matmul(q, k, transpose_b=True) / tf.sqrt(tf.cast(self.depth, tf.float32))
        attention_weights = tf.nn.softmax(logits, axis=-1)

        output = tf.matmul(attention_weights, v)  # Shape: (batch_size, num_grids, num_heads, num_patches, depth)
        output = tf.transpose(output, perm=[0, 1, 3, 2, 4])  # Shape: (batch_size, num_grids, num_patches, num_heads, depth)
        output = tf.reshape(output, (batch_size, -1, self.embed_dim))  # Shape: (batch_size, num_grids, embed_dim)

        return self.dense(output)

class FeedForwardNetwork(layers.Layer):
    def __init__(self, embed_dim, hidden_dim):
        super().__init__()
        self.fc1 = layers.Dense(hidden_dim, activation='gelu')
        self.fc2 = layers.Dense(embed_dim)

    def call(self, x):
        return self.fc2(self.fc1(x))

class GridAttentionBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, hidden_dim, grid_size):
        super().__init__()
        self.grid_partitioning = GridPartitioning(grid_size)
        self.grid_msa = GridMSA(embed_dim, num_heads)
        self.ffn = FeedForwardNetwork(embed_dim, hidden_dim)
        self.dense_residual = layers.Dense(embed_dim)

    def call(self, x):
        # Step 1: Grid Partitioning
        grid_output = self.grid_partitioning(x)

        # Step 2: Grid MSA
        msa_output = self.grid_msa(grid_output)

        # Step 3: Add residual connection
        msa_residual = x + msa_output

        # Step 4: Feed Forward Network
        ffn_output = self.ffn(msa_residual)

        # Step 5: Add residual connection from the input + FFN output
        final_output = msa_residual + ffn_output
        return tf.reshape(final_output, tf.shape(x))

In [None]:
class TransformerEncoder(tf.keras.Model):
    def __init__(self, input_dim=768, drop_prob=0.0, conv_filters=256,
                 embed_dim=768, num_heads=8, hidden_dim=512, window_size=7, grid_size=7):
        super(TransformerEncoder, self).__init__()
        # Initialize ConvNeXtV2Block
        self.convnext_block = ConvNeXtV2Block(drop_prob=drop_prob)

        # Initialize Block Attention Module with required parameters
        self.block_attention = BlockAttentionModule(embed_dim=embed_dim,
                                                    num_heads=num_heads,
                                                    hidden_dim=hidden_dim,
                                                    window_size=window_size)

        self.grid_attention = GridAttentionBlock(embed_dim=embed_dim,
                                                  num_heads=num_heads,
                                                  hidden_dim=hidden_dim,
                                                  grid_size=grid_size)# Ensure this is correctly defined

        self.mlp = MLP(input_dim)


    def call(self, inputs, training=False):
        # Process through ConvNeXtV2 Block
        x = self.convnext_block(inputs, training=training)

        # Residual connection after ConvNeXtV2 Block
        x_res = x + inputs

        # Process through Block Attention
        x_block_attention = self.block_attention(x_res)

        # Residual connection after Block Attention
        x_res_block = x_block_attention + x_res

        # Process through Grid Attention
        x_grid_attention = self.grid_attention(x_res_block)

        # Residual connection after Grid Attention
        x_res_grid = x_grid_attention + x_res_block

        # Pass through MLP
        output = self.mlp(x_res_grid, training=training)

        return output

In [None]:
model = TransformerEncoder()
model.build((None, 196, 768))
model.summary()



In [None]:

input = tf.random.normal((1, 196, 768))
model = TransformerEncoder()
output = model(input)
print(f"Output shape: {output.shape}")
