# CamVid (Cambridge-Driving Labeled Video Database)

## Description
The Cambridge-driving Labeled Video Database (CamVid) provides ground truth labels that associate each pixel with one of 32 semantic classes. This dataset is often used in (real-time) semantic segmentation research.

The dataset is split up as follows:

* 367 training pairs
* 101 validation pairs
* 233 test pairs

The images and masks for each split are in a separate directory.

Citations:

[1] Brostow, Shotton, Fauqueur, Cipolla. **Segmentation and Recognition Using Structure from Motion Point Clouds**, 
_European Conference on Computer Vision (ECCV)_, 2008.

[2] Brostow, Fauqueur, Cipolla. **Semantic Object Classes in Video: A High-Definition Ground Truth Database**, 
_Pattern Recognition Letters_.

The original dataset can be found here:
http://mi.eng.cam.ac.uk/research/projects/VideoRec/CamVid

Source / Contact:
http://mi.eng.cam.ac.uk/research/projects/VideoRec/CamVid

Kaggle Source:
https://www.kaggle.com/datasets/carlolepelaars/camvid

In [3]:
import tensorflow as tf
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import tarfile 
import pandas as pd
import os
import numpy as np
import seaborn as sns

from sklearn.model_selection import StratifiedKFold, train_test_split

In [4]:
# Initialize notebook constants/config class
class CFG():
    RANDOM_SEED=42
    IMG_HEIGHT=256
    IMG_WIDTH=256
    BATCH_SIZE=16

In [5]:
files_path = "../input/camvid/CamVid/"

In [6]:
unique_classes = pd.read_csv(files_path + "class_dict.csv")
unique_classes.head()

Unnamed: 0,name,r,g,b
0,Animal,64,128,64
1,Archway,192,0,128
2,Bicyclist,0,128,192
3,Bridge,0,128,64
4,Building,128,0,0


In [7]:
color_map = unique_classes.drop("name", axis=1)
color_map_tf = tf.constant(color_map, dtype=tf.int32)

In [8]:
train_files = tf.data.Dataset.list_files(files_path + "train/*.png") 
val_files = tf.data.Dataset.list_files(files_path + "val/*.png") 

In [9]:
def rgb_class_convert(label_rgb, color_map, inverse=False):

    if not inverse:
        label_expanded = tf.expand_dims(label_rgb, axis=-2)  # [H, W, 1, 3]
        color_map_expanded = tf.reshape(color_map, [1, 1, -1, 3])  # [1, 1, N, 3]
        matches = tf.reduce_all(tf.equal(label_expanded, color_map_expanded), axis=-1)  # [H, W, num_classes]
        class_indices = tf.argmax(tf.cast(matches, tf.int32), axis=-1)
    else:
        label_indices = tf.argmax(label_rgb, axis=-1)
        rgb_image = tf.gather(color_map, label_indices)
        class_indices = rgb_image

    return class_indices

