In [None]:
import os
import numpy as np
import cv2
from glob import glob
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger, EarlyStopping
import matplotlib.pyplot as plt
from tqdm import tqdm
import urllib
import IPython


In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("nikhilroxtomar/person-segmentation")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/nikhilroxtomar/person-segmentation?dataset_version_number=1...


100%|██████████| 461M/461M [00:23<00:00, 21.0MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/nikhilroxtomar/person-segmentation/versions/1


In [None]:
database_path = '/root/.cache/kagglehub/datasets/nikhilroxtomar/person-segmentation/versions/1/people_segmentation'

In [None]:
def conv_block_segNet1(inputs, num_filters, flag=False):
    x = Conv2D(num_filters, 3, padding='same')(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    if flag == True:
      x = Conv2D(num_filters, 3, padding='same')(x)
      x = BatchNormalization()(x)
      x = Activation("relu")(x)
    return x

In [None]:
def encoder_block_segNet(inputs, num_filters,flag=False):
    x = conv_block_segNet1(inputs, num_filters, flag)
    p = MaxPool2D((2, 2))(x)
    return x, p

In [None]:
def decoder_block_segNet(inputs, skip_features, num_filters, flag=False):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding='same')(inputs)
    x = Concatenate()([x, skip_features])
    x = conv_block_segNet1(x, num_filters, flag)
    return x

In [None]:
def conv_block(inputs, num_filters):
    x = Conv2D(num_filters, 3, padding='same')(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

In [None]:
def encoder_block(inputs, num_filters):
    x = conv_block(inputs, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p

In [None]:
def decoder_block(inputs, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding='same')(inputs)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

In [None]:
def build_unet(input_shape):
    inputs = Input(input_shape)

    """ Encoder """
    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)
    b1 = conv_block(p4, 1024)

    """ Decoder """
    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    """ Output """
    outputs = Conv2D(1, (1, 1), padding="same", activation="sigmoid")(d4)
    return Model(inputs, outputs, name="U-Net")

In [None]:
def build_segNet(inout_shape):
  inputs = Input(input_shape)

  """ Encoder """
  s1, p1 = encoder_block_segNet(inputs, 64)
  s2, p2 = encoder_block_segNet(p1, 128)
  s3, p3 = encoder_block_segNet(p2, 256, True)
  s4, p4 = encoder_block_segNet(p3, 512, True)
  b1 = conv_block_segNet1(p4, 1024,True)

  """ Decoder """
  d1 = decoder_block_segNet(b1, s4, 512,True)
  d2 = decoder_block_segNet(d1, s3, 256,True)
  d3 = decoder_block_segNet(d2, s2, 128)
  d4 = decoder_block_segNet(d3, s1, 64)

  """ Output """
  outputs = Conv2D(1, (1, 1), padding="same", activation="softmax")(d4)
  return Model(inputs, outputs, name="DeepLab")

In [None]:
def load_data(dataset_path):
    images = sorted(glob(os.path.join(dataset_path, "images/*")))
    masks = sorted(glob(os.path.join(dataset_path, "masks/*")))
    train_x, test_x = train_test_split(images, test_size=0.2, random_state=42)
    train_y, test_y = train_test_split(masks, test_size=0.2, random_state=42)
    return (train_x, train_y), (test_x, test_y)

In [None]:
def read_image(path):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (256, 256))
    x = x/255.0
    x = x.astype(np.float32)
    # (256, 256, 3)
    return x

In [None]:
def read_mask(path):
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (256, 256))
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=-1)
    return x

In [None]:
def preprocess(image_path, mask_path):
    def f(image_path, mask_path):
        image_path = image_path.decode()
        mask_path = mask_path.decode()
        x = read_image(image_path)
        y = read_mask(mask_path)
        return x, y
    image, mask = tf.numpy_function(f, [image_path, mask_path], [tf.float32, tf.float32])
    image.set_shape([256, 256, 3])
    mask.set_shape([256, 256, 1])
    return image, mask

