


# $Analysing$ $Human$ $Movement$ $Patterns$ $in$ $Diverse$ $Environments$ $using$ *``YOLOv8 \ YOLOv10``*  $\&$  *``Roboflow Supervision``*

---

<a align="">
  <img width="1200"src="https://raw.githubusercontent.com/ultralytics/assets/main/yolov8/banner-yolov8.png">
</a>

<a align="">
 




# $Setup$ ✅
---



In [86]:
import cv2
import os
import shutil
import numpy as np
import yaml
import torch


In [87]:
# Check for CUDA device and set it
torch.cuda.set_device(0)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using device: {device}')
  # checks

Using device: cuda


In [88]:
from ultralytics import YOLO, checks, hub
checks()

Ultralytics YOLOv8.2.72  Python-3.10.8 torch-2.3.1+cu118 CUDA:0 (NVIDIA GeForce GTX 1650 Ti with Max-Q Design, 4096MiB)
Setup complete  (12 CPUs, 15.8 GB RAM, 311.0/337.3 GB disk)


###  $Supervision$

<a align="center">
  <img width ="1200" src="https://camo.githubusercontent.com/6b72c64ca80ade48dd90f820cb403946cde644335c05f400a939644ca0488ed0/68747470733a2f2f6d656469612e726f626f666c6f772e636f6d2f6f70656e2d736f757263652f7375706572766973696f6e2f72662d7375706572766973696f6e2d62616e6e65722e706e673f7570646174656441743d31363738393935393237353239">
</a>

In [4]:
import supervision as sv 

In [5]:
import supervision as sv
from supervision.draw.color import ColorPalette
from supervision import Detections, BoxAnnotator
smoother = sv.DetectionsSmoother()

# $Model$ $Selection$ 🍇
---


### $YOLOv8x$ $model$



In [126]:
from ultralytics import YOLO
model = YOLO("yolov8x.pt").to(device)  # load a pretrained model (recommended for training)

### $YOLOv8n$ $model$


In [125]:
model = YOLO("yolov8n.pt").to(device)  # load a pretrained model (recommended for training)

# $Movement$ $Analysis$ 🏃🏻‍♀️
---




 The Input Video: *the video that we will test it*

In [116]:
SOURCE_VIDEO_PATH=r'C:\Users\MSI\Desktop\Upwork Projects\2. Computer Vision and Image Processing\Oussema Project (Aerport Object Detection, Persons Counting, Mouvement Analysis)\Videos\videos\Test\test1.mp4'   #  Change the path
video_info = sv.VideoInfo.from_video_path(video_path=SOURCE_VIDEO_PATH)
video_info

VideoInfo(width=1920, height=1080, fps=25, total_frames=341)

Utils 

In [9]:
# dict maping class_id to class_name
CLASS_NAMES_DICT = model.model.names
# class_ids of interest - car, motorcycle, bus and truck
CLASS_ID = [i for i in range(80)]


In [127]:
def get_bbox_center(box):
    """Calculate the center point coordinates of a bounding box.

    Args:
        box (numpy.ndarray): A numpy array containing the bounding box coordinates in the form (xmin, ymin, xmax, ymax).

    Returns:
        tuple: A tuple containing the coordinates of the center point in the form (center_x, center_y).
    """
    center_x = (box[0] + box[2]) / 2
    center_y = (box[1] + box[3]) / 2
    return np.array([center_x, center_y])

In [11]:
def speed_interval(speed,thresh):
    if speed==99999:
        return 0
    elif speed>thresh:
        return 2
    else:
        return 1

In [122]:
def determine_movement_state(relative_displacement, bounding_box_width, bounding_box_height):
    if relative_displacement > 0.00045 * max(bounding_box_width, bounding_box_height):
        return 2  # Moving
    else:
        return 1  # Stopped


## $Both \space Classes  \space Detection$: *``Absolute threshold vlaue``*

In [166]:
# Step 1: Read the video using a VideoCapture / Video_generator
# Step 2: Detection with supervised annotations
# Step 3: Identify Moving/Stopping objects

#  |
#  | 
#  |--->  Tracker required
#  |  |
#  |  -->  ByteTrack
#  |
#  |--->  Process Current/Previous Frames
class_names = ['Processing..', 'Stopped', 'Moving']

tracker = sv.ByteTrack(frame_rate=video_info.fps)
# Colors for each use case: Processing, Moving, Stopping
colors = sv.ColorPalette.from_hex(['#FFDBAC', '#E87A5C', '#B6E696'])
black = sv.Color.BLACK

