In [1]:
import cv2
import ultralytics
import numpy as np
import pandas as pd

from collections import defaultdict
import time

In [6]:
# This will autodownload the yolov8x model automatically
model = ultralytics.models.YOLO('models/yolov8x.pt')
# Training with custom data but we wont be using that
# model.train(data='datasets/data.yaml', plots=True, save=True)

In [7]:
# Resize the video input size to improve fps
def resize_frame(frame, scale):
    width = int(frame.shape[1] * scale)
    height = int(frame.shape[0] * scale)
    resolution = (width, height)

    return cv2.resize(frame, resolution, interpolation=cv2.INTER_AREA)

In [None]:
# Flags
VIDEO_PATH = "input1.mp4"
MAX_PERSON_ALLOWED = 8
# In seconds
MAX_TIME_ALLOWED = 3
# Max frame saved per person, could be less than 30 if the person exit early
MAX_FRAME_SAVED = 30

# Time controls
warning_timer = 0
running_timer = 0

cap = cv2.VideoCapture(VIDEO_PATH)

track_history_xy = defaultdict(lambda: [])
track_history_wh = defaultdict(lambda: [])
# Video frame loop
while cap.isOpened():
    # Read the data stream
    success, frame = cap.read()
    
    # Source video sometimes are too big in resolution and fill the whole screen, and sometimes it's just slow
    frame = resize_frame(frame, 0.5)

    if success:
        # Actual inference process happen here
        # Capped to class 0 for human tracking only
        results = model.track(frame, classes=[0], persist=True, save=True, project='./result')
        # Dump the tracking frame into a ndarray
        annotated_frame = results[0].plot()
        
        # Putting how text about how many people on the screen.
        annotated_frame = cv2.putText(annotated_frame, f'{len(results[0])} person detected', (50, 50), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 255))
        # Checks to detect whether there's too many people on the screen
        if len(results[0]) >= MAX_PERSON_ALLOWED:
            # Timer start
            if warning_timer == 0:
                warning_timer = time.time()
                
            # Running timer calculation
            running_timer = round(time.time() - warning_timer, 2)

            # Check whether too many people beeng lounging for too long
            if running_timer >= MAX_TIME_ALLOWED:
                # Text info with timers
                annotated_frame = cv2.putText(annotated_frame, f'{len(results[0])} person detected for {running_timer} seconds', (50, 50), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 255))
        else:
            # Reset timer calculation
            warning_timer = 0
            running_timer = 0
        
        # Safety checks for boxes None condition (without this it will crash if no trackable object is in sight)
        if results[0].boxes.id is None:
            pass
        else:
            # Extract boxes and ids
            boxes = results[0].boxes.xywh
            track_ids = results[0].boxes.id.int().numpy()
        
            # Print traling effect on the boxes            
            for box, track_id in zip(boxes, track_ids):
                x, y, w, h = box
                track = track_history_xy[track_id]
                track.append((float(x), float(y)))
                track_history_wh[track_id].append((float(w), float(h)))
                # Each boxes will have a max frame saved to the dict depends on MAX_FRAME_SAVED value
                # Extra frame will be ejected out of the queue
                if len(track) > MAX_FRAME_SAVED:
                    track.pop(0)

                # Stack the previous frame with the new frame together and reshaping it into (x,y) coord
                points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
                # Draw the tracking lines
                cv2.polylines(annotated_frame, [points], isClosed=False, color=(0, 225, 225), thickness=2)
        
        # Show frames
        cv2.imshow("YOLO Inference", annotated_frame)

        # cv2 window exit
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    else:
        break

# Obligatory cv2 window destroy
cap.release()
cv2.destroyAllWindows()

In [6]:
# Empty lists for each row of the dataframe
ids = []
frame = []
xs = []
ys = []
ws = []
hs = []

frame_counter = 0
# Loop every entry in track_history
for entry in track_history_xy.items():
    # Empty entry sometimes pass through as the first frame
    if entry[1] == []:
        continue
    else:
        # Loop for each frame for each person detected
        for coord in entry[1]:
            ids.append(entry[0])
            frame.append(frame_counter)
            xs.append(coord[0])
            ys.append(coord[1])
            ws.append(track_history_wh[entry[0]][0][0])
            hs.append(track_history_wh[entry[0]][0][1])
            
            frame_counter += 1
    frame_counter=0

In [7]:
# Save data to csv
df = pd.DataFrame({'id': ids, 'frame': frame, 'x': xs, 'y': ys, 'w': ws, 'h': hs})
df.to_csv('data.csv')