In [None]:
!pip install ultralytics opencv-python -q
!conda install conda-forge::ultralytics
# !pip install supervision==0.21.0.rc5

### Sample test footage

In [None]:
import cv2
from ultralytics import YOLO, solutions
# Load the YOLO model
model = YOLO("yolov9e.pt")

# Open the video file
cap = cv2.VideoCapture("resources/741755_Kazakhstan Traffic Cars Road_By_Danil_Nevsky_Artlist_HD.mp4")
assert cap.isOpened(), "Error reading video file" 
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))

classes_to_count = list(range(0, 23))  # classes to count 

# Define the lines or regions points
line_points_1 = [(50, 300), (1000, 250)]  # first line points
line_points_2 = [(1400, 750), (1920, 650)]  # second line points

# Video writer
video_writer = cv2.VideoWriter("sample_object_counting_output.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))

# Init Object Counters
counter_1 = solutions.ObjectCounter(
    view_img=True,
    reg_pts=line_points_1,
    classes_names=model.names,
    draw_tracks=True,
    line_thickness=2,
    text_offset=(0, 0),
    count_reg_color=(229, 255, 204),
    count_bg_color=(229, 255, 204)
)

counter_2 = solutions.ObjectCounter(
    view_img=True,
    reg_pts=line_points_2,
    classes_names=model.names,
    draw_tracks=True,
    line_thickness=2,
    text_offset=(-250, 0),
    count_reg_color=(153, 153, 255),
    count_bg_color=(153, 153, 255)
)

# Init separate trackers for each counter
tracker = model  # Tracker for the first counter

while cap.isOpened():
    success, im0 = cap.read()
    if not success:
        print("Video frame is empty or video processing has been successfully completed.")
        break

    # Perform tracking for the first counter, options: tracker="bytetrack.yaml"
    tracks = tracker.track(im0, persist=True, show=False, classes=classes_to_count, device="gpu")
            
    im0 = counter_1.start_counting(im0, tracks, stationary_ids=set())

    # # Perform tracking for the second counter
    # im0 = counter_2.start_counting(im0, tracks)
    
    # Write the frame to the output video
    video_writer.write(im0)

# Release resources
cap.release()
video_writer.release()
cv2.destroyAllWindows()

### Real World Footage

In [None]:
import cv2
from ultralytics import YOLO, solutions

# Load the YOLO model
model = YOLO("yolov9e.pt")

# Open the video file
cap = cv2.VideoCapture("resources/DJI_0505.MP4")
assert cap.isOpened(), "Error reading video file" 
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))

classes_to_count = list(range(0, 23))  # classes to count 

# Define the lines or regions points
line_points_1 = [(300, 600), (700, 550)]  # first line points
line_points_2 = [(800, 550), (1700, 350)]  # second line points
line_points_3 = [(174, 1150), (174, 1800)]  # second line points
line_points_4 = [(640, 1674), (1006, 1578)]  # second line points


