In [9]:
import os
import glob
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras import backend as K


In [17]:
base_dir = '/kaggle/input/brain-tumor-dataset/Data'

subdirs = [os.path.join(base_dir, d) for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]

image_mask_pairs = []

for subdir in subdirs:
    all_files = sorted(glob.glob(os.path.join(subdir, "*.tif")))

    image_paths = [f for f in all_files if "_mask" not in f]
    mask_paths = [f for f in all_files if "_mask" in f]
    for img in image_paths:
        corresponding_mask = img.replace('.tif', '_mask.tif')
        if corresponding_mask in mask_paths:
            image_mask_pairs.append((img, corresponding_mask))
        else:
            print(f"Warning: No corresponding mask found for {img}")


In [16]:
def apply_clahe(image_path):
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    enhanced_image = clahe.apply(image)
    
    return enhanced_image

def normalize_image(image):
    return image / 255.0

def preprocess_image_mask(image_path, mask_path):

    image = apply_clahe(image_path)

    image = normalize_image(image)

    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

    if image is None:
        print(f"Error reading image: {image_path}")
    if mask is None:
        print(f"Error reading mask: {mask_path}")
    
    return image, mask

In [15]:
preprocessed_data = []

for image_path, mask_path in image_mask_pairs:
    image, mask = preprocess_image_mask(image_path, mask_path)
    if image is not None and mask is not None:
        preprocessed_data.append((image, mask))

print(f"Total preprocessed data: {len(preprocessed_data)}")

Total preprocessed data: 3929


In [14]:
from sklearn.model_selection import train_test_split

# Split the dataset into 80% training and 20% testing sets.
if len(preprocessed_data) > 0:
    train_data, test_data = train_test_split(preprocessed_data, test_size=0.2, random_state=42)
    print(f"Training set size: {len(train_data)}, Testing set size: {len(test_data)}")
else:
    print("No data available for splitting.")   

Training set size: 3143, Testing set size: 786


In [13]:
# Nested U-Net (U-Net++)
def conv_block(x, filters, kernel_size=3, activation='relu', padding='same'):
    x = layers.Conv2D(filters, kernel_size, activation=activation, padding=padding)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(filters, kernel_size, activation=activation, padding=padding)(x)
    x = layers.BatchNormalization()(x)
    return x

def upsample_block(x, skip_connection, filters):
    x = layers.Conv2DTranspose(filters, kernel_size=2, strides=2, padding='same')(x)
    x = layers.Concatenate()([x, skip_connection])
    return x

def nested_unet(input_shape, num_classes):
    inputs = layers.Input(input_shape)
    
    
    c1 = conv_block(inputs, 64)
    p1 = layers.MaxPooling2D((2, 2))(c1)
    
    c2 = conv_block(p1, 128)
    p2 = layers.MaxPooling2D((2, 2))(c2)
    
    c3 = conv_block(p2, 256)
    p3 = layers.MaxPooling2D((2, 2))(c3)
    
    c4 = conv_block(p3, 512)
    p4 = layers.MaxPooling2D((2, 2))(c4)
    
    c5 = conv_block(p4, 1024)
    
    u4 = upsample_block(c5, c4, 512)
    u4 = conv_block(u4, 512)
    
    u3 = upsample_block(u4, c3, 256)
    u3 = conv_block(u3, 256)
    
    u2 = upsample_block(u3, c2, 128)
    u2 = conv_block(u2, 128)
    
    u1 = upsample_block(u2, c1, 64)
    u1 = conv_block(u1, 64)
    
    outputs = layers.Conv2D(num_classes, kernel_size=1, activation='sigmoid')(u1)
    
    model = Model(inputs, outputs)
    return model

input_shape = (256, 256, 1)  
num_classes = 1
nested_unet_model = nested_unet(input_shape, num_classes)
nested_unet_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
nested_unet_model.summary()

In [12]:
def conv_block(x, filters, kernel_size=3, activation='relu', padding='same'):
    x = layers.Conv2D(filters, kernel_size, activation=activation, padding=padding)(x)
    x = layers.BatchNormalization()(x)
    return x

def upsample_block(x, skip_connection, filters):
    x = layers.Conv2DTranspose(filters, kernel_size=2, strides=2, padding='same')(x)
    x = layers.Concatenate()([x, skip_connection])
    return x

def attention_block(x, g, filters):
    theta_x = layers.Conv2D(filters, kernel_size=1, padding='same')(x)
    phi_g = layers.Conv2D(filters, kernel_size=1, padding='same')(g)

    if theta_x.shape[1:] != phi_g.shape[1:]:
        theta_x = layers.Conv2D(filters, kernel_size=3, padding='same')(theta_x)

    add_xg = layers.Add()([theta_x, phi_g])
    relu_xg = layers.Activation('relu')(add_xg)
    psi = layers.Conv2D(1, kernel_size=1, padding='same')(relu_xg)
    psi = layers.Activation('sigmoid')(psi)
    return layers.Multiply()([x, psi])

