In [1]:
import cv2
import numpy as np
import os
from datetime import datetime
from collections import deque
import random
from short import Sort
from ultralytics import YOLO

In [2]:
model = YOLO('yolov9e-seg.pt')
tracker = Sort()
video_path = r"D:\Coding\FetchFrames\20240611_135158"
# Declare coordinates for the drawn structure. Final product will have a visual tab for this.
points = {
    'DeReg1L': (626, 726),
    'DeReg1R': (1426, 653),
    'DeRegL': (650, 624),
    'DeRegR': (1323, 575),
    'RegL': (675, 548),
    'RegR': (1228, 506),
    'Reg1L': (592, 856),
    'Reg1R': (1573, 762),
}
selected_point = None 
enable_editing = False # When set to true, it allows the user to drag one of the points.   


In [3]:
# Calculating the croppage based on how the points coordinates, using a padding of 100 px
def calculate_crop(p1, p2, padding=100, rev=False):
    if p1 > p2:
        if rev:
            return p2 - padding
        return p1 + padding
    else:
        if rev:
            return p1 - padding
        return p2 + padding

crop_y1 = calculate_crop(points['RegL'][1],points['RegR'][1], rev=True)
crop_x1 = calculate_crop(points['RegL'][0],points['Reg1L'][0], rev=True)
crop_y2 = calculate_crop(points['Reg1L'][1],points['Reg1R'][1])
crop_x2 = calculate_crop(points['RegR'][0],points['Reg1R'][0])

In [4]:
# Adjusting the coordinates based on the crop, to find out where the points are on the cropped image
def adjust_points_for_crop(points, crop_x1, crop_y1):
    adjusted_points = {k: (v[0] - crop_x1, v[1] - crop_y1) for k, v in points.items()}
    return adjusted_points
adjusted_points = adjust_points_for_crop(points, crop_x1, crop_y1)

In [5]:
# Speed estimation variables
distance_real_reg =  5 # meters, estimated by the user
disntace_real_regR = 6 # meters, estimated by the user
distance_pixels_reg = np.linalg.norm(np.array(points['RegL']) - np.array(points['Reg1L']))
distance_pixels_regR = np.linalg.norm(np.array(points['RegR']) - np.array(points['Reg1R']))
conversion_factor_reg = distance_real_reg / distance_pixels_reg
conversion_factor_regR = disntace_real_regR / distance_pixels_regR
average_conversion_factor = (conversion_factor_reg + conversion_factor_regR) / 2

In [6]:
# Check if line p1->p2 intersects with predefined lines from the structure and return intersecting line tags.
def line_intersects(p1, p2):
    def ccw(A, B, C):
        return (C[1] - A[1]) * (B[0] - A[0]) > (B[1] - A[1]) * (C[0] - A[0])
    
    def intersects(q1, q2):
        return ccw(p1, q1, q2) != ccw(p2, q1, q2) and ccw(p1, p2, q1) != ccw(p1, p2, q2)
    result = []
    if intersects(adjusted_points['RegL'], adjusted_points['RegR']):
        result.append("Registered_Reg")
    if intersects(adjusted_points['Reg1L'], adjusted_points['Reg1R']):
        result.append("Registered_Reg1")
    if intersects(adjusted_points['DeRegL'], adjusted_points['DeRegR']):
        result.append("Deregistered_DeReg")
    if intersects(adjusted_points['DeReg1L'], adjusted_points['DeReg1R']):
        result.append("Deregistered_DeReg1")
    return result

def draw_regions(event, x, y, flags, param):
    global points, selected_point
    if enable_editing:
        if event == cv2.EVENT_LBUTTONDOWN:
            for pname, p in points.items():
                if np.linalg.norm(np.array([x, y]) - np.array(p)) < 10:
                    selected_point = pname
                    break
        elif event == cv2.EVENT_MOUSEMOVE and selected_point is not None:
            points[selected_point] = (x, y)
        elif event == cv2.EVENT_LBUTTONUP:
            selected_point = None
            print("points = {")
            for point, coords in sorted(points.items()):
                print(f"    '{point}': {coords},")
            print("}")
