In [1]:
# === IMPORTS ===
# Import all necessary libraries for model, training, and utilities
from google.colab import drive
drive.mount('/content/drive')
# store data here for notebook access

Mounted at /content/drive


In [2]:
# === IMPORTS ===
# Import all necessary libraries for model, training, and utilities
import cv2
import numpy as np
import os

In [13]:
# DO INITIALLY WHEN CONVERTING MASKS
# path = '/content/drive/My Drive/Colab Notebooks/images_challenge_multiclass/multi_masks/'

#  960 pixels in width and 540 pixels in height

In [3]:
color_dict = {1: (0, 0, 0), 2: (49, 205, 49), 3: (138, 0, 0), 4: (255, 214, 0)}



# Black: (0, 0, 0) background
# Green: (49, 205, 49) jaws
# Dark Red: (138, 0, 0) wrist
# Yellow: (255, 214, 0) arm

In [None]:
# output_path = '/content/drive/My Drive/Colab Notebooks/images_challenge_multiclass/converted_masks/'

In [15]:
# === VISUALIZATION ===
# Visualize predictions, masks, or training outputs
# DO ONCE TO CONVERT MASKS



# for file_name in os.listdir(path):
#     # read in the RGB mask image
#     rgb_mask = cv2.imread(os.path.join(path, file_name))

#     # create an empty multiclass mask
#     multiclass_mask = np.zeros(rgb_mask.shape[:2], dtype=np.uint8)

#     # loop through each pixel of the RGB mask image and map its color to the corresponding class label
#     for label, color in color_dict.items():
#         indices = np.where(np.all(rgb_mask == color, axis=-1))
#         multiclass_mask[indices] = label

#     # save the multiclass mask image
#     output_folder = '/content/drive/My Drive/Colab Notebooks/images_challenge_multiclass/converted_masks/'
#     cv2.imwrite(os.path.join(output_folder, f"multiclass_mask_{file_name}"), multiclass_mask)
    # However, because the class labels are so low, they are not visually distinct in the grayscale image. If you want to visually inspect the multiclass mask, you may need to apply a colormap or rescale the class labels to a higher range for better visibility



# multiclass_mask = np.zeros(rgb_mask.shape[:2], dtype=np.uint8)
# this code creates a NumPy array multiclass_mask filled with zeros. the shape of the array is determined by the first two dimensions of the rgb_mask array. the dtype argument specifies the data type of the elements in the multiclass_mask array as an unsigned integer with 8 bits.
# in particular, this code is creating a binary mask for a multiclass segmentation task, where each pixel can belong to one of several classes. the resulting multiclass_mask array will be of the same shape as the rgb_mask array, but with all elements initialized to zero. this array can then be used to mark the pixels belonging to each class by setting the corresponding elements to a non-zero value.

In [4]:
# === IMPORTS ===
# Import all necessary libraries for model, training, and utilities
import os
import shutil
import random

# set up source and target directories
SOURCE_DIR = '/content/drive/My Drive/Colab Notebooks/images_challenge_multiclass/'
source_images = os.path.join(SOURCE_DIR, 'groundtruth')
source_masks = os.path.join(SOURCE_DIR, 'converted_masks')

DATA_DIR = '/content/drive/My Drive/Colab Notebooks/images_challenge_multiclass/'

x_train_dir = os.path.join(DATA_DIR, 'XTrain')
y_train_dir = os.path.join(DATA_DIR, 'yTrain')

x_valid_dir = os.path.join(DATA_DIR, 'XVal')
y_valid_dir = os.path.join(DATA_DIR, 'yVal')

x_test_dir = os.path.join(DATA_DIR, 'XTest')
y_test_dir = os.path.join(DATA_DIR, 'yTest')


In [18]:

