<a href="https://colab.research.google.com/github/abhinaynatti/hello/blob/master/Segmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# !pip install --upgrade tensorflow-gpu
# from google.colab import drive; drive.mount('/content/drive');

In [0]:
from tensorflow.keras.layers import Conv2D, DepthwiseConv2D, BatchNormalization, Dropout, ReLU, UpSampling2D
import tensorflow as tf;   from tensorflow import keras;   from tensorflow.keras.models import Model
import numpy as np; import pandas as pd;  import os, time; from zipfile import ZipFile; import glob;
import matplotlib as mpl; import matplotlib.pyplot as plt; from pathlib import Path;
from os import listdir; import scipy.misc; from IPython.display import clear_output;

mpl.rcParams['axes.grid'] = False; mpl.rcParams['figure.figsize'] = (12, 12);
get_ipython().run_line_magic('matplotlib', 'inline')

In [0]:
is_gpu_available = tf.test.is_gpu_available(cuda_only = False, min_cuda_compute_capability = None)
print(tf.__version__); print(f"GPU is available: {is_gpu_available}")

2.0.0
GPU is available: True


### Inverted Residual Block

In [0]:
class Conv2D_BN(tf.keras.Model):

    def __init__(self, nb_out_filters, kernel_sz, stride_sz, padding = "same", dilation_rate = 1, momentum = 0.99, bool_apply_act = True, 
                 bool_apply_norm = True):

        super(Conv2D_BN, self).__init__(); self.use_bias = not bool_apply_norm;
        self.bool_apply_act = bool_apply_act; self.bool_apply_norm = bool_apply_norm; 
        
        self.conv_layer = Conv2D(filters = nb_out_filters, kernel_size = kernel_sz, strides = stride_sz, padding = padding, 
                                 dilation_rate = dilation_rate, use_bias = self.use_bias);
        self.batch_norm_layer = BatchNormalization(axis = -1, momentum = momentum)
        self.relu6_layer = ReLU(max_value = 6.0, threshold = 0.0)


    def call(self, input_tensor, training = True):

        x = self.conv_layer(input_tensor);
        if self.bool_apply_norm: x = self.batch_norm_layer(x, training = training);
        if self.bool_apply_act:  x = self.relu6_layer(x)

        return x

In [0]:
class InvertedResidual(tf.keras.Model):
    
    def __init__(self, nb_out_filters, nb_inp_filters, kernel_sz = 3, stride_sz = 1, dilation_rate = 1, exp_ratio = 6, width_multiplier = 1):
        
        super(InvertedResidual, self).__init__();
        self.exp_ratio = exp_ratio; self.width_multiplier = width_multiplier;
        
        self.res_connect = True if (stride_sz == 1 and nb_out_filters == nb_inp_filters) else False;
        nb_out_filters = nb_out_filters * self.width_multiplier; nb_bottleneck_filters = self.exp_ratio * nb_inp_filters;
        
        self.conv_pxp_1 = Conv2D_BN(nb_bottleneck_filters, 1, 1);
        self.conv_pxp_2 = Conv2D_BN(nb_out_filters, 1, 1, bool_apply_act = False)

        self.conv_dxd_1 = DepthwiseConv2D(kernel_size = kernel_sz, strides = stride_sz, padding = "same", depth_multiplier = 1, 
                                          dilation_rate = dilation_rate, use_bias = False)
        
        self.batch_norm_layer = BatchNormalization(axis = -1, momentum = 0.99)
        self.relu6_layer = ReLU(max_value = 6.0, threshold = 0.0)
    
    
    def call(self, input_tensor, training = True):
        
        x = self.conv_pxp_1(input_tensor, training = training);
        x = self.conv_dxd_1(x); x = self.batch_norm_layer(x, training = training); x = self.relu6_layer(x);
        x = self.conv_pxp_2(x, training = training);
        
        if self.res_connect: x = x + input_tensor;

        return x;

