# Dataset Import

In [None]:
!pip install roboflow

In [None]:
from roboflow import Roboflow

rf = Roboflow(api_key="blocked")
workspaces = rf.workspace()
print(workspaces)

In [None]:
workspace = rf.workspace("hkpolyu-comp4423-group-project")
project = workspace.project("canteenq-counter")
VERSION = 7
dataset_version = project.version(VERSION)
dataset = dataset_version.download("yolov8")

In [None]:
import os
HOME = os.path.expanduser("~")
dataset_location = f"{HOME}/CanteenQ-Counter-{VERSION}/data.yaml"

# Segmentation using DeepLab v3+

## Environment Setup

In [None]:
base_dir = './DeepLabV3Plus_Project'
os.makedirs(base_dir, exist_ok=True)
os.chdir(base_dir)

!git clone https://github.com/VainF/DeepLabV3Plus-Pytorch

os.chdir('DeepLabV3Plus-Pytorch')

!pip install -r requirements.txt

os.makedirs('checkpoints', exist_ok=True)

checkpoints_path = './checkpoints/all.zip'
if not os.path.isfile(checkpoints_path):
    !wget -O {checkpoints_path} https://www.dropbox.com/sh/w3z9z8lqpi8b2w7/AAB0vkl4F5vy6HdIhmRCTKHSa?dl=1
    !unzip {checkpoints_path} -d ./checkpoints/

In [None]:
!python predict.py --input ../../Test_Image_Folder --model deeplabv3plus_mobilenet --ckpt checkpoints/best_deeplabv3plus_mobilenet_voc_os16.pth --save_val_results_to test_results

## Post-processing of the model's output

In [None]:
import numpy as np

def detect_people(image):
    hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    mask = cv2.inRange(hsv_image, np.array([0, 50, 50]), np.array([10, 255, 255]))
    mask_inv = cv2.bitwise_not(mask)
    image[mask_inv == 255] = [0, 0, 0]

    kernel = np.ones((7, 7), np.uint8)
    eroded_mask = cv2.erode(mask, kernel, iterations=2)

    contours, hierarchy = cv2.findContours(eroded_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    people_contours = [contour for contour in contours if
                       cv2.contourArea(contour) > 100 and cv2.contourArea(contour) / cv2.arcLength(contour, True) > 0.5]

    return people_contours, image

In [None]:
import os
import cv2
from matplotlib import pyplot as plt

def test_on_img_detect_people(image_path):
    image = cv2.imread(image_path)
    if image is None:
        print(f"Image not found: {image_path}")
        return

    people_contours, image = detect_people(image)
    people_count = len(people_contours)
    for contour in people_contours:
        cv2.drawContours(image, [contour], -1, (0, 255, 0), 2)

    print(f"Number of Humans in {image_path}: {people_count}")
    resized_image = cv2.resize(image, (0, 0), fx=0.2, fy=0.2)
    plt.figure(figsize=(10, 10))
    plt.imshow(cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB))
    plt.show()

folder_path=f"{HOME}/DeepLabV3Plus_Project/DeepLabV3Plus-Pytorch/test_results"
for filename in os.listdir(folder_path):
    if filename.endswith(".png"):
        image_path = os.path.join(folder_path, filename)
        test_on_img_detect_people(image_path)

## Saved images

In [None]:
def predict_list_img(frameList, frameCountList):
    cwd = os.getcwd()
    save_path = os.path.join(cwd, "images")
    if not os.path.exists(save_path):
        os.mkdir(save_path)
    os.chdir(save_path) # Change directory to /images

    # Save current images in the cwd/image
    for frame, count in zip(frameList, frameCountList):
        cv2.imwrite(f"{save_path}/{count}.jpg", frame)

    os.chdir(cwd) # Change directory back to the model's path
    !python predict.py --input {save_path} --model deeplabv3plus_mobilenet --ckpt checkpoints/best_deeplabv3plus_mobilenet_voc_os16.pth --save_val_results_to test_results

