**U- Net Model**

In [1]:

from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Activation, MaxPool2D, UpSampling2D, Concatenate, Add
import tensorflow as tf
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"


def conv_block(inputs, out_ch, rate=1):
    """

    With "same" padding, the necessary amount of zero-padding is added to the input so that the output has the same height and width as the input.

    """
    x = Conv2D(out_ch, 3, padding="same", dilation_rate=1)(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x


def RSU_L(inputs, out_ch, int_ch, num_layers, rate=2):
    """
    inputs: This is the input tensor or layer to the function. It represents the input feature map on which the operations will be performed.

    out_ch: It stands for "output channels." This is the number of filters or channels that the convolutional layers in the function will produce. It determines the depth of the output feature map.

    int_ch: It stands for "intermediate channels." This is the number of channels used in the intermediate convolutional layers of the function. It can be considered as a bottleneck layer.

    num_layers: This parameter represents the number of layers in the encoding part of the network. It determines how many times the function will apply downsampling operations (MaxPooling) followed by convolutional layers on the input.

    rate=2: This is a default parameter for the dilation rate. Dilation rate controls the spacing between the kernel points, influencing the field of view of each point. A dilation rate of 1 corresponds to standard convolution, and a rate of 2 or more introduces spaces between the kernel points.

    """

    """ Initial Conv """
    x = conv_block(inputs, out_ch)
    init_feats = x

    """ Encoder """
    """
    skip is to keep a record of the intermediate feature maps obtained after each convolutional block in the encoding part of the network. These feature maps will be used during the decoding part, where the network has to concatenate or skip connections between the corresponding feature maps from the encoding and decoding stages. This skip connection helps in retaining and utilizing low-level details during the upsampling process.
    """
    skip = []
    x = conv_block(x, int_ch)
    skip.append(x)

    for i in range(num_layers-2):
        x = MaxPool2D((2, 2))(x)
        x = conv_block(x, int_ch)
        skip.append(x)

    """ Bridge """
    x = conv_block(x, int_ch, rate=rate)

    """ Decoder """
    skip.reverse()

    x = Concatenate()([x, skip[0]])
    x = conv_block(x, int_ch)

    for i in range(num_layers-3):
        x = UpSampling2D(size=(2, 2), interpolation="bilinear")(x)
        x = Concatenate()([x, skip[i+1]])
        x = conv_block(x, int_ch)

    x = UpSampling2D(size=(2, 2), interpolation="bilinear")(x)
    x = Concatenate()([x, skip[-1]])
    x = conv_block(x, out_ch)

    """ Add """
    x = Add()([x, init_feats])
    return x


def RSU_4F(inputs, out_ch, int_ch):
    """ Initial Conv """
    x0 = conv_block(inputs, out_ch, rate=1)

    """ Encoder """
    x1 = conv_block(x0, int_ch, rate=1)
    x2 = conv_block(x1, int_ch, rate=2)
    x3 = conv_block(x2, int_ch, rate=4)

    """ Bridge """
    x4 = conv_block(x3, int_ch, rate=8)

    """ Decoder """
    x = Concatenate()([x4, x3])
    x = conv_block(x, int_ch, rate=4)

    x = Concatenate()([x, x2])
    x = conv_block(x, int_ch, rate=2)

    x = Concatenate()([x, x1])
    x = conv_block(x, out_ch, rate=1)

    """ Addition """
    x = Add()([x, x0])
    return x


def u2net(input_shape, out_ch, int_ch, num_classes=1):
    """ Input Layer """
    inputs = Input(input_shape)
    s0 = inputs

    """ Encoder """
    s1 = RSU_L(s0, out_ch[0], int_ch[0], 7)
    p1 = MaxPool2D((2, 2))(s1)

    s2 = RSU_L(p1, out_ch[1], int_ch[1], 6)
    p2 = MaxPool2D((2, 2))(s2)

    s3 = RSU_L(p2, out_ch[2], int_ch[2], 5)
    p3 = MaxPool2D((2, 2))(s3)

    s4 = RSU_L(p3, out_ch[3], int_ch[3], 4)
    p4 = MaxPool2D((2, 2))(s4)

    s5 = RSU_4F(p4, out_ch[4], int_ch[4])
    p5 = MaxPool2D((2, 2))(s5)

    """ Bridge """
    b1 = RSU_4F(p5, out_ch[5], int_ch[5])
    b2 = UpSampling2D(size=(2, 2), interpolation="bilinear")(b1)

    """ Decoder """
    d1 = Concatenate()([b2, s5])
    d1 = RSU_4F(d1, out_ch[6], int_ch[6])
    u1 = UpSampling2D(size=(2, 2), interpolation="bilinear")(d1)

    d2 = Concatenate()([u1, s4])
    d2 = RSU_L(d2, out_ch[7], int_ch[7], 4)
    u2 = UpSampling2D(size=(2, 2), interpolation="bilinear")(d2)

    d3 = Concatenate()([u2, s3])
    d3 = RSU_L(d3, out_ch[8], int_ch[8], 5)
    u3 = UpSampling2D(size=(2, 2), interpolation="bilinear")(d3)

    d4 = Concatenate()([u3, s2])
    d4 = RSU_L(d4, out_ch[9], int_ch[9], 6)
    u4 = UpSampling2D(size=(2, 2), interpolation="bilinear")(d4)

    d5 = Concatenate()([u4, s1])
    d5 = RSU_L(d5, out_ch[10], int_ch[10], 7)

    """ Side Outputs """
    y1 = Conv2D(num_classes, 3, padding="same")(d5)

    y2 = Conv2D(num_classes, 3, padding="same")(d4)
    y2 = UpSampling2D(size=(2, 2), interpolation="bilinear")(y2)

    y3 = Conv2D(num_classes, 3, padding="same")(d3)
    y3 = UpSampling2D(size=(4, 4), interpolation="bilinear")(y3)

    y4 = Conv2D(num_classes, 3, padding="same")(d2)
    y4 = UpSampling2D(size=(8, 8), interpolation="bilinear")(y4)

    y5 = Conv2D(num_classes, 3, padding="same")(d1)
    y5 = UpSampling2D(size=(16, 16), interpolation="bilinear")(y5)

    y6 = Conv2D(num_classes, 3, padding="same")(b1)
    y6 = UpSampling2D(size=(32, 32), interpolation="bilinear")(y6)

    y0 = Concatenate()([y1, y2, y3, y4, y5, y6])
    y0 = Conv2D(num_classes, 3, padding="same")(y0)

    y0 = Activation("sigmoid", name="y0")(y0)
    y1 = Activation("sigmoid", name="y1")(y1)
    y2 = Activation("sigmoid", name="y2")(y2)
    y3 = Activation("sigmoid", name="y3")(y3)
    y4 = Activation("sigmoid", name="y4")(y4)
    y5 = Activation("sigmoid", name="y5")(y5)
    y6 = Activation("sigmoid", name="y6")(y6)

    model = tf.keras.models.Model(inputs, outputs=[y0, y1, y2, y3, y4, y5, y6])
    return model


def build_u2net(input_shape, num_classes=1):
    out_ch = [64, 128, 256, 512, 512, 512, 512, 256, 128, 64, 64]
    int_ch = [32, 32, 64, 128, 256, 256, 256, 128, 64, 32, 16]
    model = u2net(input_shape, out_ch, int_ch, num_classes=num_classes)
    return model


def build_u2net_lite(input_shape, num_classes=1):
    out_ch = [64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64]
    int_ch = [16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16]
    model = u2net(input_shape, out_ch, int_ch, num_classes=num_classes)
    return model


if __name__ == "__main__":
    model = build_u2net_lite((512, 512, 3))
    model.summary()


# ...




Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 512, 512, 3)]        0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 512, 512, 64)         1792      ['input_1[0][0]']             
                                                                                                  
 batch_normalization (Batch  (None, 512, 512, 64)         256       ['conv2d[0][0]']              
 Normalization)                                                                                   
                                                                                                  
 activation (Activation)     (None, 512, 512, 64)         0         ['batch_normalization[0][0

**Let us now train the model**

In [2]:

# from model import build_u2net_lite, build_u2net
from sklearn.model_selection import train_test_split
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
import tensorflow as tf
from sklearn.utils import shuffle
from glob import glob
import cv2
import numpy as np
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"


""" Global parameters """

# may need to change the below size of image based on GPU capacity
H = 128
W = 128


def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)


