<a href="https://colab.research.google.com/github/hector6298/Deep-Learning-Collab-notebooks/blob/master/segmentation_UNetResnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Modified U-net with ResNet50 as the encoder

This collab is made to train a segmentation model with a custom dataset, not from tensorflow APIs.
The dataset to be used is COCO along with its API for image filenames, classes and segmentation masks loading.

Thi line is just to mount your Google Drive. You can skip it if you want.

In [0]:
from google.colab import drive, files
drive.mount('/content/drive')

Remove useless data from Collab.

In [0]:
!rm -r /content/sample_data

Download and set raw COCO data

In [0]:
!wget http://images.cocodataset.org/zips/train2014.zip
!unzip train2014.zip
!rm train2014.zip

In [0]:
!wget http://images.cocodataset.org/zips/val2014.zip
!unzip val2014.zip
!rm val2014.zip

In [0]:
!wget http://images.cocodataset.org/annotations/annotations_trainval2014.zip
!unzip annotations_trainval2014.zip
!rm annotations_trainval2014.zip

Clone and install the API into the VM.

In [0]:
!git clone https://github.com/nightrome/cocostuffapi.git
!make -C cocostuffapi/PythonAPI/

Install tensorflow examples to use Pix2Pix for the upsamplers in the decoding part.

In [0]:
!pip install -q git+https://github.com/tensorflow/examples.git


##Python Code

In [0]:
%matplotlib inline
import os
import numpy as np
import skimage.io as io
import matplotlib.pyplot as plt
import pylab
import tensorflow as tf
import cv2
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from skimage.transform import resize
from tensorflow_examples.models.pix2pix import pix2pix
from cocostuffapi.PythonAPI.pycocotools.coco import COCO
from cocostuffapi.PythonAPI.pycocotools.cocostuffhelper import cocoSegmentationToSegmentationMap

pylab.rcParams['figure.figsize'] = (8.0, 10.0)

Method to retrieve all the image Ids

In [0]:
# get all images containing given categories, select one at random
def getIds4Categories(coco, categories):
  idlist = []
  classMap = dict()
  label = 1
  for category in categories:
    print(f"resolving for {category}...")
    catIds = coco.getCatIds(catNms=[category])
    classMap[catIds[0]] = label
    print(label)
    label += 1
    imgIds = coco.getImgIds(catIds=catIds )
    for id in imgIds:
      idlist.append(id)
  return idlist, classMap

Method to convert coco segmentation data into a mask for each image, given a dictionary mapping COCO classes ids to labels.

In [0]:
def getMask(coco, imgId, classMap, iscrowd=False, checkUniqueClass=False):
  shape = coco.imgs[imgId]
  shape = (shape['height'], shape['width'])
  labelMap = np.zeros(shape)

  annIds = coco.getAnnIds(imgIds=imgId, iscrowd=iscrowd)
  imgAnns = coco.loadAnns(annIds)

  for ann in imgAnns:
    labelMask = coco.annToMask(ann) == 1
    if ann['category_id'] in classMap:
      newLabel = classMap[ann['category_id']]
      labelMap[labelMask] = newLabel
  return labelMap

This is the class that will generate all our data for training. It is subclassed from keras.utils.Sequence A useful class to easily  train a Keras model using custom data.

In [0]:
class DataGenerator(keras.utils.Sequence):
  def __init__(self,
               coco,
               imgIds,
               datadir,
               classMap=None,
               is_training=True,
               batch_size=8,
               input_shape=(224,224),
               num_channels = 3,
               shuffle=True,
               augmentationParams=None,
               seed = 3
               ):
    self.coco = coco
    self.imgIds = imgIds
    self.batch_size = batch_size
    self.input_shape = input_shape
    self.num_channels = num_channels
    self.shuffle = shuffle
    self.datadir = datadir
    self.classMap = classMap
    self.imgObjs = self.coco.loadImgs(self.imgIds)
    self.is_training = is_training
    self.n = 0
    self.seed = seed
    if augmentationParams is not None:
      self.augmentationEngineIMG = ImageDataGenerator(**augmentationParams)
      self.augmentationEngineMASK = ImageDataGenerator(**augmentationParams)
      
  def __next__(self):
    batch_x, batch_y = self.__getitem__(self.n)
    self.n += 1
    if self.n >= self.__len__():
      self.on_epoch_end()
      self.n = 0
    return batch_x, batch_y
  
  def __len__(self):
    return int(np.floor(len(self.imgIds)/float(self.batch_size)))
  
  def reset_batch_index(self):
    self.n = 0

  def on_epoch_end(self):
    if self.shuffle == True:
      np.random.shuffle(self.imgObjs)

  def __getitem__(self, idx):
    sample = self.imgObjs[idx*self.batch_size:(idx+1)*self.batch_size]
    batch_x = np.empty((self.batch_size, self.input_shape[0], self.input_shape[1], self.num_channels), dtype=np.float32)
    batch_y = np.empty((self.batch_size, self.input_shape[0], self.input_shape[1], 1), dtype=np.float32)

    for i in range(self.batch_size):
      img = io.imread(f"{self.datadir}/{sample[i]['file_name']}")
      mask = getMask(self.coco, sample[i]['id'], self.classMap)
      mask = np.expand_dims(mask, axis=2)
      if(len(img.shape) < 3):
        img = np.expand_dims(img,axis=2)

      img = resize(img, self.input_shape)
      mask = resize(mask, self.input_shape)

      if self.is_training and hasattr(self,'augmentationEngineIMG'):
        img = self.augmentationEngineIMG.random_transform(img, seed=self.seed)
        mask = self.augmentationEngineMASK.random_transform(mask, seed=self.seed)
      
      batch_x[i] = img
      batch_y[i] = mask
 

    return batch_x, batch_y