def draw_all_regions(img, points):
    regions = {
        'Region1': ['RegL', 'RegR', 'DeRegR', 'DeRegL'],
        'Region3': ['DeReg1L', 'DeReg1R', 'Reg1R', 'Reg1L'],
        'Region2': ['DeRegL', 'DeRegR', 'DeReg1R', 'DeReg1L']
    }
    colors = [(255, 0, 0), (0, 0, 255), ( 0, 255, 0)]
    for color, region in zip(colors, regions.values()):
        pts = np.array([points[pt] for pt in region], np.int32)
        pts = pts.reshape((-1, 1, 2))
        cv2.polylines(img, [pts], isClosed=True, color=color, thickness=2)

def calculate_speed(positions, conversion_factor, fps):
    if len(positions) > 1:
        pixel_distance = np.linalg.norm(np.array(positions[-1]) - np.array(positions[-2]))
        # Convert distance from pixels to meters
        real_distance = pixel_distance * conversion_factor
        # Speed = distance/time (time between frames is 1/fps), this will be calculated inside of the final product
        speed = real_distance * fps
        return speed
    return 0   

In [7]:
# Declaring the output for the video writer
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"D:\Coding\FetchFrames\Video_outputs\{timestamp}.mp4"
use_video = not os.path.isdir(video_path)


# Differentiate between video or a folder of image frames
if use_video:
    cap = cv2.VideoCapture(video_path)
    assert cap.isOpened(), "Error reading video file"
    w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))
else:
    frame_files = sorted([os.path.join(video_path, f) for f in os.listdir(video_path) if f.endswith('.png')])
    first_frame = cv2.imread(frame_files[0])
    h, w = first_frame.shape[:2]
    fps = 0.8725868725868726
video_writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))

cv2.namedWindow('Live Tracking')
cv2.setMouseCallback('Live Tracking', draw_regions)
track_positions = {}
frame_count = 0
count = 0 