# Load the model's output picture from package test_results
def load_img_draw(image_path):
    image = cv2.imread(image_path)
    if image is None:
        print(f"Image not found: {image_path}")
        return

    people_contours, image = detect_people(image)
    people_count = len(people_contours)
    for contour in people_contours:
        cv2.drawContours(image, [contour], -1, (0, 255, 0), 2)

    cv2.putText(image, f"Number of Humans: {people_count}", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    return image

## Video Object Detection Counter

In [None]:
# In progress
video_file_path = f"{HOME}/VSQ_4.30_OrgVideo.mov"
cap = cv2.VideoCapture(video_file_path)
assert cap.isOpened(), "Error reading video file"

# Get video properties
w, h, fps = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), cap.get(cv2.CAP_PROP_FPS)

# Number of frames to skip
# Speed up video 2 times - Set 0.05
skip_frames = int(0.1 * fps)

# Video writer
video_writer = cv2.VideoWriter(
    "Segmentation_VSQ_Obeject_Counter_4.30.mp4",
    cv2.VideoWriter_fourcc(*'mp4v'),
    fps,
    (w, h)
)

frameList = [] # Frame object, type: frame
frameCountList = [] # Frame count, type: int
frame_count = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Skip frames if not true
    if frame_count % skip_frames == 0:
        # Add the frame that need to be processed into the model;
        # record the frame count
        frameList.append(frame)
        frameCountList.append(frame_count)

    frame_count += 1

cap.release()

predict_list_img(frameList, frameCountList) # Save the images that have be predicted.

video_file_path = f"{HOME}/VSQ_4.30_OrgVideo.mov"
cap = cv2.VideoCapture(video_file_path)
assert cap.isOpened(), "Error reading video file"

save_path = os.getcwd()
save_path = os.path.join(save_path, "test_results")

frame_count = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    if frame_count % skip_frames == 0:
        # Frame that need to be processed
        # Frame proceesing
        img_path = f"{save_path}/{frame_count}.png"
        processed_frame = load_img_draw(img_path)
        video_writer.write(processed_frame)

    frame_count += 1

# Release everything when the job is finished
cap.release()
video_writer.release()  # The video's ouput path is released under "DeepLabV3Plus-Pytorch" package
# cv2.destroyAllWindows()

# Yolov8 Object Detection Counter

## Installation of the Yolov8 model

In [None]:
!pip install ultralytics yolov8

In [None]:
from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()

## Hyperparameter Tuning

In [None]:
os.chdir(HOME)

In [None]:
from ultralytics import YOLO
model = YOLO("yolov8n.pt")

In [None]:
from ultralytics import YOLO

# Load a model
model = YOLO("yolov8n.pt")
model.tune(data = f"{dataset_location}", epochs = 10, iterations = 100, optimizer = 'AdamW',plots = False, save = False, val = False, lr0 = 0.01027, lrf = 0.01033, momentum=0.93038, weight_decay=0.00051, warmup_epochs=2.89715, warmup_momentum= 0.60719, box=7.45953, cls=0.44658, dfl=1.39703, hsv_h= 0.01204, hsv_s=0.7691, hsv_v=0.46525, translate=0.09319, scale=0.40937, fliplr=0.5084, mosaic=0.83653)

In [None]:
# Showcase Tuning Result
tuneDir = "tune2"
files_dict = {}

tune_files = os.listdir(f"{HOME}/runs/detect/{tuneDir}")
files_dict[tuneDir] = tune_files

for dir_name, documents in files_dict.items():
    print(f"File Name: {dir_name}")
    for doc in documents:
        print(f"Document Name: {doc}")
    print()

In [None]:
from IPython.display import display
import os
from PIL import Image

def is_image(file_name):
    return file_name.lower().endswith((".png", ".jpg", ".jpeg"))

for dir_name in [tuneDir, trainDir]:
    path = f"{HOME}/runs/detect/{dir_name}"
    files = os.listdir(path)
    img_files = [file for file in files if is_image(file)]

    for img in img_files:
        img_path = os.path.join(path, img)
        res_img = Image.open(img_path)

        if res_img.mode == "RGBA":
            res_img = res_img.convert("RGB")
        print(f"Image path: {path}/{img}")
        display(res_img)

