<a href="https://colab.research.google.com/github/haysnairpa/stairvision/blob/main/stairvision_main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install ultralytics opencv-python-headless numpy

Collecting ultralytics
  Downloading ultralytics-8.3.177-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.15-py3-none-any.whl.metadata (14 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading nv

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [1]:
import cv2
import numpy as np
from ultralytics import YOLO

In [2]:
seg_model_path = "src/model/best_stair_handrail_model.pt"
pose_model_path = "src/model/best_pose_model.pt"

seg_model = YOLO(seg_model_path)
pose_model = YOLO(pose_model_path)

In [10]:
import os

video_path = r"D:\Aldi\stairvision\src\dataset\east\videos\Copy of Copy of IMG_3093.MOV"
output_dir = r"D:\Aldi\stairvision\src\dataset\east\videos\output_videos"
output_prefix = "output_"

os.makedirs(output_dir, exist_ok=True)

base_name = os.path.basename(video_path)
base_name_no_ext, ext = os.path.splitext(base_name)

new_file_name = f"{output_prefix}{base_name_no_ext}{ext}"
output_path = os.path.join(output_dir, new_file_name)

counter = 1
while os.path.exists(output_path):
    new_file_name = f"{output_prefix}{base_name_no_ext} ({counter}){ext}"
    output_path = os.path.join(output_dir, new_file_name)
    counter += 1

print(f"output path: {output_path}")

output path: D:\Aldi\stairvision\src\dataset\east\videos\output_videos\output_Copy of Copy of IMG_3093.MOV


In [4]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise RuntimeError(f"❌: {video_path}")


In [5]:
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

In [6]:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

In [7]:
import time
import cv2
import numpy as np

start_time = time.time()

DILATE_KERNEL_SIZE = 5
CLOSE_KERNEL_SIZE = 5
KP_CONF_THRESH = 0.25
HAND_RADIUS = max(5, int(0.015 * max(width, height)))
HAND_OVERLAP_RATIO = 0.3

frame_num = 0
while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_orig = frame.copy()

    seg_results = seg_model.predict(frame, conf=0.4, verbose=False)
    handrail_mask = np.zeros((height, width), dtype=np.uint8)

    for r in seg_results:
        if r.masks is not None:
            for mask_poly, cls in zip(r.masks.xy, r.boxes.cls):
                if int(cls) == 0:
                    poly = np.array(mask_poly, dtype=np.int32)
                    cv2.fillPoly(handrail_mask, [poly], 255)

    close_kernel = np.ones((CLOSE_KERNEL_SIZE, CLOSE_KERNEL_SIZE), np.uint8)
    handrail_mask = cv2.morphologyEx(handrail_mask, cv2.MORPH_CLOSE, close_kernel)
    dilate_kernel = np.ones((DILATE_KERNEL_SIZE, DILATE_KERNEL_SIZE), np.uint8)
    handrail_mask = cv2.dilate(handrail_mask, dilate_kernel, iterations=1)

    pose_results = pose_model.predict(frame_orig, conf=0.25, verbose=False)

    frame_vis = frame.copy()

    for r in pose_results:
        if r.keypoints is None:
            continue

        kpts_xy = r.keypoints.xy.cpu().numpy()
        try:
            kpts_conf = r.keypoints.conf.cpu().numpy()
        except Exception:
            kpts_conf = np.ones((kpts_xy.shape[0], kpts_xy.shape[1]))

        for person_idx in range(kpts_xy.shape[0]):
            person_kpts = kpts_xy[person_idx]
            person_conf = kpts_conf[person_idx]

            hands = []
            hands_conf = []

            if person_kpts.shape[0] > 9:
                hands.append(person_kpts[9])
                hands_conf.append(person_conf[9])
            if person_kpts.shape[0] > 10:
                hands.append(person_kpts[10])
                hands_conf.append(person_conf[10])

            holding = False
            for (hx, hy), conf_val in zip(hands, hands_conf):
                if conf_val < KP_CONF_THRESH:
                    continue
                x, y = int(hx), int(hy)
                if not (0 <= x < width and 0 <= y < height):
                    continue

                y_grid, x_grid = np.ogrid[:height, :width]
                mask_circle = (x_grid - x) ** 2 + (y_grid - y) ** 2 <= HAND_RADIUS ** 2
                inside_mask = np.logical_and(mask_circle, handrail_mask > 0)
                overlap_ratio = inside_mask.sum() / mask_circle.sum()

                if overlap_ratio >= HAND_OVERLAP_RATIO:
                    holding = True
                    cv2.circle(frame_vis, (x, y), 6, (0, 255, 0), -1)
                else:
                    cv2.circle(frame_vis, (x, y), 6, (0, 0, 255), -1)

            label_pos = (int(person_kpts[0][0]), int(person_kpts[0][1]) - 10) if person_kpts.shape[0] > 0 else (10, 30)
            if holding:
                cv2.putText(frame_vis, "Holding Handrail", label_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            else:
                cv2.putText(frame_vis, "Not Holding", label_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    out.write(frame_vis)
    frame_num += 1

    elapsed = time.time() - start_time
    fps_proc = frame_num / elapsed if elapsed > 0 else 0
    eta = (total_frames - frame_num) / fps_proc if fps_proc > 0 else 0
    print(f"Frame {frame_num}/{total_frames} | {fps_proc:.2f} FPS | ETA: {eta/60:.1f} min", flush=True)

cap.release()
out.release()
print(f"video saved to: {output_path}")

Frame 1/303 | 0.23 FPS | ETA: 22.0 min
Frame 2/303 | 0.42 FPS | ETA: 12.0 min
Frame 3/303 | 0.58 FPS | ETA: 8.6 min
Frame 4/303 | 0.73 FPS | ETA: 6.9 min
Frame 5/303 | 0.85 FPS | ETA: 5.8 min
Frame 6/303 | 0.96 FPS | ETA: 5.2 min
Frame 7/303 | 1.06 FPS | ETA: 4.7 min
Frame 8/303 | 1.15 FPS | ETA: 4.3 min
Frame 9/303 | 1.21 FPS | ETA: 4.1 min
Frame 10/303 | 1.26 FPS | ETA: 3.9 min
Frame 11/303 | 1.32 FPS | ETA: 3.7 min
Frame 12/303 | 1.36 FPS | ETA: 3.6 min
Frame 13/303 | 1.41 FPS | ETA: 3.4 min
Frame 14/303 | 1.45 FPS | ETA: 3.3 min
Frame 15/303 | 1.49 FPS | ETA: 3.2 min
Frame 16/303 | 1.52 FPS | ETA: 3.1 min
Frame 17/303 | 1.55 FPS | ETA: 3.1 min
Frame 18/303 | 1.58 FPS | ETA: 3.0 min
Frame 19/303 | 1.60 FPS | ETA: 3.0 min
Frame 20/303 | 1.63 FPS | ETA: 2.9 min
Frame 21/303 | 1.66 FPS | ETA: 2.8 min
Frame 22/303 | 1.69 FPS | ETA: 2.8 min
Frame 23/303 | 1.72 FPS | ETA: 2.7 min
Frame 24/303 | 1.74 FPS | ETA: 2.7 min
Frame 25/303 | 1.75 FPS | ETA: 2.6 min
Frame 26/303 | 1.77 FPS | ETA: 2

### Below is the code to process video with mask output

In [27]:
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
import os
import time

seg_model_path = "src/model/best_stair_handrail_model.pt"
pose_model_path = "src/model/best_pose_model.pt"
seg_model = YOLO(seg_model_path)
pose_model = YOLO(pose_model_path)

video_path = r"D:\Aldi\stairvision\src\dataset\west\videos\Copy of IMG_4567.MOV"
output_dir = r"D:\Aldi\stairvision\src\dataset\west\videos\output_videos"
output_prefix = "output_with_mask_"

os.makedirs(output_dir, exist_ok=True)

base_name = os.path.basename(video_path)
base_name_no_ext, ext = os.path.splitext(base_name)
new_file_name = f"{output_prefix}{base_name_no_ext}{ext}"
output_path = os.path.join(output_dir, new_file_name)
counter = 1
while os.path.exists(output_path):
    new_file_name = f"{output_prefix}{base_name_no_ext} ({counter}){ext}"
    output_path = os.path.join(output_dir, new_file_name)
    counter += 1

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise RuntimeError(f"❌ cannot open video from the path: {video_path}")

total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

DILATE_KERNEL_SIZE = 4
CLOSE_KERNEL_SIZE = 4
ERODE_KERNEL_SIZE = 3
KP_CONF_THRESH = 0.25
HAND_RADIUS = max(5, int(0.05 * max(width, height)))
HAND_OVERLAP_RATIO = 0.2
SMOOTH_FRAMES = 3

# Keypoint connections for drawing the skeleton (COCO 17-point format)
SKELETON_CONNECTIONS = [
    (0, 1), (0, 2), (1, 3), (2, 4), (5, 6), (5, 7), (7, 9), (6, 8),
    (8, 10), (11, 12), (5, 11), (6, 12), (11, 13), (13, 15), (12, 14), (14, 16)
]

# Smoothing memory
hand_history = {}
status_memory = {}

start_time = time.time()
frame_num = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_orig = frame.copy()

    # stair handrail segmentation
    seg_results = seg_model.predict(frame, conf=0.4, verbose=False)
    handrail_mask = np.zeros((height, width), dtype=np.uint8)

    for r in seg_results:
        if r.masks is not None:
            for mask_poly, cls in zip(r.masks.xy, r.boxes.cls):
                if int(cls) == 0:
                    poly = np.array(mask_poly, dtype=np.int32)
                    cv2.fillPoly(handrail_mask, [poly], 255)

    # Morphological adjustments
    close_kernel = np.ones((CLOSE_KERNEL_SIZE, CLOSE_KERNEL_SIZE), np.uint8)
    dilate_kernel = np.ones((DILATE_KERNEL_SIZE, DILATE_KERNEL_SIZE), np.uint8)
    erode_kernel = np.ones((ERODE_KERNEL_SIZE, ERODE_KERNEL_SIZE), np.uint8)

    handrail_mask = cv2.morphologyEx(handrail_mask, cv2.MORPH_CLOSE, close_kernel)
    handrail_mask = cv2.dilate(handrail_mask, dilate_kernel, iterations=1)
    handrail_mask = cv2.erode(handrail_mask, erode_kernel, iterations=1)

    # person detection
    pose_results = pose_model.predict(frame_orig, conf=0.25, verbose=False)
    
    #  Visualization Setup
    frame_vis = frame.copy()
    
    # Create a semi-transparent overlay for the handrail mask
    mask_overlay = frame_vis.copy()
    mask_overlay[handrail_mask > 0] = (255, 255, 0)  # Cyan color for the mask
    frame_vis = cv2.addWeighted(mask_overlay, 0.4, frame_vis, 0.6, 0)

    for pid, r in enumerate(pose_results):
        if r.keypoints is None:
            continue

        kpts_xy = r.keypoints.xy.cpu().numpy()
        try:
            kpts_conf = r.keypoints.conf.cpu().numpy()
        except Exception:
            kpts_conf = np.ones((kpts_xy.shape[0], kpts_xy.shape[1]))

        for person_idx in range(kpts_xy.shape[0]):
            person_kpts = kpts_xy[person_idx]
            person_conf = kpts_conf[person_idx]

            # the Skeleton
            for i in range(person_kpts.shape[0]):
                if person_conf[i] > KP_CONF_THRESH:
                    x, y = int(person_kpts[i][0]), int(person_kpts[i][1])
                    cv2.circle(frame_vis, (x, y), 3, (200, 200, 200), -1)

            for start_idx, end_idx in SKELETON_CONNECTIONS:
                if person_kpts.shape[0] > max(start_idx, end_idx):
                    if person_conf[start_idx] > KP_CONF_THRESH and person_conf[end_idx] > KP_CONF_THRESH:
                        start_point = tuple(np.array(person_kpts[start_idx], int))
                        end_point = tuple(np.array(person_kpts[end_idx], int))
                        cv2.line(frame_vis, start_point, end_point, (255, 255, 255), 2)
            
            # Hand Holding Logic (with visualization on top of skeleton)
            holding_status_for_person = False
            for hid in [9, 10]:  # Left & right wrists
                if person_kpts.shape[0] <= hid or person_conf[hid] < KP_CONF_THRESH:
                    continue

                hx, hy = person_kpts[hid]

                # Position smoothing
                hand_history.setdefault((person_idx, hid), deque(maxlen=SMOOTH_FRAMES))
                hand_history[(person_idx, hid)].append((hx, hy))
                avg_hx = int(np.mean([p[0] for p in hand_history[(person_idx, hid)]]))
                avg_hy = int(np.mean([p[1] for p in hand_history[(person_idx, hid)]]))

                if not (0 <= avg_hx < width and 0 <= avg_hy < height):
                    continue

                # Overlap calculation
                y_grid, x_grid = np.ogrid[:height, :width]
                mask_circle = (x_grid - avg_hx) ** 2 + (y_grid - avg_hy) ** 2 <= HAND_RADIUS ** 2
                inside_mask = np.logical_and(mask_circle, handrail_mask > 0)
                overlap_ratio = inside_mask.sum() / (mask_circle.sum() + 1e-6)

                # Store status for smoothing
                status_memory.setdefault((person_idx, hid), deque(maxlen=SMOOTH_FRAMES))
                status_memory[(person_idx, hid)].append(overlap_ratio >= HAND_OVERLAP_RATIO)

                # Final decision for this hand
                is_hand_holding = sum(status_memory[(person_idx, hid)]) >= (SMOOTH_FRAMES // 2 + 1)
                
                # Update overall person status if at least one hand is holding
                if is_hand_holding:
                    holding_status_for_person = True

                # circle on wrist (green for holding, red for not)
                cv2.circle(frame_vis, (avg_hx, avg_hy), 8, (0, 255, 0) if is_hand_holding else (0, 0, 255), -1)

            # text label for the person
            label_pos = (int(person_kpts[5][0]), int(person_kpts[5][1]) - 20) if person_kpts.shape[0] > 5 else (10, 30)
            text = "HOLDING" if holding_status_for_person else "NOT HOLDING"
            color = (0, 255, 0) if holding_status_for_person else (0, 0, 255)
            cv2.putText(frame_vis, text, label_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

    out.write(frame_vis)
    frame_num += 1

    elapsed = time.time() - start_time
    fps_proc = frame_num / elapsed if elapsed > 0 else 0
    eta = (total_frames - frame_num) / fps_proc if fps_proc > 0 else 0
    print(f"Frame {frame_num}/{total_frames} | {fps_proc:.2f} FPS | ETA: {eta/60:.1f} min", flush=True)

cap.release()
out.release()
print(f"video saved to: {output_path}")

Frame 1/266 | 0.57 FPS | ETA: 7.7 min
Frame 2/266 | 1.03 FPS | ETA: 4.3 min
Frame 3/266 | 1.43 FPS | ETA: 3.1 min
Frame 4/266 | 1.76 FPS | ETA: 2.5 min
Frame 5/266 | 2.06 FPS | ETA: 2.1 min
Frame 6/266 | 2.33 FPS | ETA: 1.9 min
Frame 7/266 | 2.56 FPS | ETA: 1.7 min
Frame 8/266 | 2.77 FPS | ETA: 1.6 min
Frame 9/266 | 2.95 FPS | ETA: 1.5 min
Frame 10/266 | 3.12 FPS | ETA: 1.4 min
Frame 11/266 | 3.26 FPS | ETA: 1.3 min
Frame 12/266 | 3.39 FPS | ETA: 1.2 min
Frame 13/266 | 3.51 FPS | ETA: 1.2 min
Frame 14/266 | 3.62 FPS | ETA: 1.2 min
Frame 15/266 | 3.72 FPS | ETA: 1.1 min
Frame 16/266 | 3.77 FPS | ETA: 1.1 min
Frame 17/266 | 3.81 FPS | ETA: 1.1 min
Frame 18/266 | 3.86 FPS | ETA: 1.1 min
Frame 19/266 | 3.90 FPS | ETA: 1.1 min
Frame 20/266 | 3.94 FPS | ETA: 1.0 min
Frame 21/266 | 3.97 FPS | ETA: 1.0 min
Frame 22/266 | 4.00 FPS | ETA: 1.0 min
Frame 23/266 | 4.03 FPS | ETA: 1.0 min
Frame 24/266 | 4.05 FPS | ETA: 1.0 min
Frame 25/266 | 4.08 FPS | ETA: 1.0 min
Frame 26/266 | 4.10 FPS | ETA: 1.0

 ### Wirst Mask display code

In [28]:
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
import os
import time

seg_model_path = "src/model/best_stair_handrail_model.pt"
pose_model_path = "src/model/best_pose_model.pt"
seg_model = YOLO(seg_model_path)
pose_model = YOLO(pose_model_path)

video_path = r"D:\Aldi\stairvision\src\dataset\west\videos\Copy of IMG_4567.MOV"
output_dir = r"D:\Aldi\stairvision\src\dataset\west\videos\output_videos"
output_prefix = "output_with_mask_"

os.makedirs(output_dir, exist_ok=True)

base_name = os.path.basename(video_path)
base_name_no_ext, ext = os.path.splitext(base_name)
new_file_name = f"{output_prefix}{base_name_no_ext}{ext}"
output_path = os.path.join(output_dir, new_file_name)
counter = 1
while os.path.exists(output_path):
    new_file_name = f"{output_prefix}{base_name_no_ext} ({counter}){ext}"
    output_path = os.path.join(output_dir, new_file_name)
    counter += 1

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise RuntimeError(f"❌ cannot open video from the path: {video_path}")

total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Morphological params
DILATE_KERNEL_SIZE = 4
CLOSE_KERNEL_SIZE = 4
ERODE_KERNEL_SIZE = 3
EXTRA_DILATE_FOR_TOLERANCE = 6  # NEW: expands mask more

# Thresholds & smoothing
KP_CONF_THRESH = 0.25
HAND_RADIUS = max(6, int(0.06 * max(width, height)))  # NEW: bigger radius
HAND_OVERLAP_RATIO = 0.08  # NEW: more tolerant
SMOOTH_FRAMES = 3

# Skeleton connections
SKELETON_CONNECTIONS = [
    (0, 1), (0, 2), (1, 3), (2, 4), (5, 6), (5, 7), (7, 9), (6, 8),
    (8, 10), (11, 12), (5, 11), (6, 12), (11, 13), (13, 15), (12, 14), (14, 16)
]

# History buffers
hand_history = {}
status_memory = {}

start_time = time.time()
frame_num = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_orig = frame.copy()

    # --- Segmentation ---
    seg_results = seg_model.predict(frame, conf=0.4, verbose=False)
    handrail_mask = np.zeros((height, width), dtype=np.uint8)

    for r in seg_results:
        if r.masks is not None:
            for mask_poly, cls in zip(r.masks.xy, r.boxes.cls):
                if int(cls) == 0:  # handrail class
                    poly = np.array(mask_poly, dtype=np.int32)
                    cv2.fillPoly(handrail_mask, [poly], 255)

    # Morphology
    close_kernel = np.ones((CLOSE_KERNEL_SIZE, CLOSE_KERNEL_SIZE), np.uint8)
    dilate_kernel = np.ones((DILATE_KERNEL_SIZE, DILATE_KERNEL_SIZE), np.uint8)
    erode_kernel = np.ones((ERODE_KERNEL_SIZE, ERODE_KERNEL_SIZE), np.uint8)

    handrail_mask = cv2.morphologyEx(handrail_mask, cv2.MORPH_CLOSE, close_kernel)
    handrail_mask = cv2.dilate(handrail_mask, dilate_kernel, iterations=1)
    handrail_mask = cv2.erode(handrail_mask, erode_kernel, iterations=1)
    handrail_mask = cv2.dilate(handrail_mask, np.ones((EXTRA_DILATE_FOR_TOLERANCE, EXTRA_DILATE_FOR_TOLERANCE), np.uint8), iterations=1)  # NEW

    # --- Pose estimation ---
    pose_results = pose_model.predict(frame_orig, conf=0.25, verbose=False)

    # Visualization base
    frame_vis = frame.copy()

    # Semi-transparent mask overlay
    mask_overlay = frame_vis.copy()
    mask_overlay[handrail_mask > 0] = (255, 255, 0)  # Cyan mask
    frame_vis = cv2.addWeighted(mask_overlay, 0.4, frame_vis, 0.6, 0)

    # --- Per person ---
    for pid, r in enumerate(pose_results):
        if r.keypoints is None:
            continue

        kpts_xy = r.keypoints.xy.cpu().numpy()
        try:
            kpts_conf = r.keypoints.conf.cpu().numpy()
        except Exception:
            kpts_conf = np.ones((kpts_xy.shape[0], kpts_xy.shape[1]))

        for person_idx in range(kpts_xy.shape[0]):
            person_kpts = kpts_xy[person_idx]
            person_conf = kpts_conf[person_idx]

            # Draw skeleton
            for i in range(person_kpts.shape[0]):
                if person_conf[i] > KP_CONF_THRESH:
                    x, y = int(person_kpts[i][0]), int(person_kpts[i][1])
                    cv2.circle(frame_vis, (x, y), 3, (200, 200, 200), -1)

            for start_idx, end_idx in SKELETON_CONNECTIONS:
                if person_kpts.shape[0] > max(start_idx, end_idx):
                    if person_conf[start_idx] > KP_CONF_THRESH and person_conf[end_idx] > KP_CONF_THRESH:
                        cv2.line(frame_vis,
                                 tuple(np.array(person_kpts[start_idx], int)),
                                 tuple(np.array(person_kpts[end_idx], int)),
                                 (255, 255, 255), 2)

            # --- Hand holding logic ---
            holding_status_for_person = False
            for hid in [9, 10]:  # wrists
                if person_kpts.shape[0] <= hid or person_conf[hid] < KP_CONF_THRESH:
                    continue

                hx, hy = person_kpts[hid]

                # Smooth position
                hand_history.setdefault((person_idx, hid), deque(maxlen=SMOOTH_FRAMES))
                hand_history[(person_idx, hid)].append((hx, hy))
                avg_hx = int(np.mean([p[0] for p in hand_history[(person_idx, hid)]]))
                avg_hy = int(np.mean([p[1] for p in hand_history[(person_idx, hid)]]))

                if not (0 <= avg_hx < width and 0 <= avg_hy < height):
                    continue

                # Overlap check
                y_grid, x_grid = np.ogrid[:height, :width]
                mask_circle = (x_grid - avg_hx) ** 2 + (y_grid - avg_hy) ** 2 <= HAND_RADIUS ** 2
                inside_mask = np.logical_and(mask_circle, handrail_mask > 0)
                overlap_ratio = inside_mask.sum() / (mask_circle.sum() + 1e-6)

                # Smooth status
                status_memory.setdefault((person_idx, hid), deque(maxlen=SMOOTH_FRAMES))
                status_memory[(person_idx, hid)].append(overlap_ratio >= HAND_OVERLAP_RATIO)
                is_hand_holding = sum(status_memory[(person_idx, hid)]) >= (SMOOTH_FRAMES // 2 + 1)

                if is_hand_holding:
                    holding_status_for_person = True

                # Debug circle
                cv2.circle(frame_vis, (avg_hx, avg_hy), HAND_RADIUS, (0, 255, 255), 2)  # yellow check area
                cv2.circle(frame_vis, (avg_hx, avg_hy), 8, (0, 255, 0) if is_hand_holding else (0, 0, 255), -1)

            # Label
            label_pos = (int(person_kpts[5][0]), int(person_kpts[5][1]) - 20) if person_kpts.shape[0] > 5 else (10, 30)
            text = "HOLDING" if holding_status_for_person else "NOT HOLDING"
            color = (0, 255, 0) if holding_status_for_person else (0, 0, 255)
            cv2.putText(frame_vis, text, label_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

    out.write(frame_vis)
    frame_num += 1

    elapsed = time.time() - start_time
    fps_proc = frame_num / elapsed if elapsed > 0 else 0
    eta = (total_frames - frame_num) / fps_proc if fps_proc > 0 else 0
    print(f"Frame {frame_num}/{total_frames} | {fps_proc:.2f} FPS | ETA: {eta/60:.1f} min", flush=True)

cap.release()
out.release()
print(f"video saved to: {output_path}")


Frame 1/266 | 0.64 FPS | ETA: 6.9 min
Frame 2/266 | 1.14 FPS | ETA: 3.9 min
Frame 3/266 | 1.55 FPS | ETA: 2.8 min
Frame 4/266 | 1.91 FPS | ETA: 2.3 min
Frame 5/266 | 2.23 FPS | ETA: 2.0 min
Frame 6/266 | 2.51 FPS | ETA: 1.7 min
Frame 7/266 | 2.76 FPS | ETA: 1.6 min
Frame 8/266 | 2.96 FPS | ETA: 1.5 min
Frame 9/266 | 3.15 FPS | ETA: 1.4 min
Frame 10/266 | 3.32 FPS | ETA: 1.3 min
Frame 11/266 | 3.48 FPS | ETA: 1.2 min
Frame 12/266 | 3.61 FPS | ETA: 1.2 min
Frame 13/266 | 3.72 FPS | ETA: 1.1 min
Frame 14/266 | 3.84 FPS | ETA: 1.1 min
Frame 15/266 | 3.93 FPS | ETA: 1.1 min
Frame 16/266 | 4.02 FPS | ETA: 1.0 min
Frame 17/266 | 4.11 FPS | ETA: 1.0 min
Frame 18/266 | 4.19 FPS | ETA: 1.0 min
Frame 19/266 | 4.25 FPS | ETA: 1.0 min
Frame 20/266 | 4.27 FPS | ETA: 1.0 min
Frame 21/266 | 4.29 FPS | ETA: 1.0 min
Frame 22/266 | 4.32 FPS | ETA: 0.9 min
Frame 23/266 | 4.36 FPS | ETA: 0.9 min
Frame 24/266 | 4.39 FPS | ETA: 0.9 min
Frame 25/266 | 4.42 FPS | ETA: 0.9 min
Frame 26/266 | 4.45 FPS | ETA: 0.9

### stair mask display

In [22]:
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
import os
import time

seg_model_path = "src/model/best_stair_handrail_model.pt"
pose_model_path = "src/model/best_pose_model.pt"
seg_model = YOLO(seg_model_path)
pose_model = YOLO(pose_model_path)

video_path = r"D:\Aldi\stairvision\src\dataset\west\videos\Copy of IMG_4566.MOV"
output_dir = r"D:\Aldi\stairvision\src\dataset\west\videos\output_videos"
output_prefix = "output_with_mask_"

os.makedirs(output_dir, exist_ok=True)

base_name = os.path.basename(video_path)
base_name_no_ext, ext = os.path.splitext(base_name)
new_file_name = f"{output_prefix}{base_name_no_ext}{ext}"
output_path = os.path.join(output_dir, new_file_name)
counter = 1
while os.path.exists(output_path):
    new_file_name = f"{output_prefix}{base_name_no_ext} ({counter}){ext}"
    output_path = os.path.join(output_dir, new_file_name)
    counter += 1

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise RuntimeError(f"❌ cannot open video from the path: {video_path}")

total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

DILATE_KERNEL_SIZE = 4
CLOSE_KERNEL_SIZE = 4
ERODE_KERNEL_SIZE = 3
KP_CONF_THRESH = 0.25
HAND_RADIUS = max(5, int(0.05 * max(width, height)))
HAND_OVERLAP_RATIO = 0.2
SMOOTH_FRAMES = 3

SKELETON_CONNECTIONS = [
    (0, 1), (0, 2), (1, 3), (2, 4), (5, 6), (5, 7), (7, 9), (6, 8),
    (8, 10), (11, 12), (5, 11), (6, 12), (11, 13), (13, 15), (12, 14), (14, 16)
]

hand_history = {}
status_memory = {}

start_time = time.time()
frame_num = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_orig = frame.copy()

    # segmentation
    seg_results = seg_model.predict(frame, conf=0.4, verbose=False)
    handrail_mask = np.zeros((height, width), dtype=np.uint8)
    stair_mask = np.zeros((height, width), dtype=np.uint8)

    for r in seg_results:
        if r.masks is not None:
            for mask_poly, cls in zip(r.masks.xy, r.boxes.cls):
                poly = np.array(mask_poly, dtype=np.int32)
                if int(cls) == 0:  # handrail
                    cv2.fillPoly(handrail_mask, [poly], 255)
                elif int(cls) == 1:  # stair
                    cv2.fillPoly(stair_mask, [poly], 255)

    # morph ops
    close_kernel = np.ones((CLOSE_KERNEL_SIZE, CLOSE_KERNEL_SIZE), np.uint8)
    dilate_kernel = np.ones((DILATE_KERNEL_SIZE, DILATE_KERNEL_SIZE), np.uint8)
    erode_kernel = np.ones((ERODE_KERNEL_SIZE, ERODE_KERNEL_SIZE), np.uint8)

    for mask in [handrail_mask, stair_mask]:
        mask[:] = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, close_kernel)
        mask[:] = cv2.dilate(mask, dilate_kernel, iterations=1)
        mask[:] = cv2.erode(mask, erode_kernel, iterations=1)

    # pose detection
    pose_results = pose_model.predict(frame_orig, conf=0.25, verbose=False)
    
    frame_vis = frame.copy()

    # --- NEW: Stair & handrail mask overlay for debugging ---
    stair_mask_colored = np.zeros_like(frame_vis)
    stair_mask_colored[stair_mask > 0] = (0, 0, 255)  # red for stairs
    handrail_mask_colored = np.zeros_like(frame_vis)
    handrail_mask_colored[handrail_mask > 0] = (255, 255, 0)  # cyan for handrail
    combined_mask_colored = cv2.addWeighted(stair_mask_colored, 1, handrail_mask_colored, 1, 0)
    frame_vis = cv2.addWeighted(frame_vis, 1, combined_mask_colored, 0.4, 0)
    # --- END NEW ---

    for pid, r in enumerate(pose_results):
        if r.keypoints is None:
            continue

        kpts_xy = r.keypoints.xy.cpu().numpy()
        try:
            kpts_conf = r.keypoints.conf.cpu().numpy()
        except Exception:
            kpts_conf = np.ones((kpts_xy.shape[0], kpts_xy.shape[1]))

        for person_idx in range(kpts_xy.shape[0]):
            person_kpts = kpts_xy[person_idx]
            person_conf = kpts_conf[person_idx]

            # --- NEW: Check BOTH feet ---
            left_ankle_idx = 15
            right_ankle_idx = 16
            if (person_kpts.shape[0] <= max(left_ankle_idx, right_ankle_idx) or
                person_conf[left_ankle_idx] < KP_CONF_THRESH or
                person_conf[right_ankle_idx] < KP_CONF_THRESH):
                continue

            lx, ly = int(person_kpts[left_ankle_idx][0]), int(person_kpts[left_ankle_idx][1])
            rx, ry = int(person_kpts[right_ankle_idx][0]), int(person_kpts[right_ankle_idx][1])

            lx = max(0, min(lx, width - 1))
            ly = max(0, min(ly, height - 1))
            rx = max(0, min(rx, width - 1))
            ry = max(0, min(ry, height - 1))

            # BOTH ankles must be inside stair mask
            if stair_mask[ly, lx] == 0 or stair_mask[ry, rx] == 0:
                continue
            # --- END NEW ---

            # draw skeleton
            for i in range(person_kpts.shape[0]):
                if person_conf[i] > KP_CONF_THRESH:
                    x, y = int(person_kpts[i][0]), int(person_kpts[i][1])
                    cv2.circle(frame_vis, (x, y), 3, (200, 200, 200), -1)

            for start_idx, end_idx in SKELETON_CONNECTIONS:
                if person_kpts.shape[0] > max(start_idx, end_idx):
                    if person_conf[start_idx] > KP_CONF_THRESH and person_conf[end_idx] > KP_CONF_THRESH:
                        start_point = tuple(np.array(person_kpts[start_idx], int))
                        end_point = tuple(np.array(person_kpts[end_idx], int))
                        cv2.line(frame_vis, start_point, end_point, (255, 255, 255), 2)
            
            # holding logic
            holding_status_for_person = False
            for hid in [9, 10]:  # wrists
                if person_kpts.shape[0] <= hid or person_conf[hid] < KP_CONF_THRESH:
                    continue

                hx, hy = person_kpts[hid]
                hand_history.setdefault((person_idx, hid), deque(maxlen=SMOOTH_FRAMES))
                hand_history[(person_idx, hid)].append((hx, hy))
                avg_hx = int(np.mean([p[0] for p in hand_history[(person_idx, hid)]]))
                avg_hy = int(np.mean([p[1] for p in hand_history[(person_idx, hid)]]))

                if not (0 <= avg_hx < width and 0 <= avg_hy < height):
                    continue

                y_grid, x_grid = np.ogrid[:height, :width]
                mask_circle = (x_grid - avg_hx) ** 2 + (y_grid - avg_hy) ** 2 <= HAND_RADIUS ** 2
                inside_mask = np.logical_and(mask_circle, handrail_mask > 0)
                overlap_ratio = inside_mask.sum() / (mask_circle.sum() + 1e-6)

                status_memory.setdefault((person_idx, hid), deque(maxlen=SMOOTH_FRAMES))
                status_memory[(person_idx, hid)].append(overlap_ratio >= HAND_OVERLAP_RATIO)
                is_hand_holding = sum(status_memory[(person_idx, hid)]) >= (SMOOTH_FRAMES // 2 + 1)

                if is_hand_holding:
                    holding_status_for_person = True

                cv2.circle(frame_vis, (avg_hx, avg_hy), 8,
                           (0, 255, 0) if is_hand_holding else (0, 0, 255), -1)

            label_pos = (int(person_kpts[5][0]), int(person_kpts[5][1]) - 20) if person_kpts.shape[0] > 5 else (10, 30)
            text = "HOLDING" if holding_status_for_person else "NOT HOLDING"
            color = (0, 255, 0) if holding_status_for_person else (0, 0, 255)
            cv2.putText(frame_vis, text, label_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

    out.write(frame_vis)

In [None]:
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
import time
import torch
import platform

# --- Model Path ---
seg_model_path = "src/model/best_stair_handrail_model.pt"
pose_model_path = "src/model/best_pose_model.pt"

# --- Load Model (use a smaller model if necessary) ---
seg_model = YOLO(seg_model_path)
pose_model = YOLO(pose_model_path)

# --- Camera Source ---
# 0 = internal webcam, 1 = external webcam, or enter an RTSP/IP Camera URL
CAMERA_SOURCE = 0

# --- Optimization ---
FRAME_SKIP = 1  # Process every frame, can be changed to 2 for faster processing
RESIZE_WIDTH = 640
RESIZE_HEIGHT = 360
KP_CONF_THRESH = 0.25
HAND_RADIUS = 20  # px, can be adjusted as needed
HAND_OVERLAP_RATIO = 0.2
SMOOTH_FRAMES = 3

DILATE_KERNEL_SIZE = 4
CLOSE_KERNEL_SIZE = 4
ERODE_KERNEL_SIZE = 3

SKELETON_CONNECTIONS = [
    (0, 1), (0, 2), (1, 3), (2, 4), (5, 6), (5, 7), (7, 9), (6, 8),
    (8, 10), (11, 12), (5, 11), (6, 12), (11, 13), (13, 15), (12, 14), (14, 16)
]

hand_history = {}
status_memory = {}

cap = cv2.VideoCapture(CAMERA_SOURCE)
if not cap.isOpened():
    raise RuntimeError(f"❌ Cannot open camera: {CAMERA_SOURCE}")

print("Press 'q' to exit.")
frame_num = 0
start_time = time.time()

while True:
    ret, frame = cap.read()
    if not ret:
        print("Frame not read, exiting...")
        break

    # Resize for lighter processing
    frame = cv2.resize(frame, (RESIZE_WIDTH, RESIZE_HEIGHT))
    height, width = frame.shape[:2]

    if frame_num % FRAME_SKIP == 0:
        # --- Segmentation ---
        with torch.no_grad():
            seg_results = seg_model.predict(frame, conf=0.4, verbose=False)
        handrail_mask = np.zeros((height, width), dtype=np.uint8)
        for r in seg_results:
            if r.masks is not None:
                for mask_poly, cls in zip(r.masks.xy, r.boxes.cls):
                    if int(cls) == 0:
                        poly = np.array(mask_poly, dtype=np.int32)
                        cv2.fillPoly(handrail_mask, [poly], 255)

        # Morphology
        close_kernel = np.ones((CLOSE_KERNEL_SIZE, CLOSE_KERNEL_SIZE), np.uint8)
        dilate_kernel = np.ones((DILATE_KERNEL_SIZE, DILATE_KERNEL_SIZE), np.uint8)
        erode_kernel = np.ones((ERODE_KERNEL_SIZE, ERODE_KERNEL_SIZE), np.uint8)
        handrail_mask = cv2.morphologyEx(handrail_mask, cv2.MORPH_CLOSE, close_kernel)
        handrail_mask = cv2.dilate(handrail_mask, dilate_kernel, iterations=1)
        handrail_mask = cv2.erode(handrail_mask, erode_kernel, iterations=1)

        # --- Pose Estimation ---
        with torch.no_grad():
            pose_results = pose_model.predict(frame, conf=0.25, verbose=False)

        # --- Visualization ---
        frame_vis = frame.copy()
        mask_overlay = frame_vis.copy()
        mask_overlay[handrail_mask > 0] = (255, 255, 0)
        frame_vis = cv2.addWeighted(mask_overlay, 0.4, frame_vis, 0.6, 0)

        for pid, r in enumerate(pose_results):
            if r.keypoints is None:
                continue
            kpts_xy = r.keypoints.xy.cpu().numpy()
            try:
                kpts_conf = r.keypoints.conf.cpu().numpy()
            except Exception:
                kpts_conf = np.ones((kpts_xy.shape[0], kpts_xy.shape[1]))

            for person_idx in range(kpts_xy.shape[0]):
                person_kpts = kpts_xy[person_idx]
                person_conf = kpts_conf[person_idx]

                # Skeleton
                for i in range(person_kpts.shape[0]):
                    if person_conf[i] > KP_CONF_THRESH:
                        x, y = int(person_kpts[i][0]), int(person_kpts[i][1])
                        cv2.circle(frame_vis, (x, y), 3, (200, 200, 200), -1)

                for start_idx, end_idx in SKELETON_CONNECTIONS:
                    if person_kpts.shape[0] > max(start_idx, end_idx):
                        if person_conf[start_idx] > KP_CONF_THRESH and person_conf[end_idx] > KP_CONF_THRESH:
                            start_point = tuple(np.array(person_kpts[start_idx], int))
                            end_point = tuple(np.array(person_kpts[end_idx], int))
                            cv2.line(frame_vis, start_point, end_point, (255, 255, 255), 2)

                # Hand Holding Logic
                holding_status_for_person = False
                for hid in [9, 10]:  # Left & right wrists
                    if person_kpts.shape[0] <= hid or person_conf[hid] < KP_CONF_THRESH:
                        continue
                    hx, hy = person_kpts[hid]
                    hand_history.setdefault((person_idx, hid), deque(maxlen=SMOOTH_FRAMES))
                    hand_history[(person_idx, hid)].append((hx, hy))
                    avg_hx = int(np.mean([p[0] for p in hand_history[(person_idx, hid)]]))
                    avg_hy = int(np.mean([p[1] for p in hand_history[(person_idx, hid)]]))
                    if not (0 <= avg_hx < width and 0 <= avg_hy < height):
                        continue
                    y_grid, x_grid = np.ogrid[:height, :width]
                    mask_circle = (x_grid - avg_hx) ** 2 + (y_grid - avg_hy) ** 2 <= HAND_RADIUS ** 2
                    inside_mask = np.logical_and(mask_circle, handrail_mask > 0)
                    overlap_ratio = inside_mask.sum() / (mask_circle.sum() + 1e-6)
                    status_memory.setdefault((person_idx, hid), deque(maxlen=SMOOTH_FRAMES))
                    status_memory[(person_idx, hid)].append(overlap_ratio >= HAND_OVERLAP_RATIO)
                    is_hand_holding = sum(status_memory[(person_idx, hid)]) >= (SMOOTH_FRAMES // 2 + 1)
                    if is_hand_holding:
                        holding_status_for_person = True
                    cv2.circle(frame_vis, (avg_hx, avg_hy), 8, (0, 255, 0) if is_hand_holding else (0, 0, 255), -1)

                label_pos = (int(person_kpts[5][0]), int(person_kpts[5][1]) - 20) if person_kpts.shape[0] > 5 else (10, 30)
                text = "HOLDING" if holding_status_for_person else "NOT HOLDING"
                color = (0, 255, 0) if holding_status_for_person else (0, 0, 255)
                cv2.putText(frame_vis, text, label_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

        # --- FPS Monitoring ---
        elapsed = time.time() - start_time
        frame_num += 1
        fps_proc = frame_num / elapsed if elapsed > 0 else 0
        cv2.putText(frame_vis, f"FPS: {fps_proc:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,255), 2)

        cv2.imshow("Stairvision Realtime", frame_vis)

    # Press 'q' to exit
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