# Video Capture - Real-Time (webcam)
cap = cv2.VideoCapture(SOURCE_VIDEO_PATH)

# Video Writer - Save results to a file
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('Both_classes_Absolute.mp4', fourcc, 25, (1920, 1080))

# Verify if the video opened successfully
if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

threshold_speed = 10  # Speed threshold value
d = 0                   # Frame counter between the current and previous detections
step = 10               # Frame step between the previous state and the current state for speed calculation
first_attempt = True    # Indicates the first time when both old and current states are available
k = 0                   # State variable: changes at the first detection

# Annotators
trace_annotator = sv.TraceAnnotator(trace_length=70, thickness=2)
label_annotator = sv.LabelAnnotator(color = colors, text_color = black, text_position = sv.Position.TOP_LEFT, text_scale = 0.7)
color_annotator = sv.ColorAnnotator(color=colors, opacity=0.2)
ellipse_annotator = sv.EllipseAnnotator(color=colors, thickness=4)
actl_D = {}

while cap.isOpened():
    # Capture frame-by-frame
    ret, frame = cap.read()
    if ret:
        result = model(frame, classes=[0])[0]
        d += 1

        # Process only when there are detections
        if len(result[0]) > 0:

            detections = sv.Detections.from_ultralytics(result)
            detections = tracker.update_with_detections(detections)

            annotated_frame = trace_annotator.annotate(
                scene=frame.copy(),
                detections=detections
            )

            # Initialize speed to zero at the first detection
            if k == 0:
                speed_D = {detections.tracker_id[i]: 0 for i in range(len(detections.tracker_id))}
                k = 1

            if d == step:  # When the current and previous frame distances equal the step

                # Previous results 
                # Skip the first case: just for more processing purposes
                if first_attempt:
                    detections_pf = detections
                    first_attempt = False

                else:  # Apply the model on the previous frame

                    prvs_D = {detections_pf.tracker_id[i]: get_bbox_center(detections_pf.xyxy[i]) for i in range(len(detections_pf.tracker_id))}
                    actl_D = {detections.tracker_id[i]: get_bbox_center(detections.xyxy[i]) for i in range(len(detections.tracker_id))}
                    speed_D = {}

                    # Iterate over the keys in actl_D
                    for key in actl_D:
                        if key in prvs_D:
                            # Calculate the norm for common keys
                            speed_D[key] = np.linalg.norm(actl_D[key] - prvs_D[key])
                        else:
                            # Assign zero for unique keys
                            speed_D[key] = 0

                    # Update previous detections
                    detections_pf = detections
                d = 0

            # Correct and filter detection classes

            # Objects in the current frame but not in the previous frame are marked as "Processing"
            for key in list(set(detections.tracker_id.tolist()) - set(list(actl_D.keys()))):
                speed_D[key] = 99999

            if not first_attempt:
                new_classes = np.array([speed_interval(speed_D[key], threshold_speed) for key in detections.tracker_id])
                detections.class_id = new_classes

            # Annotate the frame
            labels = [
                f"ID: {tracker_id}, {class_names[class_id]}, Absolute speed: {np.round(speed_D[tracker_id], 3)} "
                for _, _, confidence, class_id, tracker_id, _,
                in detections
            ]
            annotated_frame = label_annotator.annotate(
                scene=annotated_frame,
                labels=labels,
                detections=detections
            )
            annotated_frame = color_annotator.annotate(
                scene=annotated_frame.copy(),
                detections=detections
            )
            frame = ellipse_annotator.annotate(
                scene=annotated_frame,
                detections=detections
            )

        out.write(frame)  
        cv2.imshow('window', cv2.resize(frame, (920, 560)))
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    else:
        break
cap.release()
out.release()
cv2.destroyAllWindows()



0: 384x640 35 persons, 302.3ms
Speed: 7.5ms preprocess, 302.3ms inference, 10.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 100.4ms
Speed: 3.6ms preprocess, 100.4ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 92.4ms
Speed: 0.0ms preprocess, 92.4ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 101.1ms
Speed: 0.0ms preprocess, 101.1ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 33 persons, 90.5ms
Speed: 9.5ms preprocess, 90.5ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 31 persons, 94.5ms
Speed: 3.0ms preprocess, 94.5ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 32 persons, 96.1ms
Speed: 0.0ms preprocess, 96.1ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 35 persons, 94.0ms
Speed: 0.0ms preprocess, 94.0ms inference, 0.0ms postprocess per i

