In [20]:
from ultralytics import YOLO
from pathlib import Path
import torch
import cv2
import time
import numpy as np
import asyncio
from FeetModel import KeypointPredictor
import math
import joblib

In [24]:
class WarningZoneDetector:
	def __init__(self):
		self.device = "cuda" if torch.cuda.is_available() else "cpu"
		self.model = YOLO("./Models/yolov8x-pose.pt").to(self.device)
		self.points = []
		self.loss = []

	def onMouseClick(self, event, x, y, flags, param):
		if event == cv2.EVENT_LBUTTONDOWN:
			print(x, y)
			self.points.append((x, y))


	def draw_overlap(self, polygon_points, frame, l_foot, r_foot):
		# Not enough points to form a polygon
		if len(polygon_points) < 3:
			return False

		user_polygon = np.array(polygon_points, np.int32)
		is_overlap = False

		# Check if either foot is inside the polygon
		if cv2.pointPolygonTest(user_polygon, l_foot, False) > 0:
			is_overlap = True
			cv2.circle(frame, l_foot, 5, (0, 0, 255), -1)

		if cv2.pointPolygonTest(user_polygon, r_foot, False) > 0:
			is_overlap = True
			cv2.circle(frame, r_foot, 5, (0, 0, 255), -1)

		return is_overlap

	def predict_foot(self, l_shoulder, r_shoulder, l_hip, r_hip, keypoint_pred_model):
		# Pytorch deep learning
		device = "cuda" if torch.cuda.is_available() else "cpu"
		inputs = torch.tensor([l_shoulder[0], l_shoulder[1], r_shoulder[0], r_shoulder[1], l_hip[0], l_hip[1], r_hip[0], r_hip[1]], dtype=torch.float32).to(device)
		result = keypoint_pred_model(inputs)
		return (result[0], result[1]), (result[2], result[3])

	def process_frame(self, frame):
		small_frame = cv2.resize(frame, (640, 480))
		results = self.model(source=small_frame, conf=0.3, save=False, classes=[0], verbose=False)
		return results[0]
	
	def run(self, video_path):
		# Testing with youtube videos
		# -----------------------
		source = cv2.VideoCapture(video_path)
		# -----------------------

		# source  = cv2.VideoCapture(0)
		cv2.namedWindow("YOLO Output")
		cv2.setMouseCallback("YOLO Output", self.onMouseClick)

		# Points of zone
		points = []

		# For FPS calculation
		new_frame_time = 0
		prev_frame_time = 0

		# font which we will be using to display FPS 
		font = cv2.FONT_HERSHEY_SIMPLEX

		small_frame_height, small_frame_width = 480, 640  # The resized small frame used in process_frame

		keypoint_pred_model = KeypointPredictor().to(self.device)
		keypoint_pred_model.load_state_dict(torch.load("./Models/FeetPredict.pt"))

		start_time = time.time()
		while True:
			ret, frame = source.read()
			frame_height, frame_width = frame.shape[:2]

			# Detect objects every n frames
			results = self.process_frame(frame)
			
			# Draw only the foot keypoints (keypoints 15: left foot, 16: right foot)
			keypoints = results.keypoints.xy
			for person_keypoints in keypoints:
				left_foot = person_keypoints[15][:2].tolist()
				right_foot = person_keypoints[16][:2].tolist()
				left_shoulder = person_keypoints[5][:2].tolist()
				right_shoulder = person_keypoints[6][:2].tolist()
				l_hip = person_keypoints[11][:2].tolist()
				r_hip = person_keypoints[12][:2].tolist()

				# Predict feet points if ears and hips are visible and feet are not visible
				if left_shoulder != [0, 0] and right_shoulder != [0, 0] and l_hip != [0, 0] and r_hip != [0, 0] and left_foot == [0, 0] and right_foot == [0, 0]:
					left_shoulder = tuple(map(int, person_keypoints[5][:2]))  # Left shoulder [x, y]
					right_shoulder = tuple(map(int, person_keypoints[6][:2]))  # Right shoulder [x, y]
					
					# Use distance between shoulders and hip 
					# points to calculate foot positions
					left_hip = tuple(map(int, person_keypoints[11][:2]))  # Left hip [x, y]
					right_hip = tuple(map(int, person_keypoints[12][:2]))  # Right hip [x, y]

					left_foot, right_foot = self.predict_foot(left_shoulder, right_shoulder, left_hip, right_hip, keypoint_pred_model)

				if left_foot != [0, 0] and right_foot != [0, 0]:
					# Scale the coordinates back to the original frame size
					left_foot = (
						int(left_foot[0] * frame_width / small_frame_width),
						int(left_foot[1] * frame_height / small_frame_height)
					)

					right_foot = (
						int(right_foot[0] * frame_width / small_frame_width),
						int(right_foot[1] * frame_height / small_frame_height)
					)

					# Draw the left foot keypoint
					cv2.circle(frame, left_foot, 5, (0, 255, 0), -1)  # Green for left foot

					# Draw the right foot keypoint
					cv2.circle(frame, right_foot, 5, (9, 255, 0), -1)  # Blue for right foot

					self.draw_overlap(self.points, frame, left_foot, right_foot)

			# Plot polygons or any other features you need
			for i, point in enumerate(self.points):
				cv2.circle(frame, point, 5, (243, 211, 74), 2)
				if len(self.points) > 1:
					next_point = self.points[i+1] if i < len(self.points)-1 else self.points[0]
					cv2.line(frame, point, next_point, (243, 211, 74), 2)

			# time when we finish processing for this frame
			new_frame_time = time.time()
		
			# Calculate FPS
			fps = 1 / (new_frame_time-prev_frame_time) 
			prev_frame_time = new_frame_time 
			fps = "FPS: " + str(int(fps))
			cv2.putText(frame, fps, (frame.shape[1]-200, 60), font, 1.5, (0, 0, 0), 7, cv2.LINE_AA)
			cv2.putText(frame, fps, (frame.shape[1]-200, 60), font, 1.5, (255, 255, 255), 2, cv2.LINE_AA)
			
			# frame = cv2.resize(frame, (frame.shape[1], frame.shape[0]))
			cv2.imshow("YOLO Output", frame)
			
			if cv2.waitKey(10) & 0xFF == ord("q"):
				break

			# stop after 15 seconds of video
			if time.time() - start_time > 15:
				break

		source.release()
		cv2.destroyAllWindows()