In [10]:
@tf.function
def load_image(img_path: tf.Tensor, ds_type: str):
    
    image_bytes = tf.io.read_file(img_path)
    image = tf.image.decode_png(image_bytes, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, (CFG.IMG_HEIGHT, CFG.IMG_WIDTH))
    image_file = tf.strings.split(tf.strings.split(img_path, os.sep)[-1], ".")[0]

    if ds_type == "train":
        label_path = tf.strings.join([files_path, "train_labels/", image_file, "_L.png"])
    elif ds_type == "val":
        label_path = tf.strings.join([files_path, "val_labels/", image_file, "_L.png"])
    else:
        label_path = tf.strings.join([files_path, "test_labels/", image_file, "_L.png"])

    label = tf.io.read_file(label_path)
    label = tf.image.decode_png(label, channels=3)
    label = tf.image.resize(label, (CFG.IMG_HEIGHT, CFG.IMG_WIDTH), method = tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    label = tf.cast(label, tf.int32)
    label = rgb_class_convert(label, color_map_tf)
    label = tf.one_hot(label, depth=len(unique_classes))
    label = tf.cast(label, tf.float32)
        
    return image, label

In [11]:
def configure_performance(ds, is_train_ds: bool):
    if is_train_ds:
        ds = ds.cache()
        ds = ds.shuffle(100)
        ds = ds.batch(CFG.BATCH_SIZE)
        ds = ds.prefetch(tf.data.AUTOTUNE)
    else:            
        ds = ds.cache()
        ds = ds.batch(CFG.BATCH_SIZE)
        ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds

In [12]:
def display_imgs(display_list, unique_classes):
    
    display_list = list(display_list)
    display_list.insert(1, rgb_class_convert(display_list[1], color_map_tf, inverse=True)) # Inverse Transformed to color scale
    display_list[-1] = tf.expand_dims(tf.argmax(display_list[-1], axis=-1), axis=-1)
    
    num_images = len(display_list)
    fig, axes = plt.subplots(1, num_images, figsize=(14, 10))
    
    titles = ['Input Image', 'True Mask', 'Class-indexed Mask', 'Predicted Mask']
    
    for ax, image, title in zip(axes, display_list, titles):
        ax.set_title(title)
        ax.imshow(tf.keras.utils.array_to_img(image))
        ax.axis('off')
    
    mask_tensor = display_list[2]
    mask_np = mask_tensor.numpy()
    unique_ids = np.unique(mask_np)
    unique_classes_filtered = unique_classes.iloc[unique_ids]
    
    handles = []
    for _, row in unique_classes_filtered.iterrows():
        label = f"{row['name']}"
        color_float = (row['r'] / 255.0, row['g'] / 255.0, row['b'] / 255.0)
        patch = mpatches.Patch(color=color_float, label=label)
        handles.append(patch)
    
    max_cols = 5
    ncols = min(len(handles), max_cols)   
    fig.legend(handles=handles,
               title="True Mask Classes Colors (only)",
               loc='lower center',
               bbox_to_anchor=(0.5, 0.2),
               ncol=ncols,
               borderaxespad=0.)
    
    plt.show()

In [13]:
train_ds = train_files.map(lambda img_path: load_image(img_path, ds_type="train"), num_parallel_calls=tf.data.experimental.AUTOTUNE)
val_ds = val_files.map(lambda img_path: load_image(img_path, ds_type="val"), num_parallel_calls=tf.data.experimental.AUTOTUNE)

In [14]:
sample_img, sample_label = next(iter(train_ds))

In [None]:
display_imgs([sample_img, sample_label], unique_classes)

In [16]:
train_ds = configure_performance(train_ds, is_train_ds=True)
val_ds = configure_performance(val_ds, is_train_ds=False)

# Define U-Net with MobileNetV2 Pretrained Model

In [None]:
# References for Conv2DTranspose and VGG16 model
# (1): https://www.tensorflow.org/api_docs/python/tf/keras/applications/MobileNetV2
# (2): https://www.tensorflow.org/tutorials/images/segmentation
# (3): https://keras.io/api/applications/mobilenet/

def unet_mobilenetv2(input_shape=(256, 256, 3), num_classes=None):

    base_model = tf.keras.applications.MobileNetV2(
        input_shape=(224, 224),
        include_top=False,
        weights='imagenet'
    )
    
    layer_names = [
        'block_1_expand_relu',   
        'block_3_expand_relu', 
        'block_6_expand_relu',
        'block_13_expand_relu', 
        'block_16_project' 
    ]
    
    layers_outputs = [base_model.get_layer(name).output for name in layer_names]
    encoder = tf.keras.Model(inputs=base_model.input, outputs=layers_outputs)
    encoder.trainable = False
    
    inputs = tf.keras.Input(shape=input_shape)
    skips = encoder(inputs)
    
    x = skips[-1]
    decoder_filters = [512, 256, 128, 64]
    
    for i in range(1, len(skips)):
        x = tf.keras.layers.Conv2DTranspose(decoder_filters[i-1], kernel_size=3, strides=2, padding='same', activation='relu')(x)
        skip_connection = skips[-(i+1)]
        x = tf.keras.layers.Concatenate()([x, skip_connection])
        x = tf.keras.layers.Conv2D(decoder_filters[i-1], kernel_size=3, padding='same', activation='relu')(x)
        x = tf.keras.layers.Conv2D(decoder_filters[i-1], kernel_size=3, padding='same', activation='relu')(x)
    
    x = tf.keras.layers.Conv2DTranspose(32, kernel_size=3, strides=2, padding='same', activation='relu')(x)
    x = tf.keras.layers.Conv2D(32, kernel_size=3, padding='same', activation='relu')(x)
    x = tf.keras.layers.Conv2D(32, kernel_size=3, padding='same', activation='relu')(x)
    
    outputs = tf.keras.layers.Conv2D(num_classes, kernel_size=1, activation='softmax')(x)
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model


In [None]:
# Build and display the model summary.
model = unet_mobilenetv2(input_shape=(256, 256, 3), num_classes=len(unique_classes))
model.summary()

In [None]:
def dice_loss(y_true, y_pred, smooth=1e-6):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    
    intersection = tf.reduce_sum(y_true * y_pred, axis=[1,2])
    union = tf.reduce_sum(y_true, axis=[1,2]) + tf.reduce_sum(y_pred, axis=[1,2])
    dice = (2 * intersection + smooth) / (union + smooth) # multiply by two because of union divider
    
    return 1 - tf.reduce_mean(dice) 

In [None]:
def total_loss(y_true, y_pred):
    ce = tf.keras.losses.categorical_crossentropy(y_true, y_pred)
    dice = dice_loss(y_true, y_pred)
    return ce + dice

In [None]:
lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
    initial_learning_rate=1e-5,
    warmup_target=0.001,
    warmup_steps=1605,
    decay_steps=14495,
    alpha=0.0
)

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(lr_schedule),
    loss=total_loss,
    metrics=['accuracy']
)

In [None]:
EPOCHS = 10

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
)