## Import libraries

In [None]:
%matplotlib inline

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

## Prepare and explore data

In [None]:
import os

input_dir = 'Development/Screenshots'
target_dir = 'Development/ScreenshotMasks'
img_size = (480, 640)
batch_size = 16

input_img_paths = sorted(
    [
        os.path.join(input_dir, fname)
        for fname in os.listdir(input_dir)
        if fname.endswith('.png')
    ]
)
target_img_paths = sorted(
    [
        os.path.join(target_dir, fname)
        for fname in os.listdir(target_dir)
        if fname.endswith('.txt')
    ]
)

if (len(input_img_paths) != len(target_img_paths)):
    raise Exception('dataset error')

num_samples = len(input_img_paths)
print('Number of samples:', num_samples)

classes = np.array([0])
for mask_path in target_img_paths:
    mask = np.loadtxt(mask_path)
    classes = np.unique(np.concatenate((classes, np.unique(mask))))

num_classes = len(classes)
print('Number of classes:', num_classes)

for input_path, target_path in zip(input_img_paths[:5], target_img_paths[:5]):
    print(input_path, '|', target_path)


# Shuffle the data
indexes = np.arange(len(input_img_paths), dtype = int)
np.random.shuffle(indexes)

input_img_paths = [input_img_paths[i] for i in indexes]
target_img_paths = [target_img_paths[i] for i in indexes]

print('After shuffle: ')

for input_path, target_path in zip(input_img_paths[:5], target_img_paths[:5]):
    print(input_path, '|', target_path)

## Revise pixel-wise labels

In [None]:
# converters = {
#     0: 0, # Background

#     1: 1, # Field

#     2: 2, # Lines

#     3: 3, # Ball

#     4: 4, # Robots
#     5: 4, # Robots
#     6: 4, # Robots
#     7: 4, # Robots
#     8: 4, # Robots
#     9: 4, # Robots
#     10: 4, # Robots
#     11: 4, # Robots
#     12: 4, # Robots

#     13: 5, # Goals
#     14: 5  # Goals
# }

def read_mask(path):
    mask = np.loadtxt(path)
    # for key in converters.keys():
    #     mask[mask==key] = converters[key]
    return mask

## Display the data

In [None]:
plt.figure(figsize=(5, 10), dpi=300)
for i in range(2):
    img = plt.imread(input_img_paths[i])
    plt.subplot(1, 4, i*2+1)
    plt.imshow(img/img.max())
    plt.axis('off')
    plt.title('Image')
    plt.subplot(1, 4, i*2+2)
    mask = read_mask(target_img_paths[i])
    plt.imshow(mask/mask.max(), cmap='gray')
    plt.axis('off')
    plt.title('Mask')
plt.show()

## Create helper to iterete over the data

In [None]:
from tensorflow.keras.preprocessing.image import load_img

class RoboCup(tf.keras.utils.Sequence):
    """Helper to iterate over the data (as Numpy arrays)."""

    def __init__(self, batch_size, img_size, input_img_paths, target_img_paths):
        self.batch_size = batch_size
        self.img_size = img_size
        self.input_img_paths = input_img_paths
        self.target_img_paths = target_img_paths

    def __len__(self):
        return len(self.target_img_paths) // self.batch_size

    def __getitem__(self, idx):
        """Returns tuple (input, target) correspond to batch #idx."""
        i = idx * self.batch_size
        batch_input_img_paths = self.input_img_paths[i : i + self.batch_size]
        batch_target_img_paths = self.target_img_paths[i : i + self.batch_size]
        x = np.zeros((self.batch_size,) + self.img_size + (3,), dtype="float32")
        for j, path in enumerate(batch_input_img_paths):
            # img = plt.imread(path)
            img = load_img(path, target_size=self.img_size)
            x[j] = img
        y = np.zeros((self.batch_size,) + self.img_size + (1,), dtype="uint8")
        for j, path in enumerate(batch_target_img_paths):
            img = read_mask(path)
            y[j] = np.expand_dims(img, 2)
        return x, y


## Define the model

