In [1]:
!pip install scikit-image
!pip install tqdm
!pip install tensorflow
import os
import glob
import time
import numpy as np
from PIL import Image
from pathlib import Path
from tqdm import tqdm
import matplotlib.pyplot as plt
from skimage.color import rgb2lab, lab2rgb

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!wget http://images.cocodataset.org/zips/train2017.zip -O coco_train2017.zip
!wget http://images.cocodataset.org/zips/val2017.zip -O coco_val2017.zip
!wget http://images.cocodataset.org/annotations/annotations_trainval2017.zip -O coco_ann2017.zip

--2023-04-15 16:11:52--  http://images.cocodataset.org/zips/train2017.zip
Resolving images.cocodataset.org (images.cocodataset.org)... 52.216.54.201, 52.216.34.209, 52.216.251.164, ...
Connecting to images.cocodataset.org (images.cocodataset.org)|52.216.54.201|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 19336861798 (18G) [application/zip]
Saving to: ‘coco_train2017.zip’

coco_train2017.zip   28%[====>               ]   5.17G  12.9MB/s    eta 17m 0s 

In [None]:
# Visualise the raw data
from zipfile import ZipFile, BadZipFile
import os
def extract_zip_file(extract_path):
    try:
        with ZipFile(extract_path+".zip") as zfile:
            zfile.extractall(".")
        # remove zipfile
        zfileTOremove=f"{extract_path}"+".zip"
        if os.path.isfile(zfileTOremove):
            os.remove(zfileTOremove)
        else:
            print("Error: %s file not found" % zfileTOremove)    
    except BadZipFile as e:
        print("Error:", e)

extract_train_path = "./coco_train2017"
extract_val_path = "./coco_val2017"
extract_ann_path="./coco_ann2017"
extract_zip_file(extract_train_path)
extract_zip_file(extract_val_path)
extract_zip_file(extract_ann_path)

In [None]:
!mkdir data
!cd data && git clone https://github.com/cocodataset/cocoapi
!cd data/cocoapi/PythonAPI && make

In [2]:
import os
import skimage.io as io
import numpy as np
import tensorflow as tf
print('TensorFlow version:', tf.__version__)

TensorFlow version: 2.12.0


In [None]:
import matplotlib.pyplot as plt
import random
!pip install livelossplot --quiet
from livelossplot.tf_keras import PlotLossesCallback

In [None]:
COCO_ROOT = '/content'
COCO_API_ROOT = './data/'
import sys
sys.path.insert(0, os.path.join(COCO_API_ROOT, 'cocoapi/PythonAPI'))
from pycocotools.coco import COCO

In [None]:
class Dataset():

    def crop_images(self, img, inp_size, random_crop=False):
        shape = tf.shape(img)
        pad = (
            [0, tf.maximum(inp_size - shape[0], 0)],
            [0, tf.maximum(inp_size - shape[1], 0)],
            [0, 0],
        )
        img = tf.pad(img, pad)

        if random_crop:
            img = tf.image.random_crop(img, (inp_size, inp_size, shape[2]))
        else: # central crop
            shape = tf.shape(img)
            ho = (shape[0] - inp_size) // 2
            wo = (shape[1] - inp_size) // 2
            img = img[ho:ho+inp_size, wo:wo+inp_size, :]

        return img

    def train_dataset(self, batch_size, epochs, inp_size):

        def item_to_images(item):
            random_crop = True
            img_combined = tf.py_function(self.read_images, [item], tf.uint8)
            img_combined = self.crop_images(img_combined, inp_size, random_crop)

            img = tf.cast(img_combined[...,:3], tf.float32) / np.float32(255.)
            mask_class = tf.cast(img_combined[...,3:4], tf.float32)
            return img, mask_class

        dataset = tf.data.Dataset.from_tensor_slices(self.img_list)
        dataset = dataset.shuffle(buffer_size=len(self.img_list))
        dataset = dataset.map(item_to_images)
        dataset = dataset.repeat(epochs)
        dataset = dataset.batch(batch_size, drop_remainder=True)

        return dataset

    def val_dataset(self, batch_size, inp_size):

        def item_to_images(item):
            random_crop = False
            img_combined = tf.py_function(self.read_images, [item], tf.uint8)
            img_combined = self.crop_images(img_combined, inp_size, random_crop)

            img = tf.cast(img_combined[...,:3], tf.float32) / np.float32(255.)
            mask_class = tf.cast(img_combined[...,3:4], tf.float32)
            return img, mask_class

        dataset = tf.data.Dataset.from_tensor_slices(self.img_list)
        dataset = dataset.map(item_to_images)
        dataset = dataset.batch(batch_size, drop_remainder=True)

        return dataset

