In [4]:
import os
import pandas as pd
import numpy as np
import imageio
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from keras import layers
from Constants import *
import outputs
%matplotlib inline

In [None]:
num_samples = 2
image = imageio.imread(image_list[num_samples])
mask = imageio.imread(mask_list[num_samples])

figure, arrar = plt.subplots(1, 2, figsize = (10, 8))
plt.axis("off")
array[0].imshow(image)
array[0].set_title("Image")
array[1].imshow(mask[:, :, 0])
array[1].set_title("Segmented image")

In [None]:
image_dataset = tf.data.Dataset.list_files(image_list, shuffle = False)
mask_dataset = tf.data.Dataset.list_files(mask_list, shuffle = False)

In [None]:
image_files = tf.constant(image_list)
mask_files = tf.constant(mask_list)

dataset = tf.data.Dataset.from_tensor_slices((image_files, mask_files))

In [6]:
def preprocessing_path(image_path, mask_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels = 3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    
    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels = 3)
    mask = tf.math.reduce_max(mask, axis =- 1, keepdims = True)
    
    return image, mask


def image_preprocessing(image, mask):
    image = tf.image.resize(image, (96, 128), method = 'nearest')
    mask = tf.image.resize(mask, (96, 128), method = 'nearest')
    image = image / 255.
    
    return image, mask

image_dataset = dataset.map(preprocessing_path)
processed_images = image_dataset(image_preprocessing)

In [None]:
# Unet Model