In [None]:
def get_unet_mod(num_classes, img_size = (640, 480), learning_rate = 1e-3,\
                 learning_decay = 1e-6, drop_out = 0.1, nchannels = 3,kshape = (3,3)):
    ''' Get U-Net model with gaussian noise and dropout'''
    
    inputs = tf.keras.Input(shape=img_size + (nchannels,))

    ### [First half of the network: downsampling inputs] ###

    # Entry block
    x = tf.keras.layers.Conv2D(32, 3, strides=2, padding="same")(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    # Blocks 1, 2, 3 are identical apart from the feature depth.
    for filters in [64, 128, 256]:
        x = tf.keras.layers.Activation("relu")(x)
        x = tf.keras.layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)

        x = tf.keras.layers.Activation("relu")(x)
        x = tf.keras.layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)

        x = tf.keras.layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = tf.keras.layers.Conv2D(filters, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = tf.keras.layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    ### [Second half of the network: upsampling inputs] ###

    for filters in [256, 128, 64, 32]:
        x = tf.keras.layers.Activation("relu")(x)
        x = tf.keras.layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)

        x = tf.keras.layers.Activation("relu")(x)
        x = tf.keras.layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = tf.keras.layers.BatchNormalization()(x)

        x = tf.keras.layers.UpSampling2D(2)(x)

        # Project residual
        residual = tf.keras.layers.UpSampling2D(2)(previous_block_activation)
        residual = tf.keras.layers.Conv2D(filters, 1, padding="same")(residual)
        x = tf.keras.layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    # Add a per-pixel classification layer
    outputs = tf.keras.layers.Conv2D(num_classes, 3, activation="softmax", padding="same")(x)

    # Define the model
    model = tf.keras.Model(inputs, outputs)
    opt = tf.keras.optimizers.Adam(lr= learning_rate, decay = learning_decay)
    model.compile(optimizer=opt, loss="sparse_categorical_crossentropy", metrics=["accuracy"])

    return model

tf.keras.backend.clear_session()

# Build model
model = get_unet_mod(num_classes, img_size)
model.summary()

## Split data into train / validation / test

In [None]:
import random

# Split our img paths into a training and a validation set
val_samples = int(0.15 * num_samples)
test_samples = int(0.15 * num_samples)
train_samples = num_samples - val_samples - test_samples

train_input_img_paths = input_img_paths[:train_samples]
train_target_img_paths = target_img_paths[:train_samples]

val_input_img_paths = input_img_paths[train_samples:train_samples + val_samples]
val_target_img_paths = target_img_paths[train_samples:train_samples + val_samples]

test_input_img_paths = input_img_paths[train_samples + val_samples:train_samples + val_samples + test_samples]
test_target_img_paths = target_img_paths[train_samples + val_samples:train_samples + val_samples + test_samples]

# Instantiate data Sequences for each split
train_gen = RoboCup(batch_size, img_size, train_input_img_paths, train_target_img_paths)
val_gen = RoboCup(batch_size, img_size, val_input_img_paths, val_target_img_paths)
test_gen = RoboCup(batch_size, img_size, test_input_img_paths, test_target_img_paths)

## Define callbacks

In [None]:
# Configure the model for training.
# We use the "sparse" version of categorical_crossentropy
# because our target data is integers.

model_name = "unet_seg.h5"
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience = 20)

monitor = tf.keras.callbacks.ModelCheckpoint(model_name, monitor='val_loss',\
                                             verbose=0,save_best_only=True,\
                                             save_weights_only=True,\
                                             mode='min')
# Learning rate schedule
def scheduler(epoch, lr):
    if epoch%3 == 0 and epoch!= 0:
        lr = lr/2
    return lr

lr_schedule = tf.keras.callbacks.LearningRateScheduler(scheduler,verbose = 0)

callbacks = [early_stop, monitor, lr_schedule]

## Train the model

In [None]:
# Train the model, doing validation at the end of each epoch.
epochs = 3
model.fit(train_gen, epochs=epochs, validation_data=(val_gen,), callbacks=callbacks)

## Visualize test results

In [None]:
val_preds = model.predict(test_gen)


plt.figure(figsize=(12,18), dpi=300)
for i in range(2):
    img = plt.imread(val_input_img_paths[i])
    plt.subplot(2, 3, i*3+1)
    plt.imshow(img/img.max())
    plt.axis("off")
    plt.title("Image")
    plt.subplot(2, 3, i*3+2)
    predicted_mask = np.argmax(val_preds[i], axis=-1)
    predicted_mask = np.expand_dims(predicted_mask, axis=-1)
    plt.imshow(predicted_mask/predicted_mask.max(), cmap='gray')
    plt.axis("off")
    plt.title("Predicted Mask")
    plt.subplot(2, 3, i*3+3)
    actual_mask = read_mask(val_target_img_paths[i])
    plt.imshow(actual_mask/actual_mask.max(), cmap='gray')
    plt.axis("off")
    plt.title("Actual Mask")
plt.show()