In [1]:
import tensorflow as tf
import numpy as np

from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Dropout 
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import concatenate

In [None]:
import tensorflow_datasets as tfds

IMG_SIZE = 480 
BATCH_SIZE = 32

def preprocess_data(example):
    # Extract image and segmentation mask
    image = example['image']
    mask = example['segmentation_mask']
    
    # Resize image and mask
    image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
    mask = tf.image.resize(mask, (IMG_SIZE, IMG_SIZE))
    
    # Normalize image to [0, 1] range
    image = tf.cast(image, tf.float32) / 255.0
    
    # Ensure mask is in integer format (not one-hot encoded)
    mask = tf.cast(mask, tf.uint8)
    
    return image, mask

# Load the COCO dataset
dataset, info = tfds.load('coco/2017', split='train', with_info=True, shuffle_files=True)

# Preprocess the dataset
dataset = dataset.map(preprocess_data, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.batch(BATCH_SIZE).prefetch(buffer_size=tf.data.AUTOTUNE)

# Print the dataset info and the first batch for verification
print(info)

for images, masks in dataset.take(1):
    print(f'Image batch shape: {images.shape}')
    print(f'Mask batch shape: {masks.shape}')



In [2]:
def conv_block(inputs=None, n_filters=32, dropout_prob=0, max_pooling=True):
    """
    Convolutional downsampling block
    
    Arguments:
        inputs -- Input tensor
        n_filters -- Number of filters for the convolutional layers
        dropout_prob -- Dropout probability
        max_pooling -- MaxPooling2D to reduce the spatial dimensions of the output volume
    Returns: 
        next_layer, skip_connection --  Next layer and skip connection outputs
    """
    conv = Conv2D(n_filters, 3,  activation= "relu", padding="same", kernel_initializer='he_normal')(inputs)
    conv = Conv2D(n_filters, 3,   activation="relu", padding="same", kernel_initializer= 'he_normal')(conv)
    
    if dropout_prob > 0:
        conv = Dropout(dropout_prob)(conv)
        
    if max_pooling:
        next_layer = MaxPooling2D((2,2))(conv)
        
    else:
        next_layer = conv
        
    skip_connection = conv
    
    return next_layer, skip_connection

In [3]:
def upsampling_block(expansive_input, contractive_input, n_filters=32):
    """
    Convolutional upsampling block
    
    Arguments:
        expansive_input -- Input tensor from previous layer
        contractive_input -- Input tensor from previous skip layer
        n_filters -- Number of filters for the convolutional layers
    Returns: 
        conv -- Tensor output
    """

    up = Conv2DTranspose( n_filters,   3,   strides=(2,2), padding="same")(expansive_input)
    
    # Merge the previous output and the contractive_input
    merge = concatenate([up, contractive_input], axis=3)
    conv = Conv2D(n_filters,   (3,3),   activation="relu",padding="same",kernel_initializer='he_normal')(merge)
    conv = Conv2D(n_filters,  (3,3),  activation="relu",padding="same",kernel_initializer="he_normal")(conv)
    
    return conv

In [4]:
def unet_model(input_size=(96, 128, 3), n_filters=32, n_classes=23):
    """
    Unet model
    
    Arguments:
        input_size -- Input shape 
        n_filters -- Number of filters for the convolutional layers
        n_classes -- Number of output classes
    Returns: 
        model -- tf.keras.Model
    """
    inputs = Input(input_size)
    cblock1 = conv_block(inputs, n_filters)
    # the first element of the output of each block to be the input of the next conv_block is chained
    # number of filters at each new step is doubled
    cblock2 = conv_block(cblock1[0], n_filters*2)
    cblock3 = conv_block(cblock2[0], n_filters*4)
    cblock4 = conv_block(cblock3[0], n_filters*8, dropout_prob=0.3) # Include a dropout_prob of 0.3 for this layer
    # dropout_prob of 0.3 for this layer, and avoid the max_pooling layer
    cblock5 = conv_block(cblock4[0], n_filters*16, dropout_prob=0.3, max_pooling=False) 
    
    # the first upsampling_block.
    # cblock5[0] as expansive_input and cblock4[1] as contractive_input and n_filters * 8
    ublock6 = upsampling_block(cblock5[0], cblock4[1],  n_filters*8)
   
    ublock7 = upsampling_block(ublock6, cblock3[1],  n_filters*4)
    ublock8 = upsampling_block(ublock7, cblock2[1],  n_filters*2)
    ublock9 = upsampling_block(ublock8, cblock1[1],  n_filters)

    conv9 = Conv2D(n_filters,3,activation='relu',padding='same',kernel_initializer='he_normal')(ublock9)

    # A Conv2D layer with n_classes filter, kernel size of 1 and a 'same' padding
    conv10 = Conv2D(n_classes, 1, padding="same")(conv9)
    
    
    model = tf.keras.Model(inputs=inputs, outputs=conv10)

    return model

In [5]:
img_height = 480
img_width = 480
num_channels = 3

unet = unet_model((img_height, img_width, num_channels))

In [6]:
unet.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 480, 480, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 480, 480, 32  896         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 conv2d_1 (Conv2D)              (None, 480, 480, 32  9248        ['conv2d[0][0]']                 
                                )                                                             

In [None]:
unet.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

In [None]:
EPOCHS = 5
BUFFER_SIZE = 500
BATCH_SIZE = 32
train_dataset = dataset.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
print(dataset.element_spec)
model_history = unet.fit(train_dataset, epochs=EPOCHS)

### In the following cells, I will load the pretrained HRNETV2 model and make some masking prediction.

##### -The model I built above has 8,640,471 trainable parameters. The Coco dataset has morethan 25000 images. If I am to train the model from scratch on my CPU, it would take days if not weeks to fit the the coco data. I will be loading the model from: tensorflow tensorflow-hub.

In [None]:
!pip install tensorflow tensorflow-hub
!pip install --upgrade tensorflow_hub

In [None]:
import tensorflow_hub as hub
import cv2
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import tensorflow as tf
model_url =  'https://tfhub.dev/google/HRNet/camvid-hrnetv2-w48/1'

print('loading model: ', model_url)
 
model = hub.load(model_url)
print('\nmodel loaded!')
 # colormap for 32 classes
colors = [
    (0, 0, 0), (128, 64, 128), (244, 35, 232), (70, 70, 70),
    (102, 102, 156), (190, 153, 153), (153, 153, 153), (250, 170, 30),
    (220, 220, 0), (107, 142, 35), (152, 251, 152), (250, 0, 12), 
    (70, 130, 180), (220, 20, 60), (255, 0, 0), (0, 0, 142),
    (0, 0, 70),(0, 60, 100), (0, 80, 100), (0, 0, 230),
    (119, 11, 32), (250, 0, 0), (13, 8, 135), (17, 35, 175),
    (18, 66, 218), (14, 102, 241), (25, 136, 252), (60, 164, 252),
    (112, 186, 245), (183, 206, 236), (238, 219, 225), (254, 236, 198)]

# a colormap from the list of colors
colormap = ListedColormap(colors)

def preprocess_image(image_path, target_size=(512, 512)):
    image = cv2.imread(image_path, cv2.IMREAD_COLOR)
    image = cv2.resize(image, target_size)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = image.astype(np.float32) / 255.0  # Normalize to [0, 1]
    image = np.expand_dims(image, axis=0)  # Add batch dimension
    return image

def predict_mask(model, image):
    prediction = model(image)
    print(prediction.shape)
    predicted_mask = tf.argmax(prediction, axis=-1)
    print(np.max(predicted_mask))
    print(predicted_mask.shape)
    return predicted_mask


#image_path = r"C:\Users\robel\Downloads\pedestrian.jpg"
image_path = r"C:\Users\robel\Downloads\cycle.png"
#image_path = r"C:\Users\robel\Downloads\voc_ex.jpg"
image = preprocess_image(image_path)
predicted_mask = predict_mask(model, image)


# Map normalized values to colors
predicted_mask = tf.squeeze(predicted_mask, axis=0)
print(predicted_mask.shape)
predicted_mask = tf.cast(predicted_mask, tf.float32)
print(predicted_mask.shape, predicted_mask)
image = tf.squeeze(image, axis=0)
image = image.numpy()*255 # Convert back to [0, 255] range

# Normalize the class IDs to [0, 1] range
num_classes = len(colors)
normalized_mask = predicted_mask / (num_classes - 1)
print(normalized_mask.shape)
# Map normalized values to colors
colored_mask = colormap(normalized_mask)
print("Colormap output:", colored_mask)
print("Colormap output type:", type(colored_mask))
# Extract only the RGB channels from the colored_mask
if colored_mask.shape[-1] == 4:  # Check if the mask has an alpha channel
    colored_mask = colored_mask[:, :, :3]  # Remove the alpha channel

# Resize colored_mask to match the size of the original image
colored_mask = cv2.resize(colored_mask, (image.shape[1], image.shape[0]), interpolation=cv2.INTER_NEAREST)

# Blend images
alpha = 0.4
image = image.astype('int32')
colored_mask = colored_mask.astype('int32')
print(image.shape, colored_mask.shape)
blended_image = cv2.addWeighted(image, alpha, colored_mask, 1 - alpha, 0)

# Display the result
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 2)
plt.title("Predicted Mask with Colors")
plt.imshow(blended_image)
plt.axis("off")
plt.show()
plt.subplot(1,2,2)
plt.imshow(colored_mask)
plt.show()

plt.subplot(1,2,2)
plt.imshow(image)
plt.show()