def conv_block(inputs = None, NUM_FILTERS, dropout = 0, max_pooling = True):
    
    conv = layers.Conv2D(NUM_FILTERS, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(inputs)
    conv = layers.Conv2D(NUM_FILTERS, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv)
    
    if dropout_prob > 0:
        conv = layers.Dropout(dropout_prob)(conv)
        
    if max_pooling2D:
        next_layer = layers.MaxPooling2D(pool_size = (2, 2))(conv)
    else:
        next_layer = conv
        
    skip_connection = conv
    
    return next_layer, skip_connection

In [None]:
INPUT_SHAPE = (96, 128, 3)
inputs = layers.Input(INPUT_SHAPE)

conv_block1 = conv_block(inputs, NUM_FILTERS * 1)
model_1 = keras.Model(inputs = inputs, outputs = conv_block1)

output_1 = [['InputLayer', [(None, 96, 128, 3)], 0],
           ['Conv2D', (None, 96, 128, 32), 896, 'same', 'relu', 'HeNormal'],
           ['Conv2D', (None, 96, 128, 32), 9248, 'same', 'relu', 'HeNormal'],
           ['MaxPooling2D', (None, 48, 64, 32), 0, (2, 2)]]

output_2 = [['InputLayer', [(None, 96, 128, 3)], 0],
           ['Conv2D', (None, 96, 128, 1024), 28672, 'same', 'relu', 'HeNormal'],
           ['Conv2D', (None, 96, 128, 1024), 9438208, 'same', 'relu', 'HeNormal'],
           ['Dropout', (None, 96, 128, 1024), 0, 0.1],
           ['MaxPooling2D', (None, 48, 64, 1024), 0, (2, 2)]]

In [None]:
def upsampling_block(expansive_input, contractive_input, NUM_FILTERS):
    
    upsample = layers.Conv2DTranspose(NUM_FILTERS, 3, strides = 2, padding = 'same')(expansive_input)
    merge = layers.Concatenate([upsample, contractive_input], axis = 3)
    
    conv = layers.Conv2D(NUM_FILTERS, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge)
    conv = layers.Conv2D(NUM_FILTERS, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv)
    
    return conv

In [None]:
INPUT_SHAPE_1 = (12, 16, 256)
INPUT_SHAPE_2 = (24, 32, 128)

expansive_inputs = layers.Input(INPUT_SHAPE_1)
contractive_inputs = layers.Input(INPUT_SHAPE_2)

conv_block1 = upsampling_block(expansive_inputs, contractive_inputs, NUM_FILTERS * 1)
model_1 = keras.Model(inputs = [expansive_inputs, contractive_inputs], outputs = conv_block1)

output_1 = [['InputLayer', [(None, 12, 16, 256)], 0],
           ['Conv2DTranspose', (None, 24, 32, 32), 73760],
           ['InputLayer', (None, 24, 32, 128), 0],
           ['Concatenate', (None, 24, 32, 160), 0],
           ['Conv2D', (None, 24, 32, 32), 46112, 'same', 'relu', 'HeNormal'],
           ['Conv2D', (None, 24, 32, 32), 9248, 'same', 'relu', 'HeNormal']]

In [None]:
def Unet_model(input_size = (96, 128, 3), NUM_FILTERS, NUM_CLASSES = 23):
    
    # Encoding block
    inputs = layers.Input(input_size)
    conv_block1 = conv_block(inputs, NUM_FILTERS)
    conv_block2 = conv_block(conv_block1[0], NUM_FILTERS * 2)
    conv_block3 = conv_block(conv_block2[0], NUM_FILTERS * 4)
    conv_block4 = conv_block(conv_block3[0], NUM_FILTERS * 8, dropout_prob = 0.3)
    conv_block5 = conv_block(conv_block4[0], NUM_FILTERS * 16, dropout_prob = 0.3, max_pooling = False)
    
    # Decoding block
    upsampling_block1 = upsampling_block(conv_block5[0], conv_block4[1], NUM_FILTERS * 8)
    upsampling_block2 = upsampling_block(upsampling_block1, conv_block3[1], NUM_FILTERS * 4)
    upsampling_block3 = upsampling_block(upsampling_block2, conv_block2[1], NUM_FILTERS * 2)
    upsampling_block4 = upsampling_block(upsampling_block3, conv_block1[1], NUM_FILTERS)
    
    conv_block6 = layers.Conv2D(NUM_FILTERS, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(upsampling_block4)
    conv_block7 = layers.Conv2(NUM_CLASSES, 1, padding = 'same')(conv_block6)
    
    model = keras.Model(inputs = inputs, output = conv_block7)
    
    return model

In [None]:
unet_model = Unet_model((IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS))
unet_model

In [None]:
unet_model.compile(optimizer = 'adam', 
                   loss = keras.losses.SparseCategoricalCrossentropy(from_logits = True),
                   metrics = ['accuracy'])

In [None]:
def display_image(display_list):
    
    plt.figure(figsize = (12, 12))
    title = ["Input Image", "True Mask", "Predicted Mask"]
    
    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i + 1)
        plt.title(title[i])
        plt.imshow(keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis("off")
    plt.show()

In [None]:
for image, mask in image_dataset.take(1):
    sample_image, sample_mask = imahe, mask
    print(mask.shape)
display([sample_image, sample_mask])

In [None]:
for image, mask in processed_images.take(1):
    sample_image, sample_mask = imahe, mask
    print(mask.shape)
display([sample_image, sample_mask])

In [None]:
processed_images.batch(BATCH_SIZE)
train_set = processed_images.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

In [None]:
history = unet_model.fit(train_set,
                         epochs = EPOCHS)

In [None]:
plt.plt(history.history['accuracy'])

In [None]:
def make_mask(predicted_mask):
    predicted_mask = tf.argmax(predicted_mask, axis =- 1)
    predicted_mask = predicted_mask[..., tf.newaxis]
    
    return predicted_mask[0]

In [None]:
def display_prediction(dataset = None, num = 1):
    
    if dataset:
        for image, mask in dataset.take(num):
            predicted_mask = unet_model.predict(image)
            display([image[0], mask[0], make_mask(predicted_mask)])
    else:
        display([sample_image,
                 sample_mask,
                 make_mask(unet_model.predict(sample_image[tf.newaxis, ...]))])

In [None]:
display_prediction(train_set, 2)