In [20]:
!pip install git+https://github.com/tensorflow/docs

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting git+https://github.com/tensorflow/docs
  Cloning https://github.com/tensorflow/docs to /tmp/pip-req-build-u4sprovf
  Running command git clone -q https://github.com/tensorflow/docs /tmp/pip-req-build-u4sprovf


In [24]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_docs as tfdocs
import tensorflow_docs.plots

from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import RMSprop

In [1]:
#Encoder and base of our network will be pretrained DenseNet121
#Decoder will be an upsample blocks of size 512,256,128,64 that will trained

In [26]:
#Load the Dataset
dataset, info = tfds.load('oxford_iiit_pet',with_info=True)

In [27]:
#Load image and mask and normalize ,additionally do augmentation during training
@tf.function
def load_image(dataset_element, train=True):
    input_image = tf.image.resize(dataset_element['image'], (256,256))
    input_mask = tf.image.resize(dataset_element['segmentation_mask'], (256,256))
    
    if train and np.random.uniform() > 0.5:
        input_image = tf.image.flip_left_right(input_image) #data augmentation during training
    
    input_image =  tf.cast(input_image, tf.float32) / 255.0 #Normalize image --> tf.cast converts the 
                                                            #image which is a tensor to float type
    input_mask = input_mask - 1 #Normalize mask
        
    return input_image,input_mask

In [28]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

#Autotuning gives better performance tf.data builds a performance 
#model of the input pipeline and runs an optimization algorithm to 
#find a good allocation of its CPU budget across all parameters 
#specified as AUTOTUNE. While the input pipeline is running, 
#tf.data tracks the time spent in each operation, so that these times 
#can be fed into the optimization algorithm.


