In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# 1 - Libraries

In [4]:
# for data load
import os

# for reading and processing images
import imageio
from PIL import Image
import cv2
# for visualizations
import matplotlib.pyplot as plt
import glob2
import numpy as np # for using np arrays
import os
# for bulding and running deep learning model
import tensorflow as tf
import pickle
from tensorflow.keras.utils import Sequence
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import *
from tensorflow.keras import models
from tensorflow.keras.callbacks import *
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.metrics import MeanIoU
import sklearn
from sklearn.cluster import KMeans
from sklearn.utils import shuffle

# 2 - Helper Functions for Data Processing

## 2.1 - Visualization image

In [5]:
def show_example(image, mask, model, label, inp_size, color_mode, function = None, kmean = None):
    img = cv2.cvtColor(cv2.imread(image),cv2.COLOR_BGR2RGB)
    img = tf.image.resize(img, inp_size, method ='nearest')
    pred, _pred= predict(model, image, label, color_mode, inp_size)
    if mask != None:
        msk= cv2.cvtColor(cv2.imread(mask), cv2.COLOR_BGR2RGB)
        msk= tf.image.resize(msk, inp_size, method = 'nearest')
        if function:
            msk = tf.convert_to_tensor(function(msk.numpy()))
        if kmean:
            y_true = kmean.predict(msk.numpy().reshape(-1,3)).reshape(*inp_size, 1)
        else:
            y_true = train_data.processing(msk.numpy())
        m.miou_class(y_true, _pred)
        y_true = decode_label(y_true, label)
        ground_truth = np.floor(img.numpy() * 0.7 + y_true * 0.3).astype('int')
        fig, axes = plt.subplots(1,3, figsize = (12,3))
        axes[0].imshow(img)
        axes[0].set_title('Original Image')
        axes[1].set_title('Ground truth')
        axes[1].imshow(ground_truth)
        axes[2].set_title('Prediction')
        axes[2].imshow(pred)

    else:
        fig, axes = plt.subplots(1,2, figsize = (12,3))
        axes[0].imshow(img)
        axes[0].set_title('Original Image')
        axes[1].set_title('Prediction')
        axes[1].imshow(pred)

In [6]:
def show_history(history, validation : bool = False):
    if validation:
        # Loss
        fig, axes = plt.subplots(figsize= (20,5))
        # Train
        axes.plot(history.epoch, history.history['loss'], color= 'r',  label = 'Train')
        axes.plot(history.epoch, history.history['val_loss'], color = 'b', label = 'Val')
        axes.set_xlabel('Epoch')
        axes.set_ylabel('Loss')
        axes.legend()
        # Acc
        fig, axes = plt.subplots(figsize= (20,5))
        # Train
        axes.plot(history.epoch, history.history['acc'], color= 'r',  label = 'Train')
        axes.plot(history.epoch, history.history['val_acc'], color = 'b', label = 'Val')
        axes.set_xlabel('Epoch')
        axes.set_ylabel('Acc')
        axes.legend()
        # Mean Iou
        fig, axes = plt.subplots(figsize= (20,5))
        # Train
        axes.plot(history.epoch, history.history['mean_iou'], color= 'r',  label = 'Train')
        axes.plot(history.epoch, history.history['val_mean_iou'], color = 'b', label = 'Val')
        axes.set_xlabel('Epoch')
        axes.set_ylabel('MeanIoU')
        axes.legend()
    else:
        fig, axes = plt.subplots(1,4, figsize= (20,5))
        # loss
        axes[0].plot(history.epoch, history.history['loss'])
        axes[0].set_title('Train')
        axes[0].set_xlabel('Epoch')
        axes[0].set_ylabel('Loss')
        # Acc
        axes[1].plot(history.epoch, history.history['acc'])
        axes[1].set_title('Train')
        axes[1].set_xlabel('Epoch')
        axes[1].set_ylabel('Acc')
        # Mean Iou
        axes[2].plot(history.epoch, history.history['mean_iou'])
        axes[2].set_title('Train')
        axes[2].set_xlabel('Epoch')
        axes[2].set_ylabel('MeanIoU')