In [0]:
class InvertedResidual_Block(tf.keras.Model):
    
    def __init__(self, nb_repeats, nb_out_filters, nb_inp_filters, kernel_sz = 3, stride_sz = 1, dilation_rate = 1, exp_ratio = 6, 
                 width_multiplier = 1):
        
        super(InvertedResidual_Block, self).__init__(); self.nb_repeats = nb_repeats; self.model_layers = []; 
        
        inv_res_layer = InvertedResidual(nb_out_filters, nb_inp_filters, kernel_sz, stride_sz, dilation_rate, exp_ratio, width_multiplier);
        self.model_layers.append(inv_res_layer);
        
        for idx in range(1, self.nb_repeats):
            inv_res_layer = InvertedResidual(nb_out_filters, nb_out_filters, kernel_sz, 1, dilation_rate, exp_ratio, width_multiplier);
            self.model_layers.append(inv_res_layer);
        
    
    def call(self, x, training = True):
        
        for idx in range(self.nb_repeats): 
            layer = self.model_layers[idx]; x = layer(x, training = training)
        
        return x

### MobileNet_v2

In [0]:
class Dilated_MobileNet(tf.keras.Model):
    
    def __init__(self, out_stride = 8):
        
        super(Dilated_MobileNet, self).__init__(); self.out_stride = out_stride;
        
        self.conv_layer = Conv2D_BN(nb_out_filters = 32, kernel_sz = 3, stride_sz = 2)
        
        self.blk_1 = InvertedResidual_Block(nb_repeats = 1, nb_out_filters = 16, nb_inp_filters = 32, kernel_sz = 3,
                                            stride_sz = 1, dilation_rate = 1, exp_ratio = 1, width_multiplier = 1);
        
        self.blk_2 = InvertedResidual_Block(nb_repeats = 2, nb_out_filters = 24, nb_inp_filters = 16, kernel_sz = 3,
                                            stride_sz = 2, dilation_rate = 1, exp_ratio = 6, width_multiplier = 1);
        
        self.blk_3 = InvertedResidual_Block(nb_repeats = 3, nb_out_filters = 32, nb_inp_filters = 24, kernel_sz = 3,
                                            stride_sz = 2, dilation_rate = 1, exp_ratio = 6, width_multiplier = 1);
        
        self.blk_4 = InvertedResidual_Block(nb_repeats = 4, nb_out_filters = 64, nb_inp_filters = 32, kernel_sz = 3,
                                            stride_sz = 1, dilation_rate = 2, exp_ratio = 6, width_multiplier = 1);
        
        self.blk_5 = InvertedResidual_Block(nb_repeats = 3, nb_out_filters = 96, nb_inp_filters = 64, kernel_sz = 3,
                                            stride_sz = 1, dilation_rate = 2, exp_ratio = 6, width_multiplier = 1);
        
        self.blk_6 = InvertedResidual_Block(nb_repeats = 3, nb_out_filters = 160, nb_inp_filters = 96, kernel_sz = 3,
                                            stride_sz = 1, dilation_rate = 2, exp_ratio = 6, width_multiplier = 1);
        
        self.blk_7 = InvertedResidual_Block(nb_repeats = 1, nb_out_filters = 320, nb_inp_filters = 160, kernel_sz = 3,
                                            stride_sz = 1, dilation_rate = 2, exp_ratio = 6, width_multiplier = 1);
        
    
    def call(self, input_tensor, training = True):
        
        x = self.conv_layer(input_tensor, training = training);
        x = self.blk_1(x, training = training); x = self.blk_2(x, training = training);
        x = self.blk_3(x, training = training); x = self.blk_4(x, training = training);
        x = self.blk_5(x, training = training); x = self.blk_6(x, training = training);
        x = self.blk_7(x, training = training);
        
        return x

### DenseNet ASPP