def load_dataset(path, split=0.1):
    train_x = sorted(
        glob(os.path.join(path, "train", "blurred_image", "*.jpg")))
    train_y = sorted(glob(os.path.join(path, "train", "mask", "*.png")))

    valid_x = sorted(glob(os.path.join(path, "validation",
                     "P3M-500-NP", "original_image", "*.jpg")))
    valid_y = sorted(
        glob(os.path.join(path, "validation", "P3M-500-NP", "mask", "*.png")))

    return (train_x, train_y), (valid_x, valid_y)


def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (W, H))
    x = x / 255.0
    x = x.astype(np.float32)
    return x


def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (W, H))
    x = x / 255.0
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=-1)
    return x


def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    return x, y


def tf_dataset(X, Y, batch=2):
    ds = tf.data.Dataset.from_tensor_slices((X, Y))
    ds = ds.map(tf_parse).batch(batch).prefetch(10)
    return ds


if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)
    files="/kaggle/working/files"

    """ Directory for storing files """
    create_dir(files)

    """ Hyperparameters """
    # may need to change the below parameters(esp. batch size) based on GPU capacity
    batch_size = 4
    lr = 1e-4
    num_epochs = 2
    model_path = os.path.join(files, "model2.h5")
    csv_path = os.path.join(files, "log.csv")

    """ Dataset """
    # dataset_path = "P3M-10k"
    dataset_path = "/kaggle/input/p3mzip/P3M-10k"
    (train_x, train_y), (valid_x, valid_y) = load_dataset(dataset_path)

    print(f"Train: {len(train_x)} - {len(train_y)}")
    print(f"Valid: {len(valid_x)} - {len(valid_y)}")

    train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
    valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)

    """ Model """
    model = build_u2net((H, W, 3))
    # model.load_weights(model_path)
    # model.compile(loss="binary_crossentropy", optimizer=Adam(lr))
    model.compile(loss="binary_crossentropy",
                  optimizer=Adam(lr), run_eagerly=True)

    callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1,
                          patience=5, min_lr=1e-7, verbose=1),
        CSVLogger(csv_path),
        EarlyStopping(monitor='val_loss', patience=20,
                      restore_best_weights=False),
    ]

    model.fit(
        train_dataset,
        epochs=num_epochs,
        validation_data=valid_dataset,
        callbacks=callbacks
    )
    # print("Training code done")