while True:
    if use_video:
        success, im0 = cap.read()
        if not success:
            break
    else:
        if frame_count >= len(frame_files):
            break
        im0 = cv2.imread(frame_files[frame_count])
    draw_all_regions(im0, points)  # Draw the structure
    original = im0.copy()
    im0 = im0[crop_y1:crop_y2, crop_x1:crop_x2] # Crop the frame before running the model, for optimization
    im0 = cv2.putText(im0, f"Count: {count}", (10,100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA) 
    overlay = im0.copy()
    results = model(im0, classes=0, line_width=1, show_boxes=True, retina_masks=True, verbose=False)
    current_ids = set()

    # Iterate over the segmenations
    for res in results:
        boxes = res.boxes.xyxy.cpu().numpy().astype(int)
        masks = res.masks.xy if hasattr(res, 'masks') else []
        tracks = tracker.update(boxes)
        
        for box, mask in zip(tracks.astype(int), masks):
            xmin, ymin, xmax, ymax, track_id = box
            # Calculate the coordinates of the center of the mask to draw a dot, which will be used to track the mask 
            x_center = (xmin + xmax) // 2
            y_center = (ymin + ymax) // 2
            current_ids.add(track_id)
            new_position = (x_center, y_center)

            if track_id not in track_positions:
                random_color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
                track_positions[track_id] = {'positions': deque(maxlen=5), 'color': random_color, 'registrations' : [], 'counted': False, 'speed': "?"}
            
            pt = (int(round(x_center)), int(round(y_center)))
            track_positions[track_id]['positions'].append(pt)

            # Draw a valid mask red 
            if mask.size > 0:
                mask_poly = np.array(mask, dtype=np.int32).reshape((-1, 1, 2))
                mask_poly = np.append(mask_poly, [mask_poly[0]], axis=0)
                if cv2.pointPolygonTest(mask_poly, pt, False) == 0:
                    track_positions[track_id]['color'] = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
                cv2.fillPoly(overlay, [mask_poly], (0,0,255))


            # Logic for registering when a line is crossed
            if len(track_positions[track_id]['positions']) > 1:
                last_position = track_positions[track_id]['positions'][-2]
                intersections = line_intersects(last_position, new_position)
                for tag in intersections:
                    track_positions[track_id]['registrations'].append(tag)
                if len(track_positions[track_id]['registrations']) > 0:
                    print(track_positions[track_id]['registrations'])
                    #Debugging print statement
                # Once both registers have been completed, the person is counted and cannot be counted again. 
                if "Registered_Reg" in track_positions[track_id]['registrations'] and "Deregistered_DeReg" in track_positions[track_id]['registrations'] and track_positions[track_id]['counted'] == False:
                    track_positions[track_id]['counted'] = True
                    count += 1
                elif "Registered_Reg1" in track_positions[track_id]['registrations'] and "Deregistered_DeReg1" in track_positions[track_id]['registrations'] and track_positions[track_id]['counted'] == False:
                    track_positions[track_id]['counted'] = True
                    count += 1
    
    # Draw tracks behind the masks, a line between the coordinates of the center, stored in track_position
    for track_id, data in track_positions.items():
        for i in range(1, len(data['positions'])):
            cv2.line(im0, data['positions'][i - 1], data['positions'][i], data['color'], 2)
        if len(data['positions']) > 1:
            speed = calculate_speed(data['positions'], average_conversion_factor, fps)
            data['speed'] = speed  # Store the speed in track_positions
            cv2.putText(im0, f"Speed: {data['speed']:.2f} m/s", (positions[0], positions[1] + 20), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 0), 2)

    
    alpha = 0.6
    im0 = cv2.addWeighted(im0, alpha, overlay, 1 - alpha, 0)
    
    for track_id in current_ids:
        positions = track_positions[track_id]['positions'][-1]
        cv2.circle(im0, positions, radius=4, color=(255, 105, 180), thickness=-1)
        cv2.putText(im0, f"Id:{track_id}", (positions[0], positions[1] - 10), cv2.FONT_HERSHEY_PLAIN, 2, (0, 0, 255), 2)
    frame_count += 1
    original[crop_y1:crop_y2, crop_x1:crop_x2] = im0
    original = cv2.putText(original, f"Count: {count}", (10,100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2, cv2.LINE_AA) 

    video_writer.write(original)
    cv2.imshow('Live Tracking', original)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    #print(points)
    track_positions = {key: val for key, val in track_positions.items() if key in current_ids}

if use_video:
    cap.release()
video_writer.release()
cv2.destroyAllWindows()


['Deregistered_DeReg1']
['Deregistered_DeReg1']
['Deregistered_DeReg1']
['Deregistered_DeReg1']
['Registered_Reg1']
['Deregistered_DeReg']
['Deregistered_DeReg1']
['Deregistered_DeReg1', 'Deregistered_DeReg']
['Registered_Reg1']
['Registered_Reg1']
['Deregistered_DeReg']
['Deregistered_DeReg1', 'Deregistered_DeReg']
['Deregistered_DeReg1', 'Deregistered_DeReg']
['Registered_Reg1']
['Registered_Reg1']
['Deregistered_DeReg1', 'Deregistered_DeReg']
['Deregistered_DeReg1', 'Deregistered_DeReg']
['Registered_Reg1', 'Deregistered_DeReg1']
['Registered_Reg1', 'Deregistered_DeReg1']
['Deregistered_DeReg1', 'Deregistered_DeReg', 'Registered_Reg']
['Registered_Reg1', 'Deregistered_DeReg1']
['Registered_Reg1', 'Deregistered_DeReg1']
['Deregistered_DeReg1', 'Deregistered_DeReg', 'Registered_Reg']
['Registered_Reg1', 'Deregistered_DeReg1']
['Registered_Reg1', 'Deregistered_DeReg1', 'Deregistered_DeReg']
['Deregistered_DeReg1', 'Deregistered_DeReg', 'Registered_Reg']
['Registered_Reg1', 'Deregistere