#3 - Constructing the ResU-Net Architecture

## 3.1 - Pre-Process Data

In [None]:
classnames = ['Chocolate cyst', 'Serous cystadenoma', 'Teratoma', 'Normal ovary',
              'Theca cell tumor', 'Simple cyst', 'Mucinous cystadenoma', 'High grade serous']

In [7]:
class DataGenerator(Sequence):
    def __init__(self, all_filenames, input_size = (256, 256), batch_size = 32, shuffle = True, seed = 123, encode: dict = None, encode_with_kmean = None, color_mode = 'hsv', function = None) -> None:
        super(DataGenerator, self).__init__()
        assert (encode != None and encode_with_kmean == None) or (encode == None and encode_with_kmean != None), 'Not empty !'
        assert color_mode == 'hsv' or color_mode == 'rgb'
        self.all_filenames = all_filenames
        self.input_size = input_size
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.color_mode = color_mode
        self.encode = encode
        self.function = function
        self.kmean = encode_with_kmean
        np.random.seed(seed)
        self.on_epoch_end()
    def processing(self, mask):
        d = list(map(lambda x: self.encode[tuple(x)], mask.reshape(-1,3)))
        return np.array(d).reshape(*self.input_size, 1)
    def __len__(self):
        return int(np.floor(len(self.all_filenames) / self.batch_size))
    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size : (index + 1) * self.batch_size]
        all_filenames_temp = [self.all_filenames[k] for k in indexes]
        X, Y = self.__data_generation(all_filenames_temp)
        return X, Y
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.all_filenames))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
    def __data_generation(self, all_filenames_temp):
        batch = len(all_filenames_temp)
        X = np.empty(shape=(batch, *self.input_size, 3))
        Y = np.empty(shape=(batch, *self.input_size, 1))
        for i, (fn, label_fn) in enumerate(all_filenames_temp):
            # img
            img = cv2.imread(fn)
            if self.color_mode == 'hsv':
                img = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
            elif self.color_mode == 'rgb':
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = tf.image.resize(img, self.input_size, method = 'nearest')
            img = tf.cast(img, tf.float32)
            img /= 255.

            #mask
            mask = cv2.imread(label_fn, 0)
            mask = cv2.cvtColor(mask, cv2.COLOR_BGR2RGB)
            mask = tf.image.resize(mask, self.input_size, method= 'nearest')
            mask = np.array(mask)
            if self.function:
                mask = self.function(mask)
            if self.encode:
                mask = self.processing(mask)
            if self.kmean:
                mask = self.kmean.predict(mask.reshape(-1,3)).reshape(*self.input_size, 1)
            mask = tf.cast(mask, tf.float32)
                    # Ensure mask is one-hot encoded
            num_classes = 8  # Assuming 8 classes
            mask_onehot = tf.one_hot(tf.cast(mask, dtype=tf.uint8), depth=num_classes, axis=-1)
            mask_onehot = np.asarray(mask_onehot, dtype=np.uint8)
            X[i,] = img
            Y[i,] = mask
        return X, Y

In [8]:
def encode_label(mask):
    # input (batch, rows, cols, channels)
    colors = np.unique(mask.reshape(-1,3), axis = 0)
    encoder = dict((tuple(j),i) for i,j in enumerate(colors)) # key is tuple
    _label = dict((j, list(i)) for i,j in encoder.items())
    with open('label.pickle', 'wb') as handel:
        pickle.dump(_label, handel, protocol= pickle.HIGHEST_PROTOCOL)
    return encoder
def encode_label_with_Kmeans(mask, classes):
    kmean = KMeans(classes, max_iter= 400)
    kmean.fit(mask)
    pred = kmean.predict(mask)
    classes_real =  len(set(pred))
    print(f'classes: {classes_real}')
    label = dict((j, i.tolist()) for i,j in list(zip(mask, pred))) # key is tuple
    with open('label.pickle', 'wb') as handel:
        pickle.dump(label, handel, protocol= pickle.HIGHEST_PROTOCOL)
    with open('kmean.pickle', 'wb') as handle:
        pickle.dump(kmean, handle, protocol= pickle.HIGHEST_PROTOCOL)
    return kmean