# Video writer
video_writer = cv2.VideoWriter("out_object_counting_output.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))

# Init Object Counters
counter_1 = solutions.ObjectCounter(
    view_img=True,
    reg_pts=line_points_1,
    classes_names=model.names,
    draw_tracks=True,
    line_thickness=2,
    text_offset=(0, 0),
    count_reg_color=(229, 255, 204),  # Very light green
    count_bg_color=(229, 255, 204)  # Very light green
)

counter_2 = solutions.ObjectCounter(
    view_img=True,
    reg_pts=line_points_2,
    classes_names=model.names,
    draw_tracks=True,
    line_thickness=2,
    text_offset=(-250, 0),
    count_reg_color=(153, 204, 255),  # Light blue
    count_bg_color=(153, 204, 255)  # Light blue
)

counter_3 = solutions.ObjectCounter(
    view_img=True,
    reg_pts=line_points_3,
    classes_names=model.names,
    draw_tracks=True,
    line_thickness=2,
    text_offset=(-500, 0),
    count_reg_color=(255, 204, 204),  # Light red
    count_bg_color=(255, 204, 204)  # Light red
)

counter_4 = solutions.ObjectCounter(
    view_img=True,
    reg_pts=line_points_4,
    classes_names=model.names,
    draw_tracks=True,
    line_thickness=2,
    text_offset=(-750, 0),
    count_reg_color=(210, 180, 140),  # Light brown
    count_bg_color=(210, 180, 140)  # Light brown
)


# Init separate trackers for each counter
tracker = model  # Tracker for the first counter

while cap.isOpened():
    success, im0 = cap.read()
    if not success:
        print("Video frame is empty or video processing has been successfully completed.")
        break

    # Perform tracking for the first counter, options: tracker="bytetrack.yaml"
    tracks = tracker.track(im0, persist=True, show=False, classes=classes_to_count)
    
    im0 = counter_1.start_counting(im0, tracks)

    # Perform tracking for the second counter
    im0 = counter_2.start_counting(im0, tracks)
    
    im0 = counter_3.start_counting(im0, tracks)
    
    im0 = counter_4.start_counting(im0, tracks)
    
    # Write the frame to the output video
    video_writer.write(im0)

# Release resources
cap.release()
video_writer.release()
cv2.destroyAllWindows()

### Calculate Speed and Prevent the detection of stationary objects

In [18]:
import cv2
from custom.ultralytics import YOLO, solutions
import time

def blur_region(frame, x_min, y_min, x_max, y_max):
    roi = frame[y_min:y_max, x_min:x_max]
    blurred_roi = cv2.GaussianBlur(roi, (51, 51), 0)
    frame[y_min:y_max, x_min:x_max] = blurred_roi
    return frame

model = YOLO("best_v3.pt")

# model = YOLO("yolov10x.pt")

# Open the video file
cap = cv2.VideoCapture("resources/DJI_0505.MP4")
assert cap.isOpened(), "Error reading video file" 
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))

classes_to_count = list(range(0, 8))  # classes to count

# classes_for_heatmap = [0, 1, 3] 

# Init heatmap
heatmap_obj = solutions.Heatmap(
    colormap=cv2.COLORMAP_AUTUMN,
    view_img=True,
    shape="circle",
    classes_names=model.names,
    decay_factor=1,
    # heatmap_alpha=0.8,
)


# Define the lines or regions points
# line_points_1 = [(300, 600), (700, 600), (700, 500), (300, 500)]  
line_points_1 = [(300, 600), (700, 550)]
line_points_2 = [(800, 550), (1700, 350)] 
line_points_3 = [(174, 1150), (174, 1800)]
line_points_4 = [(640, 1674), (1006, 1578)] 


