In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [None]:
zip_path = '/content/gdrive/MyDrive/week 8/Human segmentation/data/human.zip'
!cp "{zip_path}" .
!unzip -q human.zip
!rm human.zip

In [None]:
import tensorflow as tf
import keras
from keras.layers import *
from keras.models import *
from keras import backend as K
from keras.losses import SparseCategoricalCrossentropy
from keras.callbacks import ModelCheckpoint, LearningRateScheduler, ReduceLROnPlateau
import cv2
import numpy as np
import os
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.optimizers import Adam
import random
import imutils

In [None]:
len(os.listdir("/content/human data/images"))

2667

In [None]:
train_path_image = r"/content/human data/images"
train_path_mask = r"/content/human data/masks"
train_file = open("train.txt", "w")
val_file = open("val.txt", "w")
test_file = open("test.txt", "w")

count = 0 
for image, mask in zip(os.listdir(train_path_image), os.listdir(train_path_mask)):
   count += 1
   image_path = os.path.join(train_path_image, image)
   mask_path = os.path.join(train_path_mask, mask)

   if count <= 1867:
      train_file.write(image_path + "," + mask_path + "\n")
   elif count <= 2267:
      val_file.write(image_path + "," + mask_path + "\n")
   else:
      test_file.write(image_path + "," + mask_path + "\n")
train_file.close()
val_file.close()
test_file.close()

In [None]:
class DataLoader():
    
    def __init__(self, train_path, val_path, test_path):
        self.train_path = train_path
        self.val_path = val_path
        self.test_path = test_path 
        
        self.train_doc = np.random.permutation(open("train.txt").readlines())
        self.val_doc = np.random.permutation(open("val.txt").readlines()) 
        self.test_doc = np.random.permutation(open("test.txt").readlines())
        
        self.train_num = len(self.train_doc)
        self.val_num = len(self.val_doc)
        self.test_num = len(self.test_doc)
        
    def image_property(self, image_height = 256, image_width = 256):
        self.height = image_height
        self.width = image_width
        self.channel = 3
    
    def init_train_property(self, batch_size = 32, epochs = 100):
        self.batch_size = batch_size
        self.epochs = epochs
        
    def generate_data(self, doc, augment = False):
        batch_size = self.batch_size
        
        while True:
            new = np.random.permutation(doc)
            # Số batch trong 1 epoch nó gen ra
            num_batch = len(new) // batch_size
            
            for batch in range(num_batch):
                batch_data, batch_labels = self.one_batch(new, batch, batch_size)
                yield batch_data, batch_labels
                
            if(augment):
                for batch in range(num_batch):
                    batch_data, batch_labels = self.one_batch(new, batch, batch_size, augment = augment)
                    yield batch_data, batch_labels

            remain_image = len(new) % batch_size
            
            if(remain_image > 0):
                batch_data, batch_labels = self.one_batch(new, num_batch, batch_size, remain_image)
                yield batch_data, batch_labels
            
            if(augment and (remain_image > 0)):
                batch_data, batch_labels = self.one_batch(new, num_batch, batch_size, remain_image, augment = augment)
                yield batch_data, batch_labels
                
    def one_batch(self, new, batch_number, batch_size, remain_image = 0, augment = False):
        batch_len = remain_image if remain_image else batch_size
        
        batch_data = np.zeros((batch_len, self.height, self.width, self.channel))
        batch_labels = np.zeros((batch_len, self.height, self.width, 1))
        
        for idx in range(batch_len):
            d = new[idx + batch_number*batch_size][:-1].split(",")
            # Image
            image = cv2.imread(d[0]).astype(np.float32)
            image_resized = cv2.resize(image, (self.height, self.width))
            if augment:
                image_resized = self._random_contrast(image_resized, 0.5, 1.5)
            batch_data[idx, :, :, 0] = image_resized[:, :, 0]/255.0
            batch_data[idx, :, :, 1] = image_resized[:, :, 1]/255.0
            batch_data[idx, :, :, 2] = image_resized[:, :, 2]/255.0

            # Mask
            mask = cv2.imread(d[1], 0).astype(np.float32)
            mask = np.where(mask > 0, 1, mask)
            mask = cv2.medianBlur(mask, 5)
            mask_resized = cv2.resize(mask, (self.height, self.width))
            batch_labels[idx, :, :, :] = mask_resized.reshape((self.height, self.width, 1))
            
        #if augment:
            #batch_data, batch_labels = self.augment(batch_data, batch_labels)
        #batch_data = batch_data/255.0

        return (batch_data, batch_labels)
    
    def augment(self, batch_data, batch_labels):
        #batch_data, batch_labels = self.random_constrast(batch_data, batch_labels, 0.5, 1.5)
        batch_data, batch_labels = self.random_brightness(batch_data, batch_labels)
        #batch_data, batch_labels = self.random_rotate(batch_data, batch_labels, -15, 15)
        batch_data, batch_labels = self.random_horizontal_flip(batch_data, batch_labels)
        batch_data, batch_labels = self.random_vertical_flip(batch_data, batch_labels)
        return (batch_data, batch_labels)
    
    def _random_contrast(self, image, lower, upper):
        contrast_factor = random.uniform(lower, upper)
        x = image[:,:,0]
        mean = np.mean(x)
        image[:,:,0] = np.clip((x - mean) * contrast_factor + mean, 0, 255).astype(np.uint8)
        x = image[:,:,1]
        mean = np.mean(x)
        image[:,:,1] = np.clip((x - mean) * contrast_factor + mean, 0, 255).astype(np.uint8)
        x = image[:,:,2]
        mean = np.mean(x)
        image[:,:,2] = np.clip((x - mean) * contrast_factor + mean, 0, 255).astype(np.uint8)
        return image

    def random_contrast(self, batch_data, batch_labels, lower, upper):
        for i in range(batch_data.shape[0]):
            batch_data[i] = self._random_contrast(batch_data, lower, upper)
        return batch_data, batch_labels

    def random_rotate(self, batch_data, batch_labels, lower=-10, upper=10):
        for i in range(batch_data.shape[0]):
            factor = random.randint(lower, upper)
            batch_data[i] = imutils.rotate(batch_data[i], factor)
            batch_labels[i] = imutils.rotate(batch_labels[i], factor)
        return batch_data, batch_labels

    def random_horizontal_flip(self, batch_data, batch_labels, p=0.5):
        a = random.randint(0, 1)
        if a <= p:
            batch_data = np.flip(batch_data, axis = 3)
            batch_labels = np.flip(batch_labels, axis = 3)
        return batch_data, batch_labels

    def random_vertical_flip(self, batch_data, batch_labels, p=0.5):
        a = random.randint(0, 1)
        if a <= p:
            batch_data = np.flip(batch_data, axis = 2)
            batch_labels = np.flip(batch_labels, axis = 2)
        return batch_data, batch_labels

    def random_brightness(self, input_img, batch_labels, brightness_range = [-100, 100]):
        brightness = np.random.ranf()*(brightness_range[1] - brightness_range[0]) + brightness_range[0]
        if brightness != 0:
            if brightness > 0:
                shadow = brightness
                highlight = 255
            else:
                shadow = 0
                highlight = 255 + brightness
            alpha_b = (highlight - shadow)/255
            gamma_b = shadow
            buf = cv2.addWeighted(input_img, alpha_b, input_img, 0, gamma_b)
        else:
            buf = input_img.copy()
        return buf, batch_labels