def decode_label(predict, label):
    d = list(map( lambda x: label[int(x)], predict.reshape(-1,1)))
    img =  np.array(d).reshape(predict.shape[0], predict.shape[1], 3)
    return img
def DataLoader(all_train_filename, all_mask,  all_valid_filename = None, input_size = (256,256), batch_size = 16, shuffle = True, seed = 123, color_mode = 'hsv', function = None, encode_with_kmeans = False, classes = class_name) -> None:
    mask_folder = sklearn.utils.shuffle(all_mask, random_state = 47)[:16]
    mask = [tf.image.resize(cv2.cvtColor(cv2.imread(img), cv2.COLOR_BGR2RGB), (128,128), method = 'nearest') for img in mask_folder ]
    #mask = [tf.image.resize(cv2.cvtColor(cv2.imread(img), cv2.COLOR_BGR2RGB), (128,128), method = 'nearest') for img in mask_folder ]
    mask = np.array(mask)
    kmean = None
    encode = None
    if function and encode_with_kmeans == False:
        mask = function(mask)
    if encode_with_kmeans == False:
        encode = encode_label(mask)
    elif encode_with_kmeans == True:
        kmean = encode_label_with_Kmeans(mask.reshape(-1,3), classes)
    train = DataGenerator(all_train_filename, input_size, batch_size , shuffle, seed, encode, kmean, color_mode, function)
    if all_valid_filename == None:
        return train, None
    else:
        valid = DataGenerator(all_valid_filename, input_size, batch_size, shuffle, seed, encode, kmean, color_mode, function)
        return train, valid


## 3.2 - Resnet

In [10]:
def decoder_block(x, y, filters):
    x = UpSampling2D()(x)
    x = Concatenate(axis = 3)([x,y])
    x = Conv2D(filters, 3, padding= 'same')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)
    x = Conv2D(filters, 3, padding= 'same')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU()(x)

    return x

def resnet50_unet(input_shape, *, classes, dropout):
    """ Input """
    inputs = Input(input_shape)

    resnet50 = ResNet50(include_top=False, weights="imagenet", input_tensor=inputs)

    """ Encoder """
    s1 = resnet50.get_layer("input_1").output
    s2 = resnet50.get_layer("conv1_relu").output
    s3 = resnet50.get_layer("conv2_block3_out").output
    s4 = resnet50.get_layer("conv3_block4_out").output

    x = resnet50.get_layer("conv4_block6_out").output

    """ Decoder """
    x = decoder_block(x, s4, 512)
    x = decoder_block(x, s3, 256)
    x = decoder_block(x, s2, 128)
    x = decoder_block(x, s1, 64)
    x = Dropout(dropout)(x)

    # build the entire model
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.5)(x)

    predictions = layers.Dense(8, activation='softmax')(x)

    model = models.Model(inputs, predictions, name="ResNet50_U-Net")
    return model


## 3.3 - Calculate Mean IoU

In [11]:
class m_iou():
    def __init__(self, classes: int) -> None:
        self.classes = classes
    def mean_iou(self,y_true, y_pred):
        y_pred = np.argmax(y_pred, axis = 3)
        miou_keras = MeanIoU(num_classes= self.classes)
        miou_keras.update_state(y_true, y_pred)
        return miou_keras.result().numpy()
    def miou_class(self, y_true, y_pred):
        y_pred = np.argmax(y_pred, axis = 3)
        miou_keras = MeanIoU(num_classes= self.classes)
        miou_keras.update_state(y_true, y_pred)
        values = np.array(miou_keras.get_weights()).reshape(self.classes, self.classes)
        for i in  range(self.classes):
            class_iou = values[i,i] / (sum(values[i,:]) + sum(values[:,i]) - values[i,i])
            print(f'IoU for class{str(i + 1)} is: {class_iou}')

