In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import zipfile
import cv2
from skimage import io
import tensorflow as tf
from tensorflow.python.keras import Sequential
from tensorflow.keras import layers, optimizers
from tensorflow.keras.applications import DenseNet121
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras.utils import plot_model
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint, LearningRateScheduler
from IPython.display import display
from tensorflow.keras import backend as K
from sklearn.preprocessing import StandardScaler, normalize
import os
import glob
import random

In [None]:
%cd "D:\project\Dataset"

In [None]:
brain_dataframe = pd.read_csv('data_mask.csv')

In [None]:
brain_dataframe.info()

In [None]:
brain_dataframe.head()

In [None]:
brain_dataframe.mask_path[1]

In [None]:
brain_dataframe.image_path[1]

In [None]:
brain_dataframe[:5]

In [None]:
brain_dataframe['mask'].value_counts().index

In [None]:
import plotly.graph_objects as go

fig = go.Figure([go.Bar(x = brain_dataframe['mask'].value_counts().index, y = brain_dataframe['mask'].value_counts())])
fig.update_traces(marker_color = 'rgb(51,51,255)', marker_line_color = 'rgb(0,0,0)',marker_line_width = 2, opacity = 0.6)
fig.show()

In [None]:
round((brain_dataframe['mask'].value_counts()* 100 / len(brain_dataframe['mask'])),2)

In [None]:
brain_dataframe.mask_path[:5]

In [None]:
brain_dataframe.image_path[:5]

In [None]:
plt.figure(figsize=(10,10))
plt.imshow(cv2.imread(brain_dataframe.mask_path[2490]))
plt.show()

In [None]:
plt.figure(figsize=(10,10))
plt.imshow(cv2.imread(brain_dataframe.image_path[2490]))
plt.show()

In [None]:
cv2.imread(brain_dataframe.mask_path[2490]).max()

In [None]:
cv2.imread(brain_dataframe.mask_path[2490]).min()

# Basic Visualization

In [None]:
import random
fig, axis = plt.subplots(5,2, figsize=(20,40))
count = 0
for x in range(5):
  i = random.randint(0, len(brain_dataframe)) # select a random index 
  axis[count][0].title.set_text("Brain MRI") # set title
  axis[count][0].imshow(cv2.imread(brain_dataframe.image_path[i])) # show MRI 
  axis[count][1].title.set_text("Mask : " + str(brain_dataframe['mask'][i])) # plot title on the mask (0 or 1)
  axis[count][1].imshow(cv2.imread(brain_dataframe.mask_path[i])) # Show corresponding mask
  count += 1

fig.tight_layout()

# Advanced Visualization

In [None]:
count = 0
fig, axis = plt.subplots(10, 3, figsize = (20, 40))
for i in range(len(brain_dataframe)):
  if brain_dataframe['mask'][i] ==1 and count <10:
    img = io.imread(brain_dataframe.image_path[i])
    axis[count][0].title.set_text('Brain MRI')
    axis[count][0].imshow(img)

    mask = io.imread(brain_dataframe.mask_path[i])
    axis[count][1].title.set_text('Mask')
    axis[count][1].imshow(mask, cmap = 'gray')

    
    img[mask == 255] = (255, 0, 0)
    axis[count][2].title.set_text('MRI with Mask')
    axis[count][2].imshow(img)
    count+=1

# Training a model

In [None]:
brain_dataframe_train = brain_dataframe.drop(columns = ['patient_id'])
brain_dataframe_train.shape

In [None]:
brain_dataframe_train['mask'] = brain_dataframe_train['mask'].apply(lambda x: str(x))

In [None]:
brain_dataframe_train.info()

In [None]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(brain_dataframe_train, test_size = 0.2,random_state=42)

In [None]:
from keras_preprocessing.image import ImageDataGenerator
datagen = ImageDataGenerator(rescale=1./255., validation_split = 0.1)

In [None]:
train_generator=datagen.flow_from_dataframe(
dataframe=train,
directory= './',
x_col='image_path',
y_col='mask',
subset="training",
batch_size=16,
shuffle=True,
class_mode="categorical",
target_size=(256,256))


valid_generator=datagen.flow_from_dataframe(
dataframe=train,
directory= './',
x_col='image_path',
y_col='mask',
subset="validation",
batch_size=16,
shuffle=True,
class_mode="categorical",
target_size=(256,256))

# Create a data generator for test images
test_datagen=ImageDataGenerator(rescale=1./255.)

