In [None]:
# Import Libraries & Modules

import os
import random
import glob
from PIL import Image, ImageOps
import numpy as np
from tensorflow.keras.models import load_model
import matplotlib
import matplotlib.pyplot as plt
import cv2
import os
import imageio as io
from skimage.transform import rotate
import random
from google.colab import drive

In [None]:
# Utility Functions

def display_image(img):
    plt.imshow(img)
    plt.show()

def remove_alpha_channel(img):
    if img.shape[2] == 4:
        img = img[:,:,:3]
    return img

def image_resizer_cv2(img,resize_len):
    img = cv2.resize(img, (resize_len, resize_len))
    return img

def convert_rgb_greyscale(img):
    img = ImageOps.grayscale(img)
    return img

def mkdir(dir):
    if not os.path.exists(dir):
        os.makedirs(dir)

def compute_dice_score(x,y):
    k=1
    denom = (np.sum(x) + np.sum(y))
    if(denom==0):
        return 1
    return np.sum(x[y==k])*2.0 / denom

def horizontal_flip(img):
    return np.fliplr(img)

def vertical_flip(img):
    return np.flipud(img)

def rotate_img(img,angle):
    return rotate(img,angle)

def random_crop(img, width, height):
    x = random.randint(0, img.shape[1] - width)
    y = random.randint(0, img.shape[0] - height)
    img = img[y:y + height, x:x + width]
    return img

In [None]:
#Mount dataset

#Dataset link: https://drive.google.com/drive/folders/105YRKK7UwMImQi-m1_HgQu1EX8w7dexC?usp=sharing

drive.mount('https://drive.google.com/drive/folders/1saQwzNH4wd5oZLJvd4cNZ89j1UVSjtrS?usp=drive_link')

ValueError: ignored

In [None]:
#Paths and Flags

data_folder = "/content/drive/MyDrive/GSDataset/GSDataset"

mode = "train"
epochs = 1
batch_size = 20
traditional_augment = False

In [None]:
# Function to read images

def read_images(image_path,mask_path):

    image_pil = Image.open(image_path)
    mask_pil = Image.open(mask_path)
    imsize = mask_pil.size[0]

    # convert mask to greyscale
    mask_pil = convert_rgb_greyscale(mask_pil)

    # Numpy
    image = np.array(image_pil)
    mask = np.array(mask_pil)

    # Remove alpha channel of image
    image = remove_alpha_channel(image)

    # scale mask values to 0 and 255 only
    mask[mask < 128] = 0
    mask[mask >= 128] = 255

    # normalize values to (0..1)
    image = image / 255.0
    mask = mask / 255.0

    return image,mask

In [None]:
from sklearn.model_selection import train_test_split

# Function to load complete data

def load_data(folder, mode='train'):

    images_folder = os.path.join(folder,'images')
    masks_folder = os.path.join(folder,'masks')

    images = []
    masks = []

    mask_paths = glob.glob(os.path.join(masks_folder,"*.png"))

    random_seed = 42
    train_mask_paths, test_mask_paths = train_test_split(mask_paths, test_size=0.2, random_state=random_seed)

    mask_paths = train_mask_paths if mode == 'train' else test_mask_paths

    for mask_path in mask_paths:

        imname = os.path.split(mask_path)[1]
        image_path = os.path.join(images_folder,imname)
        image,mask = read_images(image_path,mask_path)
        images.append(image)
        masks.append(mask)

        if(mode=='train' and traditional_augment):

            #Hotizontal flip
            image_h = horizontal_flip(image)
            mask_h = horizontal_flip(mask)
            images.append(image_h)
            masks.append(mask_h)

            #Vertical flip
            image_v = vertical_flip(image)
            mask_v = vertical_flip(mask)
            images.append(image_v)
            masks.append(mask_v)

            #Rotation
            angle = random.randint(10,180)
            image_r = rotate_img(image,angle)
            mask_r = rotate_img(mask,angle)
            images.append(image_r)
            masks.append(mask_r)

    return images,masks

In [None]:
#Define model

import numpy as np
import os
import skimage.io as io
import skimage.transform as trans
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler
from tensorflow.keras import backend as keras