# Video writer
video_writer = cv2.VideoWriter("v4.1_stationay_object_counting_output.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))

# Init Object Counters
counter_1 = solutions.ObjectCounter(
    view_img=True,
    reg_pts=line_points_1,
    classes_names=model.names,
    draw_tracks=True,
    text_offset=(0, 0),
    count_reg_color=(229, 255, 204), 
    count_bg_color=(229, 255, 204),  
    direction_wise_count=False  
)

counter_2 = solutions.ObjectCounter(
    view_img=True,
    reg_pts=line_points_2,
    classes_names=model.names,
    draw_tracks=True,
    text_offset=(-250, 0),
    count_reg_color=(153, 204, 255),  
    count_bg_color=(153, 204, 255),
    direction_wise_count=False  
)

counter_3 = solutions.ObjectCounter(
    view_img=True,
    reg_pts=line_points_3,
    classes_names=model.names,
    draw_tracks=True,
    text_offset=(-500, 0),
    count_reg_color=(255, 204, 204),  
    count_bg_color=(255, 204, 204),
)

counter_4 = solutions.ObjectCounter(
    view_img=True,
    reg_pts=line_points_4,
    classes_names=model.names,
    draw_tracks=True,
    text_offset=(-750, 0),
    count_reg_color=(210, 180, 140),  
    count_bg_color=(210, 180, 140),
    direction_wise_count=False  
)

# Initialize tracking dictionary
last_positions = {}
stationary_ids = set()
frame_interval = 30  # Check position every x frames
frame_count = 0  # Initialize frame counter
object_speed_ids= {}

movement_threshold = 2  # Minimum pixels an object must move to be considered in motion
pixel_to_meter = 0.5  # Conversion factor: 1 pixel = 0.5 meters
frame_rate = 23  # Frame rate of the video

tracker = model  

while cap.isOpened():
    success, im0 = cap.read()
    if not success:
        print("Video frame is empty or video processing has been successfully completed.")
        break

    frame_count += 1  # Increment frame counter

    # vid_stride=5
    tracks = tracker.track(im0, persist=True, show=False, classes=classes_to_count, device='mps')
        
    for result in tracks:
        for track in result.boxes:
            if track.id is None:
                continue
            
            id = int(track.id)

            if track.xyxy.numel() == 4:
                x_min, y_min, x_max, y_max = track.xyxy[0]  # Unpack from the first row

                # Calculate center coordinates (x, y), width (w), and height (h)
                x = (x_min + x_max) / 2
                y = (y_min + y_max) / 2
                w = x_max - x_min
                h = y_max - y_min
                
                if track.cls.item() == 0.0:  # If the class is 0 (person), blur the contents of the bounding box
                    im0 = blur_region(im0, int(x_min), int(y_min), int(x_max), int(y_max))
            
            if frame_count % frame_interval == 0:
                if id in last_positions:
                    last_x, last_y, _, _, last_frame_count = last_positions[id]
                    # Calculate movement
                    distance_pixels = ((last_x - x) ** 2 + (last_y - y) ** 2) ** pixel_to_meter
                    delta_frames = frame_count - last_frame_count
                    delta_time = delta_frames / frame_rate

                    if distance_pixels < movement_threshold:
                        stationary_ids.add(id)
                    else:
                        stationary_ids.discard(id)
                                        
                    distance_meters = distance_pixels * 0.1
                    speed_meters_per_second = distance_meters / delta_time

                    # Convert speed to kilometers per hour
                    speed_km_per_hour = speed_meters_per_second * 3.6

                    print(f"Object {id} moved with speed {speed_km_per_hour:.2f} km/h")
                    
                    # Update object_speed_ids dictionary
                    object_speed_ids[id] =  speed_km_per_hour
                    
            # Update position and timestamp x frames
            last_positions[id] = (x, y, w, h, frame_count)


    # Perform counting and draw tracks for each counter
    im0 = counter_1.start_counting(im0, tracks, stationary_ids=stationary_ids)
    im0 = counter_2.start_counting(im0, tracks, stationary_ids=stationary_ids)
    im0 = counter_3.start_counting(im0, tracks, stationary_ids=stationary_ids)
    im0 = counter_4.start_counting(im0, tracks, stationary_ids=stationary_ids)
    
    # im0 = heatmap_obj.generate_heatmap(im0, tracks)

    # Write the frame to the output video
    video_writer.write(im0)

# Release resources
cap.release()
video_writer.release()
cv2.destroyAllWindows()

None
Line Counter Initiated.
Line Counter Initiated.
Line Counter Initiated.
Line Counter Initiated.


0: 640x384 7 persons, 3 bicycles, 9 cars, 1 truck, 58.7ms
0: 640x384 7 persons, 3 bicycles, 9 cars, 1 truck, 58.7ms
Speed: 2.9ms preprocess, 58.7ms inference, 7.8ms postprocess per image at shape (1, 3, 640, 384)
Speed: 2.9ms preprocess, 58.7ms inference, 7.8ms postprocess per image at shape (1, 3, 640, 384)


0: 640x384 7 persons, 3 bicycles, 9 cars, 1 truck, 67.2ms
0: 640x384 7 persons, 3 bicycles, 9 cars, 1 truck, 67.2ms
Speed: 2.0ms preprocess, 67.2ms inference, 80.6ms postprocess per image at shape (1, 3, 640, 384)
Speed: 2.0ms preprocess, 67.2ms inference, 80.6ms postprocess per image at shape (1, 3, 640, 384)


0: 640x384 7 persons, 3 bicycles, 9 cars, 1 truck, 63.6ms
0: 640x384 7 persons, 3 bicycles, 9 cars, 1 truck, 63.6ms
Speed: 1.9ms preprocess, 63.6ms inference, 7.9ms postprocess per image at shape (1, 3, 640, 384)
Speed: 1.9ms preprocess, 63.6ms inference, 7.9ms postproce