In [None]:
import tensorflow as tf
import numpy as np

from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import concatenate

from test_utils import summary, comparator

<a name='2'></a>
Load and Split the Data

In [None]:
import os
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import imageio

import matplotlib.pyplot as plt
%matplotlib inline
from google.colab import drive
drive.mount('/content/drive/')
path = '/content/drive/MyDrive/'


image_path = os.path.join(path, 'data/CameraRGB/')
mask_path = os.path.join(path, 'data/CameraMask/')
image_list = os.listdir(image_path)
mask_list = os.listdir(mask_path)
image_list = [image_path+i for i in image_list]
mask_list = [mask_path+i for i in mask_list]

In [None]:
# Ensure image_list and mask_list have the same number of existing, valid paths
valid_image_paths = [img for img in image_list if os.path.exists(img)]
valid_mask_paths = [mask for mask in mask_list if os.path.exists(mask)]

# Check if both lists have the same length after filtering
if len(valid_image_paths) != len(valid_mask_paths):
    print(f"Mismatch after filtering: {len(valid_image_paths)} images, {len(valid_mask_paths)} masks")

    # Keep only pairs that match by filename
    matching_image_paths = []
    matching_mask_paths = []

    for img_path in valid_image_paths:
        mask_path = img_path.replace('CameraRGB', 'CameraMask')  # Adjust pattern if needed
        if mask_path in valid_mask_paths:
            matching_image_paths.append(img_path)
            matching_mask_paths.append(mask_path)

    valid_image_paths = matching_image_paths
    valid_mask_paths = matching_mask_paths

print(f"Final count - Images: {len(valid_image_paths)}, Masks: {len(valid_mask_paths)}")

<a name='2-1'></a>
Split Your Dataset into Unmasked and Masked Images

In [None]:
image_list_ds = tf.data.Dataset.list_files(image_list, shuffle=False)
mask_list_ds = tf.data.Dataset.list_files(mask_list, shuffle=False)

for path in zip(image_list_ds.take(3), mask_list_ds.take(3)):
    print(path)

In [None]:
missing_masks = []
for image in image_list:
    mask_name = image.replace('CameraRGB', 'CameraMask')  # Assuming mask naming follows the same pattern
    if mask_name not in mask_list:
        missing_masks.append(image)

print(f"Missing masks for {len(missing_masks)} images:")
for missing in missing_masks:
    print(missing)

In [None]:
# Ensure image_list and mask_list have the same number of existing, valid paths
valid_image_paths = [img for img in image_list if os.path.exists(img)]
valid_mask_paths = [mask for mask in mask_list if os.path.exists(mask)]

# Check if both lists have the same length after filtering
if len(valid_image_paths) != len(valid_mask_paths):
    print(f"Mismatch after filtering: {len(valid_image_paths)} images, {len(valid_mask_paths)} masks")

    # Keep only pairs that match by filename
    matching_image_paths = []
    matching_mask_paths = []

    for img_path in valid_image_paths:
        mask_path = img_path.replace('CameraRGB', 'CameraMask')  # Adjust pattern if needed
        if mask_path in valid_mask_paths:
            matching_image_paths.append(img_path)
            matching_mask_paths.append(mask_path)

    valid_image_paths = matching_image_paths
    valid_mask_paths = matching_mask_paths

print(f"Final count - Images: {len(valid_image_paths)}, Masks: {len(valid_mask_paths)}")

In [None]:
image_filenames = tf.constant(valid_image_paths)
masks_filenames = tf.constant(valid_mask_paths)

dataset = tf.data.Dataset.from_tensor_slices((image_filenames, masks_filenames))

for image, mask in dataset.take(1):
    print(image)
    print(mask)

In [None]:
dataset.take(1)

<a name='2-2'></a>
Preprocess Data

In [None]:
def process_path(image_path, mask_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)

    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=3)
    mask = tf.math.reduce_max(mask, axis=-1, keepdims=True)
    return img, mask

def preprocess(image, mask):
    input_image = tf.image.resize(image, (96, 128), method='nearest')
    input_mask = tf.image.resize(mask, (96, 128), method='nearest')

    return input_image, input_mask

image_ds = dataset.map(process_path)
processed_image_ds = image_ds.map(preprocess)