def split_data(source_images, source_masks, split=(0.7, 0.2, 0.1)):
    image_files = [f for f in os.listdir(source_images) if f.endswith('.png')]
    mask_files = [f for f in os.listdir(source_masks) if f.endswith('.png')]

    print(f"Found {len(image_files)} images and {len(mask_files)} masks.")

    pairs = []
    for mask in mask_files:
        # extract the unique ID from the mask filename
        unique_id = mask.replace('multiclass_mask_p-', '').replace('.png', '')
        corresponding_image = f"s-{unique_id}.png"

        if corresponding_image in image_files:
            pairs.append((os.path.join(source_images, corresponding_image), os.path.join(source_masks, mask)))

    print(f"Total pairs matched: {len(pairs)}")

    random.shuffle(pairs)

    train_idx = int(len(pairs) * split[0])
    valid_idx = train_idx + int(len(pairs) * split[1])

    for i, (img_path, mask_path) in enumerate(pairs):
        try:
            if i < train_idx:
                shutil.move(img_path, x_train_dir)
                shutil.move(mask_path, y_train_dir)
            elif i < valid_idx:
                shutil.move(img_path, x_valid_dir)
                shutil.move(mask_path, y_valid_dir)
            else:
                shutil.move(img_path, x_test_dir)
                shutil.move(mask_path, y_test_dir)
        except Exception as e:
            print(f"Error moving file: {e}")

# execute the split
split_data(source_images, source_masks)

Found 4232 images and 4191 masks.
Total pairs matched: 4191


In [5]:
# === IMPORTS ===
# Import all necessary libraries for model, training, and utilities
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
# controls the verbosity level of TensorFlow logging messages, and setting it to different values adjusts the amount of logging information that TensorFlow prints to the console
# "2": filters out both info and warning messages, only displaying error messages

In [6]:
# === IMPORTS ===
# Import all necessary libraries for model, training, and utilities
from keras.models import Model
from keras.layers import (Input, Conv2D, BatchNormalization, Activation,
                          MaxPool2D, Conv2DTranspose, Concatenate, ZeroPadding2D, Cropping2D)