In [101]:
#Model Definition 
from tensorflow.keras.applications import DenseNet121
class DenseUnet(object):
    def __init__(self, input_size=(256,256,3),output_channels=3):
        self.pretrained_model = DenseNet121(input_shape=input_size,
                                           include_top=False,
                                           weights='imagenet')
        
        #The encoding part of the network which will be frozen
        self.target_layers = [
            'conv1/relu', #1st encoder block going down to 2nd
            'conv2_block1_0_relu', #2nd encoder block going down to 3rd
            'conv3_block1_0_relu', #3rd encoder block going down to 4th
            'conv4_block1_0_relu', #4th encoder block going down to base
            'conv5_block8_1_relu' #base
        ]
        
        self.input_size = input_size
        self.output_channels = output_channels
        
        self.model = self._create_model()
        loss = SparseCategoricalCrossentropy(from_logits=True)
        self.model.compile(optimizer=RMSprop(),loss=loss,metrics=['accuracy'])

        #_create_model() creates the model as defined
        #self.target_layers will be used for transfer learning       
        #output_channels is 3 , because each pixel can be categorized into
        #one of three classes black white and grey 
        #Trainer->RMSprop Loss->SparseCategoricalCrossentropy
    
    #Define our decoder 1.e. upsample block
    @staticmethod
    def _upsample(filters, size, dropout=False):
       
        init = tf.random_normal_initializer(0.0, 0.03) #Kernel weight initializer
        #Upsample block will be made of transposed convolutions
        layers = Sequential()
        layers.add(Conv2DTranspose(filters=filters,
                              kernel_size=size,
                              strides=2,
                              padding='same',
                              kernel_initializer=init,
                              use_bias = False))
        layers.add(BatchNormalization())
        if dropout: layers.add(Dropout(rate=0.65))
        layers.add(ReLU())
    
        return layers
    
    #create the model
    def _create_model(self):
        layers = []
        for i in self.target_layers:
            layers.append(self.pretrained_model.get_layer(i).output)
            
        down_stack = Model(inputs=self.pretrained_model.input, outputs=layers)
        down_stack.trainable = False #don't train the encoder
        
        up_stack = []
        for filters in (512, 256, 128, 64): #upsample to blocks of these sizes
            up_block = self._upsample(filters, 4)
            up_stack.append(up_block)
            
        #Upsampling from block of size 512 going up 
        #to block 256 to 128 to finally 64
        
        inputs = Input(shape=self.input_size)
        x = inputs
        
        #Adding skip connection to enable flow of gradient
        skip_layers = down_stack(x) #the output of encoder blocks
        x = skip_layers[-1]         #taking the last output i.e. base output that will serve as the input to 1st upsampling layer  
        skip_layers = reversed(skip_layers[:-1]) #reverse the encoder ouput(-1) indicates base layer not taken since we will concatenate 
                                                 #the last encoder output with the first upsampled layer
         
        for up, skip_connection in zip(up_stack,skip_layers):
            x = up(x)                                  #upsample input to the defined upsample stack
            x = Concatenate()([x, skip_connection])    #concatenate the encoder output
        
        #output of upsampled+concatenated goes to a transposed convolution
        init = tf.random_normal_initializer(0.0, 0.03) #weight initialization for the kernel
        output = Conv2DTranspose(
            filters=self.output_channels,
            kernel_size=3,
            strides=2,
            padding='same',
            kernel_initializer=init)(x)
            
        return Model(inputs, outputs=output)
    
    #Plot training plots
    @staticmethod
    def _plot_model_history(model_history, metric, ylim=None):
        plt.style.use('seaborn-darkgrid')
        plotter = tfdocs.plots.HistoryPlotter()
        plotter.plot({'Model': model_history}, metric=metric)
        plt.title(f'{metric.upper()}')
        if ylim is None:
            plt.ylim([0, 1])
        else:
            plt.ylim(ylim)
        plt.savefig(f'{metric}.png')
        plt.close()
        
    def train(self, train_dataset, epochs, steps_per_epoch,
              validation_dataset, validation_steps):
        hist = self.model.fit(train_dataset,epochs=epochs,
                              steps_per_epoch=steps_per_epoch,
                              validation_steps=validation_steps,
                              validation_data=validation_dataset)
        
        self._plot_model_history(hist, 'loss', [0.,2.0])
        self._plot_model_history(hist, 'accuracy')
        

    #Create a mask from model prediction
    @staticmethod
    def _create_mask(prediction_mask):
        prediction_mask = tf.argmax(prediction_mask,axis=-1)
        prediction_mask = prediction_mask[...,tf.newaxis]
        return prediction_mask[15]

    #Open segmentation mask with opencv to later save and view  
    @staticmethod
    def _process_mask(mask):
        mask = (mask.numpy() * 127.5).astype('uint8')
        mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2RGB)
        
        return mask

    #Save and Visualization iamage and masks
    def _save_image_and_masks(self, image,ground_truth_mask,
                              prediction_mask, image_id):
        
        image = (image.numpy() * 255.0).astype('uint8')
        gt_mask = self._process_mask(ground_truth_mask)
        pred_mask = self._process_mask(prediction_mask)
        mosaic = np.hstack([image, gt_mask, pred_mask])
        mosaic = cv2.cvtColor(mosaic, cv2.COLOR_RGB2BGR)
        cv2.imwrite(f'segimg_{image_id}.jpg', mosaic)
 
    #save the predicted mask 
    def _save_predictions(self, dataset,sample_size=1):
        for id, (image, mask) in enumerate(dataset.take(sample_size),
                                           start=1):
            pred_mask = self.model.predict(image)
            pred_mask = self._create_mask(pred_mask)
            image = image[15]
            ground_truth_mask = mask[15]
            self._save_image_and_masks(image,ground_truth_mask,
                                       pred_mask,image_id=id)
            
    #Compute the accuracy
    def evaluate(self, test_dataset, sample_size=5):
        result = self.model.evaluate(test_dataset)
        print(f'Accuracy: {result[1] * 100:.2f}%')
        self._save_predictions(test_dataset, sample_size)

In [39]:
#Hperparameters
TRAIN_SIZE = info.splits['train'].num_examples
VALIDATION_SIZE = info.splits['test'].num_examples
BATCH_SIZE = 64
STEPS_PER_EPOCH = TRAIN_SIZE // BATCH_SIZE
VALIDATION_SUBSPLITS = 5
VALIDATION_STEPS = VALIDATION_SIZE // BATCH_SIZE
VALIDATION_STEPS //= VALIDATION_SUBSPLITS
BUFFER_SIZE = 1000

In [40]:
#Define the training and testing datasets
train_dataset = (dataset['train'].map(load_image, num_parallel_calls=AUTOTUNE)
                 .cache()
                 .shuffle(BUFFER_SIZE)
                 .batch(BATCH_SIZE)
                 .repeat()
                 .prefetch(buffer_size=AUTOTUNE))
test_dataset = (dataset['test']
                 .map(lambda d: load_image(d,
                 train=False),
                 num_parallel_calls=AUTOTUNE)
                 .batch(BATCH_SIZE))

In [103]:
#Run model 
unet = DenseUnet()
unet.train(train_dataset,epochs=15,steps_per_epoch=STEPS_PER_EPOCH,
           validation_steps=VALIDATION_STEPS,
           validation_dataset=test_dataset)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


In [104]:
#Model Evaluation
unet.evaluate(test_dataset)

Accuracy: 88.64%