In [None]:
class COCO_Dataset(Dataset):

    def __init__(self, sublist, percent=1):
        ann_file_fpath = os.path.join(COCO_ROOT, 'annotations', 'instances_'+sublist+'2017.json')
        self.coco = COCO(ann_file_fpath)
        self.cat_ids = self.coco.getCatIds(catNms=['person'])
        self.img_list = self.coco.getImgIds(catIds=self.cat_ids)
        assert percent > 0 and percent <= 1
        self.img_list = random.sample(self.img_list, int(len(self.img_list) * percent))

    def read_images(self, img_id):
        img_id = int(img_id.numpy())
        img_data = self.coco.loadImgs(img_id)[0]
        img_fname = '/'.join(img_data['coco_url'].split('/')[-2:])

        img = io.imread(os.path.join(COCO_ROOT, img_fname))
        if len(img.shape) == 2:
            img = np.tile(img[..., None], (1, 1, 3))

        ann_ids = self.coco.getAnnIds(imgIds=img_data['id'], catIds=self.cat_ids, iscrowd=None)
        anns = self.coco.loadAnns(ann_ids)
        mask_class = np.zeros((img.shape[0], img.shape[1]), dtype=np.uint8)
        for i in range(len(anns)):
            mask_class += self.coco.annToMask(anns[i])
        mask_class = (mask_class > 0).astype(np.uint8)

        img_combined = np.concatenate([img, mask_class[..., None]], axis=2)

        return img_combined

# ConvEncoder and ConvDecoder Blocks

In [None]:
def ConvEncoder(inputs, n_filters=32, dropout_prob=0.0, 
            weight_initializer='HeNormal', max_pooling=True):
    c1 = tf.keras.layers.Conv2D(n_filters, 3, 
                  activation='relu',
                  padding='same',
                  kernel_initializer=weight_initializer)(inputs)

    c2 = tf.keras.layers.Conv2D(n_filters, 3,
                  activation='relu',
                  padding='same',
                  kernel_initializer=weight_initializer)(c1)
    
    conv = tf.keras.layers.BatchNormalization()(c2, training=False)

    if dropout_prob > 0.0:     
        conv = tf.keras.layers.Dropout(dropout_prob)(conv)

    if max_pooling:
        next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2,2))(conv)    
    else:
        next_layer = conv

    skip_connection = conv
    
    return next_layer, skip_connection


def ConvDecoder(prev_layer_input, skip_layer_input, n_filters=32, 
            weight_initializer='HeNormal'):
    c1 = tf.keras.layers.Conv2DTranspose(n_filters, (3,3),
                 strides=(2,2),
                 padding='same')(prev_layer_input)

    c2 = tf.keras.layers.Conv2D(n_filters, 3,
                 activation='relu',
                 padding='same',
                 kernel_initializer=weight_initializer)(tf.keras.layers.concatenate([c1, 
                                                            skip_layer_input], 
                                                            axis=3))

    return tf.keras.layers.Conv2D(n_filters, 3,
                 activation='relu',
                 padding='same',
                 kernel_initializer=weight_initializer)(c2)

# UNetSimple Architecture