## $Both \space Classes  \space Detection$: *``Relative threshold value``*

In [142]:
# Step 1: Read the video using a VideoCapture / Video_generator
# Step 2: Detection with supervised annotations
# Step 3: Identify moving/stopping objects

#  |
#  | 
#  |--->  Tracker required
#  |  |
#  |  -->  ByteTrack
#  |
#  |--->  Process current/previous frames

class_names = ['Processing..', 'Stopped', 'Moving']

tracker = sv.ByteTrack(frame_rate=video_info.fps)

# Colors for each use case: Processing, Moving, Stop
colors = sv.ColorPalette.from_hex(['#FFDBAC', '#E87A5C', '#B6E696'])

black = sv.Color.BLACK

# Video Reading - Real-time (webcam)
cap = cv2.VideoCapture(SOURCE_VIDEO_PATH)

# Video Saver / Video Writer - results()
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('Both_classes_Relative.mp4', fourcc, 25, (1920, 1080))

# Check if video opened successfully
if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

d = 0                   # Counter: Computes the number of frames between the current and previous detections
step = 10               # Step between the previous state and the current state for speed calculation: number of frames
first_attempt = True    # First attempt: Initial state calculation after getting the first 8 frames (based on step)
k = 0                   # State variable: Changes after the first detection

# Annotators
trace_annotator = sv.TraceAnnotator(trace_length = 70, thickness = 2)
label_annotator = sv.LabelAnnotator(color = colors, text_color = black, text_position = sv.Position.TOP_LEFT, text_scale = 0.7)
color_annotator = sv.ColorAnnotator(color = colors, opacity = 0.2)
ellipse_annotator = sv.EllipseAnnotator(color = colors, thickness = 4)
actl_D = {}

while cap.isOpened():
    # Capture frame-by-frame
    ret, frame = cap.read()
    if ret:
        result = model(frame, classes = [0])[0]
        d += 1
        
        # Processing only when there is detection
        if len(result[0]) > 0:
            detections = sv.Detections.from_ultralytics(result)
            detections = tracker.update_with_detections(detections)
            detections = smoother.update_with_detections(detections)
            
            annotated_frame = trace_annotator.annotate(
                scene = frame.copy(),
                detections = detections)

            # Initialization of the speed at zero only in the first detection
            if k == 0:
                speed_D = {detections.tracker_id[i]: 0 for i in range(len(detections.tracker_id))}
                k = 1

            if d == step:  # When current-previous state distance equals the step
                # Previous results 
                # Skip the first case: just for more processing purposes
                if first_attempt:
                    detections_pf = detections
                    first_attempt = False
                else:  # Apply the model on the previous frame
                    prvs_D = {detections_pf.tracker_id[i]: get_bbox_center(detections_pf.xyxy[i]) for i in range(len(detections_pf.tracker_id))}
                    actl_D = {detections.tracker_id[i]: [detections.xyxy[i][2] - detections.xyxy[i][0], (detections.xyxy[i][3] - detections.xyxy[i][1]), get_bbox_center(detections.xyxy[i])] for i in range(len(detections.tracker_id))}
                    speed_D = {}
                    
                    # Iterate over the keys in actl_D
                    for key in actl_D:
                        if key in prvs_D:
                            # Calculate the norm for common keys
                            width = actl_D[key][0]
                            height = actl_D[key][1]
                            relative_displacement = np.linalg.norm(actl_D[key][2] - prvs_D[key]) / np.max([width, height])

                            speed_D[key] = [relative_displacement, determine_movement_state(relative_displacement, width, height)]
                        else:
                            # Assign zero for unique keys
                            speed_D[key] = [99999, 0]

                    # Updating previous detections
                    detections_pf = detections
                d = 0

             
            # For new objects appearing in the frame, objects in the current frame and not in the previous. 
            # In this case, create a new class: under processing class 
            for key in list(set(detections.tracker_id.tolist()) - set(list(actl_D.keys()))):
                speed_D[key] = [99999, 0]

            if not first_attempt:
                new_classes = np.array([speed_D[key][1] for key in detections.tracker_id])
                detections.class_id = new_classes
        
            # Annotation  
            labels = [
                f"ID: {tracker_id}, {class_names[class_id]}, Relative speed: {np.round(speed_D[tracker_id][0], 3)}"
                for _, _, confidence, class_id, tracker_id, _,
                in detections
            ]
            annotated_frame = label_annotator.annotate(
                scene = annotated_frame.copy(),
                labels = labels,
                detections = detections
            )
            annotated_frame = color_annotator.annotate(
                scene = annotated_frame.copy(),
                detections = detections
            )
            frame = ellipse_annotator.annotate(
                scene = annotated_frame.copy(),
                detections = detections
            )

        out.write(frame)  
        cv2.imshow('window', cv2.resize(frame, (920, 560)))
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    else:
        break