test_generator=test_datagen.flow_from_dataframe(
dataframe=test,
directory= './',
x_col='image_path',
y_col='mask',
batch_size=16,
shuffle=False,
class_mode='categorical',
target_size=(256,256))

In [None]:
basemodel =DenseNet121(weights = 'imagenet', include_top = False, input_tensor = Input(shape=(256, 256, 3)))

In [None]:
basemodel.summary()

In [None]:
for layer in basemodel.layers:
  layers.trainable = False

In [None]:
headmodel = basemodel.output
headmodel = AveragePooling2D(pool_size = (4,4))(headmodel)
headmodel = Flatten(name= 'flatten')(headmodel)
headmodel = Dense(256, activation = "relu")(headmodel)
headmodel = Dropout(0.25)(headmodel)#
headmodel = Dense(256, activation = "relu")(headmodel)
headmodel = Dropout(0.25)(headmodel)
headmodel = Dense(256, activation = "relu")(headmodel)
headmodel = Dropout(0.25)(headmodel)
headmodel = Dense(2, activation = 'softmax')(headmodel)

model = Model(inputs = basemodel.input, outputs = headmodel)

In [None]:
model.summary()

In [None]:
earlystopping = EarlyStopping(monitor='val_loss', 
                              mode='min', 
                              verbose=1, 
                              patience=25
                             )
checkpointer = ModelCheckpoint(filepath="D:\project\Dataset\Model\Classifier_DenseNet_Model.hdf5", 
                               verbose=1, 
                               save_best_only=True
                              )
reduce_lr = ReduceLROnPlateau(monitor='val_loss',
                              mode='min',
                              verbose=1,
                              patience=10,
                              min_delta=0.0001,
                              factor=0.2
                             )
callbacks = [checkpointer, earlystopping, reduce_lr]

In [None]:
model.compile(loss = 'categorical_crossentropy', optimizer='adam', metrics= ["accuracy"])

In [None]:
history = model.fit(train_generator, 
              steps_per_epoch= train_generator.n // train_generator.batch_size, 
              epochs = 100, 
              validation_data= valid_generator, 
              validation_steps= valid_generator.n // valid_generator.batch_size, 
              callbacks = [checkpointer, earlystopping, reduce_lr])

In [None]:
history

In [None]:
csfont = {'fontname':'Times New Roman','size':14,'weight':'bold'}
plt.figure(figsize = [15, 5])
sns.set_style('whitegrid')
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy',**csfont)
plt.ylabel('Accuracy',**csfont)
plt.xlabel('Epoch',**csfont)
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
csfont = {'fontname':'Times New Roman','size':14,'weight':'bold'}
plt.figure(figsize = [15, 5])
sns.set_style('whitegrid')
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss',**csfont)
plt.ylabel('Loss',**csfont)
plt.xlabel('Epoch',**csfont)
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
tf.keras.utils.plot_model(
    model,
    to_file="model.png",
    show_shapes=False,
    show_dtype=False,
    show_layer_names=True,
    rankdir="TB",
    expand_nested=False,
    dpi=96,
    layer_range=None,
    show_layer_activations=False,
)

# Save Model

In [None]:
model_json = model.to_json()
with open("D:\project\Dataset\Model\classifierDensenetmodel.json", "w") as json_file:
    json_file.write(model_json)

# Assess Trained Model Performance

In [None]:
with open('D:\project\Dataset\Model\classifierDensenetmodel.json', 'r') as json_file:
    json_savedModel= json_file.read()
# load the model  
model = tf.keras.models.model_from_json(json_savedModel)
model.load_weights('D:\project\Dataset\Model\Classifier_DenseNet_Model.hdf5')
model.compile(loss = 'categorical_crossentropy', optimizer='adam', metrics= ["accuracy"])

