# Imports

In [1]:
from ultralytics import YOLO
import matplotlib.pyplot as plt
from PIL import Image
from matplotlib.patches import Polygon
import numpy as np
import cv2
import itertools
from time import time
import torch
import supervision as sv
from depth.depth_anything_v2.dpt import DepthAnythingV2
from matplotlib.colors import Normalize
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error

xFormers not available
xFormers not available


In [2]:
VIDEO_PATH = 'walking4.mp4'

# Device

In [3]:
# Check if GPU is available
if torch.cuda.is_available():
    device = torch.device('cuda')
    print("GPU is available. Using GPU.")
else:
    device = torch.device('cpu')
    print("GPU is not available. Using CPU.")

GPU is available. Using GPU.


# Object Detection Model

In [4]:
# Build a YOLOv9c model from pretrained weight
model = YOLO("yolov9c-seg.pt")
tracker = sv.ByteTrack()

model.to(device)

YOLO(
  (model): SegmentationModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(64, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(128, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): RepNCSPELAN4(
        (cv1): Conv(
          (conv): Conv2d(128, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(128, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Sequential(
          (0): RepCSP(
            (cv1): Conv(
              (conv): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
           

In [5]:

def calculate_head_and_leg_points(polygon_points, threshold=0.12):
    """
    Calculate the average points for the head and leg based on the top and bottom threshold percentage of Y-coordinates.

    Parameters:
    polygon_points (np.array): The polygon points.
    threshold (float): The percentage to consider for the top and bottom points (default is 0.07).

    Returns:
    tuple: The average points for the head and leg.
    """
    poly = np.array(polygon_points, dtype=np.int32)

    # Extract Y-coordinates
    y_coords = poly[:, 1]

    # Calculate top and bottom threshold percentage
    top_threshold_indices = np.argsort(y_coords)[:max(int(threshold * len(y_coords)), 1)]
    bottom_threshold_indices = np.argsort(y_coords)[- max (int(threshold * len(y_coords)), 1):]

    # Get average points for head and leg
    head_points = poly[top_threshold_indices]
    leg_points = poly[bottom_threshold_indices]

    head_avg = np.mean(head_points, axis=0).astype(int)
    leg_avg = np.mean(leg_points, axis=0).astype(int)

    leg_avg[1] = np.max(y_coords)
    head_avg[1] = np.min(y_coords)


    return head_avg, leg_avg

In [6]:
def process_detection_results(model, frame):
    # Convert frame to RGB
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    
    # Perform inference using the model
    results = model(frame_rgb, classes=[0], conf=0.45)[0]
    masks = results.masks
    boxes = results.boxes

    # Dictionary to store head and leg positions
    legs_and_heads = {}

    # List to store bounding boxes
    boxes_list = []

    # Iterate over each detected box and mask
    for idx, box in enumerate(boxes):
        # Convert box coordinates to list
        xyxy = box.cpu().xyxy.tolist()[0]
        boxes_list.append(xyxy)

        # Calculate head and leg positions for the current mask
        head_pos, leg_pos = calculate_head_and_leg_points(masks[idx].xy[0])
        legs_and_heads[idx] = (head_pos, leg_pos)

    return results, legs_and_heads, boxes_list

In [7]:
def get_IOU( bbox1, bbox2):
      """
      Calculate the Intersection over Union (IoU) between two bounding boxes.

      Args:
      - bbox1: Coordinates of the first bounding box in the format [x1, y1, x2, y2].
      - bbox2: Coordinates of the second bounding box in the format [x1, y1, x2, y2].

      Returns:
      - iou: Intersection over Union (IoU) score between the two bounding boxes.
      """

      x1, y1, x2, y2 = bbox1
      X1, Y1, X2, Y2 = bbox2

      # Calculate intersection area
      interArea = max(0, min(x2, X2) - max(x1, X1)) * max(0, min(y2, Y2) - max(y1, Y1))

      # Calculate areas of bounding boxes
      bbox1_area = (x2 - x1) * (y2 - y1)
      bbox2_area = (X2 - X1) * (Y2 - Y1)

      # Calculate IoU
      iou = interArea / (bbox1_area + bbox2_area - interArea)

      return iou

In [8]:
def match(xyxy, boxes):
    best_idx = 0
    best_iou = 0.0

    for idx, box in enumerate(boxes):
        iou = get_IOU(box, xyxy)

        if iou>best_iou:
            best_iou = iou
            best_idx = idx

    return best_idx


# Depth estimation model

In [9]:
def load_depth_model(device, encoder='vitl', load_from='depth/assets/depth_anything_v2_metric_vkitti_vitl.pth', max_depth=25):
    
    model_configs = {
        'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},
        'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},
        'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},
        'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}
    }
    
    depth_anything = DepthAnythingV2(**{**model_configs[encoder], 'max_depth': max_depth})
    depth_anything.load_state_dict(torch.load(load_from, map_location=device))
    depth_anything = depth_anything.to(device).eval()
    
    return depth_anything

In [10]:
depth_anything= load_depth_model(device)

# Camera calibration on the first frame

In [11]:
from CameraCalibrateApp import CameraCalibrateApp

app = CameraCalibrateApp(VIDEO_PATH)
line_data, frame = app.start()

depth_map = depth_anything.infer_image(frame)


anchors = []

for start, end, true_length in line_data:
    length_pixels = np.sqrt((end[0] - start[0]) ** 2 + (end[1] - start[1]) ** 2)
    middle_point = ((start[0] + end[0]) // 2, (start[1] + end[1]) // 2)
    depth = depth_map[middle_point[1], middle_point[0]]

    length_ratio = (length_pixels * np.sqrt (depth) )/ true_length
    
    anchors.append((length_pixels,  true_length,  depth))


# Dataset and model

In [12]:
cap = cv2.VideoCapture(VIDEO_PATH)

dic = {}
frame_count = -1

step = 90
frames_limit = 6000

while cap.isOpened():
    ret, frame = cap.read()
    frame_count += 1
    if not ret:
        break
    
    if frame_count % step != 0:
         continue
    results, legs_and_heads, boxes = process_detection_results(model, frame)

    detections = sv.Detections.from_ultralytics(results)
    detections = tracker.update_with_detections(detections)

    depth_map = depth_anything.infer_image(frame)


    for detection_idx, _ in enumerate(detections):
            xyxy = detections[detection_idx].xyxy.tolist()[0]
            obj_id = detections[detection_idx].tracker_id[0]

            best_idx = match(xyxy, boxes)


            head_pos, leg_pos = legs_and_heads[best_idx]

            #####
            start, end = head_pos, leg_pos
            length_pixels = np.sqrt((end[0] - start[0]) ** 2 + (end[1] - start[1]) ** 2)
            middle_point = ((start[0] + end[0]) // 2, (start[1] + end[1]) // 2)

            depth = depth_map[middle_point[1], middle_point[0]]

            if obj_id not in dic:
                 dic[obj_id] = [(depth, length_pixels)]
            else: 
                dic[obj_id].append((depth, length_pixels)) 

    
    cv2.waitKey(1) 

    
    if frame_count >= frames_limit:
        break
    print(f"Processed frame {frame_count}")

# Release everything when finished
cap.release()

cv2.destroyAllWindows()
print(dic)


0: 384x640 9 persons, 202.4ms
Speed: 3.5ms preprocess, 202.4ms inference, 469.7ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 0

0: 384x640 12 persons, 19.9ms
Speed: 2.1ms preprocess, 19.9ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 90

0: 384x640 15 persons, 20.3ms
Speed: 2.5ms preprocess, 20.3ms inference, 3.5ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 180

0: 384x640 15 persons, 21.6ms
Speed: 2.2ms preprocess, 21.6ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 270

0: 384x640 17 persons, 21.1ms
Speed: 2.5ms preprocess, 21.1ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 360

0: 384x640 12 persons, 19.1ms
Speed: 1.9ms preprocess, 19.1ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 450

0: 384x640 14 persons, 21.4ms
Speed: 2.0ms preprocess, 21.4ms inference, 2.2ms postprocess per image at shape (

In [13]:
# Function to get the data element
def get_data_element(p1, p2):
    depth_diff = p2[0] - p1[0]
    
    if depth_diff == 0:
        return None
    
    return p1[0], depth_diff, p2[1]/p1[1]

In [14]:
# Generate dataset using permutations
dataset = []
for id_data in dic.values():
    # Generate permutations of two elements
    permutations = list(itertools.permutations(id_data, 2))
    for perm in permutations:
        p1, p2 = perm
        scale_change = get_data_element(p1, p2)
        if scale_change is not None:
            dataset.append(scale_change)
print(len(dataset))

2680


In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split

In [17]:
# Prepare data
X = [[elem[0], elem[1]] for elem in dataset]
y = [elem[2] for elem in dataset]

X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32).view(-1, 1)

# Create a dataset and split into training and test sets
full_dataset = TensorDataset(X, y)
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

# Data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define the neural network model
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(2, 64)
        self.fc2 = nn.Linear(64, 128)
        self.fc25 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc25(x))
        x = torch.relu(self.fc3(x))
        x = self.fc4(x)
        return x

def custom_loss(y_pred, y_true, epsilon=1e-3):
    y_true = torch.clamp(y_true, min=epsilon)  # Ensure y_true is not zero
    l = torch.mean(    (torch.log(y_pred) - torch.log(y_true))*      (torch.log(y_pred) - torch.log(y_true))  )
    

    return l


# Initialize the model, loss function, and optimizer
cam_clib_model = SimpleNN()
optimizer = optim.Adam(cam_clib_model.parameters(), lr=0.001)

# Training the cam_clib_model
num_epochs = 10
for epoch in range(num_epochs):
    cam_clib_model.train()
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        y_pred = cam_clib_model(X_batch)
        loss = custom_loss(y_pred, y_batch)
        loss.backward()
        optimizer.step()

    if (epoch+1) % 1 == 0:
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')

# Evaluate the cam_clib_model
cam_clib_model.eval()
test_loss = 0.0
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        y_pred = cam_clib_model(X_batch)
        loss = custom_loss(y_pred, y_batch)
        test_loss += loss.item()

test_loss /= len(test_loader)
print(f'Test Loss: {test_loss}')

Epoch 1/10, Loss: 0.08349927514791489
Epoch 2/10, Loss: 0.08998268842697144
Epoch 3/10, Loss: 0.06479211896657944
Epoch 4/10, Loss: 0.07979784905910492
Epoch 5/10, Loss: 0.03559689596295357
Epoch 6/10, Loss: 0.10715124011039734
Epoch 7/10, Loss: 0.08736579865217209
Epoch 8/10, Loss: 0.12562216818332672
Epoch 9/10, Loss: 0.05495157837867737
Epoch 10/10, Loss: 0.0324413925409317
Test Loss: 0.05209240152993623


In [19]:
def infer_cam_calib(cam_clib_model, current_depth, depth_diff):
    current_depth = float(current_depth)
    depth_diff = float(depth_diff)
    scale_change = cam_clib_model(torch.tensor([current_depth, depth_diff], dtype=torch.float32))
    return scale_change.item()
infer_cam_calib(cam_clib_model, 4, -5)

3.4697468280792236

# Video

In [20]:
def annotate_frame(frame, head_pos, leg_pos, obj_id):          
    # Draw circles or markers for head and leg positions
    cv2.circle(frame, head_pos, 5, (0, 255, 0), -1)  # Green circle for head
    cv2.circle(frame, leg_pos, 5, (0, 0, 255), -1)   # Red circle for leg
    cv2.line(frame, head_pos, leg_pos, (255, 0, 0), 2) 
    
    # Draw the id in the center point of the head and the leg
    midpoint = ((head_pos[0] + leg_pos[0]) // 2, (head_pos[1] + leg_pos[1]) // 2)
    cv2.putText(frame, str(obj_id), midpoint, cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

    return frame

In [21]:
def estimate_hight(c_length_pixels, c_depth, anchors):
    n = len(anchors)

    av_c_tall = 0.0

    for a_length_pixels,  a_true_length,  a_depth in anchors:
        c_tranformed_length_pixels = infer_cam_calib(cam_clib_model, c_depth, a_depth - c_depth) * c_length_pixels
        c_tall = a_true_length * ( c_tranformed_length_pixels / a_length_pixels   )

        av_c_tall += c_tall/n

    return av_c_tall      
    

In [22]:
def detect_head_and_leg(video_path, output_video_path, model):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("Error: Couldn't open video.")
        return

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        
        results, legs_and_heads, boxes = process_detection_results(model, frame)

        detections = sv.Detections.from_ultralytics(results)
        detections = tracker.update_with_detections(detections)

        depth_map = depth_anything.infer_image(frame)

        for detection_idx, _ in enumerate(detections):
            xyxy = detections[detection_idx].xyxy.tolist()[0]
            obj_id = detections[detection_idx].tracker_id

            best_idx = match(xyxy, boxes)


            head_pos, leg_pos = legs_and_heads[best_idx]

            #####
            start, end = head_pos, leg_pos
            length_pixels = np.sqrt((end[0] - start[0]) ** 2 + (end[1] - start[1]) ** 2)
            middle_point = ((start[0] + end[0]) // 2, (start[1] + end[1]) // 2)

            depth = depth_map[middle_point[1], middle_point[0]]
            ##         

           
            obj_id =round (estimate_hight(length_pixels, depth, anchors), 2)


            frame = annotate_frame(frame, head_pos, leg_pos, obj_id)
       

        # Display the frame
        cv2.imshow('Frame', frame)

        # Write the frame into the output video file
        out.write(frame)

        # Exit if 'Esc' key is pressed
        if cv2.waitKey(1) & 0xFF == 27:  # 27 is the Esc key
            break

        frame_count += 1
        print(f"Processed frame {frame_count}/{total_frames}")

    # Release everything when finished
    cap.release()
    out.release()
    
    cv2.destroyAllWindows()

output_video_path = 'output_video.mp4'


detect_head_and_leg(VIDEO_PATH, output_video_path, model)


0: 384x640 9 persons, 29.5ms
Speed: 13.3ms preprocess, 29.5ms inference, 3.2ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 1/3493

0: 384x640 9 persons, 22.2ms
Speed: 2.0ms preprocess, 22.2ms inference, 2.2ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 2/3493

0: 384x640 9 persons, 21.3ms
Speed: 2.2ms preprocess, 21.3ms inference, 2.7ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 3/3493

0: 384x640 10 persons, 22.1ms
Speed: 2.5ms preprocess, 22.1ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 4/3493

0: 384x640 10 persons, 22.1ms
Speed: 2.2ms preprocess, 22.1ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 5/3493

0: 384x640 10 persons, 19.8ms
Speed: 1.8ms preprocess, 19.8ms inference, 2.5ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 6/3493

0: 384x640 10 persons, 20.8ms
Speed: 1.9ms preprocess, 20.8ms inference, 2.5ms postprocess per 