In [None]:
class DataLoader():
    
    def __init__(self, train_path, val_path):
        self.train_path = train_path
        self.val_path = val_path
        
        self.train_doc = np.random.permutation(open("train.txt").readlines())
        self.val_doc = np.random.permutation(open("val.txt").readlines()) 
        
        self.train_num = len(self.train_doc)
        self.val_num = len(self.val_doc)
        
    def image_property(self, image_height = 256, image_width = 256):
        self.height = image_height
        self.width = image_width
        self.channel = 3
    
    def init_train_property(self, batch_size = 32, epochs = 100):
        self.batch_size = batch_size
        self.epochs = epochs
        
    def generate_data(self, doc, augment = False):
        batch_size = self.batch_size
        
        while True:
            #new = np.random.permutation(doc)
            new = doc
            num_batch = len(new) // batch_size
            
            for batch in range(num_batch):
                batch_data, batch_labels = self.one_batch(new, batch, batch_size)
                yield batch_data, batch_labels
                
            if(augment):
                for batch in range(num_batch):
                    batch_data, batch_labels = self.one_batch(new, batch, batch_size, augment = augment)
                    yield batch_data, batch_labels

            remain_image = len(new) % batch_size
            
            if(remain_image):
                batch_data, batch_labels = self.one_batch(new, num_batch, batch_size, remain_image)
                yield batch_data, batch_labels
            
            if(augment and (remain_image)):
                batch_data, batch_labels = self.one_batch(new, num_batch, batch_size, remain_image, augment = augment)
                yield batch_data, batch_labels
                
    def one_batch(self, new, batch_number, batch_size, remain_image = 0, augment = False):
        batch_len = remain_image if remain_image else batch_size
        
        batch_data = np.zeros((batch_len, self.height, self.width, self.channel))
        batch_labels = np.zeros((batch_len, self.height, self.width, 1))
        
        for idx in range(batch_len):
            d = new[idx + batch_number*batch_size][:-1].split(",")
            # Image
            image = cv2.imread(d[0]).astype(np.float32)
            image_resized = cv2.resize(image, (self.height, self.width))
            #if augment:
                #image_resized = self.augment1(image_resized, 0.5, 1.5)
            batch_data[idx, :, :, 0] = image_resized[:, :, 0]/255.0*2-1
            batch_data[idx, :, :, 1] = image_resized[:, :, 1]/255.0*2-1
            batch_data[idx, :, :, 2] = image_resized[:, :, 2]/255.0*2-1

            # Mask
            mask = cv2.imread(d[1], 0).astype(np.float32)
            mask = np.where(mask > 0, 1, mask)
            mask = cv2.medianBlur(mask, 5)
            mask_resized = cv2.resize(mask, (self.height, self.width))
            batch_labels[idx, :, :, :] = mask_resized.reshape((self.height, self.width, 1))
            
        #if augment:
            #batch_data, batch_labels = self.augment1(batch_data, batch_labels)
        #batch_data = batch_data/255.0*2-1 
        return (batch_data, batch_labels)
    
    def augment(self, batch_data, batch_labels):
        batch_data = np.flip(batch_data, axis = 3)
        batch_labels = np.flip(batch_labels, axis = 3)
        return (batch_data, batch_labels)

    def random_brightness(self, input_img, brightness_range = [-30, 60]):
        brightness = np.random.ranf()*(brightness_range[1] - brightness_range[0]) + brightness_range[0]
        if brightness != 0:
            if brightness > 0:
                shadow = brightness
                highlight = 255
            else:
                shadow = 0
                highlight = 255 + brightness
            alpha_b = (highlight - shadow)/255
            gamma_b = shadow
            buf = cv2.addWeighted(input_img, alpha_b, input_img, 0, gamma_b)
        else:
            buf = input_img.copy()
        return buf

    def augment1(self, tensor, batch_labels, p = 0.6):
        a = random.random()
        if a < p:
            tensor = np.flip(tensor, axis=3)
            batch_labels = np.flip(batch_labels, axis = 3)
        tensor = self.random_contrast(tensor, 0.5, 1.5)
        tensor = self.random_brightness(tensor, [-100, 100])
        return (tensor, batch_labels)
    
    def random_contrast(self, image, lower, upper):
        contrast_factor = random.uniform(lower, upper)
        x = image[:,:,0]
        mean = np.mean(x)
        image[:,:,0] = np.clip((x - mean) * contrast_factor + mean, 0, 255).astype(np.uint8)
        x = image[:,:,1]
        mean = np.mean(x)
        image[:,:,1] = np.clip((x - mean) * contrast_factor + mean, 0, 255).astype(np.uint8)
        x = image[:,:,2]
        mean = np.mean(x)
        image[:,:,2] = np.clip((x - mean) * contrast_factor + mean, 0, 255).astype(np.uint8)
        return image

