In [None]:
!pip install patchify

Collecting patchify
  Downloading patchify-0.2.3-py3-none-any.whl (6.6 kB)
Installing collected packages: patchify
Successfully installed patchify-0.2.3


In [None]:
import os
from math import log2
from glob import glob
import numpy as np
import cv2
from tqdm import tqdm
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras.layers import (Dense, Dropout, Input, ReLU, Reshape,
                                        BatchNormalization, MaxPool2D, Embedding,
                                        LayerNormalization, MultiHeadAttention,
                                        Activation, Add, Concatenate, Conv2D,
                                        Conv2DTranspose)
from tensorflow.keras.callbacks import (ModelCheckpoint, CSVLogger, ReduceLROnPlateau,
                                        EarlyStopping)
from tensorflow.keras.optimizers import Adam, SGD
from sklearn.model_selection import train_test_split
from patchify import patchify
from tensorflow.keras.models import Model, load_model

In [None]:
import tarfile

# Open the tar file
with tarfile.open('/content/drive/MyDrive/LaPa.tar.gz', 'r:gz') as tar:
    # Extract all contents to the current directory
    tar.extractall()


In [None]:
cf = {}
cf["image_size"] = 256
cf["num_classes"] = 11
cf["num_layers"] = 12
cf["hidden_dim"] = 128
cf["mlp_dim"] = 32
cf["num_heads"] = 6
cf["dropout_rate"] = 0.1
cf["patch_size"] = 16
cf["num_patches"] = (cf["image_size"]**2)//(cf["patch_size"]**2)
cf["num_channels"] = 3
cf["flat_patches_shape"] = (
    cf["num_patches"],
    cf["patch_size"]*cf["patch_size"]*cf["num_channels"]
)

In [None]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

In [None]:
def load_dataset(path):
    X_train = sorted(glob(os.path.join(path, "train", "images", "*.jpg")))
    y_train = sorted(glob(os.path.join(path, "train", "labels", "*.png")))

    X_val = sorted(glob(os.path.join(path, "val", "images", "*.jpg")))
    y_val = sorted(glob(os.path.join(path, "val", "labels", "*.png")))

    X_test = sorted(glob(os.path.join(path, "test", "images", "*.jpg")))
    y_test = sorted(glob(os.path.join(path, "test", "labels", "*.png")))

    return (X_train, y_train), (X_val, y_val), (X_test, y_test)

In [None]:
def mlp(x, cf):
    x = Dense(cf["mlp_dim"], activation="gelu")(x)
    x = Dropout(cf["dropout_rate"])(x)
    x = Dense(cf["hidden_dim"])(x)
    x = Dropout(cf["dropout_rate"])(x)
    return x

In [None]:
def transformer_encoder(x, cf):
    skip_1 = x
    x = LayerNormalization()(x)
    x = MultiHeadAttention(
        num_heads=cf["num_heads"], key_dim=cf["hidden_dim"]
    )(x, x)
    x = Add()([x, skip_1])

    skip_2 = x
    x = LayerNormalization()(x)
    x = mlp(x, cf)
    x = Add()([x, skip_2])
    return x

