In [1]:
import os
import pandas as pd
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
from skimage.transform import resize
from tensorflow.keras.models import Model
from tensorflow.keras.layers import ZeroPadding2D, Lambda, Layer, Activation, Conv2DTranspose, Conv2D, MaxPooling2D, Input, Dense, Flatten, BatchNormalization, LeakyReLU, Concatenate, UpSampling2D, Add, GlobalMaxPooling2D, GlobalAveragePooling2D, Reshape, Multiply
from sklearn import metrics
from keras.callbacks import EarlyStopping
from keras.layers import concatenate, AveragePooling2D
import tensorflow as tf
import tensorflow.keras.backend as K

2024-04-18 23:01:10.266293: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
curr_dir = os.getcwd()
data_folder_path = os.path.join(curr_dir, 'dataset')
dataset_path = os.path.join(data_folder_path, 'Brain Tumor Data Set')

healthy_brain_images_path = os.path.join(dataset_path, 'Healthy')
brain_tumor_images_path = os.path.join(dataset_path, 'Brain Tumor')

training_metadata_file_path = os.path.join(data_folder_path, 'metadata.csv')

In [3]:
training_metdata_df = pd.read_csv(training_metadata_file_path)
training_metdata_df = training_metdata_df[training_metdata_df["image"].str.contains("jpg")]
training_metdata_df.head(5)

Unnamed: 0.1,Unnamed: 0,image,class,format,mode,shape
0,0,Cancer (1).jpg,tumor,JPEG,RGB,"(512, 512, 3)"
3,3,Cancer (10).jpg,tumor,JPEG,RGB,"(512, 512, 3)"
5,5,Cancer (100).jpg,tumor,JPEG,RGB,"(512, 512, 3)"
6,6,Cancer (1000).jpg,tumor,JPEG,RGB,"(290, 250, 3)"
7,7,Cancer (1001).jpg,tumor,JPEG,RGB,"(620, 620, 3)"


In [4]:
print(len(training_metdata_df))

4432


In [5]:
new_training_metadata_df = training_metdata_df

In [6]:
# For faster training reduce dataset size
import random

new_training_metadata_df = pd.DataFrame()

non_healthy_data = training_metdata_df[training_metdata_df["class"] == "tumor"]
healthy_data = training_metdata_df[training_metdata_df["class"] == "normal"]
print(len(healthy_data), len(non_healthy_data), len(non_healthy_data) + len(healthy_data))

random_non_healthy_data = non_healthy_data.sample(n=100)
random_healthy_data = healthy_data.sample(n=200)

new_training_metadata_df = pd.concat([random_non_healthy_data, random_healthy_data], ignore_index=True)

2074 2358 4432


In [7]:
TEST_SIZE = 0.2
RANDOM_STATE = 42
EPOCHS = 3
BATCH_SIZE = 32
INPUT_SHAPE = (256, 256, 1)
IMAGE_SZIE = (256, 256)
NUM_CLASSES = 2

In [8]:
train_ids, test_ids = train_test_split(new_training_metadata_df["image"], test_size = TEST_SIZE, random_state = RANDOM_STATE)

In [9]:
len(train_ids), len(test_ids)

(240, 60)

In [10]:
def resize_image(image, size=IMAGE_SZIE):
    resized_image = image.resize(size)
    return resized_image.convert("RGB")

def load_image(image_path):
    return Image.open(image_path)

def rotate_image(image, angle):
    return image.rotate(angle, expand=True)

def flip_image(image):
    return image.transpose(Image.FLIP_LEFT_RIGHT)

def convert_image_to_numpy_array(image):
    return np.array(image)

def rgb_to_grayscale(image):
    return image.convert("L")