cap.release()
out.release()
cv2.destroyAllWindows()



0: 384x640 35 persons, 318.2ms
Speed: 1.8ms preprocess, 318.2ms inference, 6.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 99.9ms
Speed: 0.0ms preprocess, 99.9ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 89.7ms
Speed: 0.0ms preprocess, 89.7ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 89.3ms
Speed: 0.0ms preprocess, 89.3ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 33 persons, 80.4ms
Speed: 1.1ms preprocess, 80.4ms inference, 9.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 31 persons, 73.9ms
Speed: 3.5ms preprocess, 73.9ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 32 persons, 85.5ms
Speed: 0.0ms preprocess, 85.5ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 35 persons, 91.4ms
Speed: 0.0ms preprocess, 91.4ms inference, 1.9ms postprocess per image 

## $ Stopped$ ``Only``  $\space Persons  \space  Detection$

In [154]:
# Step 1: Read the video using a Streamlit generator
# Step 2: Detection with supervised annotations
# Step 3: Identify moving/stopping objects

#  |
#  | 
#  |--->  Tracker required
#  |  |
#  |  -->  ByteTrack
#  |
#  |--->  Process current/previous frames

class_names = ['Processing..', 'Stopped', 'Moving']

tracker = sv.ByteTrack(frame_rate=video_info.fps)

# Colors for each use case: Processing, Moving, Stop
colors = sv.ColorPalette.from_hex(['#FFDBAC', '#E87A5C', '#B6E696'])

black = sv.Color.BLACK
ROBOFLOW=sv.Color.ROBOFLOW

# Video Reading - Real-time (webcam)
cap = cv2.VideoCapture(SOURCE_VIDEO_PATH)

# Video Saver / Video Writer - results()
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('Stopped_Olny.mp4', fourcc, 25, (1920, 1080))

# Check if video opened successfully
if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

d = 0  # Counter: Computes the number of frames between the current and previous detections
step = 10  # Step between the previous state and the current state for speed calculation: number of frames
first_attempt = True  # First attempt: Initial state calculation after getting the first 8 frames (based on step)
k = 0  # State variable: Changes after the first detection

# Annotators
trace_annotator = sv.TraceAnnotator(color=ROBOFLOW,trace_length = 70, thickness = 3)
label_annotator = sv.LabelAnnotator(color = colors, text_color = black, text_position = sv.Position.TOP_LEFT, text_scale = 0.7)
color_annotator = sv.ColorAnnotator(color = colors, opacity = 0.3)
ellipse_annotator = sv.EllipseAnnotator(color = colors, thickness = 4)
actl_D = {}