def conv_block(inputs, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

def encoder_block(inputs, num_filters):
    x = conv_block(inputs, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p

def decoder_block(inputs, skip, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(inputs)
    x = Concatenate()([x, skip])
    x = conv_block(x, num_filters)
    return x

def build_unet(input_shape, num_classes):
    inputs = Input(input_shape)

    # padding for input_shape to be divisible by 2^4
    pad_height = (16 - input_shape[0] % 16) % 16
    pad_width = (16 - input_shape[1] % 16) % 16
    x = ZeroPadding2D(padding=((pad_height // 2, pad_height - pad_height // 2),
                               (pad_width // 2, pad_width - pad_width // 2)))(inputs)

    s1, p1 = encoder_block(x, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    b1 = conv_block(p4, 1024)

    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    outputs = Conv2D(num_classes, 1, padding="same", activation="softmax")(d4)

    # calculate cropping size and crop the output
    crop_height = pad_height // 2
    crop_width = pad_width // 2
    outputs = Cropping2D(cropping=((crop_height, crop_height), (crop_width, crop_width)))(outputs)

    model = Model(inputs, outputs)
    return model

if __name__ == "__main__":
    input_shape = (540, 960, 3)  # original input shape
    num_classes = 4  # set the number of classes as needed
    model = build_unet(input_shape, num_classes)
    model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 540, 960, 3)]        0         []                            
                                                                                                  
 zero_padding2d (ZeroPaddin  (None, 544, 960, 3)          0         ['input_1[0][0]']             
 g2D)                                                                                             
                                                                                                  
 conv2d (Conv2D)             (None, 544, 960, 64)         1792      ['zero_padding2d[0][0]']      
                                                                                                  
 batch_normalization (Batch  (None, 544, 960, 64)         256       ['conv2d[0][0]']          

In [7]:
# === IMPORTS ===
# Import all necessary libraries for model, training, and utilities
import numpy as np
import cv2
from glob import glob
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, CSVLogger
global image_h
global image_w
global num_classes
global classes
global rgb_codes

In [8]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

In [9]:
# === IMPORTS ===
# Import all necessary libraries for model, training, and utilities
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger, EarlyStopping
from glob import glob

def preprocess(image_path, mask_path):
    # read and preprocess the image
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, [image_h, image_w])
    image = image / 255.0

    # read and preprocess the mask
    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=1)
    mask = tf.image.resize(mask, [image_h, image_w])
    mask = tf.squeeze(mask)  # ensure mask is a 2D image
    mask = tf.cast(mask, tf.int32)
    mask = tf.one_hot(mask, num_classes, axis=-1)

    return image, mask

def tf_dataset(X, Y, batch=3):
    ds = tf.data.Dataset.from_tensor_slices((X, Y))
    ds = ds.shuffle(buffer_size=5000).map(preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    ds = ds.batch(batch).prefetch(tf.data.experimental.AUTOTUNE)
    return ds

def load_dataset(base_dataset_path):
    train_x = sorted(glob(os.path.join(base_dataset_path, 'XTrain', "*.png")))
    train_y = sorted(glob(os.path.join(base_dataset_path, 'yTrain', "*.png")))

    valid_x = sorted(glob(os.path.join(base_dataset_path, 'XVal', "*.png")))
    valid_y = sorted(glob(os.path.join(base_dataset_path, 'yVal', "*.png")))

    test_x = sorted(glob(os.path.join(base_dataset_path, 'XTest', "*.png")))
    test_y = sorted(glob(os.path.join(base_dataset_path, 'yTest', "*.png")))

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

if __name__ == "__main__":
    """ seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ directory for storing files """
    create_dir("files")

    """ hyperparameters """
    image_h = 540
    image_w = 960
    num_classes = 4
    input_shape = (image_h, image_w, 3)
    batch_size = 3
    lr = 1e-4
    num_epochs = 5

    """ paths """
    base_dataset_path = "/content/drive/My Drive/Colab Notebooks/images_challenge_multiclass/"
    model_path = os.path.join("files", "model.h5")
    csv_path = os.path.join("files", "data.csv")

    """ loading the dataset """
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(base_dataset_path)
    print(f"Train: {len(train_x)}/{len(train_y)} - Valid: {len(valid_x)}/{len(valid_y)} - Test: {len(test_x)}/{len(test_y)}")
    print("")

    """ dataset pipeline """
    train_ds = tf_dataset(train_x, train_y, batch=batch_size)
    valid_ds = tf_dataset(valid_x, valid_y, batch=batch_size)

    """ model """
    model = build_unet(input_shape, num_classes)
    model.compile(loss="categorical_crossentropy", optimizer=tf.keras.optimizers.Adam(lr))

    """ training """
    callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True, monitor='val_loss'),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
        CSVLogger(csv_path, append=True),
        EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False)
    ]

    model.fit(train_ds, validation_data=valid_ds, epochs=num_epochs, callbacks=callbacks)

    # save the final model state
    final_model_path = os.path.join("files", "multiclass_challenge_model.h5")
    model.save(final_model_path)
    print(f"Model saved at {final_model_path}")


# files/model.h5/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py:3079
# https://colab.research.google.com/signup/pricing

Train: 2933/2933 - Valid: 838/838 - Test: 420/420

Epoch 1/5
Epoch 1: val_loss improved from inf to 0.04348, saving model to files/model.h5


  saving_api.save_model(


Epoch 2/5
Epoch 2: val_loss improved from 0.04348 to 0.02526, saving model to files/model.h5
Epoch 3/5
Epoch 3: val_loss improved from 0.02526 to 0.01816, saving model to files/model.h5
Epoch 4/5
Epoch 4: val_loss improved from 0.01816 to 0.01233, saving model to files/model.h5
Epoch 5/5
Epoch 5: val_loss improved from 0.01233 to 0.00867, saving model to files/model.h5
Model saved at files/multiclass_challenge_model.h5


In [10]:
print("Current Working Directory:", os.getcwd())

Current Working Directory: /content


In [11]:
files_dir = os.path.join(os.getcwd(), "files")
if os.path.exists(files_dir):
    print(f"'files' directory exists at: {files_dir}")
    # list files in the 'files' directory
    print("Files in 'files' directory:", os.listdir(files_dir))
else:
    print(f"'files' directory does not exist at: {files_dir}")

    # not seeing these files in your Google Drive is that /content is a directory on the Colab virtual machine, not in your Google Drive. Files saved here are stored temporarily on the Colab VM and are not automatically synchronized with your Google Drive

'files' directory exists at: /content/files
Files in 'files' directory: ['multiclass_challenge_model.h5', 'model.h5', 'data.csv']


In [13]:
# === IMPORTS ===
# Import all necessary libraries for model, training, and utilities
import shutil

# specify google Drive path
drive_target_path = '/content/drive/My Drive/Colab Notebooks/images_challenge_multiclass/files/'

# create target directory in Google Drive if it doesn't exist
os.makedirs(drive_target_path, exist_ok=True)

# copy files from '/content/files' to the Google Drive directory
for file_name in os.listdir(files_dir):
    shutil.copy(os.path.join(files_dir, file_name), drive_target_path)
    print(f"Copied {file_name} to {drive_target_path}")

Copied multiclass_challenge_model.h5 to /content/drive/My Drive/Colab Notebooks/images_challenge_multiclass/files/
Copied model.h5 to /content/drive/My Drive/Colab Notebooks/images_challenge_multiclass/files/
Copied data.csv to /content/drive/My Drive/Colab Notebooks/images_challenge_multiclass/files/


In [None]:
##################################### TEST THE MODEL

In [None]:
# === IMPORTS ===
# Import all necessary libraries for model, training, and utilities
import tensorflow as tf
import numpy as np
import cv2
import os
from glob import glob

# function to preprocess images (similar to training preprocessing)
def preprocess_image(image_path, image_h, image_w):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, [image_h, image_w])
    image = image / 255.0
    return image

# function to save mask to a file
def save_mask(mask, save_path):
    mask = np.argmax(mask, axis=-1)
    mask = (mask * 255).astype(np.uint8)
    cv2.imwrite(save_path, mask)

# load the model
model_path = "/path/to/saved/model.h5"
model = tf.keras.models.load_model(model_path)

# set parameters
image_h, image_w = 540, 960  # same as during training
test_images_path = "/path/to/test/images/"
output_masks_path = "/path/to/save/masks/"

# create output directory if it doesn't exist
if not os.path.exists(output_masks_path):
    os.makedirs(output_masks_path)

# process each test image
for image_file in glob(os.path.join(test_images_path, "*.png")):
    # preprocess the image
    image = preprocess_image(image_file, image_h, image_w)
    image = np.expand_dims(image, axis=0)  # add batch dimension

    # predict mask
    predicted_mask = model.predict(image)

    # post-process and save mask
    mask_filename = os.path.basename(image_file).replace('.png', '_mask.png')
    save_mask_path = os.path.join(output_masks_path, mask_filename)
    save_mask(predicted_mask[0], save_mask_path)  # [0] to remove batch dimension

    print(f"Mask saved to: {save_mask_path}")



# Update model_path, test_images_path, and output_masks_path with the appropriate paths.
# The preprocess_image function should mirror the preprocessing done during training.
# The save_mask function converts the predicted mask (assuming one-hot encoded output) back to a single-channel image and saves it.
# The script processes each image in the test images directory, predicts the mask, and saves it in the specified output directory.
# This script assumes that the saved model outputs a one-hot encoded mask. If your model's output format is different, you will need to modify the save_mask function accordingly.

In [None]:
# load this model later using tf.keras.models.load_model(final_model_path) for further usage or testing

In [None]:
##################################################################################################################################

In [None]:
# TEST MODEL

In [None]:
# === IMPORTS ===
# Import all necessary libraries for model, training, and utilities
# import numpy as np
# import cv2
# import pandas as pd
# from glob import glob
# from tqdm import tqdm
# import tensorflow as tf
# from sklearn.metrics import f1_score, jaccard_score
# # from train import create_dir, load_dataset: use when train is it's own notebook

# import os
# os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

In [None]:
# def grayscale_to_rgb(mask, rgb_codes):
#     h, w = mask.shape[0], mask.shape[1]
#     mask = mask.astype(np.int32)
#     output = []

#     for i, pixel in enumerate(mask.flatten()):
#         output.append(rgb_codes[pixel])

#     output = np.reshape(output, (h, w, 3))
#     return output

In [None]:
# === VISUALIZATION ===
# Visualize predictions, masks, or training outputs
# def save_results(image_x, mask, pred, save_image_path):
#     mask = np.expand_dims(mask, axis=-1)
#     mask = grayscale_to_rgb(mask, rgb_codes)

#     pred = np.expand_dims(pred, axis=-1)
#     pred = grayscale_to_rgb(pred, rgb_codes)

#     line = np.ones((image_x.shape[0], 10, 3)) * 255

#     cat_images = np.concatenate([image_x, line, mask, line, pred], axis=1)
#     cv2.imwrite(save_image_path, cat_images)

In [None]:
# === TRAINING LOOP ===
# Iterate through epochs to train and validate model performance
# if __name__ == "__main__":
#     """ seeding """
#     np.random.seed(42)
#     tf.random.set_seed(42)

#     """ directory for storing files """
#     create_dir("results")

#     """ hyperparameters """
#     image_h = 512
#     image_w = 512
#     num_classes = 3

#     """ paths """
#     dataset_path = "USE"
#     model_path = os.path.join("files", "model.h5")

#     """ RGB code and classes """
#     rgb_codes = [
#         [0, 0, 0], [255, 214, 0], [49, 205, 49]
#     ]


#     classes = [
#         "background", "shaft", "end-effector"
#     ]

#     """ loading the dataset """
#     (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(dataset_path)
#     print(f"Train: {len(train_x)}/{len(train_y)} - Valid: {len(valid_x)}/{len(valid_y)} - Test: {len(test_x)}/{len(test_x)}")
#     print("")

#     """ load the saved model """
#     model = tf.keras.models.load_model(model_path)

#     """ prediction & evaluation """
#     SCORE = []
#     for x, y in tqdm(zip(test_x, test_y), total=len(test_x)):
#         """ extract the name """
#         name = x.split(" ")[1] #[-1].split(".")[0] #.split(" ")[-1] #.split("_")[-1] #[0]

#         """ reading the image """
#         image = cv2.imread(x, cv2.IMREAD_COLOR) # from test x
#         image = cv2.resize(image, (image_w, image_h))
#         image_x = image
#         image = image/255.0 # (H, W, 3) normalizes
#         image = np.expand_dims(image, axis=0) # [1, H, W, 3]
#         image = image.astype(np.float32)

#         """ reading the mask """
#         mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE) # from test y
#         mask = cv2.resize(mask, (image_w, image_h))
#         mask = mask.astype(np.int32)

#         """ prediction """
#         pred = model.predict(image, verbose=0)[0]
#         pred = np.argmax(pred, axis=-1) # [0.1, 0.2, 0.1, 0.6] -> 3
#         pred = pred.astype(np.int32)


#         """ save the results """
#         save_image_path = f"results/{name}.png"
#         save_results(image_x, mask, pred, save_image_path)


#         """ flatten the array """
#         mask = mask.flatten()
#         pred = pred.flatten()

#         labels = [i for i in range(num_classes)]

#         """ calculating the metrics values """
#         f1_value = f1_score(mask, pred, labels=labels, average=None, zero_division=0)
#         jac_value = jaccard_score(mask, pred, labels=labels, average=None, zero_division=0)

#         SCORE.append([f1_value, jac_value])

#     score = np.array(SCORE)
#     score = np.mean(score, axis=0)

#     f = open("files/score.csv", "w")
#     f.write("Class,F1,Jaccard\n")

#     l = ["Class", "F1", "Jaccard"]
#     print(f"{l[0]:15s} {l[1]:10s} {l[2]:10s}")
#     print("-"*35)

#     for i in range(num_classes):
#         class_name = classes[i]
#         f1 = score[0, i]
#         jac = score[1, i]
#         dstr = f"{class_name:15s}: {f1:1.5f} - {jac:1.5f}"
#         print(dstr)
#         f.write(f"{class_name:15s},{f1:1.5f},{jac:1.5f}\n")

#     print("-"*35)
#     class_mean = np.mean(score, axis=-1)
#     class_name = "Mean"

#     f1 = class_mean[0]
#     jac = class_mean[1]

#     dstr = f"{class_name:15s}: {f1:1.5f} - {jac:1.5f}"
#     print(dstr)
#     f.write(f"{class_name:15s},{f1:1.5f},{jac:1.5f}\n")

#     f.close()

Train: 1241/1241 - Valid: 100/100 - Test: 10/10



100%|██████████| 10/10 [00:20<00:00,  2.02s/it]

Class           F1         Jaccard   
-----------------------------------
background     : 0.99654 - 0.99310
shaft          : 0.99911 - 0.99823
end-effector   : 0.98329 - 0.96716
-----------------------------------
Mean           : 0.99298 - 0.98616