def load_images(image_ids):
    X_classification = []
    y_classification = []

    for image_id in image_ids:
        if "Not Cancer" in image_id:
            image_path = healthy_brain_images_path + "/" + image_id
        else:
            image_path = brain_tumor_images_path + "/" + image_id

        image = load_image(image_path)
        image_resized = resize_image(image, IMAGE_SZIE)


        classification_label = new_training_metadata_df[new_training_metadata_df['image'] == image_id]['class'].values[0]
        if classification_label == "tumor":
            classification_label = 1
        else:
            classification_label = 0

        rotated_image_60 = resize_image(rotate_image(image, 60))
        rotated_image_120 = resize_image(rotate_image(image, 120))

        flipped_image_original = flip_image(image_resized)
        flipped_image_60 = flip_image(rotated_image_60)
        flipped_image_120 = flip_image(rotated_image_120)

        # print("Image Resized Shape:", np.array(image_resized).shape)
        # print("Rotated Image 60 Shape:", np.array(rotated_image_60).shape)
        # print("Rotated Image 120 Shape:", np.array(rotated_image_120).shape)
        # print("Flipped Image Original Shape:", np.array(flipped_image_original).shape)
        # print("Flipped Image 60 Shape:", np.array(flipped_image_60).shape)
        # print("Flipped Image 120 Shape:", np.array(flipped_image_120).shape)

        X_classification.extend([convert_image_to_numpy_array(rgb_to_grayscale(image_resized)),
                                 convert_image_to_numpy_array(rgb_to_grayscale(rotated_image_60)),
                                 convert_image_to_numpy_array(rgb_to_grayscale(rotated_image_120)),
                                 convert_image_to_numpy_array(rgb_to_grayscale(flipped_image_original)),
                                 convert_image_to_numpy_array(rgb_to_grayscale(flipped_image_60)),
                                 convert_image_to_numpy_array(rgb_to_grayscale(flipped_image_120))])
        y_classification.extend([classification_label] * 6)

        # X_classification.append(image_resized)
        # y_classification.append(classification_label)

    return np.array(X_classification), np.array(y_classification)

In [11]:
X_train, y_train = load_images(train_ids)

In [12]:
# X_train[0].shape == INPUT_SHAPE

In [13]:
X_test, y_test = load_images(test_ids)

In [14]:
print("Shape of X_train:", X_train.shape)
print("Shape of y_train:", y_train.shape)

Shape of X_train: (1440, 256, 256)
Shape of y_train: (1440,)


In [15]:
print("Shape of X_val:", X_test.shape)
print("Shape of y_val:", y_test.shape)

Shape of X_val: (360, 256, 256)
Shape of y_val: (360,)


In [16]:
early_stopping_callback = EarlyStopping(monitor='accuracy', patience=2, restore_best_weights=True)

In [17]:
# class MeanLayer(Layer):
#     def call(self, inputs):
#         return tf.reduce_mean(inputs, axis=[1, 2], keepdims=True)

# # Define Convolutional Block Attention Module (CBAM)
# def cbam_block(inputs, ratio = 8):
#     # Implement CBAM block here
#     # You can refer to existing implementations of CBAM online or in research papers
#     # Channel attention module
#     x_avg = GlobalAveragePooling2D()(inputs)
#     x_max = GlobalMaxPooling2D()(inputs)
#     x_concat = Concatenate()([x_avg, x_max])
#     x = Dense(units=(K.int_shape(inputs)[-1] // ratio), activation='relu')(x_concat)
#     x = Dense(units=K.int_shape(inputs)[-1], activation='sigmoid')(x)
#     x = Reshape((1, 1, K.int_shape(inputs)[-1]))(x)
#     x = Multiply()([inputs, x])

#     # Spatial attention module
#     y = Conv2D(1, kernel_size=7, padding='same', activation='sigmoid')(inputs)
#     x = Multiply()([x, y])
    
#     return x

# # Define Spatial Pyramid Pooling Fast+ (SPPF+)
# def sppf_plus(inputs):
#     # Implement SPPF+ layer here
#     # You can refer to existing implementations or research papers for guidance
#     pool1 = GlobalAveragePooling2D()(inputs)
#     pool2 = GlobalMaxPooling2D()(inputs)
#     pool3 = MeanLayer()(inputs)
#     squeeze_layer = Lambda(lambda x: tf.squeeze(x, axis=[1, 2]))  # Define squeeze layer
#     pool3 = squeeze_layer(pool3)  # Squeeze dimensions
#     x = Concatenate(axis=-1)([pool1, pool2, pool3])
#     return x

# # Define Bi-directional Feature Pyramid Network (BiFPN)
# def bfpn(*inputs):
#     num_levels = len(inputs)
#     output_tensors = []

#     inputs = list(inputs)