while cap.isOpened():
    # Capture frame-by-frame
    ret, frame = cap.read()
    if ret:
        result = model(frame, classes = [0])[0]
        d += 1
        
        # Processing only when there is detection
        if len(result[0]) > 0:
            detections = sv.Detections.from_ultralytics(result)
            detections = tracker.update_with_detections(detections)
            detections = smoother.update_with_detections(detections)
            
            

            # Initialization of the speed at zero only in the first detection
            if k == 0:
                speed_D = {detections.tracker_id[i]: 0 for i in range(len(detections.tracker_id))}
                k = 1

            if d == step:  # When current-previous state distance equals the step
                # Previous results 
                # Skip the first case: just for more processing purposes
                if first_attempt:
                    detections_pf = detections
                    first_attempt = False
                else:  # Apply the model on the previous frame
                    prvs_D = {detections_pf.tracker_id[i]: get_bbox_center(detections_pf.xyxy[i]) for i in range(len(detections_pf.tracker_id))}
                    actl_D = {detections.tracker_id[i]: [detections.xyxy[i][2] - detections.xyxy[i][0], (detections.xyxy[i][3] - detections.xyxy[i][1]), get_bbox_center(detections.xyxy[i])] for i in range(len(detections.tracker_id))}
                    speed_D = {}
                    
                    # Iterate over the keys in actl_D
                    for key in actl_D:
                        if key in prvs_D:
                            # Calculate the norm for common keys
                            width = actl_D[key][0]
                            height = actl_D[key][1]
                            relative_displacement = np.linalg.norm(actl_D[key][2] - prvs_D[key]) / np.max([width, height])

                            speed_D[key] = [relative_displacement, determine_movement_state(relative_displacement, width, height)]
                        else:
                            # Assign zero for unique keys
                            speed_D[key] = [99999, 0]

                    # Updating previous detections
                    detections_pf = detections
                d = 0




            
            # For new objects appearing in the frame, objects in the current frame and not in the previous. 
            # In this case, create a new class: under processing class 
            for key in list(set(detections.tracker_id.tolist()) - set(list(actl_D.keys()))):
                speed_D[key] = [99999, 0]

            if not first_attempt:
                new_classes = np.array([speed_D[key][1] for key in detections.tracker_id])
                detections.class_id = new_classes
            

            
            # Filtering 
            selected_classes = [0, 1] 
            detections=detections[np.isin(detections.class_id, selected_classes)]

            # Annotation 
            annotated_frame = trace_annotator.annotate(
                scene = frame.copy(),
                detections = detections)
              
            labels = [
                f"ID: {tracker_id}, {class_names[class_id]}, Relative speed: {np.round(speed_D[tracker_id][0], 3)}"
                for _, _, confidence, class_id, tracker_id, _,
                in detections
            ]
            annotated_frame = label_annotator.annotate(
                scene = annotated_frame.copy(),
                labels = labels,
                detections = detections
            )
            annotated_frame = color_annotator.annotate(
                scene = annotated_frame.copy(),
                detections = detections
            )
            frame = ellipse_annotator.annotate(
                scene = annotated_frame.copy(),
                detections = detections
            )

        out.write(frame)  
        cv2.imshow('window', cv2.resize(frame, (920, 560)))
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    else:
        break
cap.release()
out.release()
cv2.destroyAllWindows()



0: 384x640 35 persons, 386.3ms
Speed: 14.2ms preprocess, 386.3ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 101.9ms
Speed: 4.0ms preprocess, 101.9ms inference, 2.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 85.1ms
Speed: 2.9ms preprocess, 85.1ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 99.8ms
Speed: 0.0ms preprocess, 99.8ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 33 persons, 96.8ms
Speed: 0.0ms preprocess, 96.8ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 31 persons, 85.9ms
Speed: 0.0ms preprocess, 85.9ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 32 persons, 82.5ms
Speed: 0.0ms preprocess, 82.5ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 35 persons, 84.7ms
Speed: 0.0ms preprocess, 84.7ms inference, 16.0ms postprocess per im

## $Moving$ ``Only``$\space Persons \space Detection$

In [155]:
# Step 1: Read the video using a VideoCapture / Video_generator
# Step 2: Detection with supervised annotations
# Step 3: Identify moving/stopping objects

#  |
#  | 
#  |--->  Tracker required
#  |  |
#  |  -->  ByteTrack
#  |
#  |--->  Process current/previous frames

class_names = ['Processing..', 'Stopped', 'Moving']

tracker = sv.ByteTrack(frame_rate=video_info.fps)

# Colors for each use case: Processing, Moving, Stop
colors = sv.ColorPalette.from_hex(['#FFDBAC', '#E87A5C', '#B6E696'])

black = sv.Color.BLACK
ROBOFLOW=sv.Color.ROBOFLOW
# Video Reading - Real-time (webcam)
cap = cv2.VideoCapture(SOURCE_VIDEO_PATH)

# Video Saver / Video Writer - results()
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('Moving_only.mp4', fourcc, 25, (1920, 1080))

# Check if video opened successfully
if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

d = 0                 # Counter: Computes the number of frames between the current and previous detections
step = 10             # Step between the previous state and the current state for speed calculation: number of frames
first_attempt = True  # First attempt: Initial state calculation after getting the first 8 frames (based on step)
k = 0                 # State variable: Changes after the first detection

# Annotators
trace_annotator = sv.TraceAnnotator(color=ROBOFLOW,trace_length = 70, thickness = 3)
label_annotator = sv.LabelAnnotator(color = colors, text_color = black, text_position = sv.Position.TOP_LEFT, text_scale = 0.7)
color_annotator = sv.ColorAnnotator(color = colors, opacity = 0.3)
ellipse_annotator = sv.EllipseAnnotator(color = colors, thickness = 4)
actl_D = {}