In [None]:
# Dice coefficient loss
def dice_coef(y_true, y_pred, smooth=1):
    intersection = K.sum(K.abs(y_true * y_pred), axis=-1)
    return (2.*intersection + smooth)/(K.sum(K.square(y_true),-1)+ K.sum(K.square(y_pred),-1) + smooth)

def dice_coef_loss(y_true, y_pred):
    return 1-dice_coef(y_true, y_pred)

In [None]:
def my_IoU(y_true, y_pred):
    y_pred = K.argmax(y_pred)
    y_pred = K.cast(y_pred, 'float32')
    y_pred = K.flatten(y_pred)
    y_true = K.flatten(y_true)
    intersection = K.sum(y_true * y_pred)
    IoU = intersection / (K.sum(y_true) + K.sum(y_pred) - intersection)
    return IoU

In [None]:
def conv_block(x, filter_out):

  x = Conv2D(filter_out, kernel_size=(3, 3), padding = 'same', kernel_initializer = 'he_normal')(x)
  x = BatchNormalization()(x)
  x = Activation("relu")(x)

  x = Conv2D(filter_out, kernel_size=(3, 3), padding = 'same', kernel_initializer = 'he_normal')(x)
  x = BatchNormalization()(x)
  x = Activation("relu")(x)

  return x

In [None]:
def block(x, filter_out):

  x = Conv2D(filter_out, kernel_size=(3, 3), activation = "relu", padding = 'same', kernel_initializer = 'he_normal')(x)

  x = Conv2D(filter_out, kernel_size=(3, 3), activation = "relu", padding = 'same', kernel_initializer = 'he_normal')(x)
  x = BatchNormalization()(x)

  return x