In [25]:
zone_det = WarningZoneDetector()
zone_det.run("./Media/HD CCTV Camera video 3MP 4MP iProx CCTV HDCCTVCameras.net retail store.mp4")

722 372
913 422
928 591
604 656


# Loading YOLOv8

In [2]:
# Try to load model
path = "./Models/yolov8x-pose.pt"
if Path(path).exists():
    model = YOLO("./Models/yolov8x-pose.pt")

else:
    # Build from YAML and transfer weights
    model = YOLO("yolov8x-pose.yaml").load("./Models/yolov8x-pose.pt")

device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

YOLO(
  (model): PoseModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 80, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(80, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(80, 160, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(160, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C2f(
        (cv1): Conv(
          (conv): Conv2d(160, 160, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(160, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(400, 160, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(160, eps=0.001, momentum=0.03, affine=True, track_runnin

In [3]:
# Convert to tensorRT
# model.export(format="engine", int8=True)
# model.export(format="engine", half=False)
# model.export(format="engine", half=True)

model = YOLO("./Models/yolov8x-pose-f32.engine")

# Helper functions

In [4]:
def onMouseClick(event, x, y, flags, param):
	global points

	if event == cv2.EVENT_LBUTTONDOWN:
		print(x, y)
		points.append((x, y))


def draw_overlap(polygon_points, frame, l_foot, r_foot):
	# Not enough points to form a polygon
	if len(polygon_points) < 3:
		return False

	user_polygon = np.array(polygon_points, np.int32)
	is_overlap = False

	# Check if either foot is inside the polygon
	if cv2.pointPolygonTest(user_polygon, l_foot, False) > 0:
		is_overlap = True
		cv2.circle(frame, l_foot, 5, (0, 0, 255), -1)

	if cv2.pointPolygonTest(user_polygon, r_foot, False) > 0:
		is_overlap = True
		cv2.circle(frame, r_foot, 5, (0, 0, 255), -1)

	return is_overlap

def predict_foot(l_shoulder, r_shoulder, l_hip, r_hip, keypoint_pred_model, model="torch"):

	if model == "torch":
		# Pytorch deep learning
		device = "cuda" if torch.cuda.is_available() else "cpu"
		inputs = torch.tensor([l_shoulder[0], l_shoulder[1], r_shoulder[0], r_shoulder[1], l_hip[0], l_hip[1], r_hip[0], r_hip[1]], dtype=torch.float32).to(device)
		result = keypoint_pred_model(inputs)
		return (result[0], result[1]), (result[2], result[3])

	else:
		# sklearn linear regression
		inputs = np.array([l_shoulder[0], l_shoulder[1], r_shoulder[0], r_shoulder[1], l_hip[0], l_hip[1], r_hip[0], r_hip[1]])
		result = keypoint_pred_model.predict(inputs.reshape(1, -1))[0]
		return (result[0], result[1]), (result[2], result[3])


async def process_frame(frame):
	small_frame = cv2.resize(frame, (640, 480))
	results = await asyncio.to_thread(model, source=small_frame, conf=0.3, save=False, classes=[0], verbose=False)
	return results[0]

In [7]:
async def main():
	global points

	global loss
	loss = []

	# Testing with youtube videos
	# -----------------------
	source = cv2.VideoCapture("./Media/HD CCTV Camera video 3MP 4MP iProx CCTV HDCCTVCameras.net retail store.mp4")
	# -----------------------

	# source  = cv2.VideoCapture(0)
	cv2.namedWindow("YOLO Output")
	cv2.setMouseCallback("YOLO Output", onMouseClick)

	# Points of zone
	points = []

	# For FPS calculation
	new_frame_time = 0
	prev_frame_time = 0

	# Frame skipper
	frame_skip = 1 # Not skipping frames at the moment
	frame_count = 0

	# font which we will be using to display FPS 
	font = cv2.FONT_HERSHEY_SIMPLEX

	small_frame_height, small_frame_width = 480, 640  # The resized small frame used in process_frame

	model_type = "torch"
	if model_type == "torch":
		keypoint_pred_model = KeypointPredictor().to(device)
		keypoint_pred_model.load_state_dict(torch.load("./Models/FeetPredict.pt"))

	else:
		keypoint_pred_model = joblib.load("./Models/LinearRegression_FeetPredict.pkl")

	start_time = time.time()
	while True:
		ret, frame = source.read()
		frame_height, frame_width = frame.shape[:2]

		# Detect objects every n frames
		if frame_count % frame_skip == 0:
			results = await process_frame(frame)
			frame_count = 0
		
		frame_count += 1
		
		# Draw only the foot keypoints (keypoints 15: left foot, 16: right foot)
		keypoints = results.keypoints.xy
		for person_keypoints in keypoints:
			left_foot = person_keypoints[15][:2].tolist()
			right_foot = person_keypoints[16][:2].tolist()
			left_shoulder = person_keypoints[5][:2].tolist()
			right_shoulder = person_keypoints[6][:2].tolist()
			l_hip = person_keypoints[11][:2].tolist()
			r_hip = person_keypoints[12][:2].tolist()

			# Predict feet points if ears and hips are visible and feet are not visible
			# if left_shoulder != [0, 0] and right_shoulder != [0, 0] and l_hip != [0, 0] and r_hip != [0, 0] and left_foot == [0, 0] and right_foot == [0, 0]:
			if left_shoulder != [0, 0] and right_shoulder != [0, 0] and l_hip != [0, 0] and r_hip != [0, 0] and left_foot != [0, 0] and right_foot != [0, 0]: # For loss calculation
				left_shoulder = tuple(map(int, person_keypoints[5][:2]))  # Left shoulder [x, y]
				right_shoulder = tuple(map(int, person_keypoints[6][:2]))  # Right shoulder [x, y]

				# For testing
				cv2.circle(frame, (
					int(left_shoulder[0] * frame_width / small_frame_width),
					int(left_shoulder[1] * frame_height / small_frame_height)
				), 5, (0, 0, 255), -1)

				cv2.circle(frame, (
					int(right_shoulder[0] * frame_width / small_frame_width),
					int(right_shoulder[1] * frame_height / small_frame_height)
				), 5, (0, 0, 255), -1)
				#############################
				
				# Use distance between shoulders and hip 
				# points to calculate foot positions
				left_hip = tuple(map(int, person_keypoints[11][:2]))  # Left hip [x, y]
				right_hip = tuple(map(int, person_keypoints[12][:2]))  # Right hip [x, y]

				# For testing
				cv2.circle(frame, (
					int(left_hip[0] * frame_width / small_frame_width),
					int(left_hip[1] * frame_height / small_frame_height)
				), 5, (255, 0, 255), -1)
				cv2.circle(frame, (
					int(right_hip[0] * frame_width / small_frame_width),
					int(right_hip[1] * frame_height / small_frame_height)
				), 5, (255, 0, 255), -1)
				############################

				ground_truth_l_foot = left_foot
				ground_truth_r_foot = right_foot

				# feet are same distance from hip as hip is same distance from shoulder
				# l_dist = np.abs(left_shoulder[1] - left_hip[1])
				# r_dist = np.abs(right_shoulder[1] - right_hip[1])

				# left_foot = (left_hip[0], left_hip[1] + (l_dist * 1.3))
				# right_foot = (right_hip[0], right_hip[1] + (r_dist * 1.3))

				left_foot, right_foot = predict_foot(left_shoulder, right_shoulder, left_hip, right_hip, keypoint_pred_model, model=model_type)

				# Loss calculation
				# Calculate the distance between predicted feet and ground truth feet
				l_dist = math.dist(left_foot, ground_truth_l_foot)
				r_dist = math.dist(right_foot, ground_truth_r_foot)
				loss.append(np.mean([l_dist, r_dist]))

			# if left_foot != [0, 0] and right_foot != [0, 0]:
			if left_shoulder != [0, 0] and right_shoulder != [0, 0] and l_hip != [0, 0] and r_hip != [0, 0] and left_foot != [0, 0] and right_foot != [0, 0]: # For loss calculation
				# Scale the coordinates back to the original frame size
				left_foot = (
					int(left_foot[0] * frame_width / small_frame_width),
					int(left_foot[1] * frame_height / small_frame_height)
				)

				right_foot = (
					int(right_foot[0] * frame_width / small_frame_width),
					int(right_foot[1] * frame_height / small_frame_height)
				)

				# Draw the left foot keypoint
				cv2.circle(frame, left_foot, 5, (0, 255, 0), -1)  # Green for left foot

				# Draw the right foot keypoint
				cv2.circle(frame, right_foot, 5, (9, 255, 0), -1)  # Blue for right foot

				draw_overlap(points, frame, left_foot, right_foot)

		# Plot polygons or any other features you need
		for i, point in enumerate(points):
			cv2.circle(frame, point, 5, (243, 211, 74), 2)
			if len(points) > 1:
				next_point = points[i+1] if i < len(points)-1 else points[0]
				cv2.line(frame, point, next_point, (243, 211, 74), 2)

		# time when we finish processing for this frame
		new_frame_time = time.time()
	
		# Calculate FPS
		fps = 1 / (new_frame_time-prev_frame_time) 
		prev_frame_time = new_frame_time 
		fps = "FPS: " + str(int(fps))
		cv2.putText(frame, fps, (frame.shape[1]-200, 60), font, 1.5, (0, 0, 0), 7, cv2.LINE_AA)
		cv2.putText(frame, fps, (frame.shape[1]-200, 60), font, 1.5, (255, 255, 255), 2, cv2.LINE_AA)
		
		# frame = cv2.resize(frame, (frame.shape[1], frame.shape[0]))
		cv2.imshow("YOLO Output", frame)
		
		if cv2.waitKey(10) & 0xFF == ord("q"):
			break

		# stop after 15 seconds of video
		if time.time() - start_time > 15:
			break

	source.release()
	cv2.destroyAllWindows()
	
await main()

In [8]:
np.mean(loss)

11.471382819123939