def unet(input_size = (256,256,3)):

    inputs = Input(input_size)
    conv1 = Conv2D(16, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(inputs)
    conv1 = Conv2D(16, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    conv2 = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool1)
    conv2 = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool2)
    conv3 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    conv4 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool3)
    conv4 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv4)
    drop4 = Dropout(0.5)(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    conv5 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool4)
    conv5 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv5)
    drop5 = Dropout(0.5)(conv5)

    up6 = Conv2D(128, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(drop5))
    merge6 = concatenate([drop4,up6], axis = 3)
    conv6 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge6)
    conv6 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv6)

    up7 = Conv2D(64, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv6))
    merge7 = concatenate([conv3,up7], axis = 3)
    conv7 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge7)
    conv7 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv7)

    up8 = Conv2D(128, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv7))
    merge8 = concatenate([conv2,up8], axis = 3)
    conv8 = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge8)
    conv8 = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv8)

    up9 = Conv2D(16, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv8))
    merge9 = concatenate([conv1,up9], axis = 3)
    conv9 = Conv2D(16, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge9)
    conv9 = Conv2D(16, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv9)
    conv9 = Conv2D(1, 3, activation = 'sigmoid', padding = 'same', kernel_initializer = 'he_normal')(conv9)

    conv9 = tf.squeeze(conv9,axis=3)

    model = Model(inputs = inputs, outputs = conv9)

    model.compile(optimizer = Adam(lr = 1e-4,amsgrad=True), loss = tf.keras.losses.BinaryCrossentropy())

    model.summary()

    return model

In [None]:
#Initilize the model

model_file = './model.hdf5'

model = unet()




Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 256, 256, 16)         448       ['input_1[0][0]']             
                                                                                                  
 conv2d_1 (Conv2D)           (None, 256, 256, 16)         2320      ['conv2d[0][0]']              
                                                                                                  
 max_pooling2d (MaxPooling2  (None, 128, 128, 16)         0         ['conv2d_1[0][0]']            
 D)                                                                                           

In [None]:
#Load train data

train_images, train_masks = load_data(data_folder, mode='train')
train_images = np.array(train_images)
train_masks = np.array(train_masks)

ValueError: ignored

In [None]:
# Visualize some images

# Randomly sample 10 images
selected_indices = np.random.choice(len(train_images), size=10, replace=False)
selected_images = train_images[selected_indices]

# Set up the figure with multiple subplots
fig, axs = plt.subplots(2, 5, figsize=(12, 6))
fig.suptitle('Randomly sampled tissue images')

# Display the randomly sampled images
for i, ax in enumerate(axs.flatten()):
    ax.imshow(selected_images[i])
    ax.axis('off')  # Turn off axis labels
    ax.set_title(f'Image {selected_indices[i] + 1}')

plt.show()

In [None]:
# Visualize some masks

# Randomly sample 10 images
selected_indices = np.random.choice(len(train_masks), size=10, replace=False)
selected_images = train_masks[selected_indices]

# Set up the figure with multiple subplots
fig, axs = plt.subplots(2, 5, figsize=(12, 6))
fig.suptitle('Randomly sampled masks')

# Display the randomly sampled binary images
for i, ax in enumerate(axs.flatten()):
    ax.imshow(selected_images[i], cmap='gray', vmin=0, vmax=1)  # Assuming the images are binary
    ax.axis('off')  # Turn off axis labels
    ax.set_title(f'Image {selected_indices[i] + 1}')

plt.show()


In [None]:
#Train the model

model_checkpoint = ModelCheckpoint(model_file, monitor='loss',verbose=1, save_best_only=True)
es = tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)

model.fit(x=train_images,
          y=train_masks,
          batch_size=batch_size,
          epochs=epochs,
          shuffle=True,
          validation_split=0.1,
          callbacks=[model_checkpoint,es])

In [None]:
# Load test data

#Load train data

test_images, test_masks = load_data(data_folder, mode='test')
test_images = np.array(test_images)
test_masks = np.array(test_masks)

In [None]:
#Test the model

def binarize_segmentation_outputs(x):
    x[x < 0.5] = 0
    x[x >= 0.5] = 1.0
    return x

def net_dice_score(ground_truth_images, predicted_images):
    l = len(ground_truth_images)
    dice_score = 0
    for i in range(0,l):
        dice_score += compute_dice_score(ground_truth_images[i],predicted_images[i])
    dice_score=dice_score/l*1.0
    return dice_score

pred_masks = model.predict(test_images, verbose=1, batch_size=1)

pred_masks = [binarize_segmentation_outputs(x) for x in pred_masks]

dice_score = net_dice_score(test_masks, pred_masks)

print("DICE Score: ", dice_score)

In [None]:
#Visualize some predictions


l = len(test_masks)

random_number = random.randint(0, l)

# Set up the figure with two subplots
fig, axs = plt.subplots(1, 2, figsize=(10, 5))

# Display the first binary image with a caption
axs[0].imshow(test_masks[random_number], cmap='gray', vmin=0, vmax=1)
axs[0].set_title('Real Mask')

# Display the second binary image with a caption
axs[1].imshow(pred_masks[random_number], cmap='gray', vmin=0, vmax=1)
axs[1].set_title('Predicted Mask')

# Show the figure
plt.show()