In [2]:
import pandas as pd
import imageio
import glob
import matplotlib
import matplotlib.pyplot as plt
import os
import cv2
from copy import deepcopy
import json
import numpy as np
from skimage.transform import resize
import argparse
from keras import Input
from keras.models import Model
from keras.layers import Conv2D, MaxPool2D, Conv2DTranspose
from keras.layers.merge import concatenate
from keras.optimizers import Adam
from keras.layers.pooling import MaxPooling2D, GlobalMaxPool2D
from keras.layers import Input, BatchNormalization, Activation, Dense, Dropout

def normalization_image(image):
    """
    This function reads a numpy array of 
    images and return the contents as a 
    numpy array.
    
    Parameters
    ----------
    image : numpy array
        It has images as numpy array.
        
    Returns
    ----------
    images: numpy array
        Normalized the value of array
        between 0 to 1.
    """
    
    image= image.astype('float32')
    image= image - np.mean(image)
    image = image / np.std(image)
    image = image[...,np.newaxis]
    return image
    
def normalization_mask(mask):
    """
    This function reads a numpy array of 
    mask and return the normalized contents
    as a numpy array.
    
    Parameters
    ----------
    mask : numpy array
        It has mask as numpy array.

    Returns
    ----------
    mask: numpy array
        Normalized the value of array
        between 0 to 1.
    """
    mask=mask.astype('float32')
    mask = mask/255
    mask=mask[...,np.newaxis]
    return mask


def mask_preprocessing(mask_dir_path):
    """
    This function reads a path of the
    mask and return the gray scaled image 
    as a numpy array.
    
    Parameters
    ----------
    mask_dir_path : str
        Read the mask directory.

    Returns
    ----------
    gray_img: numpy array
        Return gray scale image as numpy array
    """
    mask_files = [f for f in glob.glob(mask_dir_path + "**/*color.png", recursive=True)]
    print("number of mask file :::",len(mask_files))
    mask_files.sort()
    gray_img=[]
    for i in mask_files:
        mask_arr=imageio.imread(i)
        gray_arr=cv2.cvtColor(mask_arr, cv2.COLOR_BGR2GRAY)
        gray_img.append(gray_arr)
        gray_img.append(resize(gray_arr,(512,512),preserve_range=True))
    gray_img=np.array(gray_img)
    return gray_img
    
def image_preprocessing(image_dir_path):
    """
    This function reads a path of the
    image and return the gray scaled 
    image as a numpy array.
    
    Parameters
    ----------
    image_dir_path : str
        Read the mask directory.

    Returns
    ----------
    image_array: numpy array
        Return gray scale image as numpy array
    """
    images_files = [f for f in glob.glob(image_dir_path + "**/*.png", recursive=True)]
    print("number of mask file :::",len(images_files))
    images_files.sort()
    image_array=[]
    for i in images_files:
        img=imageio.imread(i)
        img=cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        image_array.append(resize(img,(512,512),preserve_range=True))
    image_array=np.array(image_array)
    return image_array
    
def loading_json(json_dir_path,gray_img):
    """
    This function reads a path of the
    json file and return the black and 
    white mask as a numpy array.
    
    Parameters
    ----------
    json_dir_path : str
        Read the json directory.
    gray_img: numpy array
        gray scale array of mask
    
    Returns
    ----------
    mask_final_arr: numpy array
        Return  black and white mask as 
        numpy array.
    """
    json_file=[f for f in glob.glob(json_dir_path + "**/*.json", recursive=True)]
    print("number of json file :::",len(json_file))
    json_file.sort()
    json_array=[]
    for i in json_file:
        json_array.append(json.load(open(i)))
    new_mask_array=[]
    c=0
    for j in gray_img:
        for i in json_array[c]["objects"]:
            if(i['label']=='vegetation'):
                temp=cv2.fillPoly(j,[np.asarray(i["polygon"],dtype=np.int32)],255)
            else:
                temp1=cv2.fillPoly(j,[np.asarray(i["polygon"],dtype=np.int32)],0)
        temp[temp!=255]=0
        new_mask_array.append(temp)
        c=c+1
    new_mask_array=np.array(new_mask_array)
    mask_final_arr=[]
    for image in new_mask_array:
        mask_final_arr.append(resize(image,(512,512),preserve_range=True))
    mask_final_arr=np.array(mask_final_arr)
    return mask_final_arr
    