In [12]:
def predict(model, image_test, label, color_mode, size):
    image = cv2.imread(image_test)
    if color_mode == 'hsv':
        image_cvt = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    elif color_mode == 'rgb':
        image_cvt = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    elif color_mode == 'gray':
        image_cvt = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        image_cvt = tf.expand_dims(image_cvt, axis = 2)
    image_cvt = tf.image.resize(image_cvt, size, method= 'nearest')
    image_cvt = tf.cast(image_cvt, tf.float32)
    image_norm = image_cvt / 255.
    image_norm = tf.expand_dims(image_norm, axis= 0)
    new_image = model(image_norm)
    image_argmax = np.argmax(tf.squeeze(new_image, axis = 0), axis = 2)
    image_decode = decode_label(image_argmax, label)
    predict_img = tf.cast(tf.image.resize(cv2.cvtColor(image, cv2.COLOR_BGR2RGB), size, method = 'nearest'), tf.float32) * 0.7 + image_decode * 0.3
    return np.floor(predict_img).astype('int'), new_image

# 4 - Executing above Functions to Train the Model

## 4.1 - Load Data

In [None]:
#Dataset path
datapath = "/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/"

#Reading file names
#Masks
masks = [os.path.join(datapath,i) for i in os.listdir(datapath) if "mask" in i]

masks.sort()
print (masks)

#Images
images = [i.replace("_mask","") for i in masks]
images.sort()
print (images)

['/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0001_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0002_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0003_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0004_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0005_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0006_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0007_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0008_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0009_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0010_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0011_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0012_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0013_mask.PNG', '/content/drive/MyDrive/Colab Notebooks/UTBT/OTU2d/both/0014_ma

In [None]:
data = list(zip(images, masks))
data = shuffle(data, random_state= 42)
split = int(0.9 * len(data))
all_train_filenames = data[:split]
all_valid_filenames = data[split:]

## 4.2 - Split Train and Test Set

In [None]:
train_data, valid_data = DataLoader(all_train_filenames, masks, all_valid_filenames, (128, 128), 8, True, 47, 'rgb', None, True, 8)

## 4.3 - Build ResU-Net Architecture

In [None]:
inp_size = (128, 128, 3)
unet = resnet50_unet(inp_size, classes= 8, dropout= 0.3)

In [None]:
m = m_iou(2)

In [None]:
checkpoint = ModelCheckpoint('resunet.h5', monitor= 'val_mean_iou', save_best_only= True, verbose= 1, mode = 'max')
lr_R = ReduceLROnPlateau(monitor= 'loss', patience= 3, verbose= 1, factor= 0.3, min_lr= 0.00001)

In [None]:
# Check the summary to better interpret how the output dimensions change in each layer
unet.summary()

# 5 - Train

In [None]:
unet.compile(loss= tf.keras.losses.SparseCategoricalCrossentropy(), optimizer = tf.keras.optimizers.Adam(learning_rate = 0.01), metrics= [m.mean_iou, 'acc'], run_eagerly= True)
history = unet.fit(train_data, validation_data= valid_data, epochs= 1, verbose= 1, callbacks = [checkpoint, lr_R])

# 6 - Predict and show history

In [None]:
with open('label.pickle', 'rb') as handel:
    label = pickle.load(handel)

In [None]:
with open('kmean.pickle', 'rb') as handel:
    kmean = pickle.load(handel)

In [None]:
show_history(history, True)

In [None]:
from tensorflow.keras.models import load_model
model = load_model('resunet.h5', custom_objects = {'mean_iou': m.mean_iou})

In [None]:
# Assuming you want to resize the masks to a specific size (e.g., 256x256)
desired_size = (256, 256)

# Process masks
processed_masks = []
for mask_path in masks:
    # Read the mask image
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

    # Resize the mask to the desired size
    resized_mask = cv2.resize(mask, desired_size)

    # Append the resized mask to the list
    processed_masks.append(resized_mask)

In [None]:
for i in range (0,10):
  show_example(*all_valid_filenames[i], model, label, (128,128), 'rgb', kmean= kmean)