## **Modified-Unet**

The model in this code is inspired by the U-Net model proposed by Ronneberger et. al. with modifications to Depth, Regularization and Hyperparameters.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install h5py pyyaml



In [None]:
# Importing the necessary libraries and API's

import numpy as np 
from numpy import asarray
import tensorflow as tf 
from tensorflow.keras.optimizers import Adam,RMSprop,SGD
import os
import shutil 
import matplotlib.pyplot as plt 
import matplotlib.image as mpimg
from PIL import Image
import keras
from tensorflow.keras.models import Model, load_model
import tensorflow.keras.layers

In [None]:
# Creating variables for training and labels directory
data_directory = r'/content/drive/My Drive/weather_news'
training_dir = r'/content/drive/My Drive/weather_news/train_images'
label_dir = r'/content/drive/My Drive/weather_news/LabelboxAnnotations'

In [None]:
# converting directory images into a list
train_list = os.listdir(training_dir)
label_list = os.listdir(label_dir)

In [None]:
# checking the length of the training and label directory 
print(len(train_list))
print(len(label_list))

1576
1648


In [None]:
# We see that the label_list has some unnecessary segmentation labels not matching with the images in the training directory and hence wee need to remove them before training 

unwanted = []
train = []
label = []

for x in train_list:
    train.append(x[0:8])
    
for x in label_list:
    label.append(x[0:8])
    
for x in label:
    if x not in train:
        unwanted.append(x)

for x in unwanted:
    x = x + ".png"
    label_list.remove(x)

print(len(unwanted))
print(len(label_list))
print(len(train_list))

72
1576
1576


In [None]:
# Sanity check for the shape of the labels  

f = None
for i in range(1576):
    img = mpimg.imread(label_dir + "/" + label_list[i])
    if img.shape != (160,160,4):
        print("found")
        f = 1
if f is None:
    print("all good")

all good


In [None]:
# Sanity check for the shape of the training data

f = None
for i in range(1576):
    img = mpimg.imread(training_dir + "/" + train_list[i])
    if img.shape != (160,160,3):
        print("found")
        f = 1
        
if f is None:
    print("all good")

all good


In [None]:
# creating the numpy arrays for the images and the corresponding segmentation labels 

height , width = (160,160)
m = len(label_list)
X = np.zeros([m, height, width, 3])
Y_init = np.zeros([m, height, width, 4])

for i in range(m):
    X[i] = mpimg.imread(training_dir + "/" + train_list[i])
    Y_init[i] = mpimg.imread(label_dir + "/" + label_list[i])

In [None]:
# color_list is actually a dictionary that contains the pixel valus across the channels for the four segmentation colors. While carrying out training for the segmentation labels we need the labels 
# to be 1 or 0 across the channels .In order to do so we need to convert the color pixels into binary values before training and convert the predicted maps into corresponding color pixels after prediction.
# Hence we need the color_list dictionary to carry out such conversions.

color_list = {}

color_list['blue'] = (0.0117647061124444, 0.6078431606292725, 0.8980392217636108, 1.0)
color_list['yellow'] = (0.8941176533699036, 0.7686274647712708, 0.2549019753932953, 1.0)
color_list['green'] = (0.3803921639919281, 0.3803921639919281, 0.3803921639919281, 1.0)
color_list['purple'] = (0.5568627715110779, 0.1411764770746231, 0.6666666865348816, 1.0)
color_list['white'] = (1.0, 1.0, 1.0, 1.0)
color_list

{'blue': (0.0117647061124444, 0.6078431606292725, 0.8980392217636108, 1.0),
 'green': (0.3803921639919281, 0.3803921639919281, 0.3803921639919281, 1.0),
 'purple': (0.5568627715110779, 0.1411764770746231, 0.6666666865348816, 1.0),
 'white': (1.0, 1.0, 1.0, 1.0),
 'yellow': (0.8941176533699036, 0.7686274647712708, 0.2549019753932953, 1.0)}

In [None]:
# label_preprocess function converts the color segmentaion maaps into binary along the channels acros the height and width of the segmentation labels.
# If the segentation labels of your data are already in binary then you dont need to use this function.