def unet():
    '''
    This model was based on the paper: https://arxiv.org/pdf/1707.06314.pdf
    
    Returns
    ----------
    model: Model object
        The created Model object, as per specifictions.
    '''
    inputs = Input((512, 512, 1))
    conv1 = Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    #b1 = BatchNormalization()(conv1)
    conv1 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv1)
    b1 = BatchNormalization()(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(b1)

    conv2 = Conv2D(128, (3, 3), activation='relu', padding='same')(pool1)
    #b2 = BatchNormalization()(conv2)
    conv2 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv2)
    b2 = BatchNormalization()(conv2)
    drop2 = Dropout(0.5)(b2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(drop2)

    conv3 = Conv2D(256, (3, 3), activation='relu', padding='same')(pool2)
    #b3 = BatchNormalization()(conv3)
    conv3 = Conv2D(256, (3, 3), activation='relu', padding='same')(conv3)
    b3 = BatchNormalization()(conv3)
    drop3 = Dropout(0.25)(b3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(drop3)

    conv4 = Conv2D(512, (3, 3), activation='relu', padding='same')(pool3)
    #b4= BatchNormalization()(conv4)
    conv4 = Conv2D(512, (3, 3), activation='relu', padding='same')(conv4)
    b4 = BatchNormalization()(conv4)
    drop4 = Dropout(0.5)(b4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    conv5 = Conv2D(1024, (3, 3), activation='relu', padding='same')(pool4)
    #b5 = BatchNormalization()(conv5)
    conv5 = Conv2D(1024, (3, 3), activation='relu', padding='same')(conv5)
    b5 = BatchNormalization()(conv5)
    drop5 = Dropout(0.5)(b5)

    up6 = concatenate([Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same')(drop5), drop4], axis=3)
    #b6 = BatchNormalization(momentum=0.5)(up6)
    conv6 = Conv2D(512, (3, 3), activation='relu', padding='same')(up6)
    #b6 = BatchNormalization()(conv6)
    conv6 = Conv2D(512, (3, 3), activation='relu', padding='same')(conv6)
    b6 = BatchNormalization()(conv6)
    drop6 = Dropout(0.5)(b6)

    up7 = concatenate([Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same')(drop6), drop3], axis=3)
    #b7 = BatchNormalization(momentum=0.5)(up7)
    conv7 = Conv2D(256, (3, 3), activation='relu', padding='same')(up7)
    #b7 = BatchNormalization()(conv7)
    conv7 = Conv2D(256, (3, 3), activation='relu', padding='same')(conv7)
    b7 = BatchNormalization()(conv7)
    drop7 = Dropout(0.5)(b7)

    up8 = concatenate([Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same')(drop7), drop2], axis=3)
    #b8 = BatchNormalization(momentum=0.5)(up8)
    conv8 = Conv2D(128, (3, 3), activation='relu', padding='same')(up8)
    #b8 = BatchNormalization()(conv8)
    conv8 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv8)
    b8 = BatchNormalization()(conv8)
    drop8 = Dropout(0.5)(b8)

    up9 = concatenate([Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same')(drop8), conv1], axis=3)
    #b9 = BatchNormalization(momentum=0.5)(up9)
    conv9 = Conv2D(64, (3, 3), activation='relu', padding='same')(up9)
    #b9 = BatchNormalization()(conv9)
    conv9 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv9)
    b9 = BatchNormalization()(conv9)

    conv10 = Conv2D(1, (1, 1), activation='sigmoid')(b9)

    model = Model(inputs=[inputs], outputs=[conv10])

    model.compile(optimizer=Adam(lr=1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    
    return model


def prediction(testing_image_arr):
    """
    This function predit the mask of the test
    image. Return the predicted mask array
    
    Parameters
    ----------
    testing_image_arr : numpy array
        test image numpy array.
        
    Returns
    ----------
    mask_pred : array
        predicted mask arry.
    """
    
    mask_pred=[]
    for image in testing_image_arr:
        image=image[np.newaxis,...]
        predicted = model.predict(image, verbose=1)
        mask_pred.append(predicted)
    return mask_pred
        
def calculating_iou(mask_real,temp_mask):
    """
    This function calculate the intersection
    over union.
    
    Parameters
    ----------
    mask_real : array
        ground truth array.
    temp_mask : array
        predicted mask array.
        
    Returns
    ----------
    mean_iou : decimal
        mean intersection over union.
    """
    arr_iou=[]
    for i in range(len(mask_real)):
        intersection = np.logical_and(mask_real[i],temp_mask[i])
        union = np.logical_or(mask_real[i], temp_mask[i])
        iou_score = np.sum(intersection) / np.sum(union)
        arr_iou.append(iou_score)
    arr_iou=np.array(arr_iou)
    arr_iou= np.nan_to_num(arr_iou, copy=True, nan=1.0)
    mean_iou=arr_iou.mean()
    return mean_iou
    

In [None]:
if __name__ == '__main__':
    
    parser = argparse.ArgumentParser(
        description=('Trains the model and outputs predictions.'),
        add_help='How to use', prog='model.py <args>')

    # Required arguments path of dataset folder "home/jupyter/dataset/"
    parser.add_argument("-d", "--data_path", required=True,
                        help=("Provide the path to the data folder"))

    args = vars(parser.parse_args())
    
    #passing the path of image and mask file to be trained and tested
    mask_dir_path = read_file(args['data_path'] + '/mask/train/')
    image_dir_path = read_file(args['data_path'] + '/image/train')
    test_image_dir_path = read_file(args['data_path'] + '/image/val/')
    ground_truth = read_file(args['data_path'] + '/mask/val/')
    
    #numpy array of the mask after normalization
    train_mask=mask_preprocessing(mask_dir_path)
    train_mask=loading_json(mask_dir_path,train_mask)
    train_mask=normalization_mask(train_mask)
    
    #numpy array of the image after normalization
    train_image=image_preprocessing(image_dir_path)
    train_image=normalization_image(train_image)
    
    #numpy array of the test mask after normalization
    test_mask=mask_preprocessing(ground_truth)
    test_mask=loading_json(ground_truth,train_mask)
    test_mask=normalization_mask(test_mask)
    
    #numpy array of the test image after normalization
    test_image=image_preprocessing(test_image_dir_path)
    test_image=normalization_image(test_image)
    
    # calling model
    model=unet()
    # fitting the model
    model.fit(train_image, train_mask, batch_size=1, epochs=5, verbose=1, shuffle=True)
    # predicted mask
    prediction_mask=prediction(test_image)
    prediction_mask=np.array(prediction_mask)
    #calculating intersection over union
    mean_iou=calculating_iou(ground_truth,prediction_mask)
    print("mean_iou:::",mean_iou)
    
    