In [None]:
def tf_dataset(images, masks, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((images, masks))
    dataset = dataset.shuffle(buffer_size=5000)
    dataset = dataset.map(preprocess)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(2)
    return dataset

In [None]:
# dataset_path = database_path
# dataset_path

In [None]:
""" Hyperparameters """
# dataset_path = database_path + "/input/person-segmentation/people_segmentation"
input_shape = (256, 256, 3)
batch_size = 8
epochs = 10
lr = 1e-4
model_path = "./unet.keras"
model_path_segNet = "./segNet.keras"
csv_path = "./data.csv"
csv_path_segNet = "./dataSegNet.csv"

In [None]:
dataset_path = database_path

In [None]:
""" Loading the dataset """
(train_x, train_y), (test_x, test_y) = load_data(dataset_path)

train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
valid_dataset = tf_dataset(test_x, test_y, batch=batch_size)


In [None]:
""" Model Unet"""
model = build_unet(input_shape)
model.compile(
    loss="binary_crossentropy",
    optimizer=tf.keras.optimizers.Adam(lr),
    metrics=[
        tf.keras.metrics.MeanIoU(num_classes=2),
        tf.keras.metrics.Recall(),
        tf.keras.metrics.Precision()
    ]
)

callbacks = [
    ModelCheckpoint(model_path, monitor="val_loss", verbose=1),
    ReduceLROnPlateau(monitor="val_loss", patience=5, factor=0.1, verbose=1),
    CSVLogger(csv_path),
    EarlyStopping(monitor="val_loss", patience=10)
]

train_steps = len(train_x)//batch_size
if len(train_x) % batch_size != 0:
    train_steps += 1

valid_steps = len(test_x)//batch_size
if len(test_x) % batch_size != 0:
    valid_steps += 1

model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs = epochs,
    steps_per_epoch=train_steps,
    validation_steps=valid_steps,
    callbacks=callbacks
)