In [None]:
test_predict = model.predict(test_generator, steps = test_generator.n // 16, verbose =1)

In [None]:
test_predict.shape

In [None]:
test_predict[:5]

In [None]:
predict = []

for i in test_predict:
  predict.append(str(np.argmax(i)))

predict = np.asarray(predict)

In [None]:
predict

In [None]:
original = np.asarray(test['mask'])[:len(predict)]
len(original)

In [None]:
from sklearn.metrics import accuracy_score

accuracy = round(accuracy_score(original, predict),2)
accuracy

In [None]:
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(original, predict)
plt.figure(figsize = (7,7))
sns.heatmap(cm, annot=True,fmt='d')
plt.show()

In [None]:
from sklearn.metrics import classification_report

report = classification_report(original, predict, labels = [0,1])
print(report)

# BUILD A SEGMENTATION MODEL TO LOCALIZE TUMOR

In [None]:
brain_dataframe_mask = brain_dataframe[brain_dataframe['mask'] == 1]
brain_dataframe_mask.shape

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_val = train_test_split(brain_dataframe_mask, test_size=0.2,random_state=42)
X_test, X_val = train_test_split(X_val, test_size=0.5,random_state=42)

In [None]:
X_train.shape,X_test.shape, X_val.shape

In [None]:

train_ids = list(X_train.image_path)
train_mask = list(X_train.mask_path)

val_ids = list(X_val.image_path)
val_mask= list(X_val.mask_path)

In [None]:
from utilities import DataGenerator

# create image generators

training_generator = DataGenerator(train_ids,train_mask)
validation_generator = DataGenerator(val_ids,val_mask)

In [None]:
def resblock(X, f):
  

  # make a copy of input
    X_copy = X

  # main path
  # Read more about he_normal: https://medium.com/@prateekvishnu/xavier-and-he-normal-he-et-al-initialization-8e3d7a087528

    X = Conv2D(f, kernel_size = (1,1) ,strides = (1,1),kernel_initializer ='he_normal')(X)
    X = BatchNormalization()(X)
    X = Activation('relu')(X) 

    X = Conv2D(f, kernel_size = (3,3), strides =(1,1), padding = 'same', kernel_initializer ='he_normal')(X)
    X = BatchNormalization()(X)

  # Short path
  # Read more here: https://towardsdatascience.com/understanding-and-coding-a-resnet-in-keras-446d7ff84d33

    X_copy = Conv2D(f, kernel_size = (1,1), strides =(1,1), kernel_initializer ='he_normal')(X_copy)
    X_copy = BatchNormalization()(X_copy)

  # Adding the output from main path and short path together

    X = Add()([X,X_copy])
    X = Activation('relu')(X)

    
    return X

In [None]:
# function to upscale and concatenate the values passsed
def upsample_concat(x, skip):
    x = UpSampling2D((2,2))(x)
    merge = Concatenate()([x, skip])

    return merge

In [None]:
input_shape = (256,256,3)

# Input tensor shape
X_input = Input(input_shape)

# Stage 1
conv1_in = Conv2D(16,3,activation= 'relu', padding = 'same', kernel_initializer ='he_normal')(X_input)
conv1_in = BatchNormalization()(conv1_in)
conv1_in = Conv2D(16,3,activation= 'relu', padding = 'same', kernel_initializer ='he_normal')(conv1_in)
conv1_in = BatchNormalization()(conv1_in)
pool_1 = MaxPool2D(pool_size = (2,2))(conv1_in)

# Stage 2
conv2_in = resblock(pool_1, 32)
pool_2 = MaxPool2D(pool_size = (2,2))(conv2_in)

# Stage 3
conv3_in = resblock(pool_2, 64)
pool_3 = MaxPool2D(pool_size = (2,2))(conv3_in)

# Stage 4
conv4_in = resblock(pool_3, 128)
pool_4 = MaxPool2D(pool_size = (2,2))(conv4_in)

# Stage 5 (Bottle Neck)
conv5_in = resblock(pool_4, 256)

# Upscale stage 1
up_1 = upsample_concat(conv5_in, conv4_in)
up_1 = resblock(up_1, 128)

# Upscale stage 2
up_2 = upsample_concat(up_1, conv3_in)
up_2 = resblock(up_2, 64)

# Upscale stage 3
up_3 = upsample_concat(up_2, conv2_in)
up_3 = resblock(up_3, 32)

# Upscale stage 4
up_4 = upsample_concat(up_3, conv1_in)
up_4 = resblock(up_4, 16)

# Final Output
output = Conv2D(1, (1,1), padding = "same", activation = "sigmoid")(up_4)

model_seg = Model(inputs = X_input, outputs = output )

In [None]:
model_seg.summary()

# Train a Segmentation Resunet Model to Localize Tumor

In [None]:
from utilities import tversky_loss, tversky

In [None]:
def focal_tversky(y_true,y_pred):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    #print(type(y_pred))
    pt_1 = tversky(y_true, y_pred)
    gamma = 0.75
    return K.pow((1-pt_1), gamma)

In [None]:
# compling model and callbacks functions
adam = tf.keras.optimizers.Adam(lr = 0.05, epsilon = 0.1)
model_seg.compile(optimizer = adam, 
                  loss = focal_tversky, 
                  metrics = [tversky]
                 )
#callbacks
earlystopping = EarlyStopping(monitor='val_loss',
                              mode='min', 
                              verbose=1, 
                              patience=25
                             )
# save the best model with lower validation loss
checkpointer = ModelCheckpoint(filepath="D:\project\Dataset\Model\DenseNet_ResUNet-weights.hdf5", 
                               verbose=1, 
                               save_best_only=True
                              )
reduce_lr = ReduceLROnPlateau(monitor='val_loss',
                              mode='min',
                              verbose=1,
                              patience=10,
                              min_delta=0.0001,
                              factor=0.2
                             )

In [None]:
history = model_seg.fit(training_generator, epochs = 100, validation_data = validation_generator,
                        callbacks = [checkpointer, earlystopping,reduce_lr])

In [None]:
csfont = {'fontname':'Times New Roman','size':14,'weight':'bold'}
plt.figure(figsize = [15, 5])
sns.set_style('whitegrid')
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss',**csfont)
plt.ylabel('Loss',**csfont)
plt.xlabel('Epoch',**csfont)
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
csfont = {'fontname':'Times New Roman','size':14,'weight':'bold'}
plt.figure(figsize = [15, 5])
sns.set_style('whitegrid')
plt.plot(history.history['tversky'])
plt.plot(history.history['val_tversky'])
plt.title('Model tversky',**csfont)
plt.ylabel('tversky',**csfont)
plt.xlabel('Epoch',**csfont)
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
# save the model architecture to json file for future use

model_json = model_seg.to_json()
with open("D:\project\Dataset\Model\DenseNet-model.json","w") as json_file:
    json_file.write(model_json)

In [None]:
from utilities import focal_tversky, tversky_loss, tversky

with open('D:\project\Dataset\Model\DenseNet-model.json', 'r') as json_file:
    json_savedModel= json_file.read()

In [None]:
model_seg1 = tf.keras.models.model_from_json(json_savedModel)
model_seg1.load_weights('D:\project\Dataset\Model\DenseNet_ResUNet-weights.hdf5')
adam = tf.keras.optimizers.Adam(lr = 0.05, epsilon = 0.1)
model_seg1.compile(optimizer = adam, loss = focal_tversky, metrics = [tversky])

In [None]:
# Utilities file contains the code for custom loss function and custom data generator
from utilities import prediction

# making prediction
image_id, mask, has_mask = prediction(test, model, model_seg1)

In [None]:
# creating a dataframe for the result
df_pred = pd.DataFrame({'image_path': image_id,'predicted_mask': mask,'has_mask': has_mask})
df_pred.head()

In [None]:
# Merge the dataframe containing predicted results with the original test data.
df_pred = test.merge(df_pred, on = 'image_path')
df_pred.head()

In [None]:
count = 0 
fig, axs = plt.subplots(12, 5, figsize=(30,90))
for i in range(len(df_pred)):
    if df_pred['has_mask'][i] == 1 and count < 12:
        # read the images and convert them to RGB format
        img = io.imread(df_pred.image_path[i])
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        axs[count][0].title.set_text("Brain MRI")
        axs[count][0].imshow(img)

        # Obtain the mask for the image 
        mask = io.imread(df_pred.mask_path[i])
        axs[count][1].title.set_text("Original Mask")
        axs[count][1].imshow(mask)

        # Obtain the predicted mask for the image 
        predicted_mask = np.asarray(df_pred.predicted_mask[i])[0].squeeze().round()
        axs[count][2].title.set_text("AI Predicted Mask")
        axs[count][2].imshow(predicted_mask)

        # Apply the mask to the image 'mask==255'
        img[mask == 255] = (255, 0, 0)
        axs[count][3].title.set_text("MRI with Original Mask (Ground Truth)")
        axs[count][3].imshow(img)

        img_ = io.imread(df_pred.image_path[i])
        img_ = cv2.cvtColor(img_, cv2.COLOR_BGR2RGB)
        img_[predicted_mask == 1] = (0, 255, 0)
        axs[count][4].title.set_text("MRI with AI Predicted Mask")
        axs[count][4].imshow(img_)
        count += 1

fig.tight_layout()