def label_preprocess(Y_init,num_images,height,width,channels,color_list):
      
    Y = np.zeros([num_images,height,width,channels])

    for n in range(num_images):
        for i in range(height):
            for j in range(width):
                t = tuple(Y_init[n,i,j,:])
        
                if t == color_list['blue']:
                    Y[n,i,j,:] = np.array((1,0,0,0))
                elif t == color_list['yellow']:
                    Y[n,i,j,:] = np.array((0,1,0,0))
                elif t == color_list['green']:
                    Y[n,i,j,:] = np.array((0,0,1,0))
                elif t == color_list['white']:
                    Y[n,i,j,:] = np.array((1,1,1,1))
                elif t == color_list['purple']:
                    Y[n,i,j,:] = np.array((0,0,0,1))
                    
    return Y  

In [None]:
# This function is used to predict the segmentation output for images 

def predictions(trained_model,test_dir,num_images,height,width,channels,color_list):
    
    y = np.zeros([num_images,height,width,channels])
    x = np.zeros([num_images,height,width,channels])
    
    for i in range(len(test_list)):
        x[i] = mpimg.imread(test_dir + '/' + test_list[i])
        y[i] = trained_model.predict(np.expand_dims(x[i], axis = 0))
    
    Y_final = prediction_postprocess(y,num_images,height,width,channels,color_list)
    
    for i in range(Y_final.shape[0]):
        plt.imshow(Y[i])
        plt.show()

In [None]:
# Before giving the final predicted map the image is processed to yield the segmentation in the four segmentation colors

def prediction_postprocess(trained_model,test_directory,num_images,height,width,channel,color_list):
    
    Y_pred = np.zeros([num_images,height,width,channel])
    Y_init_pred = np.zeros([num_images,height,width,channel])
    test_list = os.listdir(test_directory)
    axis_image = plt.figure(figsize = (100,100))
    t = 0

    
    for n in range(num_images):

      image = mpimg.imread(test_directory + '/' + test_list[n])
      Y_init_pred[n,:,:,:] = trained_model.predict(np.expand_dims(image, axis = 0))

      for i in range(height):
        for j in range(width):

          ch = np.argmax(Y_init_pred[n,i,j,:])

          if ch == 0:
            Y_pred[n,i,j,:] = np.array(color_list['blue'])
          elif ch == 1:
            Y_pred[n,i,j,:] = np.array(color_list['yellow'])
          elif ch == 2:
            Y_pred[n,i,j,:] = np.array(color_list['green'])
          elif ch == 3:
            Y_pred[n,i,j,:] = np.array(color_list['purple'])

      img = axis_image.add_subplot(num_images, 2,t+1, xticks = [], yticks = [])
      plt.imshow(np.squeeze(image))
      plt.savefig('/content/drive/My Drive/weather_news/model_unet_processed/Test_Predictions/Test_Image_Predictions_' + str(t+1))
      plt.show()

      img = axis_image.add_subplot(num_images, 2,t+2, xticks = [], yticks = [])
      plt.imshow(np.squeeze(Y_pred[n]))
      plt.savefig('/content/drive/My Drive/weather_news/model_unet_processed/Test_Predictions/Test_Image_Predictions_' + str(t+2))
      plt.show()

      t = t+2
                    
    return Y_pred   

In [None]:
# Function for a single resolution convolution operations

def conv_layer(input_layer, conv_channels, kernel_size = (3,3), pool_stride = (2,2), dropout_rate = 0.2, padding = 'same', activation = 'relu'):        
    
    layer_1 = tf.keras.layers.Conv2D(conv_channels, kernel_size, activation = activation, padding = padding, kernel_initializer = 'he_normal')(input_layer)
    layer_2 = tf.keras.layers.BatchNormalization()(layer_1)
    layer_3 = tf.keras.layers.Dropout(dropout_rate)(layer_2)
    layer_4 = tf.keras.layers.Conv2D(conv_channels, kernel_size, activation = activation, padding = padding, kernel_initializer = 'he_normal')(layer_3)
    layer_5 = tf.keras.layers.BatchNormalization()(layer_4)
    layer_6 = tf.keras.layers.MaxPool2D(pool_stride)(layer_5)
    
    return layer_1,layer_6

In [None]:
# Function for a single resolution convolution operation at the terminal position