Epoch 1/10
[1m568/568[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 555ms/step - loss: 0.4691 - mean_io_u: 0.3699 - precision: 0.5834 - recall: 0.6099
Epoch 1: saving model to ./unet.keras
[1m568/568[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m424s[0m 634ms/step - loss: 0.4690 - mean_io_u: 0.3700 - precision: 0.5835 - recall: 0.6100 - val_loss: 0.6769 - val_mean_io_u: 0.3743 - val_precision: 0.5049 - val_recall: 0.8979 - learning_rate: 1.0000e-04
Epoch 2/10
[1m312/568[0m [32m━━━━━━━━━━[0m[37m━━━━━━━━━━[0m [1m1:56[0m 454ms/step - loss: 0.3191 - mean_io_u: 0.3700 - precision: 0.7628 - recall: 0.6974

KeyboardInterrupt: 

In [None]:
images_path = '/content/images/*'

In [36]:
!mkdir /content/images/
!mkdir /content/save_imagesUnet/
!mkdir /content/save_imagesSegNet/

mkdir: cannot create directory ‘/content/images/’: File exists
mkdir: cannot create directory ‘/content/save_imagesUnet/’: File exists
mkdir: cannot create directory ‘/content/save_imagesSegNet/’: File exists


In [None]:
""" Load the test images """
test_images = glob(images_path)

""" Load the model """
model = tf.keras.models.load_model("unet.keras")

for path in tqdm(test_images, total=len(test_images)):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    original_image = x
    h, w, _ = x.shape

    x = cv2.resize(x, (256, 256))
    x = x/255.0
    x = x.astype(np.float32)

    x = np.expand_dims(x, axis=0)
    pred_mask = model.predict(x)[0]

    pred_mask = np.concatenate(
        [
            pred_mask,
            pred_mask,
            pred_mask
        ], axis=2)
    pred_mask = (pred_mask > 0.5) * 255
    pred_mask = pred_mask.astype(np.float32)
    pred_mask = cv2.resize(pred_mask, (w, h))

    original_image = original_image.astype(np.float32)

    alpha = 0.6
    cv2.addWeighted(pred_mask, alpha, original_image, 1-alpha, 0, original_image)

    name = path.split("/")[-1]
    cv2.imwrite(f"save_imagesUnet/{name}", original_image)

ValueError: File not found: filepath=unet.keras. Please ensure the file is an accessible `.keras` zip file.

In [None]:
!mkdir /content/videoUnet/
!mkdir /content/videosegNet/

In [None]:
""" Video Path """
video_path = "Elon_Musk.mp4"

""" Load the model """
model = tf.keras.models.load_model("unet.keras")

""" Reading frames """
vs = cv2.VideoCapture(video_path)
_, frame = vs.read()
H, W, _ = frame.shape
vs.release()

fourcc = cv2.VideoWriter_fourcc('M','J','P','G')
out = cv2.VideoWriter('outputUnet.avi', fourcc, 10, (W, H), True)

cap = cv2.VideoCapture(video_path)
idx = 0
while True:
    ret, frame = cap.read()
    if ret == False:
        cap.release()
        out.release()
        break

    H, W, _ = frame.shape
    ori_frame = frame
    frame = cv2.resize(frame, (256, 256))
    frame = np.expand_dims(frame, axis=0)
    frame = frame / 255.0

    mask = model.predict(frame)[0]
    mask = mask > 0.5
    mask = mask.astype(np.float32)
    mask = cv2.resize(mask, (W, H))
    mask = np.expand_dims(mask, axis=-1)

    combine_frame = ori_frame * mask
    combine_frame = combine_frame.astype(np.uint8)

    cv2.imwrite(f"videoUnet/{idx}.png", combine_frame)
    idx += 1

    out.write(combine_frame)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 64ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 64ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 67ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 66ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 66ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 69ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms

In [33]:
""" Model SegNet"""
modelSegNet = build_segNet(input_shape)
modelSegNet.compile(
    loss="binary_crossentropy",
    optimizer=tf.keras.optimizers.Adam(lr),
    metrics=[
        tf.keras.metrics.MeanIoU(num_classes=2),
        tf.keras.metrics.Recall(),
        tf.keras.metrics.Precision()
    ]
)

callbacks = [
    ModelCheckpoint(model_path_segNet, monitor="val_loss", verbose=1),
    ReduceLROnPlateau(monitor="val_loss", patience=5, factor=0.1, verbose=1),
    CSVLogger(csv_path_segNet),
    EarlyStopping(monitor="val_loss", patience=10)
]

train_steps = len(train_x)//batch_size
if len(train_x) % batch_size != 0:
    train_steps += 1

valid_steps = len(test_x)//batch_size
if len(test_x) % batch_size != 0:
    valid_steps += 1

modelSegNet.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs = epochs,
    steps_per_epoch=train_steps,
    validation_steps=valid_steps,
    callbacks=callbacks
)

Epoch 1/10




[1m568/568[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 665ms/step - loss: 0.4188 - mean_io_u: 0.1264 - precision: 0.2527 - recall: 1.0000
Epoch 1: saving model to ./segNet.keras
[1m568/568[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m496s[0m 747ms/step - loss: 0.4187 - mean_io_u: 0.1264 - precision: 0.2527 - recall: 1.0000 - val_loss: 0.3404 - val_mean_io_u: 0.1279 - val_precision: 0.2557 - val_recall: 1.0000 - learning_rate: 1.0000e-04
Epoch 2/10
[1m568/568[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 583ms/step - loss: 0.2894 - mean_io_u: 0.1283 - precision: 0.2565 - recall: 1.0000
Epoch 2: saving model to ./segNet.keras
[1m568/568[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m380s[0m 669ms/step - loss: 0.2894 - mean_io_u: 0.1283 - precision: 0.2565 - recall: 1.0000 - val_loss: 0.3186 - val_mean_io_u: 0.1279 - val_precision: 0.2557 - val_recall: 1.0000 - learning_rate: 1.0000e-04
Epoch 3/10
[1m568/568[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 

KeyboardInterrupt: 

In [35]:
""" Load the test images """
test_images = glob(images_path)

""" Load the model """
modelSegNet = tf.keras.models.load_model("segNet.keras")

for path in tqdm(test_images, total=len(test_images)):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    original_image = x
    h, w, _ = x.shape

    x = cv2.resize(x, (256, 256))
    x = x/255.0
    x = x.astype(np.float32)

    x = np.expand_dims(x, axis=0)
    pred_mask = modelSegNet.predict(x)[0]

    pred_mask = np.concatenate(
        [
            pred_mask,
            pred_mask,
            pred_mask
        ], axis=2)
    pred_mask = (pred_mask > 0.5) * 255
    pred_mask = pred_mask.astype(np.float32)
    pred_mask = cv2.resize(pred_mask, (w, h))

    original_image = original_image.astype(np.float32)

    alpha = 0.6
    cv2.addWeighted(pred_mask, alpha, original_image, 1-alpha, 0, original_image)

    name = path.split("/")[-1]
    cv2.imwrite(f"save_imagesSegNet/{name}", original_image)

  0%|          | 0/2 [00:00<?, ?it/s]

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step


 50%|█████     | 1/2 [00:02<00:02,  2.63s/it]

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 74ms/step


100%|██████████| 2/2 [00:02<00:00,  1.38s/it]


In [None]:
""" Video Path """
video_path = "Elon_Musk.mp4"

""" Load the model """
modelSegNet = tf.keras.models.load_model("segNet.keras")

""" Reading frames """
vs = cv2.VideoCapture(video_path)
_, frame = vs.read()
H, W, _ = frame.shape
vs.release()

fourcc = cv2.VideoWriter_fourcc('M','J','P','G')
out = cv2.VideoWriter('outputSegNet.avi', fourcc, 10, (W, H), True)

cap = cv2.VideoCapture(video_path)
idx = 0
while True:
    ret, frame = cap.read()
    if ret == False:
        cap.release()
        out.release()
        break

    H, W, _ = frame.shape
    ori_frame = frame
    frame = cv2.resize(frame, (256, 256))
    frame = np.expand_dims(frame, axis=0)
    frame = frame / 255.0

    mask = modelSegNet.predict(frame)[0]
    mask = mask > 0.5
    mask = mask.astype(np.float32)
    mask = cv2.resize(mask, (W, H))
    mask = np.expand_dims(mask, axis=-1)

    combine_frame = ori_frame * mask
    combine_frame = combine_frame.astype(np.uint8)

    cv2.imwrite(f"videosegNet/{idx}.png", combine_frame)
    idx += 1

    out.write(combine_frame)