while cap.isOpened():
    # Capture frame-by-frame
    ret, frame = cap.read()
    if ret:
        result = model(frame, classes = [0])[0]
        d += 1
        
        # Processing only when there is detection
        if len(result[0]) > 0:
            detections = sv.Detections.from_ultralytics(result)
            detections = tracker.update_with_detections(detections)
            detections = smoother.update_with_detections(detections)
            


            # Initialization of the speed at zero only in the first detection
            if k == 0:
                speed_D = {detections.tracker_id[i]: 0 for i in range(len(detections.tracker_id))}
                k = 1

            if d == step:  # When current-previous state distance equals the step
                # Previous results 
                # Skip the first case: just for more processing purposes
                if first_attempt:
                    detections_pf = detections
                    first_attempt = False
                else:  # Apply the model on the previous frame
                    prvs_D = {detections_pf.tracker_id[i]: get_bbox_center(detections_pf.xyxy[i]) for i in range(len(detections_pf.tracker_id))}
                    actl_D = {detections.tracker_id[i]: [detections.xyxy[i][2] - detections.xyxy[i][0], (detections.xyxy[i][3] - detections.xyxy[i][1]), get_bbox_center(detections.xyxy[i])] for i in range(len(detections.tracker_id))}
                    speed_D = {}
                    
                    # Iterate over the keys in actl_D
                    for key in actl_D:
                        if key in prvs_D:
                            # Calculate the norm for common keys
                            width = actl_D[key][0]
                            height = actl_D[key][1]
                            relative_displacement = np.linalg.norm(actl_D[key][2] - prvs_D[key]) / np.max([width, height])

                            speed_D[key] = [relative_displacement, determine_movement_state(relative_displacement, width, height)]
                        else:
                            # Assign zero for unique keys
                            speed_D[key] = [99999, 0]

                    # Updating previous detections
                    detections_pf = detections
                d = 0




            
            # For new objects appearing in the frame, objects in the current frame and not in the previous. 
            # In this case, create a new class: under processing class 
            for key in list(set(detections.tracker_id.tolist()) - set(list(actl_D.keys()))):
                speed_D[key] = [99999, 0]

            if not first_attempt:
                new_classes = np.array([speed_D[key][1] for key in detections.tracker_id])
                detections.class_id = new_classes
            

            
            # Filtering 
            selected_classes = [0, 2] 
            detections=detections[np.isin(detections.class_id, selected_classes)]


            # Annotation   
            annotated_frame = trace_annotator.annotate(
                scene = frame.copy(),
                detections = detections)
            
            labels = [
                f"ID: {tracker_id}, {class_names[class_id]}, Relative speed: {np.round(speed_D[tracker_id][0], 3)}"
                for _, _, confidence, class_id, tracker_id, _,
                in detections
            ]
            annotated_frame = label_annotator.annotate(
                scene = annotated_frame.copy(),
                labels = labels,
                detections = detections
            )
            annotated_frame = color_annotator.annotate(
                scene = annotated_frame.copy(),
                detections = detections
            )
            frame = ellipse_annotator.annotate(
                scene = annotated_frame.copy(),
                detections = detections
            )

        out.write(frame)  
        cv2.imshow('window', cv2.resize(frame, (920, 560)))
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    else:
        break
cap.release()
out.release()
cv2.destroyAllWindows()



0: 384x640 35 persons, 404.6ms
Speed: 4.8ms preprocess, 404.6ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 103.9ms
Speed: 2.8ms preprocess, 103.9ms inference, 1.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 83.3ms
Speed: 0.0ms preprocess, 83.3ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 34 persons, 83.0ms
Speed: 0.0ms preprocess, 83.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 33 persons, 88.7ms
Speed: 3.0ms preprocess, 88.7ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 31 persons, 87.3ms
Speed: 3.0ms preprocess, 87.3ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 32 persons, 88.4ms
Speed: 2.0ms preprocess, 88.4ms inference, 5.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 35 persons, 82.2ms
Speed: 0.0ms preprocess, 82.2ms inference, 0.0ms postprocess per imag

# .
© 2024 Nassim, NAFCAI VISION. All rights reserved.

[Linkedin Link](https://www.linkedin.com/in/nassim-hammami-771015217/)