#Main Code

##Data directories and hiperparameters.

In [0]:
dataType='train2014'
dataDirTrain='/content/train2014'
dataDirTest='/content/val2014'
annFileTrain='/content/annotations/instances_train2014.json'
annFileTest='/content/annotations/instances_val2014.json'


INPUT_SHAPE = [224,224,3]
EPOCHS = 10
VAL_SUBSPLITS = 5
BATCH_SIZE = 100
OUTPUT_CHANNELS = len(CLASSES2SEGMENT) + 1 
LEARNING_RATE = 0.001


**Please define the classes you want to train the model with** I wanted to segment vehicles, so I used ['bus','car','truck']
See what classes. Run the cell below to know which classes you can use.

In [0]:

cocoTrain=COCO(annFileTrain)
cocoTest=COCO(annFileTest)

cats = cocoTrain.loadCats(cocoTrain.getCatIds())
nms=[cat['name'] for cat in cats]
print('COCO categories: \n{}\n'.format(' '.join(nms)))


**DEFINE YOUR CLASSES HERE**

In [0]:
CLASSES2SEGMENT= ['bus', 'car', 'truck']

Load ids for the iamges on given categories and defining a dictionary that maps category ids to labels.
Instantiating DataGenerators; the one for the training will make use of data augmentation using the parameters given in this cell. Please change it to suit your needs.

In [0]:
idlistTrain, classMap = getIds4Categories(cocoTrain, CLASSES2SEGMENT)
idlistTest, classMap = getIds4Categories(cocoTest, CLASSES2SEGMENT)

#dictionary for image data augmentation
data_gen_args = dict(featurewise_center=False,
                     featurewise_std_normalization=False,
                     width_shift_range=0.1,
                     height_shift_range=0.1,
                     horizontal_flip=True)

trainImgGenerator = DataGenerator(cocoTrain, idlistTrain, dataDirTrain, classMap=classMap, batch_size=BATCH_SIZE, augmentationParams=data_gen_args)
testImgGenerator = DataGenerator(cocoTest, idlistTest, dataDirTest, classMap=classMap, batch_size=BATCH_SIZE, is_training=False)

Let's test the generator to show an image and its corresponding mask showing only desired classes.

In [0]:
total_batch = len(testImgGenerator)
batch_x, batch_y = next(testImgGenerator)
testImgGenerator.reset_batch_index()

img, mask = batch_x[0], batch_y[0]
fig, axis = plt.subplots(1,2)
axis[0].imshow(img)
axis[1].imshow(tf.keras.preprocessing.image.array_to_img(mask))
plt.show()



##Let's define the model

In [0]:
base_model = tf.keras.applications.resnet_v2.ResNet50V2(weights='imagenet', input_shape=INPUT_SHAPE, include_top=False)
layernames = ["conv1_conv",               #128 channels
              "conv2_block3_preact_relu", #256 channels
              "conv3_block4_preact_relu", #512 channels
              "conv4_block6_preact_relu", #1024 channels
              "post_relu"] #2048 channels   
layers = [base_model.get_layer(name).output for name in layernames]
downsample_stack = tf.keras.Model(inputs=base_model.input, outputs=layers)
downsample_stack.trainable = False