In [None]:
def conv_block(inputs=None, n_filters=32, dropout_prob=0, max_pooling=True):
    conv = Conv2D(n_filters, # Number of filters
                  3,   # Kernel size
                  activation='relu',
                  padding='same',
                  kernel_initializer='he_normal')(inputs)
    conv = Conv2D(n_filters, # Number of filters
                  3,   # Kernel size
                  activation='relu',
                  padding='same',
                  kernel_initializer='he_normal')(conv)

    if dropout_prob > 0:
        conv = tf.keras.layers.Dropout(dropout_prob)(conv)

    if max_pooling:
        next_layer = tf.keras.layers.MaxPooling2D(pool_size=(2, 2),strides=2, padding='same')(conv)
    else:
        next_layer = conv

    skip_connection = conv
    return next_layer, skip_connection

In [None]:
def upsampling_block(expansive_input, contractive_input, n_filters=32):
    up = Conv2DTranspose(
                 n_filters,    # number of filters
                 3,    # Kernel size
                 strides=2,
                 padding='same')(expansive_input)

    merge = concatenate([up, contractive_input], axis=3)
    conv = Conv2D(n_filters,   # Number of filters
                 3,     # Kernel size
                 activation='relu',
                 padding='same',
                 kernel_initializer='he_normal')(merge)
    conv = Conv2D(n_filters,  # Number of filters
                 3,   # Kernel size
                 activation='relu',
                 padding='same',
                 kernel_initializer='he_normal')(conv)
    return conv

In [None]:
def unet_model(input_size=(96, 128, 3), n_filters=32, n_classes=23):
    inputs = Input(input_size)
    cblock1 = conv_block(inputs, n_filters)
    cblock2 = conv_block(cblock1[0], n_filters * 2)
    cblock3 = conv_block(cblock2[0], n_filters * 4)
    cblock4 = conv_block(cblock3[0], n_filters * 8, dropout_prob=0.3) # Include a dropout_prob of 0.3 for this layer
    cblock5 = conv_block(cblock4[0], n_filters * 16, dropout_prob=0.3, max_pooling=False)

    ublock6 = upsampling_block(cblock5[0], cblock4[1],  n_filters * 8)
    ublock7 = upsampling_block(ublock6, cblock3[1],  n_filters * 4)
    ublock8 = upsampling_block(ublock7, cblock2[1],  n_filters * 2)
    ublock9 = upsampling_block(ublock8, cblock1[1],  n_filters)
    conv9 = Conv2D(n_filters,
                 3,
                 activation='relu',
                 padding='same',
                 kernel_initializer='he_normal')(ublock9)

    conv10 = Conv2D(n_classes,1, padding='same')(conv9)
    model = tf.keras.Model(inputs=inputs, outputs=conv10)
    return model

<a name='3-5'></a>
Set Model Dimensions

In [None]:
img_height = 96
img_width = 128
num_channels = 3

unet = unet_model((img_height, img_width, num_channels))

In [None]:
unet.summary()

In [None]:
unet.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
def display(display_list):
    plt.figure(figsize=(15, 15))

    title = ['Input Image', 'True Mask', 'Predicted Mask']

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

In [None]:
for image, mask in image_ds.take(1):
    sample_image, sample_mask = image, mask
    print(mask.shape)
display([sample_image, sample_mask])

In [None]:
for image, mask in processed_image_ds.take(1):
    sample_image, sample_mask = image, mask
    print(mask.shape)
display([sample_image, sample_mask])

<a name='4'></a>
Train the Model

In [None]:
EPOCHS = 40
VAL_SUBSPLITS = 5
BUFFER_SIZE = 500
BATCH_SIZE = 32
processed_image_ds.batch(BATCH_SIZE)
train_dataset = processed_image_ds.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
print(processed_image_ds.element_spec)
model_history = unet.fit(train_dataset, epochs=EPOCHS)

(TensorSpec(shape=(96, 128, 3), dtype=tf.float32, name=None), TensorSpec(shape=(96, 128, 1), dtype=tf.uint8, name=None))
Epoch 1/40


<a name='4-1'></a>
Create Predicted Masks

In [None]:
def create_mask(pred_mask):
    pred_mask = tf.argmax(pred_mask, axis=-1)
    pred_mask = pred_mask[..., tf.newaxis]
    return pred_mask[0]

In [None]:
plt.plot(model_history.history["accuracy"])

In [None]:
def show_predictions(dataset=None, num=1):
    if dataset:
        for image, mask in dataset.take(num):
            pred_mask = unet.predict(image)
            display([image[0], mask[0], create_mask(pred_mask)])
    else:
        display([sample_image, sample_mask,
             create_mask(unet.predict(sample_image[tf.newaxis, ...]))])

In [None]:
show_predictions(train_dataset, 6)