In [None]:
def UNetSimple(image_size=224, n_filters=32, weights='HeNormal', 
               dropout_prob=0.2):
    inputs = tf.keras.layers.Input((image_size, image_size, 3))
    e1 = ConvEncoder(inputs, n_filters, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e2 = ConvEncoder(e1[0],n_filters*2, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e3 = ConvEncoder(e2[0], n_filters*4, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=False)
    
    d4 = ConvDecoder(e3[0], e2[1],  n_filters * 2, weight_initializer=weights)
    d5 = ConvDecoder(d4, e1[1],  n_filters * 1, weight_initializer=weights)

    conv1 = tf.keras.layers.Conv2D(n_filters,
                 3,
                 activation='relu',
                 padding='same',
                 kernel_initializer=weights)(d5)

    conv2 = tf.keras.layers.Conv2D(1, (3, 3), padding='same', 
                                   activation='sigmoid')(conv1)
    
    model = tf.keras.Model(inputs=inputs, outputs=conv2)

    return model
UNetSimple().summary()

# UNetDense Architecture

In [None]:
def UNetDense(image_size=224, n_filters=32, weights='HeNormal', 
               dropout_prob=0.3):
    inputs = tf.keras.layers.Input((image_size, image_size, 3))
    e1 = ConvEncoder(inputs, n_filters, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e2 = ConvEncoder(e1[0],n_filters*2, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e3 = ConvEncoder(e2[0], n_filters*4, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e4 = ConvEncoder(e3[0], n_filters*8, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=True)
    e5 = ConvEncoder(e4[0], n_filters*16, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=False)
    
    d6 = ConvDecoder(e5[0], e4[1],  n_filters * 8, weight_initializer=weights)
    d7 = ConvDecoder(d6, e3[1],  n_filters * 2, weight_initializer=weights)
    d8 = ConvDecoder(d7, e2[1],  n_filters * 2, weight_initializer=weights)
    d9 = ConvDecoder(d8, e1[1],  n_filters, weight_initializer=weights)

    conv10 = tf.keras.layers.Conv2D(n_filters,
                 3,
                 activation='relu',
                 padding='same',
                 kernel_initializer=weights)(d9)

    conv11 = tf.keras.layers.Conv2D(1, (3, 3), padding='same', 
                                   activation='sigmoid')(conv10)
    
    model = tf.keras.Model(inputs=inputs, outputs=conv11)

    return model
UNetDense().summary()

In [None]:
# UNetSuperDense Architecture

In [None]:
def UNetSuperDense(image_size=256, n_filters=32, weights='HeNormal', 
               dropout_prob=0.3):
    inputs = tf.keras.layers.Input((image_size, image_size, 3))
    e1 = ConvEncoder(inputs, n_filters, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e2 = ConvEncoder(e1[0],n_filters*2, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e3 = ConvEncoder(e2[0], n_filters*4, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e4 = ConvEncoder(e3[0], n_filters*8, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=True)
    e5 = ConvEncoder(e4[0], n_filters*16, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=True)
    e6 = ConvEncoder(e5[0], n_filters*32, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=True)
    e7 = ConvEncoder(e6[0], n_filters*64, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=True)
    e8 = ConvEncoder(e7[0], n_filters*128, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=False)
    
    d9 = ConvDecoder(e8[0], e7[1],  n_filters * 64, weight_initializer=weights)
    d10 = ConvDecoder(d9, e6[1],  n_filters * 32, weight_initializer=weights)
    d11 = ConvDecoder(d10, e5[1],  n_filters * 16, weight_initializer=weights)
    d12 = ConvDecoder(d11, e4[1],  n_filters * 8, weight_initializer=weights)
    d13 = ConvDecoder(d12, e3[1],  n_filters * 4, weight_initializer=weights)
    d14 = ConvDecoder(d13, e2[1],  n_filters * 2, weight_initializer=weights)
    d15 = ConvDecoder(d14, e1[1],  n_filters, weight_initializer=weights)

    conv1 = tf.keras.layers.Conv2D(n_filters,
                 3,
                 activation='relu',
                 padding='same',
                 kernel_initializer=weights)(d15)

    conv2 = tf.keras.layers.Conv2D(1, (3, 3), padding='same', 
                                   activation='sigmoid')(conv1)
    
    model = tf.keras.Model(inputs=inputs, outputs=conv2)

    return model


UNetSuperDense().summary()

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_5 (InputLayer)           [(None, 256, 256, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d_86 (Conv2D)             (None, 256, 256, 32  896         ['input_5[0][0]']                
                                )                                                                 
                                                                                                  
 conv2d_87 (Conv2D)             (None, 256, 256, 32  9248        ['conv2d_86[0][0]']              
                                )                                                           

# CNN Architecture

In [3]:
def CNNencoder(inputs, n_filters=32, dropout_prob=0.0, 
            weight_initializer='HeNormal', max_pooling=True):
    c1 = tf.keras.layers.Conv2D(n_filters, 3, 
                  activation='relu',
                  padding='same',
                  kernel_initializer=weight_initializer)(inputs)

    c2 = tf.keras.layers.Conv2D(n_filters, 3,
                  activation='relu',
                  padding='same',
                  kernel_initializer=weight_initializer)(c1)
    
    conv = tf.keras.layers.BatchNormalization()(c2, training=False)

    if dropout_prob > 0.0:     
        conv = tf.keras.layers.Dropout(dropout_prob)(conv)

    if max_pooling:
        next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2,2))(conv)    
    else:
        next_layer = conv
    
    return next_layer


def CNNdecoder(prev_layer_input, n_filters=32, 
            weight_initializer='HeNormal'):
    c1 = tf.keras.layers.Conv2DTranspose(n_filters, (3,3),
                 strides=(2,2),
                 padding='same')(prev_layer_input)

    c2 = tf.keras.layers.Conv2D(n_filters, 3,
                 activation='relu',
                 padding='same',
                 kernel_initializer=weight_initializer)(c1)

    return tf.keras.layers.Conv2D(n_filters, 3,
                 activation='relu',
                 padding='same',
                 kernel_initializer=weight_initializer)(c2)

def CNNSuperDense(image_size=256, n_filters=32, weights='HeNormal', 
               dropout_prob=0.3):
    inputs = tf.keras.layers.Input((image_size, image_size, 3))
    e1 = CNNencoder(inputs, n_filters, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e2 = CNNencoder(e1,n_filters*2, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e3 = CNNencoder(e2, n_filters*4, dropout_prob=0.0, 
                 weight_initializer=weights, max_pooling=True)
    e4 = CNNencoder(e3, n_filters*8, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=True)
    e5 = CNNencoder(e4, n_filters*16, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=True)
    e6 = CNNencoder(e5, n_filters*32, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=True)
    e7 = CNNencoder(e6, n_filters*64, dropout_prob=dropout_prob, 
                 weight_initializer=weights, max_pooling=False)
    
    d8 = CNNdecoder(e7,  n_filters * 32, weight_initializer=weights)
    d9 = CNNdecoder(d8,  n_filters * 16, weight_initializer=weights)
    d10 = CNNdecoder(d9,  n_filters * 8, weight_initializer=weights)
    d11 = CNNdecoder(d10,  n_filters * 4, weight_initializer=weights)
    d12 = CNNdecoder(d11,  n_filters * 2, weight_initializer=weights)
    d13 = CNNdecoder(d12,  n_filters, weight_initializer=weights)

    conv1 = tf.keras.layers.Conv2D(n_filters,
                 3,
                 activation='relu',
                 padding='same',
                 kernel_initializer=weights)(d13)

    conv2 = tf.keras.layers.Conv2D(1, (3, 3), padding='same', 
                                   activation='sigmoid')(conv1)
    
    model = tf.keras.Model(inputs=inputs, outputs=conv2)

    return model
CNNSuperDense().summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 256, 256, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 256, 256, 32)      896       
                                                                 
 conv2d_1 (Conv2D)           (None, 256, 256, 32)      9248      
                                                                 
 batch_normalization (BatchN  (None, 256, 256, 32)     128       
 ormalization)                                                   
                                                                 
 max_pooling2d (MaxPooling2D  (None, 128, 128, 32)     0         
 )                                                               
                                                                 
 conv2d_2 (Conv2D)           (None, 128, 128, 64)      18496 

In [None]:
# Define Dataset
COCO_dataset_train = COCO_Dataset('train', 1.0)
COCO_dataset_val = COCO_Dataset('val', 1.0)

loading annotations into memory...
Done (t=35.19s)
creating index...
index created!
loading annotations into memory...
Done (t=0.56s)
creating index...
index created!


In [None]:
BATCH_SIZE = 40
IMAGE_SIZE = 256
EPOCHS = 40

train_ds = COCO_dataset_train.train_dataset(batch_size=BATCH_SIZE, epochs=EPOCHS, inp_size=IMAGE_SIZE)
val_ds = COCO_dataset_val.val_dataset(batch_size=BATCH_SIZE, inp_size=IMAGE_SIZE)

In [None]:
from pathlib import Path

In [None]:
path = Path("model_1")
path.mkdir(exist_ok=True, parents=True) # folder created in google drive
assert path.exists()
# cpt_filename = "checkpoint-{epoch:02d}-{val_loss:.2f}-{val_accuracy:.4f}.hdf5"
cpt_filename = "best1.hdf5"
cpt_path =str(path / cpt_filename)

# UNetSimple Model Training
model = UNetSimple(image_size=IMAGE_SIZE)
checkpoint = tf.keras.callbacks.ModelCheckpoint(cpt_path, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
optimizer = tf.keras.optimizers.Adam(0.0001)

model.compile(loss='binary_crossentropy', metrics=['accuracy'], optimizer=optimizer)

hist = model.fit(
    train_ds, 
    epochs=EPOCHS,
    steps_per_epoch=50,
    validation_data=val_ds,
    callbacks=[PlotLossesCallback(), checkpoint])

In [None]:
path = Path("model_2")
path.mkdir(exist_ok=True, parents=True) # folder created in google drive
assert path.exists()
cpt_filename = "best2.hdf5"
cpt_path =str(path / cpt_filename)

# UNetDense Model Training
model = UNetDense(image_size=IMAGE_SIZE)
checkpoint = tf.keras.callbacks.ModelCheckpoint(cpt_path, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
optimizer = tf.keras.optimizers.Adam(0.0002)

model.compile(loss='binary_crossentropy', metrics=['accuracy'], optimizer=optimizer)

hist = model.fit(
    train_ds, 
    epochs=EPOCHS,
    steps_per_epoch=50,
    validation_data=val_ds,
    callbacks=[PlotLossesCallback(), checkpoint])

In [None]:
path = Path("model_3")
path.mkdir(exist_ok=True, parents=True) # folder created in google drive
assert path.exists()
cpt_filename = "best3.hdf5"
cpt_path =str(path / cpt_filename)

# UNetSuperDense Model Training
model = UNetSuperDense(image_size=IMAGE_SIZE)
checkpoint = tf.keras.callbacks.ModelCheckpoint(cpt_path, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
optimizer = tf.keras.optimizers.Adam(0.00005)

model.compile(loss='binary_crossentropy', metrics=['accuracy'], optimizer=optimizer)

hist = model.fit(
    train_ds, 
    epochs=EPOCHS,
    steps_per_epoch=50,
    validation_data=val_ds,
    callbacks=[PlotLossesCallback(), checkpoint]
    )

In [None]:
path = Path("model_4")
path.mkdir(exist_ok=True, parents=True) # folder created in google drive
assert path.exists()
cpt_filename = "best4.hdf5"
cpt_path =str(path / cpt_filename)

# CNNSuperDense Model Training
model = CNNSuperDense(image_size=IMAGE_SIZE)
checkpoint = tf.keras.callbacks.ModelCheckpoint(cpt_path, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
optimizer = tf.keras.optimizers.Adam(0.0001)

model.compile(loss='binary_crossentropy', metrics=['accuracy'], optimizer=optimizer)

hist = model.fit(
    train_ds, 
    epochs=EPOCHS,
    steps_per_epoch=50,
    validation_data=val_ds,
    callbacks=[PlotLossesCallback(), checkpoint]
    )

In [None]:
import matplotlib.pyplot as plt
import random
########## Model Results ##########

def draw_sub_image(i, sample, mask):
    plt.subplot(4,4, i + 1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    # print(sample.shape, mask.shape)
    mask = (mask[..., 0] > 0.2).astype(np.float32)
    mask_clr = plt.get_cmap('viridis')(mask)[..., :3]
    plt.imshow(sample*0.5 + mask_clr*0.5)
    title_object = plt.title("Predicted" if i % 2 else "Real")
    _ = plt.setp(title_object, color="b")

plt.figure(figsize=(10,10))

imgs, masks = next(train_ds.as_numpy_iterator())
# imgs, masks = next(val_ds.as_numpy_iterator())

for i, sample, mask in zip(range(imgs.shape[0]), imgs, masks):
    pred_mask = model.predict(sample[None, ...])
    if i > 7:
        break
    draw_sub_image(2*i, sample, mask)
    draw_sub_image(2*i + 1, sample, pred_mask[0, ...])