<a href="https://colab.research.google.com/github/Sebastian352/Endoscope-Semantic-Segmentation-using-Unet/blob/master/unet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/Project')

Drive already mounted at /content/Project; to attempt to forcibly remount, call drive.mount("/content/Project", force_remount=True).


In [3]:
# !unzip /content/Project/MyDrive/Project/video_archive.zip -d /content/sample_data/video_content

In [4]:
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.models import Model
import numpy as np
import pandas as pd
import cv2
from sklearn.model_selection import train_test_split
import tensorflow as tf
import os
import shutil
from google.colab.patches import cv2_imshow
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint,ReduceLROnPlateau,CSVLogger
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import LabelEncoder

In [5]:
def conv_block(input, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(input)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

In [6]:
def encoder_block(input, num_filters):
    x = conv_block(input, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p

In [7]:
def decoder_block(input, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

In [8]:
def build_unet(input_shape, num_classes):
    inputs = Input(shape=input_shape)

    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    b1 = conv_block(p4, 1024)

    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    outputs = Conv2D(num_classes, 1, padding="same", activation="softmax")(d4)

    model = Model(inputs, outputs, name="U-Net")
    return model

In [9]:
def create_dir(path):
  if not os.path.exists(path):
    os.makedirs(path)

In [10]:
def move_data(path,rawImagesPath,maskImagesPath):
  for directory in os.listdir(path):
    dirPath = os.path.join(path, directory)
    for subDir in os.listdir(dirPath):
      subSubDir = os.path.join(dirPath, subDir)
      for image in os.listdir(subSubDir):
        srcPath = os.path.join(subSubDir, image)
        newName = subDir + image
        if 'mask' not in image:
          newPath = os.path.join(rawImagesPath, newName)
          destPath = os.path.join(rawImagesPath, image)
          shutil.copy(srcPath, destPath)
          os.rename(destPath, newPath)
        if 'color' in image:
          newPath = os.path.join(maskImagesPath, newName)
          destPath = os.path.join(maskImagesPath, image)
          shutil.copy(srcPath, destPath)
          os.rename(destPath, newPath)

In [11]:
def add_path(images):
  for i in range(len(images)):
    if 'mask' not in images[i]:
      images[i] = os.path.join(rawImagesPath,images[i])
    else:
      images[i] = os.path.join(maskImagesPath,images[i])
  return images

In [12]:
def read_image(image_path):
    img = cv2.imread(image_path, cv2.IMREAD_COLOR)

    if img is None:
        raise ValueError(f"Unable to read image from path: {image_path}")

    img = cv2.resize(img, (IMG_W, IMG_H))

    return img


def read_mask(mask_path):
  img = cv2.imread(mask_path,cv2.IMREAD_COLOR)

  if img is None:
    raise ValueError(f"Unable to read image from path: {mask_path}")

  img = cv2.resize(img,(IMG_W,IMG_H))
  cv2_imshow(img)
  output = []
  for color,_ in color_class_mapping.items():
    cmap = np.all(np.equal(img,color),axis=-1)
    output.append(cmap)

  output = np.stack(output,axis=-1)
  output = output.astype(np.uint8)

  return output

In [13]:
def preprocess(image_path,mask_path):
  def read(image_path,mask_path):
    image_path = image_path.np.decode()
    mask_path = mask_path.np.decode()

    x = read_image(image_path)
    y = read_mask(mask_path)
    return x,y

  image,mask = tf.numpy_function(read,[image_path,mask_path],[tf.float32,tf.uint8])
  image.set_shape([IMG_H,IMG_W,3])
  mask.set_shape([IMG_H,IMG_W,NUM_CLASSES])

  # Add a batch dimension to the input data
  image = tf.expand_dims(image, axis=0)
  mask = tf.expand_dims(mask, axis=0)

  return image,mask

In [14]:
def tf_dataset(image_path,mask_path,batch=8):
  dataset = tf.data.Dataset.from_tensor_slices((image_path,mask_path))
  dataset = dataset.shuffle(buffer_size = 5000)
  dataset = dataset.map(preprocess)
  dataset = dataset.prefetch(2)
  return dataset

In [15]:
def load_images(images_raw,images_mask):
  images = []
  masks = []
  for i in range(len(images_raw)):
    image = read_image(images_raw[i])
    mask = read_image(images_mask[i])

    # Add a batch dimension to the input data
    images.append(image)
    masks.append(mask)
  return images,masks

In [16]:
np.random.seed(42)
tf.random.set_seed(42)

In [17]:
""" Hyperparameters """
IMG_H = 320
IMG_W = 416
NUM_CLASSES = 12
input_shape = (IMG_H,IMG_W, 3)
batch_size = 32
lr = 1e-4
epochs = 100

path = '/content/sample_data/video_content'
opPath = '/content'

model_path = os.path.join(opPath,"model.h5")
csv_path = os.path.join(opPath,"data.csv")


rawImagesPath = os.path.join(opPath, 'rawImages')
maskImagesPath = os.path.join(opPath, 'maskImages')
labelsPath = os.path.join(opPath, 'labels')

imgTrainPath = os.path.join(opPath,'images','train')
imgCrossPath=os.path.join(opPath,'images','cross')
imgTestPath=os.path.join(opPath,'images','test')

labelTrainPath=os.path.join(opPath,'labels','train')
labelCrossPath=os.path.join(opPath,'labels','cross')
labelTestPath=os.path.join(opPath,'labels','test')


In [18]:
""" Loading the dataset """

create_dir(rawImagesPath)
create_dir(maskImagesPath)
# create_dir(labelsPath)

# create_dir(imgTrainPath)
# create_dir(imgCrossPath)
# create_dir(imgTestPath)

# create_dir(labelTrainPath)
# create_dir(labelCrossPath)
# create_dir(labelTestPath)

# move_data(path,rawImagesPath,maskImagesPath)

In [None]:
rawImages = sorted(os.listdir(rawImagesPath))
maskImages = sorted(os.listdir(maskImagesPath))
add_path(rawImages)
add_path(maskImages)

In [20]:
""" Splitting data """
train_x, temp_x, train_y,temp_y = train_test_split(rawImages,maskImages,test_size=0.3,random_state=42)
val_x,test_x ,val_y, test_y = train_test_split(temp_x,temp_y,test_size=0.5,random_state=42)


In [21]:
train_x,train_y = load_images(train_x,train_y)

In [22]:
train_x = np.array(train_x)
train_y = np.array(train_y)

In [None]:
labelEncoder = LabelEncoder()

# n,h,w,c = train_x.shape
train_x = train_x.reshape(-1,1)
train_x = labelEncoder.fit_transform(train_x)
train_x = train_x.reshape(n,h,w,c)

  y = column_or_1d(y, warn=True)


In [None]:
""" Process the color map """
color_class_mapping={(127, 127, 127): 0,
                    (140, 140, 210): 1,
                    (114, 114, 255): 2,
                    (156, 70, 231): 3,
                    (75, 183, 186): 4,
                    (0, 255, 170): 5,
                    (0, 85, 255): 6,
                    (0, 0, 255): 7,
                    (0, 255, 255): 8,
                    (184, 255, 169): 9,
                    (165, 160, 255): 10,
                    (128, 50, 0): 11,
                    (0, 74, 111): 12}

classNameMapping = {
    0: 'Black Background',
    1: 'Abdominal Wall',
    2: 'Liver',
    3: 'Gastrointestinal Tract',
    4: 'Fat',
    5: 'Grasper',
    6: 'Connective Tissue',
    7: 'Blood',
    8: 'Cystic Duct',
    9: 'L-hook Electrocautery',
    10: 'Gallbladder',
    11: 'Hepatic Vein',
    12: 'Liver Ligament'
}

In [None]:
""" Dataset Pipeline """

train_dataset = tf_dataset(train_x,train_y,batch = batch_size)
valid_dataset = tf_dataset(val_x,val_y,batch=batch_size)


In [None]:
load_images(rawImages,maskImages)

In [None]:
""" Model """
model = build_unet(input_shape,NUM_CLASSES)
model.compile(
    loss = "categorical_crossentropy",
    optimizer = Adam(lr),
)

In [None]:
""" Training """

callbacks = [
    ModelCheckpoint(model_path,verbose=1,save_best_only=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
    CSVLogger(csv_path,append=True),
    EarlyStopping(monitor='val_loss',patience=20,restore_best_weights=False)
]

In [None]:
model.fit(train_dataset,validation_data=valid_dataset,epochs=epochs,callbacks=callbacks)