In [0]:
class DenseASPP_block(tf.keras.Model):
    
    def __init__(self, nb_inp_filters, nb_bottleneck_fiters, nb_out_filters, kernel_sz = 3, dilation_rate = 1, drop_rate = 0.1, 
                 momentum = 0.9997, start_bn = True):
        
        super(DenseASPP_block, self).__init__(); self.start_bn = start_bn; self.drop_rate = drop_rate;
        if self.start_bn: self.bn_layer = BatchNormalization(axis = -1, momentum = momentum);
        
        self.relu6_layer = ReLU(max_value = 6.0, threshold = 0.0);
        self.conv_layer_1 = Conv2D_BN(nb_out_filters = nb_bottleneck_fiters, kernel_sz = 1, stride_sz = 1, dilation_rate = 1, 
                                      momentum = momentum);
        
        self.conv_layer_2 = Conv2D_BN(nb_out_filters = nb_out_filters, kernel_sz = kernel_sz, stride_sz = 1, dilation_rate = dilation_rate, 
                                      bool_apply_act = False, bool_apply_norm = False);
        if drop_rate: self.dropout_layer = Dropout(rate = drop_rate)
        
    
    def call(self, x, training = True):
        
        if self.start_bn: x = self.bn_layer(x, training = training); 
        
        x = self.relu6_layer(x); x = self.conv_layer_1(x, training = training); 
        x = self.conv_layer_2(x, training = training);
        
        if self.drop_rate: x = self.dropout_layer(x, training = training);
        
        return x

In [0]:
class DenseASPP(tf.keras.Model):
    
    def __init__(self, nb_inp_filters = 16, nb_bottleneck_filters = 128, nb_growth_rate = 64, drop_rate_conv = 0.1, 
                 drop_rate_fc = 0.1):
        
        super(DenseASPP, self).__init__(); self.drop_rate_fc = drop_rate_fc; self.base_model = Dilated_MobileNet();
        
        if drop_rate_fc: self.dropout_layer = Dropout(rate = drop_rate_fc);
        self.upsample_layer = UpSampling2D(size = (8, 8), interpolation = "bilinear");
        
        self.ASPP_blk_1 = DenseASPP_block(nb_inp_filters + 0 * nb_growth_rate, nb_bottleneck_filters, nb_growth_rate, 
                                          dilation_rate = 3,  drop_rate = drop_rate_conv, start_bn = False)
        self.ASPP_blk_2 = DenseASPP_block(nb_inp_filters + 1 * nb_growth_rate, nb_bottleneck_filters, nb_growth_rate, 
                                          dilation_rate = 6,  drop_rate = drop_rate_conv, start_bn = True)
        self.ASPP_blk_3 = DenseASPP_block(nb_inp_filters + 2 * nb_growth_rate, nb_bottleneck_filters, nb_growth_rate,
                                          dilation_rate = 12, drop_rate = drop_rate_conv, start_bn = True)
        self.ASPP_blk_4 = DenseASPP_block(nb_inp_filters + 3 * nb_growth_rate, nb_bottleneck_filters, nb_growth_rate,
                                          dilation_rate = 18, drop_rate = drop_rate_conv, start_bn = True)
        self.ASPP_blk_5 = DenseASPP_block(nb_inp_filters + 4 * nb_growth_rate, nb_bottleneck_filters, nb_growth_rate,
                                          dilation_rate = 24, drop_rate = drop_rate_conv, start_bn = True)
        
        self.conv_layer = Conv2D_BN(nb_out_filters = 1, kernel_sz = 1, stride_sz = 1, bool_apply_act = False, 
                                    bool_apply_norm = False)
        
        
    def call(self, input_tensor, training = True):
        
        features = self.base_model(input_tensor, training = training);
        
        x = self.ASPP_blk_1(features, training = training)
        features = tf.keras.layers.concatenate([x, features], axis = -1);
        
        x = self.ASPP_blk_2(features, training = training)
        features = tf.keras.layers.concatenate([x, features], axis = -1);
        
        x = self.ASPP_blk_3(features, training = training)
        features = tf.keras.layers.concatenate([x, features], axis = -1);
        
        x = self.ASPP_blk_4(features, training = training)
        features = tf.keras.layers.concatenate([x, features], axis = -1);
        
        x = self.ASPP_blk_5(features, training = training)
        features = tf.keras.layers.concatenate([x, features], axis = -1);
        
        if self.drop_rate_fc: features = self.dropout_layer(features, training = training);
        x = self.conv_layer(features, training = training); 
        
        x = self.upsample_layer(x);
        
        return x