def attention_unet(input_shape, num_classes):
    inputs = layers.Input(input_shape)

    c1 = conv_block(inputs, 64)
    p1 = layers.MaxPooling2D((2, 2))(c1)
    
    c2 = conv_block(p1, 128)
    p2 = layers.MaxPooling2D((2, 2))(c2)
    
    c3 = conv_block(p2, 256)
    p3 = layers.MaxPooling2D((2, 2))(c3)
    
    c4 = conv_block(p3, 512)
    p4 = layers.MaxPooling2D((2, 2))(c4)
    
    c5 = conv_block(p4, 1024)
    
    u4 = upsample_block(c5, c4, 512)
    u4 = attention_block(c4, u4, 512)
    u4 = conv_block(u4, 512)
    
    u3 = upsample_block(u4, c3, 256)
    u3 = attention_block(c3, u3, 256)
    u3 = conv_block(u3, 256)
    
    u2 = upsample_block(u3, c2, 128)
    u2 = attention_block(c2, u2, 128)
    u2 = conv_block(u2, 128)
    
    u1 = upsample_block(u2, c1, 64)
    u1 = attention_block(c1, u1, 64)
    u1 = conv_block(u1, 64)
    
    outputs = layers.Conv2D(num_classes, kernel_size=1, activation='sigmoid')(u1)
    
    model = Model(inputs, outputs)
    return model

input_shape = (256, 256, 1)  
num_classes = 1 
attention_unet_model = attention_unet(input_shape, num_classes)
attention_unet_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
attention_unet_model.summary()

In [7]:
def preprocess_for_training(data):
    images = np.array([pair[0] for pair in data])
    masks = np.array([pair[1] for pair in data])
    
    images = np.expand_dims(images, axis=-1)  
    masks = np.expand_dims(masks, axis=-1)  
    
    return images, masks

train_images, train_masks = preprocess_for_training(train_data)
test_images, test_masks = preprocess_for_training(test_data)

history_nested_unet = nested_unet_model.fit(
    train_images, train_masks, 
    validation_data=(test_images, test_masks), 
    epochs=5, batch_size=8
)

history_attention_unet = attention_unet_model.fit(
    train_images, train_masks, 
    validation_data=(test_images, test_masks), 
    epochs=5, batch_size=8
)

Epoch 1/5


I0000 00:00:1727762887.310981     108 service.cc:145] XLA service 0x79114404dce0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1727762887.311040     108 service.cc:153]   StreamExecutor device (0): Tesla T4, Compute Capability 7.5
I0000 00:00:1727762887.311044     108 service.cc:153]   StreamExecutor device (1): Tesla T4, Compute Capability 7.5
I0000 00:00:1727762940.948112     108 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m393/393[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m331s[0m 677ms/step - accuracy: 0.6964 - loss: -135.2944 - val_accuracy: 0.1569 - val_loss: -2665.7407
Epoch 2/5
[1m393/393[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m194s[0m 495ms/step - accuracy: 0.1139 - loss: -802.7806 - val_accuracy: 0.9879 - val_loss: 116.0619
Epoch 3/5
[1m393/393[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m193s[0m 492ms/step - accuracy: 0.0017 - loss: -2123.7454 - val_accuracy: 0.9607 - val_loss: -1710.7017
Epoch 4/5
[1m393/393[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m193s[0m 491ms/step - accuracy: 8.2560e-05 - loss: -4237.8291 - val_accuracy: 0.0257 - val_loss: -2389.5696
Epoch 5/5
[1m393/393[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m193s[0m 492ms/step - accuracy: 1.8750e-05 - loss: -7251.7822 - val_accuracy: 2.6600e-04 - val_loss: -12855.8096
Epoch 1/5
[1m393/393[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m156s[0m 340ms/step - accuracy: 0.6804 - loss: -88.4792 - val_accur

In [10]:
def dice_coefficient(y_true, y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection) / (K.sum(y_true_f) + K.sum(y_pred_f))
nested_unet_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[dice_coefficient])
attention_unet_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[dice_coefficient])


nested_unet_dice = nested_unet_model.evaluate(test_images, test_masks, batch_size=8)[1] 
attention_unet_dice = attention_unet_model.evaluate(test_images, test_masks, batch_size=8)[1]

print(f"Nested U-Net DICE Score: {nested_unet_dice}")
print(f"Attention U-Net DICE Score: {attention_unet_dice}")


TypeError: Input 'y' of 'Mul' Op has type float32 that does not match type uint8 of argument 'x'.

In [11]:
nested_unet_model.save('nested_unet_model.h5')

attention_unet_model.save('attention_unet_model.h5')