upsample_stack = [pix2pix.upsample(1024,3),
                  pix2pix.upsample(512,3),
                  pix2pix.upsample(256,3),
                  pix2pix.upsample(128,3),
                  pix2pix.upsample(64,3)]


In [0]:
def mod_unet_resnet(output_channels, input_shape=[224,224,3]):
  inputs = tf.keras.layers.Input(shape=input_shape)
  x = inputs
  #encoder
  skips = downsample_stack(x)
  #get last of the outputs for bottleneck
  x = skips[-1]
  skips = reversed(skips[:-1])

  #decoder
  it = 0
  for layer_enc, skip in zip(upsample_stack, skips):
    print(it)
    x = layer_enc(x)
    concat = tf.keras.layers.Concatenate()
    x = concat([x,skip])
    
    it += 1

  #last layer

  last = tf.keras.layers.Conv2DTranspose(
      output_channels, 3, strides=2, padding='same', name='outMask'
  )

  x = last(x)

  return tf.keras.Model(inputs=inputs, outputs=x)


Define Our IOU metric before compiling. Default MeanIoU class does not support results from multiple channels that serves as logits

In [0]:
class meanIoU(tf.keras.metrics.MeanIoU):
    def update_state(self, y_true, y_pred, sample_weight=None):
        y_pred = tf.argmax(y_pred, axis=-1)
        y_pred = y_pred[..., tf.newaxis]
        return super().update_state(y_true, y_pred, sample_weight=sample_weight)


Instantiate our model and plot the graph.

In [0]:
model = mod_unet_resnet(OUTPUT_CHANNELS)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
              loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=[meanIoU(4)])
tf.keras.utils.plot_model(model, show_shapes=True)


##get mask inference methods

In [0]:
def create_mask_from_inference(pred_mask):
  pred_mask = tf.argmax(pred_mask, axis=-1)
  pred_mask = pred_mask[..., tf.newaxis]
  return pred_mask[0]

In [0]:
def display(display_list):
  plt.figure(figsize=(15, 15))

  title = ['Input Image', 'True Mask', 'Predicted Mask']

  for i in range(len(display_list)):
    plt.subplot(1, len(display_list), i+1)
    plt.title(title[i])
    plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
    plt.axis('off')
  plt.show()


In [0]:
def show_predictions(generator=None):
  if generator is not None:
    batch_x, batch_y = next(generator)
    generator.reset_batch_index()
    pred_mask = model.predict(batch_x)
    for i in range(5):
      display([batch_x[i], batch_y[i], create_mask_from_inference(pred_mask[i][tf.newaxis, ...])])
  else:
    display([sample_image, sample_mask,
             create_mask_from_inference(model.predict(sample_image[tf.newaxis, ...]))])


##Show the network is working before training

In [0]:
trainImgGenerator.on_epoch_end()
batch_x, batch_y = next(trainImgGenerator)

sample_image, sample_mask = batch_x[0], batch_y[0]
show_predictions()

##Training

Do we have prior weights? You can skip this cell if you want.

In [0]:
if os.path.exists('segmentation_net_weights.h5'):
  model.load_weights('segmentation_net_weights.h5')


Train the model 

In [0]:
model_history = model.fit(x=trainImgGenerator, 
                epochs=EPOCHS,
                validation_data=testImgGenerator,
                verbose=1
                )

Display training metrics

In [0]:
def plot_metrics(model_history):

  loss = model_history.history['loss']
  val_loss = model_history.history['val_loss']

  iou = model_history.history['mean_io_u']
  val_iou = model_history.history['val_mean_io_u']
  epochs = range(EPOCHS)
  fig, axs = plt.subplots(2)

  axs[0].plot(epochs, loss, 'r', label='Training loss')
  axs[0].plot(epochs, val_loss, 'bo', label='Validation loss')
  axs[0].title('Training and Validation Loss')
  axs[0].xlabel('Epoch')
  axs[0].ylabel('Loss Value')
  axs[1].plot(epochs, iou, 'r', label='Training iou')
  axs[1].plot(epochs, val_iou, 'bo', label='Validation iou')
  axs[1].title('Training and Validation iou')
  axs[1].xlabel('Epoch')
  axs[1].ylabel('iou Value')
  plt.legend()
  plt.show()



In [0]:
plot_metrics(model_history)

##Save Model

Save weights after training and network architechture

In [0]:
json_config = model.to_json()
with open('model_config.json', 'w') as json_file:
    json_file.write(json_config)
model.save_weights('segmentation_net_weights.h5')
files.download('segmentation_net_weights.h5')
files.downlaod('model_config.json')