#     inputs[0] = concatenate([inputs[0], inputs[0], inputs[0]], axis=-1)
#     inputs[1] = concatenate([inputs[1], inputs[1], inputs[1]], axis=-1)
#     inputs[2] = Reshape((3, 1, 1024))(inputs[2]) 
#     inputs[2] = AveragePooling2D(pool_size=(3, 1))(inputs[2])

#     # Upsample and merge features from higher levels
#     for i in range(num_levels - 1, 0, -1):
#         x_down = Conv2D(filters=256, kernel_size=1, strides=1, padding='same')(inputs[i])
#         print("Shape after Conv2D down:", x_down.shape)
#         x_up = UpSampling2D(size=(1, 1))(inputs[i - 1])
#         print("Shape after UpSampling2D:", x_up.shape)
#         x_up = Conv2D(filters=256, kernel_size=1, strides=1, padding='same')(x_up)
#         print("Shape after Conv2D up:", x_up.shape)
#         x = Add()([x_down, x_up])
#         output_tensors.append(x)

#     # Downsample and merge features from lower levels
#     for i in range(0, num_levels - 1):
#         x_up = Conv2D(filters=256, kernel_size=1, strides=1, padding='same')(inputs[i])
#         print("Shape after Conv2D up:", x_up.shape)
#         x_down = MaxPooling2D(pool_size=(1, 1))(inputs[i + 1])
#         print("Shape after MaxPooling2D:", x_down.shape)
#         x_down = Conv2D(filters=256, kernel_size=1, strides=1, padding='same')(x_down)
#         print("Shape after Conv2D down:", x_down.shape)
#         x = Add()([x_up, x_down])
#         output_tensors.append(x)
    
#     return output_tensors

# def normalize_feature_map(x):
#     # Apply normalization (e.g., batch normalization)
#     return x  # Placeholder, replace with actual normalization layer if needed

# def conv_block(x, filters, kernel_size=(3, 3), strides=(1, 1), padding='same'):
#     """
#     Convolutional block consisting of Conv2D, BatchNormalization, and ReLU activation.
    
#     Args:
#     - x: Input tensor
#     - filters: Number of filters/kernels for Conv2D layer
#     - kernel_size: Size of the convolutional kernel (default: (3, 3))
#     - strides: Stride size (default: (1, 1))
#     - padding: Padding mode (default: 'same')
    
#     Returns:
#     - Tensor output after passing through the convolutional block
#     """
#     x = Conv2D(filters=filters, kernel_size=kernel_size, strides=strides, padding=padding)(x)
#     x = BatchNormalization()(x)
#     x = Activation('relu')(x)
#     return x


# def residual_block(x, filters, blocks):
#     # First convolutional block with stride 2 for downsampling
#     x = conv_block(x, filters=filters, kernel_size=(2, 2), strides=(2, 2))
    
#     # Residual blocks
#     for _ in range(blocks):
#         shortcut = x
#         x = conv_block(x, filters=filters // 2, kernel_size=(1, 1))
#         x = conv_block(x, filters=filters, kernel_size=(2, 2))
#         x = Add()([shortcut, x])  # Residual connection
    
#     return x


# def darknet53(inputs):
#     x = Conv2D(32, (2, 2), strides=(1, 1), padding='same', activation='relu')(inputs)
#     x = MaxPooling2D(pool_size=(1, 1))(x)
    
#     # Residual blocks
#     x = residual_block(x, filters=64, blocks=1)
#     x = MaxPooling2D(pool_size=(1, 1))(x)
#     x = residual_block(x, filters=128, blocks=2)
#     x = MaxPooling2D(pool_size=(2, 2))(x)
#     x = residual_block(x, filters=256, blocks=8)
#     x = MaxPooling2D(pool_size=(1, 1))(x)
#     x = residual_block(x, filters=512, blocks=8)
#     x = MaxPooling2D(pool_size=(2, 2))(x)
#     x = residual_block(x, filters=1024, blocks=4)
    
#     return x

# # Define YOLOv7 model with CBAM, SPPF+, decoupled heads, and BiFPN
# def yolov7_cbam_sppf_bfpn(input_shape, num_classes):
#     inputs = Input(shape=input_shape)

#     # Backbone network (Darknet-53 or similar)
#     backbone_output = darknet53(inputs)