def terminal_conv_layer(input_layer, conv_channels, kernel_size = (3,3), dropout_rate = 0.2, padding = 'same', activation = 'relu'):
    
    layer_1 = tf.keras.layers.Conv2D(conv_channels, kernel_size, activation = activation, padding = padding, kernel_initializer = 'he_normal')(input_layer)
    layer_2 = tf.keras.layers.BatchNormalization()(layer_1)
    layer_3 = tf.keras.layers.Dropout(dropout_rate)(layer_2)
    layer_4 = tf.keras.layers.Conv2D(conv_channels, kernel_size, activation = activation, padding = padding, kernel_initializer = 'he_normal')(layer_3)
    layer_5 = tf.keras.layers.BatchNormalization()(layer_4)
    
    return layer_5

In [None]:
# Function for a single resolution transpose convolution operation

def transpose_conv_layer(input_layer, skip_layer, conv_channels, kernel_size = (3,3), transpose_kernel_size = (2,2), dropout_rate = 0.2, padding = 'same', activation = 'relu', transpose_strides = (2,2)):
    
    layer_1 = tf.keras.layers.Conv2DTranspose(conv_channels, transpose_kernel_size, strides = transpose_strides, padding = padding )(input_layer)
    layer_2 = tf.keras.layers.concatenate([layer_1, skip_layer], axis = 3)
    
    layer_3 = tf.keras.layers.Conv2D(conv_channels, kernel_size, activation = activation, padding = padding, kernel_initializer = 'he_normal')(layer_2)
    layer_4 = tf.keras.layers.BatchNormalization()(layer_3)
    layer_5 = tf.keras.layers.Dropout(dropout_rate)(layer_4)
    layer_6 = tf.keras.layers.Conv2D(conv_channels, kernel_size, activation = activation, padding = padding, kernel_initializer = 'he_normal')(layer_5)
    layer_7 = tf.keras.layers.BatchNormalization()(layer_6)    
    
    return layer_7

In [None]:
# The final U-Net model of this program

def modified_unet_model(height, width, image_channels):
    
    inputs = tf.keras.layers.Input((height, width, image_channels))
    normalized_inputs = tf.keras.layers.Lambda(lambda x: x/255)(inputs)

    S1,D1 = conv_layer(normalized_inputs, 16)
    S2,D2 = conv_layer(D1, 32)
    S3,D3 = conv_layer(D2, 64)
    S4,D4 = conv_layer(D3, 128)
    S5,D5 = conv_layer(D4, 256)

    T1 = terminal_conv_layer(D5, 512)

    U1 = transpose_conv_layer(T1, S5, 256)
    U2 = transpose_conv_layer(U1, S4, 128)
    U3 = transpose_conv_layer(U2, S3, 64)
    U4 = transpose_conv_layer(U3, S2, 32)
    U5 = transpose_conv_layer(U4, S1, 16)

    outputs = tf.keras.layers.Conv2D(4,(1,1), activation = 'sigmoid')(U5)

    model = Model(inputs = [inputs], outputs = [outputs])
    
    model.summary()
    
    return model

In [None]:
# This function creates an instance of the model and compiles it with the entered optimizers, loss criterion and metrics and prints the summary of the model

def model_construction(X,Y,optimizer,loss,metrics):
    
    h = X.shape[1]
    w = X.shape[2]
    c = X.shape[3]
    
    unet_model = modified_unet_model(h,w,c)
    
    unet_model.compile(optimizer = optimizer, loss = loss, metrics = [metrics])
    unet_model.summary()
    
    return unet_model

In [None]:
# Calculation of intersection over union of predicted segmentations

def IoU(Y_pred , Y, smooth = 1):
    
    I_Area = tf.keras.backend.sum(tf.keras.backend.abs(np.multiply(Y_pred,Y)), axis = [1,2,3])
    U_Area = tf.keras.backend.sum(Y_pred, axis = [1,2,3]) + tf.keras.backend.sum(Y, axis = [1,2,3]) - I_Area
    IoU_value = I_Area/U_Area
    
    m = Y.shape[0]
    for i in range(m):   
        print("IoU of Image " + str(i+1) + " = " + str(IoU_value[i]))
        
    mean_IoU = tf.keras.backend.mean((I_Area + smooth)/(U_Area + smooth), axis = 0) 
    
    return mean_IoU

In [None]:
# processing the training image labels before training
Y = label_preprocess(Y_init,m,height,width,4,color_list)

In [None]:
# loading the earlier trained model