## Yolov8 Model Training

In [None]:
# Use the model
model.train(data=f"{dataset_location}", epochs=100, lr0=0.00622, lrf=0.00618, momentum=0.80774, weight_decay=0.00056, warmup_epochs=1.8395, warmup_momentum=0.52433, box=8.45512, cls=0.37275, dfl=1.18532, hsv_h=0.01438, hsv_s=0.75444, hsv_v=0.37201, translate=0.09335, scale=0.29729, fliplr=0.38751, mosaic=0.92436)
metrics = model.val()  # evaluate model performance on the validation set

In [None]:
# Showcase Training Result

# Need to change to best hyperparameter train
trainDir = "train103"
files_dict = {}

train_files = os.listdir(f"{HOME}/runs/detect/{trainDir}")
files_dict[trainDir] = train_files

for dir_name, documents in files_dict.items():
    print(f"File Name: {dir_name}")
    for doc in documents:
        print(f"Document Name: {doc}")
    print()

In [None]:
from IPython.display import display
import os
from PIL import Image

def is_image(file_name):
    return file_name.lower().endswith((".png", ".jpg", ".jpeg"))

for dir_name in [trainDir]:
    path = f"{HOME}/runs/detect/{dir_name}"
    files = os.listdir(path)
    img_files = [file for file in files if is_image(file)]

    for img in img_files:
        img_path = os.path.join(path, img)
        res_img = Image.open(img_path)

        if res_img.mode == "RGBA":
            res_img = res_img.convert("RGB")
        print(f"Image path: {path}/{img}")
        display(res_img)

## CanteenQ Object Counter

## Image Testing

In [None]:
img_path = f"{HOME}/Test_Image_Folder/VS_150.jpeg"
results = model(img_path)
for result in results:
    boxes = result.boxes
    probs = result.probs
    result.save(filename="testWithAImg.jpeg")

saved_img_path = f"{HOME}/testWithAImg.jpeg"
saved_img = Image.open(saved_img_path)
display(saved_img)

In [None]:
import cv2
from statistics import mode

cap = cv2.VideoCapture(f"{HOME}/VSQ_4.30_OrgVideo.mov")
assert cap.isOpened(), "Error reading video file"

# Get video properties
w, h, fps = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), cap.get(cv2.CAP_PROP_FPS)

# Number of frames to skip
# Speed up video 2 times - Set 0.05
skip_frames = int(0.05 * fps)

# Video writer
video_writer = cv2.VideoWriter(
    "YOLOv8_VSQ_Object_Counter_Res.mp4",
    cv2.VideoWriter_fourcc(*'mp4v'),
    fps,
    (w, h)
)

frame_count = 0
kiosks_array = []
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Skip frames
    if frame_count % skip_frames == 0:
        results = model(frame)
        for result in results:
            num_detections = result.boxes.cls.numel()
            num_humans = 0
            num_orderKiosks = 0
            for det in range(num_detections):
                class_id = int(result.boxes.cls[det].item())
                x1, y1, x2, y2 = result.boxes.xyxy[det].tolist()

                if class_id == 0:
                    num_humans += 1
                    color = (0, 255, 0)  # Green for humans
                else:
                    num_orderKiosks += 1
                    color = (0, 0, 255)  # Red for kiosks

                # Draw bounding boxes using the xyxy attribute
                cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)

            kiosks_array.append(num_orderKiosks)
            # Calculate average wait time
            unit = 0.6
            if num_orderKiosks == 0:
                num_orderKiosks = mode(kiosks_array)
            avg_waitT = round(num_humans * unit / num_orderKiosks, 3)

            cv2.putText(frame, f"Number of Humans: {num_humans}", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            cv2.putText(frame, f"Number of Ordering Kiosks: {num_orderKiosks}", (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            cv2.putText(frame, f"Average Waiting Time (in Minutes): {avg_waitT}", (20, 180), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)

        video_writer.write(frame)

    frame_count += 1

# Release everything when the job is finished
cap.release()
video_writer.release()
cv2.destroyAllWindows()