#     # Apply CBAM block
#     cbam_output = cbam_blockxq(backbone_output)

#     # Apply Spatial Pyramid Pooling Fast+
#     sppf_output = sppf_plus(cbam_output)

#     # Apply Bi-directional Feature Pyramid Network
#     bfpn_outputs = bfpn(backbone_output, cbam_output, sppf_output)

#     # Define decoupled heads for detection
#     output1 = Conv2D(filters=256, kernel_size=2, strides=1, padding='same')(bfpn_outputs[0])
#     output2 = Conv2D(filters=256, kernel_size=2, strides=1, padding='same')(bfpn_outputs[1])
#     output3 = Conv2D(filters=256, kernel_size=2, strides=1, padding='same')(bfpn_outputs[2])

#     model = Model(inputs, [output1, output2, output3])
#     return model

# print(INPUT_SHAPE)
# yolov7_cbam_sppf_bfpn_model = yolov7_cbam_sppf_bfpn(INPUT_SHAPE, NUM_CLASSES)
# # yolov7_cbam_sppf_bfpn_model.summary()

In [18]:
def convolutional(input_layer, filters, kernel_size, downsample=False, activate=True, bn=True):
    if downsample:
        input_layer = ZeroPadding2D(((1, 0), (1, 0)))(input_layer)
        padding = 'valid'
        strides = 2
    else:
        strides = 1
        padding = 'same'
    conv = Conv2D(filters=filters, kernel_size=kernel_size, strides=strides,
                  padding=padding, use_bias=not bn,
                  kernel_regularizer=tf.keras.regularizers.l2(0.0005))(input_layer)
    if bn:
        conv = BatchNormalization()(conv)
    if activate:
        conv = LeakyReLU(alpha=0.1)(conv)
    return conv

def YOLOv7(input_shape=INPUT_SHAPE, num_classes=NUM_CLASSES):
    input_layer = tf.keras.Input(shape=input_shape)

    # YOLOv7 backbone
    x = convolutional(input_layer, filters=32, kernel_size=3)
    x = convolutional(x, filters=64, kernel_size=3, downsample=True)

    # Add more convolutional layers as needed

    # YOLOv7 head
    x = convolutional(x, filters=128, kernel_size=3)
    x = convolutional(x, filters=256, kernel_size=3)
    x = convolutional(x, filters=512, kernel_size=3)
    x = convolutional(x, filters=256, kernel_size=3)
    x = convolutional(x, filters=512, kernel_size=3)
    x = convolutional(x, filters=256, kernel_size=3)

    x = GlobalAveragePooling2D()(x)

    # Output layer
    output = Dense(1, activation='sigmoid')(x)

    model = Model(inputs=input_layer, outputs=output)
    return model

# Create YOLOv7 model
yolo_v7_model = YOLOv7()

# Print model summary
yolo_v7_model.summary()




In [19]:
yolo_v7_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [20]:
yolo_v7_model.fit(X_train, y_train, epochs=EPOCHS, batch_size = BATCH_SIZE , callbacks=[early_stopping_callback])

Epoch 1/3
[1m20/45[0m [32m━━━━━━━━[0m[37m━━━━━━━━━━━━[0m [1m53:40[0m 129s/step - accuracy: 0.5961 - loss: 1.5543

KeyboardInterrupt: 

In [None]:
y_pred = yolo_v7_model.predict(X_test)

[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m55s[0m 9s/step


In [None]:
y_pred_class = (y_pred > 0.5).astype(int)
# y_pred_class

In [None]:
accuracy = metrics.accuracy_score(y_test, y_pred_class)
precision = metrics.precision_score(y_test, y_pred_class)
recall = metrics.recall_score(y_test, y_pred_class)
f1_score = metrics.f1_score(y_test, y_pred_class)
auc_score = metrics.roc_auc_score(y_test, y_pred_class)

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")
print(f"AUC Score: {auc_score}")

Accuracy: 0.3333333333333333
Precision: 0.3333333333333333
Recall: 1.0
F1 Score: 0.5
AUC Score: 0.5


In [None]:
confusion_matrix = metrics.confusion_matrix(y_test, y_pred_class)
print(confusion_matrix)

[[  0 120]
 [  0  60]]