Train: 9421 - 9421
Valid: 500 - 500
Epoch 1/2
Epoch 1: val_loss improved from inf to 1.37008, saving model to /kaggle/working/files/model2.h5


  saving_api.save_model(


Epoch 2/2
Epoch 2: val_loss improved from 1.37008 to 1.12853, saving model to /kaggle/working/files/model2.h5
Training code done


In [None]:
# def load_dataset(path, split=0.1):
#     train_x = sorted(
#         glob(os.path.join(path, "train", "blurred_image", "*.jpg")))
#     train_y = sorted(glob(os.path.join(path, "train", "mask", "*.png")))

#     valid_x = sorted(glob(os.path.join(path, "validation",
#                      "P3M-500-NP", "original_image", "*.jpg")))
#     valid_y = sorted(
#         glob(os.path.join(path, "validation", "P3M-500-NP", "mask", "*.png")))

#     return (train_x, train_y), (valid_x, valid_y)


**Let us now test the model**

In [5]:

import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

import numpy as np
import cv2
import pandas as pd
from glob import glob
from tqdm import tqdm
import tensorflow as tf
# from train import load_dataset

""" Global parameters """
H = 128
W = 128

""" Creating a directory """
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)
    
    """ Load the model """
    files="/kaggle/working/files"
    model_path = os.path.join(files, "model2.h5")
    model = tf.keras.models.load_model(model_path)

    """ Directory for storing files """
    results="/kaggle/working/results"
    for item in ["joint", "mask"]:
        create_dir(f"{results}/{item}")

    """ Dataset """
    images = glob("/kaggle/input/git-repo/U2-Net-for-Image-Matting-in-TensorFlow-main/test/*")
    print(f"Images: {len(images)}")

    """ Prediction """
    for x in tqdm(images, total=len(images)):
        """ Extracting the name """
        name = x.split("/")[-1]

        """ Reading the image """
        image = cv2.imread(x, cv2.IMREAD_COLOR)
        x = cv2.resize(image, (W, H))
        x = x/255.0
        x = np.expand_dims(x, axis=0)

        """ Prediction """
        pred = model.predict(x, verbose=0)

        line = np.ones((H, 10, 3)) * 255

        """ Joint and save mask """
        pred_list = []
        for item in pred:
            p = item[0] * 255
            p = np.concatenate([p, p, p], axis=-1)

            pred_list.append(p)
            pred_list.append(line)

        save_image_path = os.path.join(results, "mask", name)
        cat_images = np.concatenate(pred_list, axis=1)
        cv2.imwrite(save_image_path, cat_images)

        """ Save final mask """
        image_h, image_w, _ = image.shape

        y0 = pred[0][0]
        y0 = cv2.resize(y0, (image_w, image_h))
        y0 = np.expand_dims(y0, axis=-1)
        y0 = np.concatenate([y0, y0, y0], axis=-1)

        line = line = np.ones((image_h, 10, 3)) * 255

        cat_images = np.concatenate([image, line, y0*255, line, image*y0], axis=1)
        save_image_path = os.path.join(results, "joint", name)
        cv2.imwrite(save_image_path, cat_images)