In [None]:
def conv_block(x, num_filters, kernel_size=3):
    x = Conv2D(num_filters, kernel_size=kernel_size, padding="same")(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    return x

In [None]:
def deconv_block(x, num_filters, strides=2):
    x = Conv2DTranspose(num_filters, kernel_size=2, padding="same", strides=strides)(x)
    return x

In [None]:
def build_unetr_2d(cf):
    input_sahpe = (cf["num_patches"], cf["patch_size"] * cf["patch_size"] * cf["num_channels"])
    inputs = Input(input_sahpe)

    patch_embed = Dense(cf["hidden_dim"])(inputs)

    positions = tf.range(start=0, limit=cf["num_patches"], delta=1)
    pos_embed = Embedding(input_dim=cf["num_patches"], output_dim=cf["hidden_dim"])(positions)
    x = patch_embed + pos_embed

    skip_connection_index = [3, 6, 9, 12]
    skip_connections = []

    for i in range(1, cf["num_layers"] + 1, 1):
        x = transformer_encoder(x, cf)

        if i in skip_connection_index:
            skip_connections.append(x)

    z3, z6, z9, z12 = skip_connections

    z0 = Reshape((cf["image_size"], cf["image_size"], cf["num_channels"]))(inputs)

    shape = (
        cf["image_size"] // cf["patch_size"],
        cf["image_size"] // cf["patch_size"],
        cf["hidden_dim"]
    )

    z3 = Reshape(shape)(z3)
    z6 = Reshape(shape)(z6)
    z9 = Reshape(shape)(z9)
    z12 = Reshape(shape)(z12)

    total_upsampe_factor = int(log2(cf["patch_size"]))
    upscale = total_upsampe_factor - 4
    #  print(upscale)

    if upscale >= 1:
        z3 = deconv_block(z3, z3.shape[-1], strides=2**upscale)
        z6 = deconv_block(z6, z6.shape[-1], strides=2**upscale)
        z9 = deconv_block(z9, z9.shape[-1], strides=2**upscale)
        z12 = deconv_block(z12, z12.shape[-1], strides=2**upscale)

    if upscale < 0:
        p = 2 **abs(upscale)
        z3 = MaxPool2D((p, p))(z3)
        z6 = MaxPool2D((p, p))(z6)
        z9 = MaxPool2D((p, p))(z9)
        z12 = MaxPool2D((p, p))(z12)

    x = deconv_block(z12, 128)

    s = deconv_block(z9, 128)
    s = conv_block(s, 128)

    x = Concatenate()([x, s])

    x = conv_block(x, 128)
    x = conv_block(x, 128)

    x = deconv_block(x, 64)

    s = deconv_block(z6, 64)
    s = conv_block(s, 64)
    s = deconv_block(s, 64)
    s = conv_block(s, 64)

    x = Concatenate()([x, s])
    x = conv_block(x, 64)
    x = conv_block(x, 64)

    x = deconv_block(x, 32)

    s = deconv_block(z3, 32)
    s = conv_block(s, 32)
    s = deconv_block(s, 32)
    s = conv_block(s, 32)
    s = deconv_block(s, 32)
    s = conv_block(s, 32)

    x = Concatenate()([x, s])
    x = conv_block(x, 32)
    x = conv_block(x, 32)

    x = deconv_block(x, 16)

    s = conv_block(z0, 16)
    s = conv_block(s, 16)

    x = Concatenate()([x, s])
    x = conv_block(x, 16)
    x = conv_block(x, 16)

    outputs = Conv2D(cf["num_classes"], kernel_size=1, padding="same", activation="sigmoid")(x)

    return Model(inputs, outputs, name="UNETR_2D")

In [None]:
if __name__=="__main__":

    config = {}
    config["image_size"] = 256
    config["num_classes"] = 11
    config["num_layers"] = 12
    config["hidden_dim"] = 64
    config["mlp_dim"] = 128
    config["num_heads"] = 6
    config["dropout_rate"] = 0.1
    config["patch_size"] = 16
    config["num_patches"] = (config["image_size"]**2)//(config["patch_size"]**2)
    config["num_channels"] = 3

    model = build_unetr_2d(config)
    model.summary()

Model: "UNETR_2D"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 256, 768)]           0         []                            
                                                                                                  
 dense (Dense)               (None, 256, 64)              49216     ['input_1[0][0]']             
                                                                                                  
 tf.__operators__.add (TFOp  (None, 256, 64)              0         ['dense[0][0]']               
 Lambda)                                                                                          
                                                                                                  
 layer_normalization (Layer  (None, 256, 64)              128       ['tf.__operators__.add[

In [None]:
def read_image(path):
    path = path.decode()
    image = cv2.imread(path, cv2.IMREAD_COLOR)
    image = cv2.resize(image, (cf["image_size"], cf["image_size"]))
    image = image / 255.0

    patch_shape = (cf["patch_size"], cf["patch_size"], cf["num_channels"])
    patches = patchify(image, patch_shape, cf["patch_size"])
    patches = np.reshape(patches, cf["flat_patches_shape"])
    patches = patches.astype(np.float32)

    return patches

In [None]:
def read_mask(path):
    path = path.decode()
    mask = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    mask = cv2.resize(mask, (cf["image_size"], cf["image_size"]))
    mask = mask.astype(np.int32)
    return mask

In [None]:
def tf_parse(x, y):
    def parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        y = tf.one_hot(y, cf["num_classes"])
        return x, y

    x, y = tf.numpy_function(parse, [x, y], [tf.float32, tf.float32])
    x.set_shape(cf["flat_patches_shape"])
    y.set_shape([cf["image_size"], cf["image_size"], cf["num_classes"]])
    return x, y

In [None]:
def tf_dataset(X, Y, batch=2):
    ds = tf.data.Dataset.from_tensor_slices((X, Y))
    ds = ds.map(tf_parse).batch(batch).prefetch(10)
    return ds

In [None]:
if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)
    dataset_path = "/content/LaPa"

    #create_dir("/content/drive/MyDrive/Data" + "/filesa")

    batch_size = 8
    learning_rate = 0.001
    num_epochs = 15
    model_path = os.path.join("/content/drive/MyDrive/Data" + "/files", "Multiclass_segmentation_using_UNETR_50_Epochs_model.h5") # epoches till now is 39
    csv_path = os.path.join("/content/drive/MyDrive/Data" + "/files", "Multiclass_segmentation_using_UNETR_50_Epochs_log.csv")

    rgb_codes = [
        [0, 0, 0], [0, 153, 255], [102, 255, 153], [0, 204, 153],
        [255, 255, 102], [255, 255, 204], [255, 153, 0], [255, 102, 255],
        [102, 0, 51], [255, 204, 255], [255, 0, 102]
    ]

    classes = [
        "background", "skin", "left eyebrow", "right eyebrow",
        "left eye", "right eye", "nose", "upper lip", "inner mouth",
        "lower lip", "hair"
    ]

    (X_train, y_train), (X_val, y_val), (X_test, y_test) = load_dataset(dataset_path)
    print(f"Trianing : \t{len(X_train)} - {len(y_train)}")
    print(f"Validation : \t{len(X_val)} - {len(y_val)}")
    print(f"Test : \t{len(X_test)} - {len(y_test)}")

    train_dataset = tf_dataset(X_train, y_train, batch=batch_size)
    val_dataset = tf_dataset(X_val, y_val, batch=batch_size)

    #model = build_unetr_2d(cf)
    model = load_model("/content/drive/MyDrive/Data/files/Multiclass_segmentation_using_UNETR_48_Epochs_model.h5", compile=False)
    model.compile(loss="categorical_crossentropy", optimizer=SGD(learning_rate=learning_rate))

    callbacks = [
        ModelCheckpoint(model_path, varbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=5, min_lr=1e-7, verbose=1),
        CSVLogger(csv_path),
        EarlyStopping(monitor="val_loss", patience=20, restore_best_weights=False)
    ]

Trianing : 	18168 - 18168
Validation : 	2000 - 2000
Test : 	2000 - 2000


In [None]:
model.fit(
    train_dataset,
    epochs=num_epochs,
    validation_data=val_dataset,
    callbacks=callbacks
)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15

In [None]:
def save_results(image, mask, pred, save_path):
    mask = np.expand_dims(mask, axis=-1)
    mask = grayscale_to_rgb(mask, rgb_codes)

    pred = np.expand_dims(pred, axis=-1)
    pred = grayscale_to_rgb(pred, rgb_codes)

    line = np.ones((image.shape[0], 10, 3)) * 255

    cat_images = np.concatenate([image, line, mask, line, pred], axis=1)
    cv2.imwrite(save_image_path, cat_images)

In [None]:
if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)

    create_dir(dataset_path + "/results")

    model_path = os.path.join(dataset_path, "/files", "Multiclass_segmentation_using_UNETR_500_Epochs_model.h5")
    csv_path = os.path.join(dataset_path, "/files", "Multiclass_segmentation_using_UNETR_500_Epochs_log.csv")

    rgb_codes = [
        [0, 0, 0], [0, 153, 255], [102, 255, 153], [0, 204, 153],
        [255, 255, 102], [255, 255, 204], [255, 153, 0], [255, 102, 255],
        [102, 0, 51], [255, 204, 255], [255, 0, 102]
    ]

    classes = [
        "background", "skin", "left eyebrow", "right eyebrow",
        "left eye", "right eye", "nose", "upper lip", "inner mouth",
        "lower lip", "hair"
    ]

    (X_train, y_train), (X_val, y_val), (X_test, y_test) = load_dataset(dataset_path)
    print(f"Trianing : \t{len(X_train)} - {len(y_train)}")
    print(f"Validation : \t{len(X_val)} - {len(y_val)}")
    print(f"Test : \t{len(X_test)} - {len(y_test)}")

    for x, y in tqdm(zip(X_test, y_test), total=len(X_test)):
        print(x, y)

        name = x.split("/")[-1].split(".")[0]

        image = cv2.imread(x, cv2.IMREAD_COLOR)
        image = cv2.resize(image, (cf["image_size"], cf["image_size"]))
        x = image / 255.0

        patch_shape = (cf["patch_size"], cf["patch_size"], cf["num_channels"])
        patches = patchify(x, patch_shape, cf["patch_size"])
        patches = np.reshape(patches, cf["flat_patches_shape"])
        patches = patches.astype(np.float32)
        patches = np.expand_dims(patches, axis=0)

        mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (cf["image_size"], cf["image_size"]))
        mask = mask.astype(np.int32)

        pred = model.predict(patches, verbose=0)[0]
        print(pred.shape)
        pred = np.argmax(pred, axis=-1)
        pred = pred.astype(np.int32)

        save_image_path = dataset_path + f"/results/{name}.png"
        save_results(image, mask, pred, save_image_path)