### Training Pipeline

In [0]:
root_dir = "/content/drive/My Drive/Samsung_Ishaara/";  BATCH_SIZE = 128;  IMG_SIZE = 128;
training_data_dir = root_dir + "Training_Data/"; test_data_dir  = root_dir + "Test_Data/";
ckpt_dir = root_dir + "Cache/tf_ckpts/"; output_dir = root_dir + "Cache/Outputs/";

def extract(src_file_path = None, des_file_path = None):

    with ZipFile(src_file_path, 'r') as zip:
        print(f'Extracting the files located at: {src_file_path}');
        zip.extractall(path = des_file_path); print('Done!');

# Extracting Labels
# extract(training_data_dir + "Labels.zip", training_data_dir);

# # Extracting Images
# extract(training_data_dir + "Images/Set_1.zip", training_data_dir + "Images/");
# extract(training_data_dir + "Images/Set_2.zip", training_data_dir + "Images/");
# extract(training_data_dir + "Images/Set_3.zip", training_data_dir + "Images/");
# extract(training_data_dir + "Images/Set_4.zip", training_data_dir + "Images/");

In [0]:
def key_to_sort(x): return int(x[-12:-4])

def check(file_path = None):

    filenames = glob.glob(file_path + '*/*.jpg', recursive = True)
    print(f"Total files at the {file_path}: {len(filenames)}!!")

    return sorted(filenames, key = key_to_sort);

label_filenames = check(training_data_dir + "Labels/"); 
set1_image_filenames = check(training_data_dir + "Images/Set_1/"); set2_image_filenames = check(training_data_dir + "Images/Set_2/");
set3_image_filenames = check(training_data_dir + "Images/Set_3/"); set4_image_filenames = check(training_data_dir + "Images/Set_4/");

Total files at the /content/drive/My Drive/Samsung_Ishaara/Training_Data/Labels/: 32560!!
Total files at the /content/drive/My Drive/Samsung_Ishaara/Training_Data/Images/Set_1/: 32560!!
Total files at the /content/drive/My Drive/Samsung_Ishaara/Training_Data/Images/Set_2/: 32560!!
Total files at the /content/drive/My Drive/Samsung_Ishaara/Training_Data/Images/Set_3/: 32560!!
Total files at the /content/drive/My Drive/Samsung_Ishaara/Training_Data/Images/Set_4/: 32560!!


In [0]:
class Helper:
    
    def __init__(self): pass
    
    @staticmethod
    def display(display_list, itr, epoch, save_dir):
    
        plt.figure(figsize = (15, 15)); title = ['Input Image', 'True Mask', 'Predicted Mask']
        for i in range(len(display_list)):

            plt.subplot(1, len(display_list), i + 1); plt.title(title[i])
            plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
            plt.axis('off')
            
        plt.savefig(save_dir + str(itr) + "_" + str(epoch) + '_output.png', bbox_inches = 'tight');
        
    
    @staticmethod
    def create_mask(x):

        mask = 1/(1 + np.exp(-x)); mask = (mask > 0.5).astype(np.uint8);
        return mask;


    def show_predictions(self, model, image, mask, itr, epoch, save_dir, num = 1):

            rand_int = np.random.randint(low = 0, high = image.shape[-1], size = 1)[0];
            sample_img  = image[rand_int]; sample_mask = mask[rand_int];
            
            predictions = model(sample_img[tf.newaxis, ...]);
            predictions = self.create_mask(predictions);
            self.display([sample_img, sample_mask, predictions[0]], itr, epoch, save_dir)