Images: 19


100%|██████████| 19/19 [00:14<00:00,  1.30it/s]


In [6]:
# Copy the folder
!cp -r /kaggle/working/results /kaggle/working/result1

# Create a zip archive of the copied folder


In [7]:
!zip -r /kaggle/working/result2.zip /kaggle/working/result1


  adding: kaggle/working/result1/ (stored 0%)
  adding: kaggle/working/result1/joint/ (stored 0%)
  adding: kaggle/working/result1/joint/photo-1599566150163-29194dcaad36.jpg (deflated 5%)
  adding: kaggle/working/result1/joint/photo-1473830394358-91588751b241.jpg (deflated 18%)
  adding: kaggle/working/result1/joint/photo-1544005313-94ddf0286df2.jpg (deflated 3%)
  adding: kaggle/working/result1/joint/photo-1500648767791-00dcc994a43e.jpg (deflated 3%)
  adding: kaggle/working/result1/joint/photo-1494790108377-be9c29b29330.jpg (deflated 11%)
  adding: kaggle/working/result1/joint/photo-1554151228-14d9def656e4.jpg (deflated 1%)
  adding: kaggle/working/result1/joint/photo-1547425260-76bcadfb4f2c.jpg (deflated 7%)
  adding: kaggle/working/result1/joint/photo-1438761681033-6461ffad8d80.jpg (deflated 11%)
  adding: kaggle/working/result1/joint/photo-1552058544-f2b08422138a.jpg (deflated 5%)
  adding: kaggle/working/result1/joint/photo-1554727242-741c14fa561c.jpg (deflated 3%)
  adding: kagg