trained_model = model_construction(X,Y,'sgd','binary_crossentropy','acc')
checkpoint_filepath = '/content/drive/My Drive/weather_news/model_unet_processed/checkpoint'
checkpoint_directory = os.path.dirname(checkpoint_filepath)
latest = tf.train.latest_checkpoint(checkpoint_directory)
trained_model.load_weights(latest)

Model: "functional_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 160, 160, 3) 0                                            
__________________________________________________________________________________________________
lambda (Lambda)                 (None, 160, 160, 3)  0           input_1[0][0]                    
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 160, 160, 16) 448         lambda[0][0]                     
__________________________________________________________________________________________________
batch_normalization (BatchNorma (None, 160, 160, 16) 64          conv2d[0][0]                     
_______________________________________________________________________________________

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f5b28319320>

In [40]:
# predictions over test images

testing_directory = r'/content/drive/My Drive/weather_news/test_images'
prediction_postprocess(trained_model,testing_directory,len(os.listdir(testing_directory)),160,160,4,color_list)

Output hidden; open in https://colab.research.google.com to view.

In [None]:
# Predictions for calculating the mean IoU

Y_pred_init = trained_model.predict(X)
Y_prediction = np.zeros([X.shape[0],height,width,4]) 

for n in range(X.shape[0]):
  for i in range(height):
    for j in range(width):

      ch = np.argmax(Y_pred_init[n,i,j,:])

      if ch == 0:
        Y_prediction[n,i,j,:] = np.array((1,0,0,0))
      elif ch == 1:
        Y_prediction[n,i,j,:] = np.array((0,1,0,0))
      elif ch == 2:
        Y_prediction[n,i,j,:] = np.array((0,0,1,0))
      elif ch == 3:
        Y_prediction[n,i,j,:] = np.array((0,0,0,1))


In [None]:
# mean IoU calculations

mean_iou = IoU(Y_prediction,Y)
print("\n\n Mean intersection over Union is " + str(mean_iou*100) )

IoU of Image 1 = tf.Tensor(0.860127157129882, shape=(), dtype=float64)
IoU of Image 2 = tf.Tensor(0.6139200605220023, shape=(), dtype=float64)
IoU of Image 3 = tf.Tensor(0.7939144392490753, shape=(), dtype=float64)
IoU of Image 4 = tf.Tensor(0.8156672222419235, shape=(), dtype=float64)
IoU of Image 5 = tf.Tensor(0.7817371937639198, shape=(), dtype=float64)
IoU of Image 6 = tf.Tensor(0.4245737016328688, shape=(), dtype=float64)
IoU of Image 7 = tf.Tensor(0.9013542514615556, shape=(), dtype=float64)
IoU of Image 8 = tf.Tensor(0.6805619378979847, shape=(), dtype=float64)
IoU of Image 9 = tf.Tensor(0.48634133596539614, shape=(), dtype=float64)
IoU of Image 10 = tf.Tensor(0.8137376456835169, shape=(), dtype=float64)
IoU of Image 11 = tf.Tensor(0.8951030832438834, shape=(), dtype=float64)
IoU of Image 12 = tf.Tensor(0.8637692668664043, shape=(), dtype=float64)
IoU of Image 13 = tf.Tensor(0.9460281261877613, shape=(), dtype=float64)
IoU of Image 14 = tf.Tensor(0.8817614678899083, shape=(), dt

## **Further Training**

If wished one can further train the model here by changing the file path to the place where the latest parameters are saved and further save the whole model in the desired directory by changing the path in the "trained_model.save" line.

In [None]:
checkpoint_filepath = '/content/drive/My Drive/weather_news/model_unet_processed/checkpoint'

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_filepath,
                                                               save_weights_only = True,
                                                               verbose = 1,
                                                               save_freq = 10
                                                               )

checkpoint_directory = os.path.dirname(checkpoint_filepath)

trained_model_ = model_construction(X,Y,'sgd','binary_crossentropy','acc')
latest = tf.train.latest_checkpoint(checkpoint_directory)
trained_model.load_weights(latest)
history = trained_model.fit(X,Y,epochs = 2000, validation_split = 0.01, callbacks = [model_checkpoint_callback], verbose = 2)
trained_model.save('/content/drive/My Drive/weather_news/model_unet_processed')