In [None]:
def UnetPlus(input_shape = (256, 256, 3)):

  inputs = Input(shape=input_shape)

  x00 = block(inputs, 16)
  p0 = MaxPooling2D((2, 2))(x00)

  x10 = block(p0, 32)
  p1 = MaxPooling2D((2, 2))(x10)

  x01 = UpSampling2D((2, 2))(x10)
  x01 = Concatenate()([x00, x01])
  x01 = conv_block(x01, 16)

  x20 = block(p1, 48)
  p2 = MaxPooling2D((2, 2))(x20)

  x11 = UpSampling2D((2, 2))(x20)
  x11 = Concatenate()([x10, x11])
  x11 = conv_block(x11, 32)

  x02 = UpSampling2D((2, 2))(x11)
  x02 = Concatenate()([x01, x02])
  x02 = conv_block(x02, 16)

  x30 = block(p2, 64)

  x21 = UpSampling2D((2, 2))(x30)
  x21 = Concatenate()([x21, x20])
  x21 = conv_block(x21, 48)

  x12 = UpSampling2D((2, 2))(x21)
  x12 = Concatenate()([x12, x11])
  x12 = conv_block(x12, 32)

  x03 = UpSampling2D((2, 2))(x12)
  x03 = Concatenate()([x02, x03])
  x03 = conv_block(x03, 16)
  outputs = Conv2D(2, (1, 1), padding="same", activation = "softmax")(x03)

  model = Model(inputs = inputs, outputs = outputs)
  model.summary()
  return model

In [None]:
def Unet(input_shape =(256, 256, 3)):

  inputs = Input(shape=input_shape)

  x0 = block(inputs, 16)
  x = MaxPooling2D((2, 2))(x0)

  x1 = block(x, 32)
  x = MaxPooling2D((2, 2))(x1)

  x2 = block(x, 48)
  x = MaxPooling2D((2, 2))(x2)

  x3 = block(x, 64)
  
  x = UpSampling2D((2, 2))(x3)
  x = Concatenate()([x2, x])
  x = conv_block(x, 48)

  x = UpSampling2D((2, 2))(x)
  x = Concatenate()([x1, x])
  x = conv_block(x, 32)

  x = UpSampling2D((2, 2))(x)
  x = Concatenate()([x0, x])
  x = conv_block(x, 16)

  outputs = Conv2D(2, (1, 1), padding="same", activation = "softmax")(x)
  model = Model(inputs = inputs, outputs = outputs)
  model.summary()
  return model


In [None]:
model = UnetPlus()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 256, 256, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 256, 256, 16  448         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 conv2d_1 (Conv2D)              (None, 256, 256, 16  2320        ['conv2d[0][0]']                 
                                )                                                             

In [None]:
train_dir = r"E:\train"
val_dir = r"E:\val"
data = DataLoader(train_dir, val_dir)
data.image_property()
data.init_train_property(batch_size =32, epochs = 300)
print(data.train_num, " ", data.val_num)
train_gen = data.generate_data(data.train_doc, augment = True)
val_gen = data.generate_data(data.val_doc, augment = False)

1867   400


In [None]:
model.compile(optimizer=Adam(learning_rate=0.001), loss=SparseCategoricalCrossentropy(), metrics=[my_IoU])

callbacks = [
    keras.callbacks.ModelCheckpoint(r"/content/gdrive/MyDrive/week 8/Human segmentation/best model/Unet+_{val_loss: .4f}_{val_my_IoU: .5f}.hdf5", save_best_only=True, save_weights_only= True, monitor = "val_my_IoU", mode = "max"),
    ReduceLROnPlateau(monitor='val_loss', patience=20, verbose=1, factor=0.1, min_lr=0.0000001)
]

In [None]:
#train_gen = data.generate_data(data.train_doc, augment = True)
#val_gen = data.generate_data(data.val_doc, augment = False)

steps_per_epoch = 0
validation_steps = 0
if ((2*data.train_num) % data.batch_size) == 0:
    steps_per_epoch = int((2*data.train_num) / data.batch_size)
else:
    steps_per_epoch = ((2*data.train_num) // data.batch_size) + 1

if ((data.val_num) % data.batch_size) == 0:
    validation_steps = int((data.val_num) / data.batch_size)
else:
    validation_steps = ((data.val_num) // data.batch_size) + 1
#print(steps_per_epoch, " ", validation_steps)

history=model.fit(train_gen, epochs=data.epochs, steps_per_epoch= steps_per_epoch, validation_steps= validation_steps, callbacks=callbacks,
                           validation_data=val_gen, batch_size = data.batch_size, initial_epoch=0)

Epoch 1/300
Epoch 2/300
Epoch 3/300
Epoch 4/300