In [0]:
class DataLoader:

    def __init__(self):
 
        self.train_img_dir = training_data_dir + "Images/"; self.test_img_dir = test_data_dir + "Images/";
        self.train_lab_dir = training_data_dir + "Labels/";


    @staticmethod
    def key_to_sort(x): return int(x[-12:-4]);
 
    def get_filenames(self, file_path): return list(sorted(glob.glob(file_path + '*/*.jpg', recursive = True), key = self.key_to_sort))


    def get_training_data_filenames(self):

        set1_img_filenames = tf.data.Dataset.from_tensor_slices(tf.constant(self.get_filenames(self.train_img_dir + "Set_1/")));
        set2_img_filenames = tf.data.Dataset.from_tensor_slices(tf.constant(self.get_filenames(self.train_img_dir + "Set_2/")));
        set3_img_filenames = tf.data.Dataset.from_tensor_slices(tf.constant(self.get_filenames(self.train_img_dir + "Set_3/")));
        set4_img_filenames = tf.data.Dataset.from_tensor_slices(tf.constant(self.get_filenames(self.train_img_dir + "Set_4/")));
        lab_filenames = tf.data.Dataset.from_tensor_slices(tf.constant(self.get_filenames(self.train_lab_dir)));

        return set1_img_filenames, set2_img_filenames, set3_img_filenames, set4_img_filenames, lab_filenames


    def get_test_data_filenames(self): return tf.data.Dataset.from_tensor_slices(tf.constant(self.get_filenames(self.test_img_dir)))


    @tf.function
    def training_data_parser(self, image_path, label_path):
        
        image = tf.image.decode_jpeg(tf.io.read_file(filename = image_path), channels = 3)
        image = tf.divide(tf.cast(image, dtype = tf.float32), 255.0) 
        image = tf.image.resize(image, size = (IMG_SIZE, IMG_SIZE));
        
        label = tf.image.decode_jpeg(tf.io.read_file(filename = label_path), channels = 1)
        label = tf.divide(tf.cast(label, dtype = tf.float32), 255.0)
        label = tf.image.resize(label, size = (IMG_SIZE, IMG_SIZE));
        
        if tf.random.uniform(()) > 0.5:
            image = tf.image.flip_left_right(image); label = tf.image.flip_left_right(label);
        
        return image, label
    

    @tf.function
    def test_data_parser(self, image_path):
        
        image = tf.image.decode_jpeg(tf.io.read_file(filename = image_path), channels = 3)
        image = tf.divide(tf.cast(image, dtype = tf.float32), 255.0)
        image = tf.image.resize(image, size = (IMG_SIZE, IMG_SIZE))
        
        return image


    def create_training_dataset(self):
        
        set1_img_files, set2_img_files, set3_img_files, set4_img_files, lab_files = self.get_training_data_filenames();
        
        set1_train_dataset = tf.data.Dataset.zip((set1_img_files, lab_files))
        set2_train_dataset = tf.data.Dataset.zip((set2_img_files, lab_files))
        set3_train_dataset = tf.data.Dataset.zip((set3_img_files, lab_files))
        set4_train_dataset = tf.data.Dataset.zip((set4_img_files, lab_files))

        train_dataset = set1_train_dataset; train_dataset = train_dataset.concatenate(set2_train_dataset); 
        train_dataset = train_dataset.concatenate(set3_train_dataset); train_dataset = train_dataset.concatenate(set4_train_dataset);
        
        train_dataset = train_dataset.shuffle(buffer_size = 50000).repeat(1);
        train_dataset = train_dataset.map(self.training_data_parser, num_parallel_calls = tf.data.experimental.AUTOTUNE)
        train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE);
        
        return train_dataset


    def create_test_dataset(self):
        
        test_dataset = self.get_test_data_filenames();
        test_dataset = test_dataset.map(self.test_data_parser, num_parallel_calls = tf.data.experimental.AUTOTUNE);
        test_dataset = test_dataset.batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE);
        
        return test_dataset

In [0]:
class Train:
    
    def __init__(self, nb_epochs, learning_rate = 1e-3, epsilon = 1e-7):
        
        self.nb_epochs = nb_epochs; self.epsilon = epsilon; self.learning_rate = learning_rate; self.model = DenseASPP();
        self.train_opt = self.get_optimizer(); dataloader = DataLoader(); self.train_dataset = dataloader.create_training_dataset();
        
        if not os.path.exists(output_dir): os.makedirs(output_dir);
        if not os.path.exists(ckpt_dir): os.makedirs(ckpt_dir);
        
        self.ckpt = tf.train.Checkpoint(step = tf.Variable(1), optimizer = self.train_opt, net = self.model)
        self.manager = tf.train.CheckpointManager(self.ckpt, ckpt_dir, max_to_keep = 10);
        self.ckpt.restore(self.manager.latest_checkpoint);
        
        if self.manager.latest_checkpoint: print("Restored from {}".format(self.manager.latest_checkpoint))
        else: print("Initializing from scratch.")
    
    
    @staticmethod
    def get_loss(y_true, y_pred, pos_weight = 1.0):
        return tf.nn.weighted_cross_entropy_with_logits(labels = y_true, logits = y_pred, pos_weight = pos_weight)
    
    
    def get_optimizer(self):
        return tf.keras.optimizers.Adam(learning_rate = self.learning_rate, epsilon = self.epsilon)
    
    
    @tf.function
    def train_one_step(self, x, y):
        
        with tf.GradientTape() as tape:
            y_hat = self.model(x); train_loss = self.get_loss(y, y_hat, 3.0);
            grads = tape.gradient(train_loss, self.model.trainable_variables);
        
        self.train_opt.apply_gradients(zip(grads, self.model.trainable_variables))
        self.ckpt.step.assign_add(1);
        
        return train_loss
        
    
    def __call__(self):
        
        train_loss_results = []; train_acc_results = []; helper = Helper(); glob_itr = 0;
        for epoch in range(18, self.nb_epochs + 1):
            
            # if epoch % 2 == 0: self.learning_rate /= 2;
            epoch_loss_avg = tf.keras.metrics.Mean(); epoch_accuracy = tf.keras.metrics.BinaryAccuracy();
            
            for x, y in self.train_dataset:
                
                train_loss = self.train_one_step(x, y); glob_itr += 1;
                epoch_loss_avg(train_loss); epoch_accuracy(y, self.model(x));
                
                if glob_itr % 100 == 0: 
                    self.learning_rate /= 2;
                    print("Iteration {:03d}: Loss: {:.3f}, Accuracy: {:.3%}".format(glob_itr, epoch_loss_avg.result(), epoch_accuracy.result()))
                    helper.show_predictions(self.model, x, y, glob_itr, epoch, output_dir);
            
            train_loss_results.append(epoch_loss_avg.result()); train_acc_results.append(epoch_accuracy.result());

            print("Saved checkpoint for epoch {}: {}".format(epoch, self.manager.save()))
            print("Epoch {:03d}: Loss: {:.3f}, Accuracy: {:.3%}".format(epoch, epoch_loss_avg.result(), epoch_accuracy.result()))
            
        return train_loss_results, train_acc_results, self.model

In [0]:
train = Train(nb_epochs = 25, learning_rate = 1e-3); train_loss, train_acc, model = train();

In [0]:
# weighted binary cross entropy loss
# tf.nn.weighted_cross_entropy_with_logits(labels, logits, pos_weight, name = None)
# generate mask instead of heat maps
# change in learning rate (callback)